Porting Reactive Extension Samples to RxJs by ThinqLinq

Porting Reactive Extension Samples to RxJs

In preparation for my upcoming talk at DevLink on RxJs, I decided to try porting some of my existing .Net/Silverlight samples over to the JavaScript version to see how different they would be. At this point, there is a disconnect between the .Net and .js versions and the signatures for the JavaScript methods are still lagging a bit behind. If you’re impatient and just want to see one of the samples in action, jump over to the Observable Sample to see how the sensor sample works and view code to see the code implementation.

Before we get started, let’s lay out the UI for our page. In this sample, we’re going to display 4 buttons to start a variety of RxJs queries and two Unordered Lists to display the results of two of these queries:

  <button id="btnLowValueSensors">Show Low Values</button>
  <button id="btnSelectedTypeSensors">Show Type x Values</button>
  <button id="btnWarnOutliers">Warn Outliers</button>
  <button id="btnHeartbeat">Heartbeat</button>

<div style="color: Blue; overflow-y: scroll; max-height: 300px; display: block; float: left;">
  Low Value Sensors:
  <ul id="LowValueSensors"></ul>
<div style="color: Red; overflow-y: scroll; max-height: 300px; display: block; float: right;">
  Type 2 Sensors:
  <ul id="SelectedTypeSensors"></ul>

So far, we haven’t added any reactive specific code. In order to use the RxJs, you need to download and install the latest version of the samples at Reactive Extensions for JavaScript experimental release version 1.0.2838.0(2010-12-20). As they become available, you can download newer versions at http://msdn.microsoft.com/en-us/data/gg577610. Once you have the samples installed, copy the .js files into your project and include a script reference in your page to the rx.js file:

<script type="text/javascript" src="Scripts/rx.js"></script>

With the script reference in place, we can now start to implement some RxJs goodness. In this sample, I’m going to redo the observable sensor sample that we updated in the last post. If you remember in C#, we can use the Observable.Generate method to generate some random values. In JavaScript, we can do the same thing with a slightly difference syntax. All of the RxJs Observable factory methods need to start with the Rx “Namespace” alias. Thus the time based Generate method can be implemented using Rx.Observable.GenerateWithTime as follows:

<script type="text/javascript">

 var sensor;
 var lowValueSubsription;

 $(function () {
     sensor = Rx.Observable.GenerateWithTime(0,
                  function (i) { return true; },
                  function (i) { return i; },
                  function (i) { return { type: Math.floor(Math.random() * 4), 
                                          value: Math.random() * 20, 
                                          timeStamp: new Date() }; },
                  function (i) { return Math.random() * 200; }

While the parameters look different, they essentially function the same. Instead of using the C# Lambda syntax “(x) =>” we use the JavaScript inline function syntax: function(i) { return i; }. The first three parameters supply the logic of initializing, setting the range, and incrementing the value for a “for” or “while” loop. In this case, we’ll just generate values constantly (return true) and not worry about the initial value (0) or the increment (return i). On each iteration, we’ll generate a new JSON object rather than anonymous type. Finally, we’ll tell the observable to delay for some random time between 0 and 200 milliseconds.

With this in place, we can now start querying and subscribing to the observable source similar to how we did in VB or C# with the notable exception that we can’t use the LINQ query comprehension syntax and are forced to use the method syntax with JavaScript inline functions. For example, to add sensor values to our SelectedTypeSensors unordered list, but filter them to only those where the type is “2” we can do something like the following:

$("#btnSelectedTypeSensors").click(function (e) {
    var query = sensor.Where(function (next) { return (next.type == "2"); });
    query.Subscribe(function (next) { 
                        + " - " + next.type 
                        + ": " + next.value)

The first line uses JQuery to wire an click event handler up to the btnSelectedTypeSensors button. The next line illustrates using Rx to filter our sensor values using the Observable.Where method of RxJs. The last several lines perform the equivalent of Console.WriteLine in our old sample by subscribing to the observable sensor passing the instance of the random generated values from our observable as a function parameter called “next” (like other lambda parameters, the name doesn’t matter here as long as you are consistent inside of your function implementation.) We then create a new List Item (li) and append the string containing the generated timestamp, type and value from the JavaScript object) and the append this new list item to the unordered SelectedTypeSensors list using JQuery. That’s a lot of power in a fairly small space.

Just like we do in regular Rx, we need to be able to not only subscribe to our observables, but unsubscribe as well. For consistency, when we subscribe to the observable, we get access to a disposable object which we hold onto. When we call dispose on it, we stop listening. Here’s the RxJs implementation of our low value sensor filter which manages the disposing of our resources:

$("#btnLowValueSensors").click(function (e) {
   if (lowValueSubsription) {
      lowValueSubsription = null;
   else {
      var query = sensor
                  .Where(function (s) { return (s.value < 2) });
      lowValueSubsription = query.Subscribe(function (next) {
         $("<li/>").html(next.timeStamp + " - " 
                         + next.type + ": " 
                         + next.value).appendTo("#LowValueSensors");

In this case, we check to see if the lowValueSubscription is already subscribed to anything. If it is, we dispose it, otherwise we create a query to filter the random values to anything where the generated value is less than 2. Like the previous example, we generate list items with the filtered values and append them to the LowValueSensors list.

In the previous sample, we were able to display a message box when values exceed some level. We short-circuited the request by using the Any extension method which doesn’t exist in the RxJs implementation at this point. To replace this, we can use Observable.Take and only take the first value. We then display a pop-up box using the Javascript alert function.

$("#btnWarnOutliers").click(function (e) {
  .Where(function (next) { return next.value > 19; })
  .Subscribe(function (next) {
     alert(next.value + " Happened");

The last query in the sensor samples should be straightforward by now with the exception that we don’t pass a TimeSpan object to the BufferWithTime, but rather just pass the number of milliseconds that we want to buffer. Thus to show the number of Type “2” values that occur every 3 seconds, we can do the following:

$("#btnHeartbeat").click(function (e) {
  var heartbeat = sensor
    .Where(function (next) { return next.type = "2"; })

  heartbeat.Subscribe(function (next) {
     var typesPerSecond = next.length;

Working with RxJs can be a bit trickier than the .Net equivalent version, in part because the intellisense and variable names are restricted at this point to make the library a small download. Currently the Rx.js file is 30K, which although fairly small, is a consideration to keep in mind when using the library over slow network connections. The nice thing is once you’ve gotten your head twisted around into thinking functionally declarative rather than imperative, making the transition from Rx.Net to RxJs becomes a relatively painless experience (so far). If you want to try out this sample or check out the final code, try it out at http://www.thinqlinq.com/observablesensor.htm.

Posted on - Comment
Categories: Rx) -
comments powered by Disqus