LINQ and Related Topics

Cancelling a Reactive Extensions Observable

I’m often asked how do you cancel an Observable. In previous posts, I’ve shown how to stop observing by disposing the result of the subscription to the Observable. Of course, this isn’t the only way. If you are using one of the Observable.Generate methods, you can pass in a pointer to an object (like the System.Threading.CancellationTokenSource) and change a flag on that object, then in the iterate lambda function, reference that object to see if the cancellation flag was set. Here’s an example:


    Private WithEvents ts As New System.Threading.CancellationTokenSource

    Protected Overrides Sub OnInitialized(e As System.EventArgs)
        MyBase.OnInitialized(e)

        Dim items As New ObservableCollection(Of Integer)
        ItemsListbox.ItemsSource = items

        Dim cancelClicked = Observable.FromEventPattern(Of RoutedEventArgs)(Cancel, "Click")

        Dim obs = Observable.Generate(0,
                                      Function(x) Not ts.IsCancellationRequested,
                                      Function(index) index + 1,
                                      Function(index) index,
                                      Function(index) TimeSpan.FromSeconds(1))

        obs.TakeUntil(cancelClicked).
            ObserveOnDispatcher().
            Subscribe(Sub(item)
                          items.Add(item)
                      End Sub)
    End Sub

    Private Sub Cancel_Click(sender As System.Object, e As System.Windows.RoutedEventArgs) Handles Cancel.Click
        ts.Cancel()
    End Sub

In this example, we’re setting up a class level variable (ts) as a CancellationTokenSource. When we create our observable using Generate, the second parameter evaluates whether or not it should continue iterating. By checking the ts.IsCancellationRequested, we will evaluate that each time we iterate. Because it is a module level variable, we can cancel it by calling Cancel() in the Cancel Button click event handler.

As another alternative, we can convert the Cancel click into an observable collection as well by using the Observable.FromEventPattern. Then on the main observable, join it with the button observable using TakeUntil as follows:


    Protected Overrides Sub OnInitialized(e As System.EventArgs)
        MyBase.OnInitialized(e)

        Dim items As New ObservableCollection(Of Integer)
        ItemsListbox.ItemsSource = items

        Dim cancelClicked = Observable.FromEventPattern(Of RoutedEventArgs)(Cancel, "Click")

        Dim obs = Observable.Generate(0,
                                      Function(x) True,
                                      Function(index) index + 1,
                                      Function(index) index,
                                      Function(index) TimeSpan.FromSeconds(1))

        obs.TakeUntil(cancelClicked).
            ObserveOnDispatcher().
            Subscribe(Sub(item)
                          items.Add(item)
                      End Sub)
    End Sub

Do you have another favorite alternative for cancelling an Observable, let me know.

Posted on 7/21/2011 9:22:00 AM - Comments(1)
Categories: Rx VB Dev Center

Porting Reactive Extension Samples to RxJs

In preparation for my upcoming talk at DevLink on RxJs, I decided to try porting some of my existing .Net/Silverlight samples over to the JavaScript version to see how different they would be. At this point, there is a disconnect between the .Net and .js versions and the signatures for the JavaScript methods are still lagging a bit behind. If you’re impatient and just want to see one of the samples in action, jump over to the Observable Sample to see how the sensor sample works and view code to see the code implementation.

Before we get started, let’s lay out the UI for our page. In this sample, we’re going to display 4 buttons to start a variety of RxJs queries and two Unordered Lists to display the results of two of these queries:


<p>
  <button id="btnLowValueSensors">Show Low Values</button>
  <button id="btnSelectedTypeSensors">Show Type x Values</button>
  <button id="btnWarnOutliers">Warn Outliers</button>
  <button id="btnHeartbeat">Heartbeat</button>
</p>

<div style="color: Blue; overflow-y: scroll; max-height: 300px; display: block; float: left;">
  Low Value Sensors:
  <ul id="LowValueSensors"></ul>
</div>
<div style="color: Red; overflow-y: scroll; max-height: 300px; display: block; float: right;">
  Type 2 Sensors:
  <ul id="SelectedTypeSensors"></ul>
</div>

