When I started looking at the Reactive Framework, one of the first things I did was to try creating some of the same standard LINQ queries that I’ve used against LINQ to Objects:
Dim unsorted = From s In Sensor Where s.SensorType = "2" Order By s.SensorValue Select s
If you try this where Sensor is an IObservable rather than IEnumerable, you will find that the Order By clause generates the following compiler error in VB: Definition of Method OrderBy is not accessible in this context. C# generates a similar but different error: “Could not find an implementation of the query pattern for source type IObservable<T>. OrderBy not found.” Essentially, the compiler is telling you that there isn’t an extension method called OrderBy that extends IObservable. Did the reactive team make a mistake and forget to implement sorting? Far from it.
Let’s consider the uses of the standard query operators over a data source where you don’t necessarily know when the source ends. “From” doesn’t really exist, it’s just a query holder for identifying the local variable name (s) used later in the query and the source of the data (Sensor).
With “Where”, we are filtering the results. We can filter results over an ongoing stream without needing to know when the stream will end. As a result, filtering isn’t much of an issue.
Similarly, “Select” simply takes the input type and transforms it into another type. This is commonly referred to as a Projection. Since projections work equally well over data streams, we are fine implementing that in Reactive.
Sorting on the other hand is a bit more problematic. Consider the case where we process the following values: 1, 2, 4, 3, 5. It’s not difficult to sort these values and return them. However, what would happen to our sort if the next value that was sent was 0? We would need to reevaluate the entire result set and inject our new value before the first value that came in. In dealing with continuous event streams, we have no way of knowing whether the next value we are going to receive will need to be inserted prior to other results.
As a result, we need to partition the sets of data we receive if we need to sort these values so that we can be assured of knowing when the last value is received from this set. The Reactive Framework supports a number of partitioning methods, including BufferWithTime, BufferWithCount, and BufferWithTimeOrCount. With these methods, we can partition our streams into pre-determined chunks based on a timespan, and/or item count. The result is a new stream of IObserverable objects that contain an IList of the original data type. In the case of our Sensors, we can partition our result sets as follows:
Dim segmented = Sensor.BufferWithTime(TimeSpan.FromSeconds(3))
This creates a variable of type IObservable(Of IList(Of SensorInfo)). If we wanted, we could then display the sorted values in the partitioned lists as follows:
segmented.Subscribe(Sub(val) FilteredList.ItemsSource = From v In val Order By v.SensorValue)
As you can see, you CAN sort values using the Reactive Framework using partitioning schemes, but it doesn’t make as much sense over data streams as it does with IEnumerable data sources typically encountered with LINQ.