Revising the Reactive Sensor Generator by ThinqLinq

Revising the Reactive Sensor Generator

When first created the Reactive sensor sample I wasn't completely happy with it because if any of the various subscribers were disposed or triggered an OnCompleted (like the Any) clause, it would trigger a completed state to all of the other “listeners”. This is not what I intended.  To make it easy, let’s review how we created the sensor originally:


    Private _observers As New List(Of IObserver(Of SensorInfo))
    Private _running As Boolean

    Public Function Subscribe(ByVal observer As System.IObserver(Of SensorInfo)) 
                              As System.IDisposable 
                              Implements System.IObservable(Of SensorInfo).Subscribe
        _observers.Add(observer)
        Return Me
    End Function

    Public Sub StartSensor()
        If Not _running Then
            _running = True
            Dim randomizer = New Random(Date.Now.Millisecond)
            While _running
                Dim randVal = randomizer.NextDouble
                If _observers.Any Then
                    Dim info As New SensorInfo With {.SensorType = CInt(randVal * 4).ToString,
                                                     .SensorValue = randVal * 20,
                                                     .TimeStamp = Now}

                    _observers.ForEach(Sub(o) o.OnNext(info))
                End If
                Threading.Thread.Sleep(CInt(randomizer.NextDouble * 500))
            End While
        End If
    End Sub

For a quick review, we set up an internal list of observers and manually added new subscribers to this list. We didn’t have a good way of removing them from the list however through the typical dispose implementation. Then as we are looping in the while loop, we generate a new sensor reading and announce it to each of the listeners through the _observers.ForEach(0 => o.OnNext(info)). Because this was constantly running (originally on the calling thread), we had to take the extra effort to run it on a background worker.

With more reflection and further work with Reactive, we can simplify the process quite a bit here by using the Observable.Generate (or in earlier builds Observable.GenerateWithTime) to create an observable that we expose to our subscribers. The subscriber is then responsible for subscribing and disposing itself from our centralized observable. Our revised implementation to generate random sensor readings at random time intervals is as follows:


Public Class Sensor

    Private _sensorObservable As IObservable(Of SensorInfo)

    Public Sub New()
        _sensorObservable =
            Observable.Generate(Of Int16, SensorInfo)(initialState:=0,
                condition:=Function(x) True,
                iterate:=Function(inVal) inVal,
                resultSelector:=Function(x)
                                    Dim randValue = New Random(Date.Now.Millisecond).NextDouble
                                    Return New SensorInfo With {.SensorType = CInt(randValue * 4).ToString,
                                                                .SensorValue = randValue * 20,
                                                                .TimeStamp = Now}
                                End Function,
                timeSelector:=Function(x) TimeSpan.FromMilliseconds(New Random(Date.Now.Millisecond).NextDouble * 100)
            )
    End Sub

    Public ReadOnly Property SensorObservable As IObservable(Of SensorInfo)
        Get
            Return _sensorObservable
        End Get
    End Property

End Class

In new implementation we use the Obserable.Generate that we discussed when mocking the phone accelerometer. We do still need to worry about our threads because the timeSelector runs on a background thread automatically. As a result, our observables are being generated on a background thread.

When we subscribe to this, we first create a shared instance of the Sensor class and then Rx queries subscribe to the sensor’s SensorObservable property:


Dim AnySensor = sensor.SensorObservable.Any(
                  Function(s) s.SensorValue > 17)
AnySensor.Subscribe(Sub(s) MessageBox.Show(s.ToString, "OutOfRange"))

If you want to remove a subscription, keep a handle on the disposable that was returned when you subscribed and dispose it to stop listening.


Private ActiveLowValueSensors As IDisposable

Private Sub FilterLowValue_Click() Handles FilterLowValue.Click
    If ActiveLowValueSensors Is Nothing Then
        Dim lowValueSensors = From s In sensor.SensorObservable
                              Where s.SensorValue < 3
                              Select s.SensorValue

        ActiveLowValueSensors = lowValueSensors.Subscribe(
           Function(val) Console.WriteLine(val))
    Else
        ActiveLowValueSensors.Dispose()
        ActiveLowValueSensors = Nothing
    End If

End Sub

I've updated the WPF samples with this change so you can take it out for a spin if you would like. As always, let me know what you Thinq.

Posted on - Comment
Categories: Rx - VB Dev Center -
comments powered by Disqus