LINQ and Related Topics

Reactive Framework Learning Resources for RxNet and RxJS

As I present more about the Reactive Framework, I often get people asking where they can learn more about it. Here are some resources that I’ve found useful:

Of course there’s nothing that beats learning by doing, so go out and try the bits yourself. Don’t be surprised when you hit a wall, but that’s when the real learning starts, trying to figure out how to overcome the challenges.

Is there a resource that I’m missing here that’s helped you? I’m happy to add it.

Posted on 8/25/2010 9:04:00 PM - Comments(0)
Categories: Rx

Reactive Framework Sorting it out

When I started looking at the Reactive Framework, one of the first things I did was to try creating some of the same standard LINQ queries that I’ve used against LINQ to Objects:


        Dim unsorted = From s In Sensor
                       Where s.SensorType = "2"
                       Order By s.SensorValue
                       Select s

If you try this where Sensor is an IObservable rather than IEnumerable, you will find that the Order By clause generates the following compiler error in VB: Definition of Method OrderBy is not accessible in this context. C# generates a similar but different error: “Could not find an implementation of the query pattern for source type IObservable<T>. OrderBy not found.” Essentially, the compiler is telling you that there isn’t an extension method called OrderBy that extends IObservable. Did the reactive team make a mistake and forget to implement sorting? Far from it.

Let’s consider the uses of the standard query operators  over a data source where you don’t necessarily know when the source ends. “From” doesn’t really exist, it’s just a query holder for identifying the local variable name (s) used later in the query and the source of the data (Sensor).

With “Where”, we are filtering the results. We can filter results over an ongoing stream without needing to know when the stream will end. As a result, filtering isn’t much of an issue.

Similarly, “Select” simply takes the input type and transforms it into another type. This is commonly referred to as a Projection. Since projections work equally well over data streams, we are fine implementing that in Reactive.

Sorting on the other hand is a bit more problematic. Consider the case where we process the following values: 1, 2, 4, 3, 5. It’s not difficult to sort these values and return them. However, what would happen to our sort if the next value that was sent was 0? We would need to reevaluate the entire result set and inject our new value before the first value that came in. In dealing with continuous event streams, we have no way of knowing whether the next value we are going to receive will need to be inserted prior to other results.

As a result, we need to partition the sets of data we receive if we need to sort these values so that we can be assured of knowing when the last value is received from this set. The Reactive Framework supports a number of partitioning methods, including BufferWithTime, BufferWithCount, and BufferWithTimeOrCount. With these methods, we can partition our streams into pre-determined chunks based on a timespan, and/or item count. The result is a new stream of IObserverable objects that contain an IList of the original data type. In the case of our Sensors, we can partition our result sets as follows:


Dim segmented = Sensor.BufferWithTime(TimeSpan.FromSeconds(3))

This creates a variable of type IObservable(Of IList(Of SensorInfo)). If we wanted, we could then display the sorted values in the partitioned lists as follows:


  segmented.Subscribe(Sub(val) FilteredList.ItemsSource =
                                         From v In val
                                         Order By v.SensorValue)

As you can see, you CAN sort values using the Reactive Framework using partitioning schemes, but it doesn’t make as much sense over data streams as it does with IEnumerable data sources typically encountered with LINQ.

Posted on 7/26/2010 10:45:00 PM - Comments(1)
Categories: Rx VB Dev Center

Reactive Framework Subscribing to Events

Previously in my Reactive framework series, we saw how to create and subscribe to ongoing observable objects. While there are a number of cases where you would want to create your own observable type, often you simply want to compose reactive sequences in response to events raised by other means. Recently, I came across a simple example that can  show you how easy it is to subscribe to event and add functionality through the Reactive Framework’s extension methods.

In this scenario, I needed to update a list of most recently used files in real time. Whenever a new file was added , modified or deleted from a directory, I wanted my UI list to reflect this change. I’ve long known about the FileSystemWatcher class in Windows Forms. It is able to listen for create, change, delete and modify events in a specified file path and let us know when the file changes. Using Rx, we can create an observable using the following:


   Dim createWatcher As New FileSystemWatcher With {.Path = "C:\Temp", .EnableRaisingEvents = True}
   Dim createdEvent = Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Created")

Using the Observable.FromEvent, we indicate that we want to watch for events with the specified name (Created) from the supplied instance object (createWatcher). With this observable, we can now perform other operations on the resulting events. We’ll use the “Do” method to perform an action (refreshing the file list). Before we do this action on the UI, we’ll need to make sure to synchronize back to the UI thread:


    Dim AllEvents = Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Created").
                    ObserveOnDispatcher.
                    Do(Sub(fsArgs) RefreshFileList()).
                    Subscribe