So far, we haven’t added any reactive specific code. In order to use the RxJs, you need to download and install the latest version of the samples at Reactive Extensions for JavaScript experimental release version 1.0.2838.0(2010-12-20). As they become available, you can download newer versions at http://msdn.microsoft.com/en-us/data/gg577610. Once you have the samples installed, copy the .js files into your project and include a script reference in your page to the rx.js file:


<script type="text/javascript" src="Scripts/rx.js"></script>

With the script reference in place, we can now start to implement some RxJs goodness. In this sample, I’m going to redo the observable sensor sample that we updated in the last post. If you remember in C#, we can use the Observable.Generate method to generate some random values. In JavaScript, we can do the same thing with a slightly difference syntax. All of the RxJs Observable factory methods need to start with the Rx “Namespace” alias. Thus the time based Generate method can be implemented using Rx.Observable.GenerateWithTime as follows:


<script type="text/javascript">

 var sensor;
 var lowValueSubsription;


 $(function () {
     sensor = Rx.Observable.GenerateWithTime(0,
                  function (i) { return true; },
                  function (i) { return i; },
                  function (i) { return { type: Math.floor(Math.random() * 4), 
                                          value: Math.random() * 20, 
                                          timeStamp: new Date() }; },
                  function (i) { return Math.random() * 200; }
     );
  });

While the parameters look different, they essentially function the same. Instead of using the C# Lambda syntax “(x) =>” we use the JavaScript inline function syntax: function(i) { return i; }. The first three parameters supply the logic of initializing, setting the range, and incrementing the value for a “for” or “while” loop. In this case, we’ll just generate values constantly (return true) and not worry about the initial value (0) or the increment (return i). On each iteration, we’ll generate a new JSON object rather than anonymous type. Finally, we’ll tell the observable to delay for some random time between 0 and 200 milliseconds.

With this in place, we can now start querying and subscribing to the observable source similar to how we did in VB or C# with the notable exception that we can’t use the LINQ query comprehension syntax and are forced to use the method syntax with JavaScript inline functions. For example, to add sensor values to our SelectedTypeSensors unordered list, but filter them to only those where the type is “2” we can do something like the following:


$("#btnSelectedTypeSensors").click(function (e) {
    var query = sensor.Where(function (next) { return (next.type == "2"); });
    query.Subscribe(function (next) { 
        $("<li/>").html(next.timeStamp 
                        + " - " + next.type 
                        + ": " + next.value)
                  .appendTo("#SelectedTypeSensors");
    });
  });

The first line uses JQuery to wire an click event handler up to the btnSelectedTypeSensors button. The next line illustrates using Rx to filter our sensor values using the Observable.Where method of RxJs. The last several lines perform the equivalent of Console.WriteLine in our old sample by subscribing to the observable sensor passing the instance of the random generated values from our observable as a function parameter called “next” (like other lambda parameters, the name doesn’t matter here as long as you are consistent inside of your function implementation.) We then create a new List Item (li) and append the string containing the generated timestamp, type and value from the JavaScript object) and the append this new list item to the unordered SelectedTypeSensors list using JQuery. That’s a lot of power in a fairly small space.

Just like we do in regular Rx, we need to be able to not only subscribe to our observables, but unsubscribe as well. For consistency, when we subscribe to the observable, we get access to a disposable object which we hold onto. When we call dispose on it, we stop listening. Here’s the RxJs implementation of our low value sensor filter which manages the disposing of our resources:


$("#btnLowValueSensors").click(function (e) {
   if (lowValueSubsription) {
      lowValueSubsription.Dispose();
      lowValueSubsription = null;
   }
   else {
      var query = sensor
                  .Where(function (s) { return (s.value < 2) });
      lowValueSubsription = query.Subscribe(function (next) {
         $("<li/>").html(next.timeStamp + " - " 
                         + next.type + ": " 
                         + next.value).appendTo("#LowValueSensors");
      });
   };
});

In this case, we check to see if the lowValueSubscription is already subscribed to anything. If it is, we dispose it, otherwise we create a query to filter the random values to anything where the generated value is less than 2. Like the previous example, we generate list items with the filtered values and append them to the LowValueSensors list.

