Reactive Framework as a Background Worker by ThinqLinq

Reactive Framework as a Background Worker

In this introduction to the Reactive Framework series, we’ve spent a bit of time setting up our Observable and Observers and wiring them up. If you haven’t been following along, here’s links to the previous posts:

So far, our observers can listen to our sensor, but it turns out, we can’t know about it because everything is happening on the main worker thread. Because the thread is continually processing, the UI locks us out of seeing the updates. In order to solve this, we need to run our sensor on a secondary thread.

With Reactive Framework, we often talk about “Hot” and “Cold” observables. Hot observables are ones which are running independently of the subscription. Cold observables are ones where the process starts when you subscribe to it. In our case, we’re simulating an ongoing sensor that we are connecting many observers to. In this case, we are dealing with a “Hot” observable. As a result, we’ll explicitly manage the sensor using the BackgroundWorker object in our “Start” button handler:


        Dim worker As New BackgroundWorker
        AddHandler worker.DoWork, Sub(s As Object, ars As DoWorkEventArgs)
                                      Sensor.StartSensor()
                                  End Sub
        worker.RunWorkerAsync(Sensor)

Now, when we run our sample and output our results using Console.WriteLine, we see our results and we can continue to click on other buttons in our application. However, if we try to output the results to our user interface, we see the following exception:

     InvalidOperationException: The calling thread cannot access this object because a different thread owns it.

If you’ve ever worked with background threads in Windows Forms, WPF or Silverlight, you should recognize that you can’t access the UI thread from a background thread directly. One of the key scenarios that the Reactive Framework was designed to combat was asynchronous operations. As a result, they took great effort to make synchronizing these threads easy. Two of the extension methods on IObservable are SubscribeOn and ObserveOn. SubscribeOn is used indicate where the operations that we are subscribing to will be performed. ObserveOn is used to indicate where we want to process the results.

In our case, we need to move back to the UI thread when we process the results, thus we need to synchronize our threads when we Observe, thus we will use the ObserveOn option. To make matters easier, the Reactive team have included a special variant of the ObserveOn to synchronize it on the dispatching thread: ObserveOnDispatcher. We can alter our subscribing code as follows to make sure we observe our subscription on the UI Thread:


        Dim items = New ObservableCollection(Of Double)
        FilteredList.ItemsSource = items

        Dim TypeSensors = From s In Sensor
                       Where s.SensorType = "4"
                       Select s.SensorValue

        TypeSensors.ObserveOnDispatcher.Subscribe(
            Sub(item) items.Add(item))

To see this sensor and various observables in action, download the corresponding WPF project for this series.

Posted on - Comment
Categories: VB Dev Center - Visual Studio - Rx -
comments powered by Disqus