Reactive Framework Building an IObservable Event Generator by ThinqLinq

Reactive Framework Building an IObservable Event Generator

In my last post, I mentioned a number of cases where you may want to use the Reactive Framework. For some upcoming presentations, I wanted to focus on a couple of these scenarios, particularly on how you can use the Reactive Framework (rX) to work with events from device sensors. You can often find these kind of sensors in a number of industries, including Robotics, automated manufacturing systems, Medical monitors, Telecom usage, and Live financial feeds. In order to demonstrate using rX in this environment, I needed to build a module that simulated generating a bunch of sample random events. Below is the module that I created. We’ll use this module in some of the future discussions of Reactive Framework.

We’re going to start with a small class that will contain the state of the individual sensor events. We’ll call this our SensorInfo class. It will hold values for the date and time that the event occurred, an indicator on the sensor’s type and the value that the sensor returns. We'll also override the ToString method to allow us to output the values easily.


Public Class SensorInfo
    Public Property TimeStamp As DateTime
    Public Property SensorType As String
    Public Property SensorValue As Double

    Public Overrides Function ToString() As String
        Return String.Format("Time: {0}  , Type: {1}  Value: {2}", TimeStamp, SensorType, SensorValue)
    End Function
End Class

Now that we have our instance class, we can create a class that will generate these sensor items randomly. (This class is not thread safe, nor is it truly random so don't use it in production applications. It is merely designed for demonstration purposes.)


Public Class ObservableSensor

    Private _running As Boolean

    Public Sub StartSensor()
        If Not _running Then
            _running = True
            Dim randomizer = New Random(Date.Now.Millisecond)
            While _running
                Dim randVal = randomizer.NextDouble
                Dim info As New SensorInfo With {.SensorType = CInt(randVal * 4).ToString(),
                                                 .SensorValue = randVal * 20,
                                                 .TimeStamp = Now}
                End If
                Threading.Thread.Sleep(CInt(randomizer.NextDouble * 500))
            End While
        End If
    End Sub

    Public Sub StopSensor()
        _running = False
    End Sub

End Class

In this class, we maintain an internal variable (_running) which tracks whether the sensor is running or not. We also have a method that Starts the sensor and stops it. While the sensor is running, we essentially generate a number of SensorInfo instances with randomized values and then pause for a random period of time before creating another value. At this point, the values that are returned don’t have much meaning. We could easily change this to return stock quotes, manufacturing defects or other sensor responses by manipulating the values this randomizer generates.

Now that we can generate random SensorInfos, we need to actually do something with them. In the past, we could just raise an event for consumers to handle after we generate each sensor’s value. Since we want to leverage the power of the new IObservable/IObserver interfaces and the Reactive Framework, I’ll make this class implement IObservable(Of T) so that we can register a number of IObserver clients and notify them each time we generate a new sensor.

The IObservable(Of T) interface requires of a single method: Subscribe. This takes a single parameter which is the IObserver client that wants to listen to our sensor data. It returns a class that implements IDisposable (so that we can make sure each of our observers know when we’re done sending them data). Since the return object here is actually the ObservableSensor itself, we need to implement both IObservable and IDisposable. Here's our revised ObservableSensor class.


Public Class ObservableSensor
    Implements IObservable(Of SensorInfo)
    Implements IDisposable

    Private _observers As New List(Of IObserver(Of SensorInfo))
    Private _running As Boolean

    Public Function Subscribe(ByVal observer As System.IObserver(Of SensorInfo)) 
                              As System.IDisposable 
                              Implements System.IObservable(Of SensorInfo).Subscribe
        _observers.Add(observer)
        Return Me
    End Function

    Public Sub StartSensor()
        If Not _running Then
            _running = True
            Dim randomizer = New Random(Date.Now.Millisecond)
            While _running
                Dim randVal = randomizer.NextDouble
                If _observers.Any Then
                    Dim info As New SensorInfo With {.SensorType = CInt(randVal * 4).ToString,
                                                     .SensorValue = randVal * 20,
                                                     .TimeStamp = Now}

                    _observers.ForEach(Sub(o) o.OnNext(info))
                End If
                Threading.Thread.Sleep(CInt(randomizer.NextDouble * 500))
            End While
        End If
    End Sub

    Public Sub StopSensor()
        _running = False
    End Sub


#Region "IDisposable Support"
    Private disposedValue As Boolean ' To detect redundant calls

    ' IDisposable
    Protected Overridable Sub Dispose(ByVal disposing As Boolean)
        If Not Me.disposedValue Then
            If disposing Then
                If _observers IsNot Nothing Then
                    _observers.ForEach(Sub(o) o.OnCompleted())
                    _observers.Clear()
                End If
                ' TODO: dispose managed state (managed objects).
            End If

            ' TODO: free unmanaged resources (unmanaged objects) and override Finalize() below.
            ' TODO: set large fields to null.
        End If
        Me.disposedValue = True
    End Sub

    ' This code added by Visual Basic to correctly implement the disposable pattern.
    Public Sub Dispose() Implements IDisposable.Dispose
        ' Do not change this code.  Put cleanup code in Dispose(ByVal disposing As Boolean) above.
        Dispose(True)
        GC.SuppressFinalize(Me)
    End Sub
#End Region

End Class

In this new version, we now have a new _observers object that maintains a list of the observers (clients). This allows us to notify multiple sensor handlers and work with them how they deem appropriate. The subscribe method simply takes the supplied observer and sticks it in the collection.

When we start the sensor, we now check to see if there are any observers (using the LINQ .Any method). If we do, we’ll generate the random sensor data. We then notify all of the listeners using the list .ForEach method passing the lambda expression instructing the observer to invoke it’s OnNext handler (part of the IObserver(Of T) implementation. This is the method which corresponds to IEnumerable’s MoveNext. It is this method which will trigger our reactive framework’s event pipeline to begin processing our sensor notifications.

When we’re done, we need to clean up our resouces. In the Disposing event, we make sure that we call the OnCompleted method on each (ForEach) of the observers in our _observers collection. We also clear the observer collection to remove the reference pointers between the client and our sensor generator.

There you have it, a generic random event generator that we can consume with the Reactive Framework (or similar technologies like StreamInsight). Next time, we’ll start to consume these events.

As always, let me know what you Thinq and if there are any modifications I should consider.

Posted on - Comment
Categories: VB - VB Dev Center - Visual Studio - Rx -
comments powered by Disqus