In the previous sample, we were able to display a message box when values exceed some level. We short-circuited the request by using the Any extension method which doesn’t exist in the RxJs implementation at this point. To replace this, we can use Observable.Take and only take the first value. We then display a pop-up box using the Javascript alert function.


$("#btnWarnOutliers").click(function (e) {
  sensor
  .Where(function (next) { return next.value > 19; })
  .Take(1) 
  .Subscribe(function (next) {
     alert(next.value + " Happened");
  });
});

The last query in the sensor samples should be straightforward by now with the exception that we don’t pass a TimeSpan object to the BufferWithTime, but rather just pass the number of milliseconds that we want to buffer. Thus to show the number of Type “2” values that occur every 3 seconds, we can do the following:


$("#btnHeartbeat").click(function (e) {
  var heartbeat = sensor
    .Where(function (next) { return next.type = "2"; })
    .BufferWithTime(3000);

  heartbeat.Subscribe(function (next) {
     var typesPerSecond = next.length;
     $("#btnHeartbeat").html(typesPerSecond);
  });
});

Working with RxJs can be a bit trickier than the .Net equivalent version, in part because the intellisense and variable names are restricted at this point to make the library a small download. Currently the Rx.js file is 30K, which although fairly small, is a consideration to keep in mind when using the library over slow network connections. The nice thing is once you’ve gotten your head twisted around into thinking functionally declarative rather than imperative, making the transition from Rx.Net to RxJs becomes a relatively painless experience (so far). If you want to try out this sample or check out the final code, try it out at http://www.thinqlinq.com/observablesensor.htm.

Posted on 6/16/2011 4:33:00 PM - Comments(4)
Categories: Rx

Revising the Reactive Sensor Generator

When first created the Reactive sensor sample I wasn't completely happy with it because if any of the various subscribers were disposed or triggered an OnCompleted (like the Any) clause, it would trigger a completed state to all of the other “listeners”. This is not what I intended.  To make it easy, let’s review how we created the sensor originally:


    Private _observers As New List(Of IObserver(Of SensorInfo))
    Private _running As Boolean

    Public Function Subscribe(ByVal observer As System.IObserver(Of SensorInfo)) 
                              As System.IDisposable 
                              Implements System.IObservable(Of SensorInfo).Subscribe
        _observers.Add(observer)
        Return Me
    End Function

    Public Sub StartSensor()
        If Not _running Then
            _running = True
            Dim randomizer = New Random(Date.Now.Millisecond)
            While _running
                Dim randVal = randomizer.NextDouble
                If _observers.Any Then
                    Dim info As New SensorInfo With {.SensorType = CInt(randVal * 4).ToString,
                                                     .SensorValue = randVal * 20,
                                                     .TimeStamp = Now}

                    _observers.ForEach(Sub(o) o.OnNext(info))
                End If
                Threading.Thread.Sleep(CInt(randomizer.NextDouble * 500))
            End While
        End If
    End Sub

For a quick review, we set up an internal list of observers and manually added new subscribers to this list. We didn’t have a good way of removing them from the list however through the typical dispose implementation. Then as we are looping in the while loop, we generate a new sensor reading and announce it to each of the listeners through the _observers.ForEach(0 => o.OnNext(info)). Because this was constantly running (originally on the calling thread), we had to take the extra effort to run it on a background worker.

With more reflection and further work with Reactive, we can simplify the process quite a bit here by using the Observable.Generate (or in earlier builds Observable.GenerateWithTime) to create an observable that we expose to our subscribers. The subscriber is then responsible for subscribing and disposing itself from our centralized observable. Our revised implementation to generate random sensor readings at random time intervals is as follows:


Public Class Sensor

    Private _sensorObservable As IObservable(Of SensorInfo)

    Public Sub New()
        _sensorObservable =
            Observable.Generate(Of Int16, SensorInfo)(initialState:=0,
                condition:=Function(x) True,
                iterate:=Function(inVal) inVal,
                resultSelector:=Function(x)
                                    Dim randValue = New Random(Date.Now.Millisecond).NextDouble
                                    Return New SensorInfo With {.SensorType = CInt(randValue * 4).ToString,
                                                                .SensorValue = randValue * 20,
                                                                .TimeStamp = Now}
                                End Function,
                timeSelector:=Function(x) TimeSpan.FromMilliseconds(New Random(Date.Now.Millisecond).NextDouble * 100)
            )
    End Sub

    Public ReadOnly Property SensorObservable As IObservable(Of SensorInfo)
        Get
            Return _sensorObservable
        End Get
    End Property

