Reactive Framework Subscribing to Observables by ThinqLinq

Reactive Framework Subscribing to Observables

It’s been a while since I started the Reactive Framework series. In case you missed the earlier posts, here’s the links to what we’ve done so far:

At this point, we’ve created our observer and set up the the logic that handles our OnNext values. What we haven’t done yet is wired our LINQ based processing pipeline to the event source. To do this, we need to Subscribe the handler to the Observables. By default, we need to create a new class that implements IObserver. To keep this simple, let’s just output the values to the console for now:


Class ConsoleObserver(Of T)
    Implements IObserver(Of T)


    Public Sub OnCompleted() Implements System.IObserver(Of T).OnCompleted

    End Sub

    Public Sub OnError(ByVal [error] As System.Exception) Implements System.IObserver(Of T).OnError

    End Sub

    Public Sub OnNext(ByVal value As T) Implements System.IObserver(Of T).OnNext
        Console.WriteLine(value.ToString)
    End Sub
End Class

The IObserver interface has three methods: the method that is fired when the source is done (OnCompleted), the method that occurs when an exception occurs (OnError) and the method that is used when each new value is received (OnNext). In the case of this simple example, we only implement the OnNext method and output the value to the Console Window.

With this in place, we can tie this all together creating and starting our sensor, filtering and projecting from the values (using LINQ) and displaying the values (through the new ConsoleObserver):


Dim Sensor As New ObservableSensor
Sensor.Start()

Dim lowvalueSensors = From s In Sensor
                      Where s.SensorValue < 3
                      Select s.SensorValue

lowvalueSensors.Subscribe(New ConsoleObserver(Of Double))

Thankfully, consuming our Observable chain doesn’t require creating a new class. Observable.Subscribe offers an additional overload which, instead of taking an IObservable, we can use an Action Lambda. As a result, we can restate our example above as follows:


Dim Sensor As New ObservableSensor
Sensor.Start()

Dim lowvalueSensors = From s In Sensor
                      Where s.SensorValue < 3
                      Select s.SensorValue

lowvalueSensors.Subscribe(Sub(value) Console.WriteLine(value))

While this consuming code is slightly longer, the total net effect is significantly more maintainable code since we don’t need to declare a separate class to just output our results.

We now could have a fully functioning set of examples. Unfortunately or example at this point is extremely unresponsive because we are completely CPU bound constantly running all of the process on the current thread. Up next time, moving our logic to the background thread.

Posted on - Comment
Categories: VB - VB Dev Center - Rx -
comments powered by Disqus