This would be fine if we only wanted to watch for the events when the file is first created. However, in a Most Recently Used (MRU) list, we want to also know when a file is changed or deleted. Rather than wiring up separate handlers for each of these events, we can use the Merge method to listen to any of these events regardless of which event handler they came from:


    Dim AllEvents = Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Created").
                    Merge(Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Changed")).
                    Merge(Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Deleted")).
                    ObserveOnDispatcher.
                    Do(Sub(fsArgs) RefreshFileList()).
                    Subscribe

One of the great things about the Reactive Framework is the ability to inject functionality into the event pipeline easily. For example, if we want to avoid responding to multiple events on the same file, we could inject the DistinctUntilChanged method as follows:


    Dim AllEvents = Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Created").
                    Merge(Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Changed")).
                    Merge(Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Deleted")).
                    DistinctUntilChanged.
                    ObserveOnDispatcher.
                    Do(Sub(fsArgs) RefreshFileList()).
                    Subscribe

Quick and easy (and elegant as well.) If you want to try this out, download RX_Wpf in the files section here and run the FileWatcher.xaml file.

Posted on 7/24/2010 10:13:00 PM - Comments(0)
Categories: VB Dev Center Rx

Reactive Framework as a Background Worker

In this introduction to the Reactive Framework series, we’ve spent a bit of time setting up our Observable and Observers and wiring them up. If you haven’t been following along, here’s links to the previous posts:

So far, our observers can listen to our sensor, but it turns out, we can’t know about it because everything is happening on the main worker thread. Because the thread is continually processing, the UI locks us out of seeing the updates. In order to solve this, we need to run our sensor on a secondary thread.

With Reactive Framework, we often talk about “Hot” and “Cold” observables. Hot observables are ones which are running independently of the subscription. Cold observables are ones where the process starts when you subscribe to it. In our case, we’re simulating an ongoing sensor that we are connecting many observers to. In this case, we are dealing with a “Hot” observable. As a result, we’ll explicitly manage the sensor using the BackgroundWorker object in our “Start” button handler:


        Dim worker As New BackgroundWorker
        AddHandler worker.DoWork, Sub(s As Object, ars As DoWorkEventArgs)
                                      Sensor.StartSensor()
                                  End Sub
        worker.RunWorkerAsync(Sensor)

Now, when we run our sample and output our results using Console.WriteLine, we see our results and we can continue to click on other buttons in our application. However, if we try to output the results to our user interface, we see the following exception:

     InvalidOperationException: The calling thread cannot access this object because a different thread owns it.

If you’ve ever worked with background threads in Windows Forms, WPF or Silverlight, you should recognize that you can’t access the UI thread from a background thread directly. One of the key scenarios that the Reactive Framework was designed to combat was asynchronous operations. As a result, they took great effort to make synchronizing these threads easy. Two of the extension methods on IObservable are SubscribeOn and ObserveOn. SubscribeOn is used indicate where the operations that we are subscribing to will be performed. ObserveOn is used to indicate where we want to process the results.

In our case, we need to move back to the UI thread when we process the results, thus we need to synchronize our threads when we Observe, thus we will use the ObserveOn option. To make matters easier, the Reactive team have included a special variant of the ObserveOn to synchronize it on the dispatching thread: ObserveOnDispatcher. We can alter our subscribing code as follows to make sure we observe our subscription on the UI Thread:


        Dim items = New ObservableCollection(Of Double)
        FilteredList.ItemsSource = items

        Dim TypeSensors = From s In Sensor
                       Where s.SensorType = "4"
                       Select s.SensorValue

        TypeSensors.ObserveOnDispatcher.Subscribe(
            Sub(item) items.Add(item))

To see this sensor and various observables in action, download the corresponding WPF project for this series.

Posted on 7/24/2010 3:19:00 PM - Comments(1)
Categories: VB Dev Center Rx VS 2010

LINQ in Action in Chinese

Linq in Action in ChineseToday, I received an unexpected surprise in the mail. A copy of LINQ in Action translated into Chinese. We were aware that someone was making a Chinese translation, but only expected it to be a couple chapters. It turns out the entire book, including the bonus chapter 14 (LINQ to Datasets) which didn't make the printed English version of the book. Hopefully nothing got lost in translation for this version. If you read Chinese, check the book out and let us know what you Thinq.

Posted on 7/10/2010 2:43:00 PM - Comments(2)
Categories: LINQ