End Class

In new implementation we use the Obserable.Generate that we discussed when mocking the phone accelerometer. We do still need to worry about our threads because the timeSelector runs on a background thread automatically. As a result, our observables are being generated on a background thread.

When we subscribe to this, we first create a shared instance of the Sensor class and then Rx queries subscribe to the sensor’s SensorObservable property:


Dim AnySensor = sensor.SensorObservable.Any(
                  Function(s) s.SensorValue > 17)
AnySensor.Subscribe(Sub(s) MessageBox.Show(s.ToString, "OutOfRange"))

If you want to remove a subscription, keep a handle on the disposable that was returned when you subscribed and dispose it to stop listening.


Private ActiveLowValueSensors As IDisposable

Private Sub FilterLowValue_Click() Handles FilterLowValue.Click
    If ActiveLowValueSensors Is Nothing Then
        Dim lowValueSensors = From s In sensor.SensorObservable
                              Where s.SensorValue < 3
                              Select s.SensorValue

        ActiveLowValueSensors = lowValueSensors.Subscribe(
           Function(val) Console.WriteLine(val))
    Else
        ActiveLowValueSensors.Dispose()
        ActiveLowValueSensors = Nothing
    End If

End Sub

I've updated the WPF samples with this change so you can take it out for a spin if you would like. As always, let me know what you Thinq.

Posted on 6/8/2011 3:52:00 PM - Comments(3)
Categories: Rx VB Dev Center

Updating Reactive Samples to 10425 build

Today, I decided to take the plunge and update my WPF and Silverlight Reactive Extensions samples to the latest (at the time of this writing) build of Rx: version 1.1.10425.0. At this point, I have purposely left the phone samples on the blog targeting the original shipping bits, so they aren’t affected at this point.

(ED: the 1.0.10605 stable and experimental releases shipped as of 6/5/2011. Looking at the release notes, this update appears to have much less breaking changes beyond those in the 1.1.10425 build).

As discussed in the Rx Forum post, this update has quite a number of breaking changes. Lee Campbell has a nice post discussing a number of these changes. If you don’t care about the changes and just want to download the revised samples, head on over the Files page and try them out:

  • RX_Wpf
    Reactive Framework samples from the "Becoming an RxPusher with the Reactive Framework" talk. With the emergence of LINQ, we discovered the power and flexibility that comes from the IEnumerable interface. This pull model makes iterating over sets of data and performing filtering, transformation, and aggregation operations easy through LINQ. However, the pull model breaks down in asynchronous and event driven environments. In evaluating the options, we discovered that the IObserverable interface and the push model were effectively analogous to the pull model of IEnumerable. As a result, we can make event driven asynchronous programming easier and more declarative by using the Reactive Framework and LINQ to Events.

    (Uploaded on 6/8/2011 - File Size 183442)

  • RX_Silverlight
    Slides and demos for using the Reactive Framework in Silverlight 4.

To help you update your existing projects, I figured I would share a bit of my experience with this update.

First of all, remove the existing references to the old builds of RX, including System.CoreEx, System.Interactive, and System.Reactive. Next download the new build either by downloading it directly from the MSDN Data Development center, or by using the Nuget Package Manager and installing the Rx-Main package. You do want to be careful here, because the older 1.0.2856.0 build is still available via Nuget as well. There are some cases where you might need access to the older build, particularly in cases where you need access to the IEnumerable extenstions that were added to complement the new ones in IObservable, like .Do. Also, note that the RxJs library is not currently updated to agree with the new method names, so you will need the older packages for RxJs support as well.

Once you have the new package downloaded and installed, add a reference to the System.Reactive assembly in your solution.

