Revising the Reactive Sensor Generator
When first created the Reactive sensor sample I wasn't completely happy with it because if any of the various subscribers were disposed or triggered an OnCompleted (like the Any) clause, it would trigger a completed state to all of the other “listeners”. This is not what I intended. To make it easy, let’s review how we created the sensor originally:
Private _observers As New List(Of IObserver(Of SensorInfo))
Private _running As Boolean
Public Function Subscribe(ByVal observer As System.IObserver(Of SensorInfo))
As System.IDisposable
Implements System.IObservable(Of SensorInfo).Subscribe
_observers.Add(observer)
Return Me
End Function
Public Sub StartSensor()
If Not _running Then
_running = True
Dim randomizer = New Random(Date.Now.Millisecond)
While _running
Dim randVal = randomizer.NextDouble
If _observers.Any Then
Dim info As New SensorInfo With {.SensorType = CInt(randVal * 4).ToString,
.SensorValue = randVal * 20,
.TimeStamp = Now}
_observers.ForEach(Sub(o) o.OnNext(info))
End If
Threading.Thread.Sleep(CInt(randomizer.NextDouble * 500))
End While
End If
End Sub
For a quick review, we set up an internal list of observers and manually added new subscribers to this list. We didn’t have a good way of removing them from the list however through the typical dispose implementation. Then as we are looping in the while loop, we generate a new sensor reading and announce it to each of the listeners through the _observers.ForEach(0 => o.OnNext(info)). Because this was constantly running (originally on the calling thread), we had to take the extra effort to run it on a background worker.
With more reflection and further work with Reactive, we can simplify the process quite a bit here by using the Observable.Generate (or in earlier builds Observable.GenerateWithTime) to create an observable that we expose to our subscribers. The subscriber is then responsible for subscribing and disposing itself from our centralized observable. Our revised implementation to generate random sensor readings at random time intervals is as follows:
Public Class Sensor
Private _sensorObservable As IObservable(Of SensorInfo)
Public Sub New()
_sensorObservable =
Observable.Generate(Of Int16, SensorInfo)(initialState:=0,
condition:=Function(x) True,
iterate:=Function(inVal) inVal,
resultSelector:=Function(x)
Dim randValue = New Random(Date.Now.Millisecond).NextDouble
Return New SensorInfo With {.SensorType = CInt(randValue * 4).ToString,
.SensorValue = randValue * 20,
.TimeStamp = Now}
End Function,
timeSelector:=Function(x) TimeSpan.FromMilliseconds(New Random(Date.Now.Millisecond).NextDouble * 100)
)
End Sub
Public ReadOnly Property SensorObservable As IObservable(Of SensorInfo)
Get
Return _sensorObservable
End Get
End Property
End Class
In new implementation we use the Obserable.Generate that we discussed when mocking the phone accelerometer. We do still need to worry about our threads because the timeSelector runs on a background thread automatically. As a result, our observables are being generated on a background thread.
When we subscribe to this, we first create a shared instance of the Sensor class and then Rx queries subscribe to the sensor’s SensorObservable property:
Dim AnySensor = sensor.SensorObservable.Any(
Function(s) s.SensorValue > 17)
AnySensor.Subscribe(Sub(s) MessageBox.Show(s.ToString, "OutOfRange"))
If you want to remove a subscription, keep a handle on the disposable that was returned when you subscribed and dispose it to stop listening.
Private ActiveLowValueSensors As IDisposable
Private Sub FilterLowValue_Click() Handles FilterLowValue.Click
If ActiveLowValueSensors Is Nothing Then
Dim lowValueSensors = From s In sensor.SensorObservable
Where s.SensorValue < 3
Select s.SensorValue
ActiveLowValueSensors = lowValueSensors.Subscribe(
Function(val) Console.WriteLine(val))
Else
ActiveLowValueSensors.Dispose()
ActiveLowValueSensors = Nothing
End If
End Sub
I've updated the WPF samples with this change so you can take it out for a spin if you would like. As always, let me know what you Thinq.