You won’t need to remove imports or using clauses from your classes because the older builds of Rx built the extensions directly on top of the System.Linq namespace. In the new builds, the LINQ extensions are now in the System.Reactive.Linq namespace, so you will need to add an Imports/using clause to System.Reactive.Linq. If you used the ObserveOnDispatcher/SubscribeOnDispatcher methods, these have been replaced by the simpler ObserveOn and SubscribeOn. As a result, you may want to add an Import/using clause for System.Threading as well so that we can access the SynchronizationContext more directly. With that in place, change your calls to .ObserveOnDispatcher as follows:


ObserveOn(SynchronizationContext.Current)

Next, we need to update some of the Observable factory implementations that we have used to create observables. The Observable.FromEvent method has changed to more closely align with Observable.FromAsyncPattern and now uses “FromEventPattern”. For example, the Mouse drag-drop example that we discussed in this post, now starts out as follows:


Dim mouseDown = From evt In Observable.FromEventPattern(Of MouseButtonEventArgs)(image, "MouseDown")
                Select evt.EventArgs.GetPosition(image)
Dim mouseUp = Observable.FromEventPattern(Of MouseButtonEventArgs)(image, "MouseUp")
Dim mouseMove = From evt In Observable.FromEventPattern(Of MouseEventArgs)(image, "MouseMove")
                Select evt.EventArgs.GetPosition(Me)

Similarly, the Observable.GenerateWithTime method has now been refined to the more generalized Observable.Generate and uses a TimeSpan override to specify the time interval.

Another such simplification was made to the Buffering and Windowing operations that we used in this post, including BufferWithTime, BufferWithCount, WindowWithTime and WindowWithCount. Now we just have a common Buffer operator that returns an IObservable<IList<T>> and Window which returns an IObservable<IObservable<T>> implementation. We can use the overload resolution to determine if we are passing in a timespan or integer and use the appropiate flavor of Buffer and Window as necessary. As a result, we can change our sorting code to the following:


Dim segmented = Sensor.Buffer(TimeSpan.FromSeconds(3))
segmented.Subscribe(Sub(val)
FilteredList.ItemsSource = From v In val 
                           Order By v.SensorValue) 

One last item that I needed to change was to restore the Do extension method on IEnumerable because I have come to love that function. Fortunately, implementing Do is relatively easy. Unfortunately, the implementation relies on iterators, so we need to put that in a C# project. Here’s the definition of the new Do extension method:


public static class IEnumerableEx
{
   public static IEnumerable<T> Do<T>(this IEnumerable<T> source, Action<T> action)
   {
        foreach (T item in source)
        {
            action(item);
            yield return item;
        }
    }
}

I’m sure you may run into other issues when updating your projects due to the breaking changes. Refer to the forum post above if you need help on your particular issue.

Posted on 6/8/2011 2:48:00 PM - Comments(2)
Categories: VB Dev Center Rx

Disambiguate LINQ and Lync with Skype

If you’ve been following along this little blog experiment, you probably know that I relish in all things LINQ. More and more often, I’m finding that when I tell people I love LINQ and wrote a book on it, they start to ask me if I can help them migrate their communication server to Lync. I’m dumbfounded when the confusion occurs in people who should know better (.Net development managers).

It’s not  completely their fault. The fault lies in the Microsoft marketing and branding teams that aren’t aware of the breadth of their product offerings in other areas. As a result, we have a situation where two unrelated products have the same name (or are pronounced the same in English). Like many of you out there, I’m often saddened when I see Microsoft marketing take a really cool name (Avalon, Indigo, Astoria) and distort it into something dry, boring, and utterly unsexy. Don’t even get me started on how they changed my “Become an Rx Pusher for WP7” title from Mix into “Rx: A Library for Managing Asynchronous Data and Events in your Windows Phone 7 Application”. Now, they’ve neutered the kinq of LINQ by creating another product with the same name.

If' they’re smart, Microsoft should leverage their recent 8.5 billion purchase and resolve this travesty by rebranding Lync as the new Microsoft Skype Unified Communication Client (Mi-Succ).

Do you have a better option, leave a comment and let me know what you thync. Or if you prefer, skype me with Mi-Succ (Skype).

Posted on 5/16/2011 10:06:00 PM - Comments(0)
Categories: