Reactive Framework Related Posts by ThinqLinq

Reactive Translator for RxJs

For several years, I’ve been demonstrating using Reactive Extensions to handle taking user input and providing real time translations into multiple languages. This code sample has been so popular that even Soma posted an example using the Bing translator in his introducing Rx post. In most cases, they only issue a single request or issue multiple individual requests for multiple languages. To add a bit of fun to my demo, I decided to spice it up a bit and turn the list of destination languages into another observable that I can join with as well. I’ve had this sample available in my Rx downloads for some time including one for RxJs, but haven’t updated them to newer versions (curently 2.2.2) until now. I figured it was about time that I shared my version.

To begin, let’s create a UI. We’ll keep it simple to allow us to focus on the code that comes later:

    <input type="text" id="TextToTranslate" size="80" />

    <div id="output">
    </div>

As they say at Staples, “That was easy.”

Next comes the fun part. Before you can do anything though, you do need to sign up to get a free Bing App ID. I’m not going to share mine Winking smile. Once you have that, your ready to code. But what are we going to do in our code? This example combines three observable sequences: User key presses, destination languages, and service request/responses. In order to work with the observables and to make it easy to work with our DOM, we need to start by importing two JavaScript libraries: jQuery and Rx (we only need the rx.lite.js version for this example).

    <script type="text/javascript" src="Scripts/jquery-2.1.0.js"></script>
    <script type="text/javascript" src="Scripts/rx.lite.js"></script>

Of our three observables, the second one is the easiest, so let’s get it out of the way first. To turn an array of languages into an observable sequence, we use the fromArray generator on a simple JavaScript array.

var destLanguages = ["de", "es", "zh-CHT", "fr", "it", "ar", "ht", "tlh"];
var languagesObs = Rx.Observable.fromArray(destLanguages);

With that out of the way, Let’s take a look at how we are going to issue the service requests to Bing and turn them into an observable stream. We’ll use the jQuery ajax method to issue our request to the Bing translator using jsonp and pass in the required app id, destination language (to), source language (I speak American English, so ‘from’ is ‘en-us’) and the text we want to translate (text). We can then use JavaScript promise to access this invoker and use RxJs’s fromPromise to turn it into an observable stream that returns the string result.

var translate = function (text, lang) {
    var p = { appId: appId, to: lang, from: 'en-us', text: text };
    var promise = $.ajax({
        url: 'http://api.microsofttranslator.com/V2/Ajax.svc/Translate',
        data: p,
        dataType: 'jsonp',
        jsonp: 'oncomplete',
    }).promise();
    return Rx.Observable.fromPromise(promise);
};

Now that we have these two observables, we need to wire this up with the user’s key presses. Instead of going on and on about each step, here’s the code in it’s glory detail annotated with what each line is doing.

// Get the keypresses
var textInputDetected = Rx.Observable.fromEvent($("#TextToTranslate"), 'keyup')
    // Wait for 1/2 second to see if they're still typing
    .throttle(500)
    // They're done typing. Clear the last output
    .do(function (_) { $("#output").empty(); })
    // Get the input string
    .select(function () { return $("#TextToTranslate").val(); })
    // If it's the same as the last one, don't re-issue the request
    .distinctUntilChanged()
    // Merge the search string with the list of languages
   .selectMany(function (text) {
        return languagesObs.select(function (t1) { return { source: text, dest: t1 }; });
    })
    // Call the service passing the source string and destination language
   .selectMany(function (sourceDest) {
       return translate(sourceDest.source, sourceDest.dest)
            // Return the destination language and translated result from the service
            .select(function (result) { return { dest: sourceDest.dest, translation: result }; });
    });

d = textInputDetected.subscribe(function (x) {
        // Add the result to the list
        $("#output").append(x.dest + ' : ' + x.translation + '<br />');
    },
    function (err) {
        // In case of an error, show the error
        $("#output").append(err + '<br />')
    });

We start by getting the keyup events for the input textbox and turning that into an observable stream (using fromEvent). This is an expensive service operation, so we throttle the request for 1/2 of a second before continuing to let the user finish typing. There’s no reason to send each keypress through the translator. The do method allows us to cause side effects, in this case to clear the last translated results if any exist. We then transform (using select or map) the result to get the user’s supplied text and pass that as the resulting stream. Similar to throttle, we also want to make sure that the user hasn’t just entered the same value twice which would result in the same translation results. We optimize our requests by not reissuing the request using distinctUntilChanged.

The fun stuff comes next. Here we start to merge our observables using selectMany (or flatMap) to match up the input text with the observable list of destination languages. Once we have that, we selectMany again, this time passing in the source string and destination language into the translate method that returns the observable string result and based on that result, we project out the result along with the enclosing destination language.

To conclude, we subscribe to the end observable stream and either append the result to the output list in the case of onNext, or append the error message for onError. Since this is set up over observables, the user can just start typing another string and the process will react to the request all over again. To put this all together, here is the complete code. Remember to get your AppId before you use it because the one I’m showing here won’t work.

    <input type="text" id="TextToTranslate" size="80" />

    <div id="output">
    </div>

    <script type="text/javascript" src="Scripts/jquery-2.1.0.js"></script>
    <script type="text/javascript" src="Scripts/rx.lite.js"></script>
    <script type="text/javascript">
        var appId = "GetYoursAt http://www.bing.com/developers/appids.aspx";
        var destLanguages = ["de", "es", "zh-CHT", "fr", "it", "ar", "ht", "he", "ja", "ko", "no", "ru", "th", "tlh"];

        (function ($) {

            var translate = function (text, lang) {
                var p = { appId: appId, to: lang, from: 'en-us', text: text };
                var promise = $.ajax({
                    url: 'http://api.microsofttranslator.com/V2/Ajax.svc/Translate',
                    data: p,
                    dataType: 'jsonp',
                    jsonp: 'oncomplete',
                }).promise();
                return Rx.Observable.fromPromise(promise);
            };

            // Turn the language array into a cold observable for the selectMany below
            var languagesObs = Rx.Observable.fromArray(destLanguages);

            // Get the keypresses
            var textInputDetected = Rx.Observable.fromEvent($("#TextToTranslate"), 'keyup')
                // Wait for 1/2 second to see if they're still typing
                .throttle(500)
                // They're done typing. Clear the last output
                .do(function (_) { $("#output").empty(); })
                // Get the input string
                .select(function () { return $("#TextToTranslate").val(); })
                // If it's the same as the last one, don't re-issue the request
                .distinctUntilChanged()
                // Merge the search string with the list of languages
                .selectMany(function (text) {
                    return languagesObs.select(function (t1) { return { source: text, dest: t1 }; });
                })
                // Call the service passing the source string and destination language
                .selectMany(function (sourceDest) {
                    return translate(sourceDest.source, sourceDest.dest)
                        // Return the destination language and translated result from the service
                        .select(function (result) { return { dest: sourceDest.dest, translation: result }; });
                });

            d = textInputDetected.subscribe(function (x) {
                    // Add the result to the list
                    $("#output").append(x.dest + ' : ' + x.translation + '<br />');
                },
                function (err) {
                    // In case of an error, show the error
                    $("#output").append(err + '<br />')
                });
            
        })(jQuery);

    </script>

Thinq you can improve on this sample, let me know.

Posted on - Comment
Categories: RxJs - JavaScript - Rx -

Rx for Windows Phone article now available

A couple of years ago I gave a presentation at the last MIX for doing asynchronous programming using Reactive Extensions (Rx) on the Windows Phone. The video of the presentation is still available for watching. Around the same time, I was approached by Code Magazine to write a similar article, which I was happy to oblige. It took a while but I was happy to see the article finally published in the Sep/Oct 2013 edition of Code Magazine.

In the article I demonstrate how to use Rx to build a dice rolling game (like Yhatzee) for the Windows Phone 7 or 8 and communicating with a service tier for rolling the dice (hiding the fact that you may be loading the results). While the particular solution was targeted to Windows Phone, the concepts are valid across the wide variety of platforms that Rx supports. Give the article a read and let me know what you thinq.

Posted on - Comment
Categories: C# - Rx - VB - WP7 -

Multi Lingual LINQ and Rx

“Imitation is the sincerest [form] of flattery” – Charles Caleb Colton

When dealing with technology, or nearly any other creative endeavor, one of the ways that you know if you’ve discovered a successful creation is to see how much it is embraced by not only your peers, but also your competitors. We can see this in quotes by Shakespeare, the music of Bach, parodies of Peter SchickleHip Hop sampling, and countless others.

This blog is based on one of the more significant technologies that many peers and competitors embraced – LINQ. As developers on both the Microsoft stack, and others saw the power of LINQ and worked to incorporate the concepts into their native languages, including JavaScript, C++, Scala, Java, Ruby, Python, and the list goes on. While the ideas in concept are not totally unique in LINQ, the way that they were put together provided the simplicity and elegance that drew many to desire to replicate it in their native tongues.

We’re starting to see a similar wave of imitation with the Reactive Extensions as we did with LINQ. While Rx primarily extends the ideas and concepts from LINQ as can be seen by the nickname “LINQ to Events”, the way it puts them together is forging it’s own way. Indeed, it is already creating it’s own movement with the Reactive Manifesto.

RxFlavorsWhat began as a mechanism to bring asynchronous LINQ to C# and VB, has become a powerful mechanism to declaratively process real-time streams of data in a multitude of environments. Microsoft got the ball rolling by creating not only .Net implementations of RX, but also JavaScript and subsequently C++. More recently, Brian Benz announced the availability of Rx for Ruby and Python. We’re also seeing a number of efforts around Functional Reactive Programming (FRP) including Haskell and F#. Others have joined the party with the Scala based Reactive Web and ReactiveMongo (a scala based driver for MongoDB). Netflix has even contributed their own RxJava.

In many ways, I’ve found LINQ and Rx to be disruptive technologies (in a good way). The amount of effort around manipulating core language constructs to enable the concepts only speaks to the impact that they’ve had on developer’s lives. I’m not saying that Erik Meijer is a combination of JS Bach, Shakespeare, and Sir Mix-a-Lot, but I’ve definitely noticed his handprint on many as the father of this disruption. I can only hope that we’ll see this kind of continued energy and creativity far into the future.

I’m sure I’ve missed some language implementations of LINQ or Rx and can’t anticipate future versions that are inevitably yet to come. Please do others a favor and comment back with links to your favorite implementation so that we can build a complete list together.

Posted on - Comment
Categories: LINQ - Rx -

SignalR and Rx talk materials

Last night I gave a talk for the Gwinnett Georgia Microsoft User Group. There were plenty of questions and discussion. Everyone seemed interested in learning about SignalR at least even if most hadn’t heard of Rx (gasp.) The slides and demos from the presentation are available for download here.

Update – 7/18/2014:  The source code and slide deck is now available on Github for these projects. Head on over to https://github.com/jwooley/SignalrRxSamples to check them out.

Anyway, for those that want to dig deeper into these technologies, here are some resources that you might want to check out:

Posted on - Comment
Categories: Rx - SignalR -

New Paper.li for Reactive Extensions

Paper.li is a site that makes it easy to aggregate blog posts, twitter streams, Google+, Facebook, etc into a magazine format for easy reading. There are virtual papers on almost any topic you could imagine. While I’ve seen messages that some of my tweets and links have been featured, I didn’t recall seeing any on the Reactive Extensions (until Now).

I took the liberty of creating a Paper.li specifically for tracking Reactive Extensions news and announcements. Feel free to follow it by clicking on the widget below. Currently I have it scheduled to only publish once a week, but could make it more frequent if necessary. Also, if you can think of a filter I should add or post I should feature, let me know and I’ll see what I can do.

Posted on - Comment
Categories: Rx - RxJs -

SignalR and Reactive Extensions are an Rx for server push notifications

Recently, I had the need to build a system where multiple clients needed to be notified as changes happened on the server. While watching Damian Edwards, Brad Wilson and Levi Broderick present on Async in ASP.Net during the AspConf I was introduced to the dynamic simplicity that SignalR brings to the table. I was even more intrigued by the fact that it integrates directly with IObservable and the Reactive Extensions. After using it for a week, I’m truly impressed by what they’ve done with this library. To give you an idea of what I mean, let’s take my ObservableSensor demo which generates random values and see how we can use SignalR to expose these values over a distributed client environment.

Reactive Extensions on the Server

To begin, let’s look at the server. Here we will use the Observable.Generate method to generate some random values with associated random categories and the timestamp when the value was generated:

Option Strict Off

Imports Microsoft.VisualBasic
Imports System.Reactive.Linq
Imports SignalR

Public Class ObservableSensor

    Public Sub New()
        Dim rand = New Random(Now.Millisecond)

        Dim Generator = Observable.Generate(Of Double, SensorData)(
            initialState:=0,
            condition:=Function(val) True,
            iterate:=Function(val) rand.NextDouble,
            resultSelector:=Function(val) New SensorData With 
                                          {
                                              .Time = Now,
                                              .Value = val * 20,
                                              .Category = (CInt(val * 4)).ToString()
                                          },
            timeSelector:=Function(val) TimeSpan.FromSeconds(val))

        Generator.Subscribe(Sub(value)
                                Dim context = GlobalHost.ConnectionManager.GetHubContext(Of ObservableSensorHub)()
                                context.Clients.Broadcast(value)
                            End Sub)
    End Sub
End Class

Public Class SensorData
    Public Property Time As DateTime
    Public Property Category As String
    Public Property Value As Double
End Class

If you recall, we discussed the Observable.Generate implementation last year. The new part here occurs in the Subscribe implementation.

SignalR across the tiers

In this case, we are going to “Broadcast” our newly created methods to anyone listening. In this case, we are publishing our notifications without having a direct connection to the SignalR hub. We can grab a hold of it using the GlobalHost ConnectionManager to get the hub in our AppDomain for a type of ObservableSensorHub. What is this Hub thing you may ask. Well, here is the implementation for the ObservableSensorHub:

Imports SignalR.Hubs

Public Class ObservableSensorHub
    Inherits Hub

End Class

In case you’re wondering, no I’m not missing code here. That’s the complete implementation. We’re just creating a strongly typed instance of the SignalR.Hubs.Hub type for the the ConnectionManager to work with. In this simple application, we’re just going to start generating values when the web application starts. In the Global.asax, add the following implementation:

    Sub Application_Start(ByVal sender As Object, ByVal e As EventArgs)
        Sensor = New ObservableSensor
    End Sub

 

At this point, we now have a server that can broadcast our random values over HTTP to any clients that wish to subscribe. Before moving to the client, I need to say a bit more about the Broadcast “Method” that we are calling on on the Hub’s Clients. If you look at the type definition of Clients, you will see that there is no Broadcast method. Clients is actually a Dynamic object. At run time, we are declaring that it has a method called Broadcast. The SignalR infrastructure then knows how to translate requests for an invocation method by that name into an HTTP message to be sent to any clients (serializing the results into a JSON object using Json.Net). Remember in Visual Basic, we enable the Dynamic functionality by specifying Option Strict Off at the top of our class definition.

Now how do we consume these messages? Let’s start with a console application. First, make sure that you’ve installed and added the references to the SignalR.Client library. The easiest way is to use NuGet. Rather than a bunch of text, let’s just jump to the code:

Option Strict Off

Imports SignalR.Client.Hubs
Imports System.Reactive.Linq
Imports Newtonsoft.Json.Linq

Module Module1

    Sub Main()
        Dim cn = New HubConnection("http://localhost:5687/")
        Dim sensor = cn.CreateProxy("observableSensorHub")

        sensor.On(Of SensorData)("broadcast", Sub(item) Console.WriteLine(item.Value))

        cn.Start().Wait()
        Console.ReadLine()

    End Sub

End Module

 

Here, we create a new HubConnection specifying the endpoint for our web application. SignalR does support self hosted servers if you want to use a Windows Service or other back end implementation. As long as the client can see the server over the network, you can wire it up. Second, we create a dynamic proxy specifying the hub type that we created on the server. Note here that the casing of the proxy is important and uses Camel casing even though the implementation on the server used Pascal casing. This is done to make the hubs and methods feel more natural to JavaScript clients.

Next, we specify how to handle the push notifications that the server sends. We do that using the .On method, specifying the type (SensorData) that the method should be deserialized back into. We then specify the name of the method (altering the case as mentioned above) that we are listening for along with the Action that should be invoked as each value is received. In this case, we’ll just output the value that we received to the console window.

RX in the Client

At this point we have Rx pushing messages to the client via SignalR. Let’s take it a step further and add some Rx goodness on the client side as well. In addition to the On method, the Proxy also supports an Observe method which turns the message pump into an IObservable of an array of Objects where each of the method parameters are contained in that array. Since our Broadcast method only has a single parameter of type SensorData, we will grab it by getting the first array element and calling the Json.Net .ToObject implementation to translate it back into our strongly typed object. From there, we work with it just as we would any other Observable sequence. For example, to output only the generated values for Category 1, we could use the following:

        Dim cn = New HubConnection("http://localhost:5687/")
        Dim sensor = cn.CreateProxy("observableSensorHub")

        Dim items = From item In sensor.Observe("broadcast")
                    Let instance = item(0).ToObject(Of SensorData)()
                    Where instance.Category = "1"
                    Select instance

        Using items.Subscribe(Sub(value) Console.WriteLine(value.Value))

            cn.Start().Wait()
            Console.ReadLine()

        End Using

In this case, we Start the connection inside of our subscription’s Using clause along with the Console.ReadLine. Once a key is pressed, the subscription is disposed freeing our resource.

One of the nice things is the flexibility that SignalR offers. Pretty much anything that speaks HTTP can consume these messages from our Rx Server. If we wanted to consume the same messages in a web client, we could use the following Javascript code:

      $(function () {
          // Proxy created on the fly
          var hub = $.connection.observableSensorHub;

          // Declare a function on the chat hub so the server can invoke it
          hub.Broadcast = function (value) {
              $('#values').append('&lt;li&gt;' + value.Time + ': ' + value.Value + '&lt;/li&gt;');
          };

          // Start the connection
          $.connection.hub.start();
      });

In the javascript, we connect to our server by referring to the $.connection.observableSensorHub. Notice the camel cased name here? That’s the SignalR translation in action again. We then specify the method handler for the dynamically invoked “Broadcast” method. Here we just add list item entries to the unordered list.

As I said above, I’m so impressed with SignalR so far. Don’t be surprised to see me post more about it in the future. For now however, if you want feel free to download this sample and try it out yourself. You will need to refresh the nuget packages to get it to run. Additionally, realize that the sample was built with RC builds of Visual Studio 2012, Rx 2.0, and SignalR. I don’t guarantee that the sample will continue to work once these are officially released. If you notice any issues or want to see more details, let me know what you Thinq below.

Posted on - Comment
Categories: VB Dev Center - Rx -

Reactive Extensions books

I’ve been presenting on Rx for some time know. Often I’m asked when I’m going to write another book. I’ve considered writing one on Rx expanding on my blog posts here, but suspected that the market might be a bit small. At this point there are two books on the market for Rx. The first one is free and serves as a good resource to the Rx methods.

Lee Campbell’s Introduction to Rx is a good overview of the various Reactive Extensions methods and coding techniques. It doesn’t focus as much on the practical uses of Rx, but would serve as a good companion to anyone trying to determine which method they should use for any given need. Even better, it’s free online at www.introtorx.com or for $0.99 at Amazon.
Jesse Liberty and Paul Betts released the first book (mostly) dedicated to Rx. It offers more practical examples, including a discussion of how to use Paul’s ReactiveUI MVVM implementation. I was hoping to like this book, but was disappointed by the thoroughness of covering Rx itself. Instead they used nearly a third of their 150 pages discussing C# LINQ features and LINQ to SQL for the Windows Phone. If you are already familiar with the basics or have gone through the free HOL below, you will probably want to skip directly to Lee’s book above. If you aren’t familiar with LINQ and Lambda’s this is a good book to start with as it covers the basics as well. It’s also a fairly short book as I read the entire thing on a single plane trip from Seattle to Atlanta.
  A couple of hands on Labs may be your best be to getting your feet wet and understanding the basics of Rx: Curing the asynchronous blues with the Reactive Extensions for .NET and JavaScript

 

In addition to these books, I still recommend the learning resources mentioned in my post: Reactive Framework Learning Resources for RxNet and RxJS. I still wonder if there is a market for more books on Rx. What do you Thinq?

Posted on - Comment
Categories: Rx -

Using Rx to consume a Task based WCF service

Among the many changes that Dev 11 brings is the new default when adding a service reference to generate Task based proxy methods rather than using the APM flavor (using the BeginXXX – EndXXX model). In this post, we’ll look at creating a simple service and then consuming it using the Reactive Extensions. Let’s start by defining the service interface and implementation:

Imports System.ServiceModel
Imports System.Threading 

<ServiceContract()>
Public Interface ISimpleServicesvc

    <OperationContract()>
    Function DoSomethingCool(input As String) As String

End Interface

Public Class SimpleServicesvc
    Implements ISimpleServicesvc

    Public Function DoSomethingCool(input As String) As String Implements ISimpleServicesvc.DoSomethingCool
        Return (String.Join("", From letter In input.ToCharArray()
               Order By letter
               Distinct))
    End Function

End Class

Essentially here we are just taking a string input and returning the distinct characters sorted. The details of the service in this case are trivial. Our focus here is how to implement the service client. We start by adding a service reference in our client application by right clicking on the project and selecting Add Service Reference. (Alternatively, you can now press Ctrl-Q and request to “Add Service Reference” from there. From the dialog, you can still use the “Discover” button to locate the service as long as it is in your solution.

image

One thing to note is that the proxy classes are now by default generated using Task based methods rather than the previous IAsyncResult AMP method.

image

As a result, the definition of the proxy class is as follows:

Public Function DoSomethingCoolAsync(ByVal input As String) 
                 As System.Threading.Tasks.Task(Of String) 
                 Implements SimpleService.ISimpleServicesvc.DoSomethingCoolAsync
    Return MyBase.Channel.DoSomethingCoolAsync(input)
End Function

If we wanted to consume this using the new Async/Await, we could do it as follows:

Private Async Sub SubmitClicked() Handles SubmitButton.Click
   Dim svc = New SimpleService.SimpleServicesvcClient()
   Dim req = Await svc.DoSomethingCoolAsync(InputText.Text)
   OutputText.Text = req
End Sub

Of course, to put the LINQ spin on this, let’s see the Rx version to do the same thing:

Private Async Sub SubmitClicked() Handles SubmitButton.Click
    Dim svc = New SimpleService.SimpleServicesvcClient()
    Dim req = svc.DoSomethingCoolAsync(InputText.Text).ToObservable()
    req.ObserveOnDispatcher().Subscribe(Sub(val) OutputText.Text = val)
End Sub

We start by turning the Task into an Observable producer using the ToObservable extension method. We then subscribe to the observable making sure to return back to the dispatcher thread because the task based service is run on a taskpool thread. Of course in this case, we are subscribing on every button click. With Rx, we could wire the button click and service request up on form navigate and unwire it when navigating from the form as follows:

Private requestDisposable As IDisposable
Protected Overrides Sub OnNavigatedTo(e As Navigation.NavigationEventArgs)
    Dim svc = New SimpleService.SimpleServicesvcClient

    requestDisposable = (From click In Observable.FromEventPattern(Of RoutedEventArgs)(SubmitButton, "Click")
                        From req In svc.DoSomethingCoolAsync(InputText.Text).ToObservable()
                        Select req).
                        ObserveOnDispatcher().
                        Subscribe(Sub(val) OutputText.Text = val)
End Sub
Protected Overrides Sub OnNavigatedFrom(e As Navigation.NavigationEventArgs)
    MyBase.OnNavigatedFrom(e)
    requestDisposable.Dispose()
    requestDisposable = Nothing
End Sub
Posted on - Comment
Categories: WCF - Rx - VB Dev Center -

Rx samples with WinRT

Recently I decided to rebuild one of my presentation computers using the Windows 8 Consumer Preview and Visual Studio 11 Beta. While it is working quite well for my typical usage, I did run into a bit of an issue when prepping my Rx talks for the Rocky Mountain Trifecta this past weekend.

In the past, when I'm giving my Practical Rx talk, I show Silverlight, WPF, WP7 and RxJs samples. Unfortunately, the Windows 8 Consumer Preview does not support the Windows Phone 7 emulator and Visual Studio 11 does not support the WP7 targets. As an alternative, i decided to port the WP7 demos over to a Metro style Win-RT application, which you can download now.

The main changes were due to slightly different event names, but other all of the Rx methods that I needed were available in the Rx Beta 2 experimental build that Bart introduced on the Rx Blog. However, when preping the prototypical Dictionary Suggest sample, I ran into a TargetInvocationException when trying to dispose the event handler. Searching a bit further into the call-stack, I found that the exception was caused when disposing the observable subscriptions due to the TakeUntil clause. Even though the Throttle is what pushes this code off of the dispatcher thread. For reference, here's the broken code method:


    var svc = new SimpleService.SimpleServicesvcClient();
    var results = new ObservableCollection();
    Translations.ItemsSource = results;

    IObservable inputStrings =
        from keyup in Observable.FromEventPattern
            (InputText, "TextChanged")
        select InputText.Text;

    var svcResults = 
        from text in inputStrings
        .Throttle(TimeSpan.FromMilliseconds(250))
        from result in svc.DoSomethingCoolAsync(new DoSomethingCoolRequest { input = text })
        .ToObservable()
        .TakeUntil(inputStrings)
        select String.Format("{0} - {1}", text, result.DoSomethingCoolResult);

    svcResults
        .ObserveOnDispatcher()
        .Subscribe(result => results.Insert(0, result));
         

Thanks to a bit of additional snooping by the Rx and WinRT teams, we found that the real reason here is that we are attaching the TextChanged event handler on the core dispatcher thread. However, when we use the Throttle, the operation is shifted to another thread. When we add the TakeUntil, we instruct Rx to dispose of the current subscription when the second stream starts. Of course, the new keypress also starts another stream of observables.  While this works without errors in the .Net world (Silverlight/WP7/WPF), WinRt is a bit pickier and requires that the remove handler be called on the same thread that was used when the handle was added. To fix this situation, we need to explicitly force the subscription onto the Dispatcher thread using SubscribeOnDispatcher. Here's the fix:


    var svc = new SimpleService.SimpleServicesvcClient();
    var results = new ObservableCollection();
    Translations.ItemsSource = results;

    IObservable inputStrings =
        from keyup in Observable.FromEventPattern
            (InputText, "TextChanged")
        .SubscribeOnDispatcher()
        select InputText.Text;

    var svcResults = 
        from text in inputStrings
        .Throttle(TimeSpan.FromMilliseconds(250))
        from result in svc.DoSomethingCoolAsync(new DoSomethingCoolRequest { input = text })
        .ToObservable()
        .TakeUntil(inputStrings)
        select String.Format("{0} - {1}", text, result.DoSomethingCoolResult);

    svcResults
        .ObserveOnDispatcher()
        .Subscribe(result => results.Insert(0, result));
         
Posted on - Comment
Categories: Rx - WinRT - C# -

Ix Interactive Extensions return

If you’ve been following the Reactive Extensions for any time, you may have seen that the team utilized the duality between IEnumerable and IObservable to not only create parallel extension methods of the enumerable versions on IObservable, but they also created IEnumerable versions of the additional methods that they added to IObservable as well. This formerly was in the Interactive libraries that came as part of the Rx bits. When the version 1 release of Rx came out however, these IEnumerable extensions were not included as part of the shipping bits.

Yesterday Microsoft released the v1.1.10823 version of these extensions branded as Ix.Note that this is an Experimental release, which means that it is highly subject to change, so if you use it, be prepared to make changes as new versions are released. When you download and install it, you can find the binaries in your C:\Program Files (x86)\Microsoft Interactive Extensions SDK\v1.1.10823\ directory. The current release includes versions for .Net 3.5 and 4, Silverlight 4 and 5 and Windows Phone 7. Glancing at the methods added in the EnumerableEx class we can find the following:

  • While, DoWhile
  • If
  • Case
  • For
  • Do
  • Buffer
  • DistinctUntilChanged
  • Repeat
  • Throw
  • Catch
  • Finally
  • OnErrorResumeNext
  • Retry
  • Publish
  • Memoize

In addition, this release adds the System.Interactive.Async library including a set of extensions that allow you to turn Enumerables into AsyncEnumerables and perform the same sets of queries you could with IEnumerables and IObservables.

If you’re interested in these, make sure to keep an eye on the Rx team’s blog and Channel 9. Also, Bart talked about the System.Interactive extensions on Channel 9 in July. He also mentioned that the bits are also available on NuGet. Look for the Ix_Experimental-Main and Ix_Experimental-Providers packages.

Posted on - Comment
Categories: Rx - LINQ -

Select Many with Rx and RxJs

A while back, I showed how you could use the query syntax and Rx to code a drag-drop operation similar to the way you might describe  the process to your mother. As a review, let’s take another look at the code, this time in C# as a preparation for moving it over to javaScript.


var mouseDown = from evt in Observable.FromEventPattern<MouseButtonEventArgs>(image, "MouseLeftButtonDown")
                select evt.EventArgs.GetPosition(image);
var mouseUp = Observable.FromEventPattern&;lt;MouseButtonEventArgs>(this, "MouseLeftButtonUp");
var mouseMove = from evt in Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
                select evt.EventArgs.GetPosition(this);

var q = from startLocation in mouseDown
        from endLocation in mouseMove.TakeUntil(mouseUp)
        select new
        {
            X = endLocation.X - startLocation.X,
            Y = endLocation.Y - startLocation.Y
        };

What might not be evident from this code is that the query syntax is actually using the SelectMany extension method when we use the from x in xs from y in ys above. If we wanted to, we could re-write this using the Lambda/method syntax as follows:


var q = mouseDown
        .SelectMany(startPos =>
              mouseMove
                    .TakeUntil(mouseUp)
                    .Select(movePos =>
                        new
                        {
                            X = movePos.X - startPos.X,
                            Y = movePos.Y - startPos.Y
                        })
                     );

Personally, I prefer the query syntax and find the lambda syntax a bit messy with SelectMany. Let’s try to tear this down a bit to see if we can understand what’s going on. Here we still have 3 different sets of Observables—mouseDown, mouseMove and mouseUp. The SelectMany extends mouseDown taking in the instance of each observed value as it is produced. SelectMany then takes a function (lambda) which will generate a new set of Observables of some other type which may or may not include values from the first observed value and values from the second variable.

In this case, we create the second observable set from the mouseMoves that we start listening to as the result of the SelectMany. We can then generate an new observable set by projecting (Select) the mouseDown’s startPos and mouseMove’s movePos offsets. For those who think pictures say more than words, here’s a marble diagram illustrating how SelectMany works:

image

In this case the Ox represents the MouseDown observables. fx represents the function supplied in the Lambda parameter and Oy represents the resulting Observable set of offsets. Wes and Jeffrey walk through SelectMany in their Channel 9 video as well.

Moving to RxJs

So what does all of this have to do with JavaScript? In the process of rewriting some of my Rx talks to RxJs, I wanted to include the MouseMove sample. Since JavaScript doesn’t support the syntactic sugar that enable query expressions, we are forced to use the method syntax. If you want to skip to the chase, you can download my RxJs samples and look at the DragDrop.htm to try this out. As we did with the Silverlight/WPF version of DragDrop, we start by setting up three sets of Observables to track the MouseDown, MouseMove and MouseUp events.


    <script type="text/javascript" src="Scripts/rx.js"></script>
    <script type="text/javascript" src="Scripts/jquery-1.4.1.min.js"></script>
    <script type="text/javascript" src="Scripts/rx.jQuery.js"></script>

    <script type="text/javascript">
        $(document).ready(function () {
            var dragTarget = $("img");
            var mouseUp = dragTarget.toObservable("mouseup");
            var mouseMove = dragTarget.toObservable("mousemove");
            var mouseDown = dragTarget.toObservable("mousedown")
                .Select(function (event) {
                    return {
                        left: event.clientX - dragTarget.offset().left,
                        top: event.clientY - dragTarget.offset().top
                    };
                });
    </script>
    <img alt="logo" src="Images/RxLogo.png" style="position: absolute; top:400; left:400; height: 100;
        width: 100" id="RxLogo" />

Now that we have our observables, we can merge the streams using SelectMany and subscribe to the end result to actually move the image on the screen by altering the css left and top positions accordingly.


            var moves = mouseDown.SelectMany(function (imageOffset) {
                return mouseMove
                        .Do(function (event) { event.preventDefault(); })
                        .Select(function (pos) {
                            return {
                               left: pos.clientX - imageOffset.left,
                                top: pos.clientY - imageOffset.top
                            };
                        })
                        .TakeUntil(mouseUp);
            });
            moves.Subscribe(function (pos) {
                dragTarget.css("left", pos.left);
                dragTarget.css("top", pos.top);
            });

With the exception of the JavaScript function syntax which I tend to think of as a hybrid of C# and VB’s lambda syntaxes, we essentially have the same code that we did with the C# lambda syntax for SelectMany.

It can take a bit of mind twisting to get your head around the lambda syntax for SelectMany, but once you’ve done that, you can start doing some powerful manipulations to extend and coordinate observable event streams in both .Net and RxJs.

Posted on - Comment
Categories: Rx - JQuery - C# -

devLINK RxJs and Async session materials available

I would like to thank everyone who came out to my DevLINQ sessions this week. The materials for both of the sessions are not available on the Files tab  of this site. In addition, here are the descriptions and direct links to each of these downloads:

  • Reactive Extensions for JavaScript (RxJs)
  • The Reactive Extensions allow developers to build composabile, asynchronous event driven methods over observable collections. In web applications, you can use this same model in client side processing using the RxJs framework. We'll show you how you can take advantage of this framework to simplify your complex asynchronous client side operations.

    Includes slides and samples demonstrating some of the uses of the Reactive Extensions for JavaScript (RxJs). The samples illustrate using RxJs for LINQ style queries, Timer based web page rotator, Mock observable sensor, Mouse drag drop, Dictionary Suggest, and Bing Translator over multiple languages.

  • Async Programming in .Net
  • While .Net 4 added parallel programming capabilities, these tools primarily help when your application is CPU bound. In many cases, parallel processing doesn't address IO Latency issues. In these cases, we need to provide the perception of responsive applications by using asynchronous programming tools. We will explore some of these options including Delegates, Callbacks, Iterators, Observers, and the new C# Async/Await keywords.

    Includes slides and samples demonstrating ways of performing Asynchronous operations from .Net 1.0 through .Net 5.0. In addition to the standard Visual Studio 2010 install, you will also need to download and in stall the following libraries to use these samples:

Posted on - Comment
Categories: Code Camp - C# - Rx - JQuery -

Using RxJs for an Image Rotator with jQuery

In trying to come up with some compelling demos for RxJs, I happened upon a scenario that some of you may find helpful. I wanted to create the ability to display images on a timer loop and keep looping through them while the user is on the page. I realize that there are a plethora of jQuery plugins that do this already, but  I have a new hammer (RxJs), I might as well see how well it works myself.

As I did in my port of the ObservableSensor in the last RxJs post,  let’s start by defining the presentation portion:

<!DOCTYPE html>
<html>
<head>
     <title>Observable Rotator</title>
     <script type="text/javascript" src="http://code.jquery.com/jquery-1.6.2.min.js"></script>
     <script type="text/javascript" src="Scripts/rx.js"></script>
</head>
<body>
     <img id="imageRotator" alt="rotating" />
</body>
</html>

Clean and simple HTML5, except I didn’t set the source of the image. We’ll do that in our JavaScript code instead. Let’s get right to it then:

<script type="text/javascript">
    $(function () {
        var images = ["Images/image1.png",
                    "Images/somethingElse.png",
                    "Images/someAd.png",
                    "Images/JimHeadShot.jpg",
                    "Images/logo.gif"
                    ];

        $("#imageRotator").attr("src", images[0]);

        var delayedSites = Rx.Observable.GenerateWithTime(
             1,                                           // Starting index
            function (x) { return true; },                // Keep iterating always
            function (index) {               
                if (index < images.length - 1) 
                    return index + 1;                     // Increment index
                else
                    return 0; // if the current index exceeds the array bounds, reset the index 
            },
            function (index) { return images[index]; },  // OnNext
            function () { return 5000 });                // Time interval


        delayedSites
            .Subscribe(function (uri) {
                $("#imageRotator").attr("src", uri);
            });
    });
</script>

The start of this method should be easy enough. I’m setting up an array containing the locations of the images I want to rotate through. Of course, you could get this array from a xml file, json service request, using the FileSystemObject, or any number of other options. It doesn’t matter how you get the array.

Once we have the array, we’ll go ahead and populate the first item in that array as the starting image using the jQuery attr method setting the src attribute to images[0].  With that out of the way, we’re ready to start with the RxJs goodness.

In order to push out items from our array, we could use Rx.Observable.FromArray which is the same as the .ToObservable extension method in .Net, however we would need to set-up a custom scheduler to handle the delay. Instead, we’ll just use the GenerateWithTime method to pull items from the array OnNext passing in a time delay (30000 for 30 seconds). We also check to see if the current iteration index exceeds the number of items in the list and if so, reset the index to start looping from the start again.

Now that we are sending out new image uri’s every 30 seconds, we simply need to change the src attribute of the imgageRotator img tag, which we’ll do as the function we pass into the Subscribe method.

Naturally, there are plenty of enhancements that can be done to this example, including randomizing the starting index, setting the size of the imageRotator to keep all of them the same, adding jQuery fadeIn/fadeOut and other animation effects, etc. I’ll leave these tasks as an exercise for the reader.

My question dear reader is, do you thinq that the RxJs version is any better or worse that other image rotator examples you’ve seen? Why?

Posted on - Comment
Categories: Rx - JQuery -

Cancelling a Reactive Extensions Observable

I’m often asked how do you cancel an Observable. In previous posts, I’ve shown how to stop observing by disposing the result of the subscription to the Observable. Of course, this isn’t the only way. If you are using one of the Observable.Generate methods, you can pass in a pointer to an object (like the System.Threading.CancellationTokenSource) and change a flag on that object, then in the iterate lambda function, reference that object to see if the cancellation flag was set. Here’s an example:


    Private WithEvents ts As New System.Threading.CancellationTokenSource

    Protected Overrides Sub OnInitialized(e As System.EventArgs)
        MyBase.OnInitialized(e)

        Dim items As New ObservableCollection(Of Integer)
        ItemsListbox.ItemsSource = items

        Dim cancelClicked = Observable.FromEventPattern(Of RoutedEventArgs)(Cancel, "Click")

        Dim obs = Observable.Generate(0,
                                      Function(x) Not ts.IsCancellationRequested,
                                      Function(index) index + 1,
                                      Function(index) index,
                                      Function(index) TimeSpan.FromSeconds(1))

        obs.TakeUntil(cancelClicked).
            ObserveOnDispatcher().
            Subscribe(Sub(item)
                          items.Add(item)
                      End Sub)
    End Sub

    Private Sub Cancel_Click(sender As System.Object, e As System.Windows.RoutedEventArgs) Handles Cancel.Click
        ts.Cancel()
    End Sub

In this example, we’re setting up a class level variable (ts) as a CancellationTokenSource. When we create our observable using Generate, the second parameter evaluates whether or not it should continue iterating. By checking the ts.IsCancellationRequested, we will evaluate that each time we iterate. Because it is a module level variable, we can cancel it by calling Cancel() in the Cancel Button click event handler.

As another alternative, we can convert the Cancel click into an observable collection as well by using the Observable.FromEventPattern. Then on the main observable, join it with the button observable using TakeUntil as follows:


    Protected Overrides Sub OnInitialized(e As System.EventArgs)
        MyBase.OnInitialized(e)

        Dim items As New ObservableCollection(Of Integer)
        ItemsListbox.ItemsSource = items

        Dim cancelClicked = Observable.FromEventPattern(Of RoutedEventArgs)(Cancel, "Click")

        Dim obs = Observable.Generate(0,
                                      Function(x) True,
                                      Function(index) index + 1,
                                      Function(index) index,
                                      Function(index) TimeSpan.FromSeconds(1))

        obs.TakeUntil(cancelClicked).
            ObserveOnDispatcher().
            Subscribe(Sub(item)
                          items.Add(item)
                      End Sub)
    End Sub

Do you have another favorite alternative for cancelling an Observable, let me know.

Posted on - Comment
Categories: Rx - VB Dev Center -

Porting Reactive Extension Samples to RxJs

In preparation for my upcoming talk at DevLink on RxJs, I decided to try porting some of my existing .Net/Silverlight samples over to the JavaScript version to see how different they would be. At this point, there is a disconnect between the .Net and .js versions and the signatures for the JavaScript methods are still lagging a bit behind. If you’re impatient and just want to see one of the samples in action, jump over to the Observable Sample to see how the sensor sample works and view code to see the code implementation.

Before we get started, let’s lay out the UI for our page. In this sample, we’re going to display 4 buttons to start a variety of RxJs queries and two Unordered Lists to display the results of two of these queries:


<p>
  <button id="btnLowValueSensors">Show Low Values</button>
  <button id="btnSelectedTypeSensors">Show Type x Values</button>
  <button id="btnWarnOutliers">Warn Outliers</button>
  <button id="btnHeartbeat">Heartbeat</button>
</p>

<div style="color: Blue; overflow-y: scroll; max-height: 300px; display: block; float: left;">
  Low Value Sensors:
  <ul id="LowValueSensors"></ul>
</div>
<div style="color: Red; overflow-y: scroll; max-height: 300px; display: block; float: right;">
  Type 2 Sensors:
  <ul id="SelectedTypeSensors"></ul>
</div>

So far, we haven’t added any reactive specific code. In order to use the RxJs, you need to download and install the latest version of the samples at Reactive Extensions for JavaScript experimental release version 1.0.2838.0(2010-12-20). As they become available, you can download newer versions at http://msdn.microsoft.com/en-us/data/gg577610. Once you have the samples installed, copy the .js files into your project and include a script reference in your page to the rx.js file:


<script type="text/javascript" src="Scripts/rx.js"></script>

With the script reference in place, we can now start to implement some RxJs goodness. In this sample, I’m going to redo the observable sensor sample that we updated in the last post. If you remember in C#, we can use the Observable.Generate method to generate some random values. In JavaScript, we can do the same thing with a slightly difference syntax. All of the RxJs Observable factory methods need to start with the Rx “Namespace” alias. Thus the time based Generate method can be implemented using Rx.Observable.GenerateWithTime as follows:


<script type="text/javascript">

 var sensor;
 var lowValueSubsription;


 $(function () {
     sensor = Rx.Observable.GenerateWithTime(0,
                  function (i) { return true; },
                  function (i) { return i; },
                  function (i) { return { type: Math.floor(Math.random() * 4), 
                                          value: Math.random() * 20, 
                                          timeStamp: new Date() }; },
                  function (i) { return Math.random() * 200; }
     );
  });

While the parameters look different, they essentially function the same. Instead of using the C# Lambda syntax “(x) =>” we use the JavaScript inline function syntax: function(i) { return i; }. The first three parameters supply the logic of initializing, setting the range, and incrementing the value for a “for” or “while” loop. In this case, we’ll just generate values constantly (return true) and not worry about the initial value (0) or the increment (return i). On each iteration, we’ll generate a new JSON object rather than anonymous type. Finally, we’ll tell the observable to delay for some random time between 0 and 200 milliseconds.

With this in place, we can now start querying and subscribing to the observable source similar to how we did in VB or C# with the notable exception that we can’t use the LINQ query comprehension syntax and are forced to use the method syntax with JavaScript inline functions. For example, to add sensor values to our SelectedTypeSensors unordered list, but filter them to only those where the type is “2” we can do something like the following:


$("#btnSelectedTypeSensors").click(function (e) {
    var query = sensor.Where(function (next) { return (next.type == "2"); });
    query.Subscribe(function (next) { 
        $("<li/>").html(next.timeStamp 
                        + " - " + next.type 
                        + ": " + next.value)
                  .appendTo("#SelectedTypeSensors");
    });
  });

The first line uses JQuery to wire an click event handler up to the btnSelectedTypeSensors button. The next line illustrates using Rx to filter our sensor values using the Observable.Where method of RxJs. The last several lines perform the equivalent of Console.WriteLine in our old sample by subscribing to the observable sensor passing the instance of the random generated values from our observable as a function parameter called “next” (like other lambda parameters, the name doesn’t matter here as long as you are consistent inside of your function implementation.) We then create a new List Item (li) and append the string containing the generated timestamp, type and value from the JavaScript object) and the append this new list item to the unordered SelectedTypeSensors list using JQuery. That’s a lot of power in a fairly small space.

Just like we do in regular Rx, we need to be able to not only subscribe to our observables, but unsubscribe as well. For consistency, when we subscribe to the observable, we get access to a disposable object which we hold onto. When we call dispose on it, we stop listening. Here’s the RxJs implementation of our low value sensor filter which manages the disposing of our resources:


$("#btnLowValueSensors").click(function (e) {
   if (lowValueSubsription) {
      lowValueSubsription.Dispose();
      lowValueSubsription = null;
   }
   else {
      var query = sensor
                  .Where(function (s) { return (s.value < 2) });
      lowValueSubsription = query.Subscribe(function (next) {
         $("<li/>").html(next.timeStamp + " - " 
                         + next.type + ": " 
                         + next.value).appendTo("#LowValueSensors");
      });
   };
});

In this case, we check to see if the lowValueSubscription is already subscribed to anything. If it is, we dispose it, otherwise we create a query to filter the random values to anything where the generated value is less than 2. Like the previous example, we generate list items with the filtered values and append them to the LowValueSensors list.

In the previous sample, we were able to display a message box when values exceed some level. We short-circuited the request by using the Any extension method which doesn’t exist in the RxJs implementation at this point. To replace this, we can use Observable.Take and only take the first value. We then display a pop-up box using the Javascript alert function.


$("#btnWarnOutliers").click(function (e) {
  sensor
  .Where(function (next) { return next.value > 19; })
  .Take(1) 
  .Subscribe(function (next) {
     alert(next.value + " Happened");
  });
});

The last query in the sensor samples should be straightforward by now with the exception that we don’t pass a TimeSpan object to the BufferWithTime, but rather just pass the number of milliseconds that we want to buffer. Thus to show the number of Type “2” values that occur every 3 seconds, we can do the following:


$("#btnHeartbeat").click(function (e) {
  var heartbeat = sensor
    .Where(function (next) { return next.type = "2"; })
    .BufferWithTime(3000);

  heartbeat.Subscribe(function (next) {
     var typesPerSecond = next.length;
     $("#btnHeartbeat").html(typesPerSecond);
  });
});

Working with RxJs can be a bit trickier than the .Net equivalent version, in part because the intellisense and variable names are restricted at this point to make the library a small download. Currently the Rx.js file is 30K, which although fairly small, is a consideration to keep in mind when using the library over slow network connections. The nice thing is once you’ve gotten your head twisted around into thinking functionally declarative rather than imperative, making the transition from Rx.Net to RxJs becomes a relatively painless experience (so far). If you want to try out this sample or check out the final code, try it out at http://www.thinqlinq.com/observablesensor.htm.

Posted on - Comment
Categories: Rx -

Revising the Reactive Sensor Generator

When first created the Reactive sensor sample I wasn't completely happy with it because if any of the various subscribers were disposed or triggered an OnCompleted (like the Any) clause, it would trigger a completed state to all of the other “listeners”. This is not what I intended.  To make it easy, let’s review how we created the sensor originally:


    Private _observers As New List(Of IObserver(Of SensorInfo))
    Private _running As Boolean

    Public Function Subscribe(ByVal observer As System.IObserver(Of SensorInfo)) 
                              As System.IDisposable 
                              Implements System.IObservable(Of SensorInfo).Subscribe
        _observers.Add(observer)
        Return Me
    End Function

    Public Sub StartSensor()
        If Not _running Then
            _running = True
            Dim randomizer = New Random(Date.Now.Millisecond)
            While _running
                Dim randVal = randomizer.NextDouble
                If _observers.Any Then
                    Dim info As New SensorInfo With {.SensorType = CInt(randVal * 4).ToString,
                                                     .SensorValue = randVal * 20,
                                                     .TimeStamp = Now}

                    _observers.ForEach(Sub(o) o.OnNext(info))
                End If
                Threading.Thread.Sleep(CInt(randomizer.NextDouble * 500))
            End While
        End If
    End Sub

For a quick review, we set up an internal list of observers and manually added new subscribers to this list. We didn’t have a good way of removing them from the list however through the typical dispose implementation. Then as we are looping in the while loop, we generate a new sensor reading and announce it to each of the listeners through the _observers.ForEach(0 => o.OnNext(info)). Because this was constantly running (originally on the calling thread), we had to take the extra effort to run it on a background worker.

With more reflection and further work with Reactive, we can simplify the process quite a bit here by using the Observable.Generate (or in earlier builds Observable.GenerateWithTime) to create an observable that we expose to our subscribers. The subscriber is then responsible for subscribing and disposing itself from our centralized observable. Our revised implementation to generate random sensor readings at random time intervals is as follows:


Public Class Sensor

    Private _sensorObservable As IObservable(Of SensorInfo)

    Public Sub New()
        _sensorObservable =
            Observable.Generate(Of Int16, SensorInfo)(initialState:=0,
                condition:=Function(x) True,
                iterate:=Function(inVal) inVal,
                resultSelector:=Function(x)
                                    Dim randValue = New Random(Date.Now.Millisecond).NextDouble
                                    Return New SensorInfo With {.SensorType = CInt(randValue * 4).ToString,
                                                                .SensorValue = randValue * 20,
                                                                .TimeStamp = Now}
                                End Function,
                timeSelector:=Function(x) TimeSpan.FromMilliseconds(New Random(Date.Now.Millisecond).NextDouble * 100)
            )
    End Sub

    Public ReadOnly Property SensorObservable As IObservable(Of SensorInfo)
        Get
            Return _sensorObservable
        End Get
    End Property

End Class

In new implementation we use the Obserable.Generate that we discussed when mocking the phone accelerometer. We do still need to worry about our threads because the timeSelector runs on a background thread automatically. As a result, our observables are being generated on a background thread.

When we subscribe to this, we first create a shared instance of the Sensor class and then Rx queries subscribe to the sensor’s SensorObservable property:


Dim AnySensor = sensor.SensorObservable.Any(
                  Function(s) s.SensorValue > 17)
AnySensor.Subscribe(Sub(s) MessageBox.Show(s.ToString, "OutOfRange"))

If you want to remove a subscription, keep a handle on the disposable that was returned when you subscribed and dispose it to stop listening.


Private ActiveLowValueSensors As IDisposable

Private Sub FilterLowValue_Click() Handles FilterLowValue.Click
    If ActiveLowValueSensors Is Nothing Then
        Dim lowValueSensors = From s In sensor.SensorObservable
                              Where s.SensorValue < 3
                              Select s.SensorValue

        ActiveLowValueSensors = lowValueSensors.Subscribe(
           Function(val) Console.WriteLine(val))
    Else
        ActiveLowValueSensors.Dispose()
        ActiveLowValueSensors = Nothing
    End If

End Sub

I've updated the WPF samples with this change so you can take it out for a spin if you would like. As always, let me know what you Thinq.

Posted on - Comment
Categories: Rx - VB Dev Center -

Updating Reactive Samples to 10425 build

Today, I decided to take the plunge and update my WPF and Silverlight Reactive Extensions samples to the latest (at the time of this writing) build of Rx: version 1.1.10425.0. At this point, I have purposely left the phone samples on the blog targeting the original shipping bits, so they aren’t affected at this point.

(ED: the 1.0.10605 stable and experimental releases shipped as of 6/5/2011. Looking at the release notes, this update appears to have much less breaking changes beyond those in the 1.1.10425 build).

As discussed in the Rx Forum post, this update has quite a number of breaking changes. Lee Campbell has a nice post discussing a number of these changes. If you don’t care about the changes and just want to download the revised samples, head on over the Files page and try them out:

  • RX_Wpf
    Reactive Framework samples from the "Becoming an RxPusher with the Reactive Framework" talk. With the emergence of LINQ, we discovered the power and flexibility that comes from the IEnumerable interface. This pull model makes iterating over sets of data and performing filtering, transformation, and aggregation operations easy through LINQ. However, the pull model breaks down in asynchronous and event driven environments. In evaluating the options, we discovered that the IObserverable interface and the push model were effectively analogous to the pull model of IEnumerable. As a result, we can make event driven asynchronous programming easier and more declarative by using the Reactive Framework and LINQ to Events.

    (Uploaded on 6/8/2011 - File Size 183442)

  • RX_Silverlight
    Slides and demos for using the Reactive Framework in Silverlight 4.

To help you update your existing projects, I figured I would share a bit of my experience with this update.

First of all, remove the existing references to the old builds of RX, including System.CoreEx, System.Interactive, and System.Reactive. Next download the new build either by downloading it directly from the MSDN Data Development center, or by using the Nuget Package Manager and installing the Rx-Main package. You do want to be careful here, because the older 1.0.2856.0 build is still available via Nuget as well. There are some cases where you might need access to the older build, particularly in cases where you need access to the IEnumerable extenstions that were added to complement the new ones in IObservable, like .Do. Also, note that the RxJs library is not currently updated to agree with the new method names, so you will need the older packages for RxJs support as well.

Once you have the new package downloaded and installed, add a reference to the System.Reactive assembly in your solution.

You won’t need to remove imports or using clauses from your classes because the older builds of Rx built the extensions directly on top of the System.Linq namespace. In the new builds, the LINQ extensions are now in the System.Reactive.Linq namespace, so you will need to add an Imports/using clause to System.Reactive.Linq. If you used the ObserveOnDispatcher/SubscribeOnDispatcher methods, these have been replaced by the simpler ObserveOn and SubscribeOn. As a result, you may want to add an Import/using clause for System.Threading as well so that we can access the SynchronizationContext more directly. With that in place, change your calls to .ObserveOnDispatcher as follows:


ObserveOn(SynchronizationContext.Current)

Next, we need to update some of the Observable factory implementations that we have used to create observables. The Observable.FromEvent method has changed to more closely align with Observable.FromAsyncPattern and now uses “FromEventPattern”. For example, the Mouse drag-drop example that we discussed in this post, now starts out as follows:


Dim mouseDown = From evt In Observable.FromEventPattern(Of MouseButtonEventArgs)(image, "MouseDown")
                Select evt.EventArgs.GetPosition(image)
Dim mouseUp = Observable.FromEventPattern(Of MouseButtonEventArgs)(image, "MouseUp")
Dim mouseMove = From evt In Observable.FromEventPattern(Of MouseEventArgs)(image, "MouseMove")
                Select evt.EventArgs.GetPosition(Me)

Similarly, the Observable.GenerateWithTime method has now been refined to the more generalized Observable.Generate and uses a TimeSpan override to specify the time interval.

Another such simplification was made to the Buffering and Windowing operations that we used in this post, including BufferWithTime, BufferWithCount, WindowWithTime and WindowWithCount. Now we just have a common Buffer operator that returns an IObservable<IList<T>> and Window which returns an IObservable<IObservable<T>> implementation. We can use the overload resolution to determine if we are passing in a timespan or integer and use the appropiate flavor of Buffer and Window as necessary. As a result, we can change our sorting code to the following:


Dim segmented = Sensor.Buffer(TimeSpan.FromSeconds(3))
segmented.Subscribe(Sub(val)
FilteredList.ItemsSource = From v In val 
                           Order By v.SensorValue) 

One last item that I needed to change was to restore the Do extension method on IEnumerable because I have come to love that function. Fortunately, implementing Do is relatively easy. Unfortunately, the implementation relies on iterators, so we need to put that in a C# project. Here’s the definition of the new Do extension method:


public static class IEnumerableEx
{
   public static IEnumerable<T> Do<T>(this IEnumerable<T> source, Action<T> action)
   {
        foreach (T item in source)
        {
            action(item);
            yield return item;
        }
    }
}

I’m sure you may run into other issues when updating your projects due to the breaking changes. Refer to the forum post above if you need help on your particular issue.

Posted on - Comment
Categories: VB Dev Center - Rx -

Rx for WP7 on Telerik Wednesdays Webcast

windowsphonewednesdaysFor anyone who hasn’t had the opportunity to attend one of my in person presentations, or are frustrated by the audio quality of my MIX recording, join me on Wednesday, May 25 at 11:00 EST when I discuss using the Reactive Extensions on the Windows Phone as part of Telerik’s new Windows Phone 7 Wednesdays video series.

Here’s the session details:

Becoming an Rx Pusher on your Windows Phone
with Jim Wooley on May 25th at 11am EST
https://www1.gotomeeting.com/register/419991849

The Reactive Extensions are a set of libraries that shipped on the Windows Phone and allow you to compose asynchronous operations over observable collections. Learn how to use this amazing framework for turning pull-oriented apps into push-oriented apps. Find out why it is a mind-bending technology and how it improves your ability to make async calls on the Windows Phone 7 platform.

Update: This webcast was recorded and the screen cast is available on Telerik TV.

Posted on - Comment
Categories: WP7 - Rx -

Rx Mock Accelerometer using Observable.GenerateWithTime

When people ask about resources to learn how to use Rx on the Windows Phone, I often point them to the How to guides, including the How to: Use Reactive Extensions to Emulate and Filter Accelerometer Data for Windows Phone. Unfortunately, I have a problem with the underlying implementation of the emulation method which returns values through an iterator (yield) and IEnumerable<T> and then cast the IEnumerable to IObservable. As I pointed out in a MSDN forum post, It would be better to recommend returning just an IObservable natively and replace the yield with OnNext(T).

In reviewing this for my MIX 2011 presentation, I decided to take another look at the implementation and found a much better solution. When mocking the accelerometer, we essentially want to issue new values at a regular interval. To start, let’s take a look at the mocking method from the original How to guide:


static IEnumerable<Vector3> EmulateAccelerometerReading()
{
    // Create a random number generator
    Random random = new Random();

    // Loop indefinitely
    for (double theta = 0; ; theta+=.1 )
    {
        // Generate a Vector3 in which the values of each axes slowly drift between -1 and 1 and
        // then normalize the vector
        Vector3 reading = new Vector3((float)Math.Sin(theta), (float)Math.Cos(theta * 1.1), (float)Math.Sin(theta * .7));
        reading.Normalize();
                
        // At random intervals, generate a random spike in the data
        if (random.NextDouble() > .95)
        {
            reading = new Vector3((float)(random.NextDouble() * 3.0 - 1.5),
             (float)(random.NextDouble() * 3.0 - 1.5),
             (float)(random.NextDouble() * 3.0 - 1.5));

        }

        // return the vector and then sleep
        yield return reading;
        Thread.Sleep(100);
    }
}

 

Essentially, this implementation sets up an infinite loop and issues new values using the yield return statement and then sleeps 100 milliseconds. As I wrote last year in the MSDN forum, a better option is to return an IObservable and use OnNext rather than yield return as follows:


public static IObservable<Vector3> GetEmulator()
{
     var obs = Observable.Create<Vector3>(subscriber =>
                {
                    Random rnd = new Random();
                    for (double theta = 0; ; theta += .1)
                    {
                        Vector3 reading = new Vector3(
                            (float)Math.Sin(theta),
                            (float)Math.Cos(theta * 1.1),
                            (float)Math.Sin(theta * .7));
                        reading.Normalize();
                        if (rnd.NextDouble() > .95)
                        {
                            reading = new Vector3(
                                (float)(rnd.NextDouble() * 3.0 - 1.5),
                                (float)(rnd.NextDouble() * 3.0 - 1.5),
                                (float)(rnd.NextDouble() * 3.0 - 1.5));
                        }
                        // return the vector and sleep before repeating
                        Thread.Sleep(100);
                        subscriber.OnNext(reading);
                    }
                });
      return obs;
}

In preparation for Mix, I decided to revisit this and replace the looping implementation with the native Observable.GenerateWithTime method as follows:


public static IObservable<Vector3> GetAccelerometer()
{
    var obs = Observable.GenerateWithTime<double, Vector3>(
                initialState: 0, 
                condition: _ => true,
                resultSelector: theta => new Vector3((float)Math.Sin(theta),
                                        (float)Math.Cos(theta * 1.1),
                                        (float)Math.Sin(theta * .7)),
                timeSelector: _ => TimeSpan.FromMilliseconds(100),
                iterate: theta => theta + .1)
                .ObserveOnDispatcher();

     return obs;
 }

Notice now we no longer have a loop in our code or a timer sleep. Those pieces are essentially handled for us inside the implementation of the GenerateWithTime. Let’s break this object instantiation down a bit to know what’s going on.

Observable.GenerateWithTime takes two generic type arguments, the double represents the type which we will increment as new values are generated. This allows us to set range values and incrementors similar to how we setup the “for” loop and potentially escape from the loop as necessary. The second generic type indicates the type of values that the observable will create. In this case, we return Vector3 types from the phone’s XNA libraries.

The first input parameter (initialState) sets the starting value of the for loop.

The condition parameter takes a lambda expression allowing us to evaluate if we should continue issuing values. In this case, we’ll keep issuing values as long as something is subscribed to us, thus we return “true”.

The resultSelector parameter takes a input value which is the current value of our iteration and returns some new value based on that input value. Here is where we generate our new vector value based on the current value.

The timeSelector parameter allows us to specify how often we want to issue new values. This would be the same as our sleep value from the original examples.

The iterate parameter allows us to increment our looping value. This is the same as the final portion of the original “for” loop declaration.

Before we return the observable, I add a method to ObserveOnDispatcher to make sure that we delegate back to the dispatcher thread. If we didn’t do this, the GenerateWithTime moves our execution context to the timer’s thread that it generates rather than leaving us on the calling thread.

Using the GenerateWithTime method allows us to abstract the looping implementation details and provides a more declarative/functional mechanism for generating the observable values. The nice thing is that we can consume this just as we would any other observable.

If you want to see this example in action, download my Rx for WP7 samples from MIX and try out the WP7 accelerometer sample. You will see that the mock value is used when you are inside the emulator and the actual accelerometer values are used when connected to a device. Regardless of how the values are generated, we process them with the same Rx subscription pipeline:


subscribed = accelReadings.Subscribe(args =>
             {
                 Single multiplier;
                 if (Single.TryParse(this.MoveMultiplier.Text, out multiplier))
                 {
                     ButtonTransform.X = args.X * multiplier;
                     ButtonTransform.Y = args.Y * multiplier;
                 }
             });
Posted on - Comment
Categories: WP7 - Rx - C# -

Presenting Rx and Windows Phone 7 at Mix

MIX11_BB_I'mSpeakingAt_1I’m happy to have been selected to speak at MIX this year. This is an exciting conference combining developers and designers and focusing on increasing application User Experiences. The focus of my talk will be to show some practical uses of Rx in the context of the Windows Phone 7. If you’re going to attend MIX, I would love to see you at this session. Otherwise, the sessions are typically recorded. I’ll add a link to the recording once it becomes available. Here’s the session description as it’s listed on the Mix website.

Rx: A Library for Managing Asynchronous Data and Events in your Windows Phone 7 Application

Lagoon B on Tue, Apr 12 3:30 PM - 4:30 PM

How do you manage the dizzying array of input sources, ranging from traditional UI events and external service requests to new user interface touch gestures and device sensor detections, while keeping your Silverlight and Phone applications responsive? In this session, you’ll discover how the Reactive Extensions (Rx) library simplifies the programming model by letting you declaratively compose increasingly complex asynchronous operations over these diverse data sources. We will demonstrate some practical uses of Rx for Windows Phone 7 by building a dice playing game including responding to user interactions, creating gestures from the device’s sensors, and making Asynchronous service calls. In the end you’ll learn how to coordinate pushing data around using Rx.

Posted on - Comment
Categories: WP7 - Rx -

Using RX to detect shake Gestures

Part of the power of RX lies in it’s ability to compose complex operations  and keep the resulting code maintainable. I previously showed how to perform Drag-Drop operations with RX. This time, I want to take a look at a slightly more complex operation: Detecting “Shake” gestures on the Windows Phone 7.

The phone includes the ability to detect motion in 3D space through the built-in Accelerometer in the Microsoft.Devices.Sensors library. This sensor raises events when the phone is moved with information about how forcefully it was moved in the EventArgs. Detecting shakes is more complex than just knowing if the device was moved. We need to make sure that the user’s motion was aggressive enough to warrant a shake detection.

In addition, we need to know if the user moved the phone aggressively enough multiple times within a small enough time span. Simply monitoring the ReadingChanged event doesn’t fill the needs of detecting a real “Shake”. To manage all of these state changes and the times that each change occurs with traditional imperative code, we would either need to set up a number of queues remembering each motion that exceeds the tolerance and the times each happens and then act upon them when a sufficient number of these movements happen within a given time threshold. GoogleBinging this finds a number of sample implementations including Joel Johnson’s article and the recently released Shake Gesture Library. Both of these versions work with traditional events and manage the state internally.

If we use RX, we can simplify the code a bit by taking advantage of Observable.FromEvent to create an observable collection from the Accelerometer.ReadingChanged event, and the TimeInterval method to track the amount of time that passes between each accelerometer reading that exceeds the given tolerance (MinimumOffset).


Imports System.Linq
Imports Microsoft.Devices.Sensors
Imports Microsoft.Phone.Reactive

Public Module ShakeObserver
    Const MinimumOffset = 1.44
    Const TimeThreshold = 200

    Public Function GetObserver(ByVal accel As Accelerometer) As IObservable(Of IEvent(Of AccelerometerReadingEventArgs))

        Dim readingChangedObservable = Observable.FromEvent(Of AccelerometerReadingEventArgs)(accel, "ReadingChanged")

        Dim query = From knocks In
                    (From startEvent In readingChangedObservable
                     Where (startEvent.EventArgs.X ^ 2 + startEvent.EventArgs.Y ^ 2) > MinimumOffset).
                    TimeInterval
                    Where knocks.Interval.TotalMilliseconds < TimeThreshold
                    Select knocks.Value

        Return query
    End Function
End Module

We can then consume this ShakeObserver in client code as we would any other Observable collection.


Dim accel As New Accelerometer
accel.Start
Dim query = From shake in GetObserver(accel)
            Select shake

query.Subscribe(Sub(_) DoSomething())

Of course, if we are composing even more complex interactions, the power of using Observables here would be even greater as that’s where RX truly shines.

Posted on - Comment
Categories: Rx - VB Dev Center - WP7 -

Reactive Extensions responding to UI events

One of the great things about the Reactive Extensions is that they allow you to express rather complex interactions simply. For this example, we’ll consider the mouse drag drop operation in Silverlight. Note: The identical code works in both the web based Silverlight and the Windows 7 phone. If you want to download the phone version of this code, it is available in C# or VB.

One of the indicators of simplicity is if you can explain a concept to your mother. How would you explain drag-drop to your mother?  It may go something like this:

  • Record the start location when you press the mouse button down.
  • While you are moving the mouse, record the end location until you let the mouse button up.
  • Calculate the difference between the start and end locations and move the image you dragged accordingly.

Now, let’s see how we can do this in code. The beauty of the Reactive programming model is that we can compose multiple expressions together in a concise manner. Here’s how we accomplish the above process:


Dim q = From startLocation In mouseDown
        From endLocation In mouseMove.TakeUntil(mouseUp)
        Select New With {
            .X = endLocation.X - startLocation.X,
            .Y = endLocation.Y - startLocation.Y
        }

As you can see, this code reads nearly the same as the description you gave to your mother. It is actually a bit more concise than the slightly more verbose description.

At this point we have a couple details to wrap up. First, we need to declare the variables for mouseDown, mouseMove and mouseUp that we used above. We do this by using RX to subscribe to the events as discussed in this post.


Dim mouseDown = From evt In Observable.FromEvent(Of MouseButtonEventArgs)(image, "MouseLeftButtonDown")
                Select evt.EventArgs.GetPosition(image)
Dim mouseUp = Observable.FromEvent(Of MouseButtonEventArgs)(Me, "MouseLeftButtonUp")
Dim mouseMove = From evt In Observable.FromEvent(Of MouseEventArgs)(Me, "MouseMove")
                Select evt.EventArgs.GetPosition(Me)

While we’re here, notice that we are grabbing the MouseLeftButtonDown event of the image. However, we track the mouseMove and mouseUp on the form itself. We could use the MouseMove and MouseLeftButtonUp events of the image, but if we try to move too fast, the time Silverlight takes to calculate that the mouse is moving on the image rather than the canvas can mean that your movement is detected off of the image before you’ve been able to move the image. Tracking on the form itself drastically increases performance and reduces the possibility that you will move off of the image too soon.

The last thing we need to do is to move the image to the new location. In this sample, we placed the image on a Canvas. We just need to use the distance we recorded in our query and subscribe to the observable with an action that moves the image:


q.Subscribe(Sub(value)
                Canvas.SetLeft(image, value.X)
                Canvas.SetTop(image, value.Y)
            End Sub)

If you want to see this code in action, the VB version is available in the WPF samples and WP7 samples on the download page. The C# sample is in the Silverlight RX samples and WP7 samples also on the download page.

Posted on - Comment
Categories: VB Dev Center - Rx -

Reactive Extensions Phone 7 samples in VB

wp7_signature_banner_smFor a while, those of us who love Visual Basic have been struggling to make sure that newly released platforms include support for VB. When platforms that cater to the hobbyist, such as the new Windows Phone 7 tools are introduced without support for VB, we are particularly saddened. Happily, the team worked hard to overcome this shortcoming and announced today availability of the Windows Phone 7 tools in Visual Basic using Silverlight. You can download the tools now.image

In celebration of this opportunity, I have converted my RX samples over to VB and made them available on my downloads page. I’ll post explanations of each of the samples over the next few days. For now, feel free to download the samples and try them out for yourself. Here’s the list of samples that are included:

Posted on - Comment
Categories: VB Dev Center - Rx -

Reactive Framework Learning Resources for RxNet and RxJS

As I present more about the Reactive Framework, I often get people asking where they can learn more about it. Here are some resources that I’ve found useful:

Of course there’s nothing that beats learning by doing, so go out and try the bits yourself. Don’t be surprised when you hit a wall, but that’s when the real learning starts, trying to figure out how to overcome the challenges.

Is there a resource that I’m missing here that’s helped you? I’m happy to add it.

Posted on - Comment
Categories: Rx -

Reactive Framework Sorting it out

When I started looking at the Reactive Framework, one of the first things I did was to try creating some of the same standard LINQ queries that I’ve used against LINQ to Objects:


        Dim unsorted = From s In Sensor
                       Where s.SensorType = "2"
                       Order By s.SensorValue
                       Select s

If you try this where Sensor is an IObservable rather than IEnumerable, you will find that the Order By clause generates the following compiler error in VB: Definition of Method OrderBy is not accessible in this context. C# generates a similar but different error: “Could not find an implementation of the query pattern for source type IObservable<T>. OrderBy not found.” Essentially, the compiler is telling you that there isn’t an extension method called OrderBy that extends IObservable. Did the reactive team make a mistake and forget to implement sorting? Far from it.

Let’s consider the uses of the standard query operators  over a data source where you don’t necessarily know when the source ends. “From” doesn’t really exist, it’s just a query holder for identifying the local variable name (s) used later in the query and the source of the data (Sensor).

With “Where”, we are filtering the results. We can filter results over an ongoing stream without needing to know when the stream will end. As a result, filtering isn’t much of an issue.

Similarly, “Select” simply takes the input type and transforms it into another type. This is commonly referred to as a Projection. Since projections work equally well over data streams, we are fine implementing that in Reactive.

Sorting on the other hand is a bit more problematic. Consider the case where we process the following values: 1, 2, 4, 3, 5. It’s not difficult to sort these values and return them. However, what would happen to our sort if the next value that was sent was 0? We would need to reevaluate the entire result set and inject our new value before the first value that came in. In dealing with continuous event streams, we have no way of knowing whether the next value we are going to receive will need to be inserted prior to other results.

As a result, we need to partition the sets of data we receive if we need to sort these values so that we can be assured of knowing when the last value is received from this set. The Reactive Framework supports a number of partitioning methods, including BufferWithTime, BufferWithCount, and BufferWithTimeOrCount. With these methods, we can partition our streams into pre-determined chunks based on a timespan, and/or item count. The result is a new stream of IObserverable objects that contain an IList of the original data type. In the case of our Sensors, we can partition our result sets as follows:


Dim segmented = Sensor.BufferWithTime(TimeSpan.FromSeconds(3))

This creates a variable of type IObservable(Of IList(Of SensorInfo)). If we wanted, we could then display the sorted values in the partitioned lists as follows:


  segmented.Subscribe(Sub(val) FilteredList.ItemsSource =
                                         From v In val
                                         Order By v.SensorValue)

As you can see, you CAN sort values using the Reactive Framework using partitioning schemes, but it doesn’t make as much sense over data streams as it does with IEnumerable data sources typically encountered with LINQ.

Posted on - Comment
Categories: Rx - VB Dev Center -

Reactive Framework Subscribing to Events

Previously in my Reactive framework series, we saw how to create and subscribe to ongoing observable objects. While there are a number of cases where you would want to create your own observable type, often you simply want to compose reactive sequences in response to events raised by other means. Recently, I came across a simple example that can  show you how easy it is to subscribe to event and add functionality through the Reactive Framework’s extension methods.

In this scenario, I needed to update a list of most recently used files in real time. Whenever a new file was added , modified or deleted from a directory, I wanted my UI list to reflect this change. I’ve long known about the FileSystemWatcher class in Windows Forms. It is able to listen for create, change, delete and modify events in a specified file path and let us know when the file changes. Using Rx, we can create an observable using the following:


   Dim createWatcher As New FileSystemWatcher With {.Path = "C:\Temp", .EnableRaisingEvents = True}
   Dim createdEvent = Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Created")

Using the Observable.FromEvent, we indicate that we want to watch for events with the specified name (Created) from the supplied instance object (createWatcher). With this observable, we can now perform other operations on the resulting events. We’ll use the “Do” method to perform an action (refreshing the file list). Before we do this action on the UI, we’ll need to make sure to synchronize back to the UI thread:


    Dim AllEvents = Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Created").
                    ObserveOnDispatcher.
                    Do(Sub(fsArgs) RefreshFileList()).
                    Subscribe

This would be fine if we only wanted to watch for the events when the file is first created. However, in a Most Recently Used (MRU) list, we want to also know when a file is changed or deleted. Rather than wiring up separate handlers for each of these events, we can use the Merge method to listen to any of these events regardless of which event handler they came from:


    Dim AllEvents = Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Created").
                    Merge(Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Changed")).
                    Merge(Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Deleted")).
                    ObserveOnDispatcher.
                    Do(Sub(fsArgs) RefreshFileList()).
                    Subscribe

One of the great things about the Reactive Framework is the ability to inject functionality into the event pipeline easily. For example, if we want to avoid responding to multiple events on the same file, we could inject the DistinctUntilChanged method as follows:


    Dim AllEvents = Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Created").
                    Merge(Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Changed")).
                    Merge(Observable.FromEvent(Of FileSystemEventArgs)(createWatcher, "Deleted")).
                    DistinctUntilChanged.
                    ObserveOnDispatcher.
                    Do(Sub(fsArgs) RefreshFileList()).
                    Subscribe

Quick and easy (and elegant as well.) If you want to try this out, download RX_Wpf in the files section here and run the FileWatcher.xaml file.

Posted on - Comment
Categories: VB Dev Center - Rx -

Reactive Framework as a Background Worker

In this introduction to the Reactive Framework series, we’ve spent a bit of time setting up our Observable and Observers and wiring them up. If you haven’t been following along, here’s links to the previous posts:

So far, our observers can listen to our sensor, but it turns out, we can’t know about it because everything is happening on the main worker thread. Because the thread is continually processing, the UI locks us out of seeing the updates. In order to solve this, we need to run our sensor on a secondary thread.

With Reactive Framework, we often talk about “Hot” and “Cold” observables. Hot observables are ones which are running independently of the subscription. Cold observables are ones where the process starts when you subscribe to it. In our case, we’re simulating an ongoing sensor that we are connecting many observers to. In this case, we are dealing with a “Hot” observable. As a result, we’ll explicitly manage the sensor using the BackgroundWorker object in our “Start” button handler:


        Dim worker As New BackgroundWorker
        AddHandler worker.DoWork, Sub(s As Object, ars As DoWorkEventArgs)
                                      Sensor.StartSensor()
                                  End Sub
        worker.RunWorkerAsync(Sensor)

Now, when we run our sample and output our results using Console.WriteLine, we see our results and we can continue to click on other buttons in our application. However, if we try to output the results to our user interface, we see the following exception:

     InvalidOperationException: The calling thread cannot access this object because a different thread owns it.

If you’ve ever worked with background threads in Windows Forms, WPF or Silverlight, you should recognize that you can’t access the UI thread from a background thread directly. One of the key scenarios that the Reactive Framework was designed to combat was asynchronous operations. As a result, they took great effort to make synchronizing these threads easy. Two of the extension methods on IObservable are SubscribeOn and ObserveOn. SubscribeOn is used indicate where the operations that we are subscribing to will be performed. ObserveOn is used to indicate where we want to process the results.

In our case, we need to move back to the UI thread when we process the results, thus we need to synchronize our threads when we Observe, thus we will use the ObserveOn option. To make matters easier, the Reactive team have included a special variant of the ObserveOn to synchronize it on the dispatching thread: ObserveOnDispatcher. We can alter our subscribing code as follows to make sure we observe our subscription on the UI Thread:


        Dim items = New ObservableCollection(Of Double)
        FilteredList.ItemsSource = items

        Dim TypeSensors = From s In Sensor
                       Where s.SensorType = "4"
                       Select s.SensorValue

        TypeSensors.ObserveOnDispatcher.Subscribe(
            Sub(item) items.Add(item))

To see this sensor and various observables in action, download the corresponding WPF project for this series.

Posted on - Comment
Categories: VB Dev Center - Rx - Visual Studio -

Reactive Framework Subscribing to Observables

It’s been a while since I started the Reactive Framework series. In case you missed the earlier posts, here’s the links to what we’ve done so far:

At this point, we’ve created our observer and set up the the logic that handles our OnNext values. What we haven’t done yet is wired our LINQ based processing pipeline to the event source. To do this, we need to Subscribe the handler to the Observables. By default, we need to create a new class that implements IObserver. To keep this simple, let’s just output the values to the console for now:


Class ConsoleObserver(Of T)
    Implements IObserver(Of T)


    Public Sub OnCompleted() Implements System.IObserver(Of T).OnCompleted

    End Sub

    Public Sub OnError(ByVal [error] As System.Exception) Implements System.IObserver(Of T).OnError

    End Sub

    Public Sub OnNext(ByVal value As T) Implements System.IObserver(Of T).OnNext
        Console.WriteLine(value.ToString)
    End Sub
End Class

The IObserver interface has three methods: the method that is fired when the source is done (OnCompleted), the method that occurs when an exception occurs (OnError) and the method that is used when each new value is received (OnNext). In the case of this simple example, we only implement the OnNext method and output the value to the Console Window.

With this in place, we can tie this all together creating and starting our sensor, filtering and projecting from the values (using LINQ) and displaying the values (through the new ConsoleObserver):


Dim Sensor As New ObservableSensor
Sensor.Start()

Dim lowvalueSensors = From s In Sensor
                      Where s.SensorValue < 3
                      Select s.SensorValue

lowvalueSensors.Subscribe(New ConsoleObserver(Of Double))

Thankfully, consuming our Observable chain doesn’t require creating a new class. Observable.Subscribe offers an additional overload which, instead of taking an IObservable, we can use an Action Lambda. As a result, we can restate our example above as follows:


Dim Sensor As New ObservableSensor
Sensor.Start()

Dim lowvalueSensors = From s In Sensor
                      Where s.SensorValue < 3
                      Select s.SensorValue

lowvalueSensors.Subscribe(Sub(value) Console.WriteLine(value))

While this consuming code is slightly longer, the total net effect is significantly more maintainable code since we don’t need to declare a separate class to just output our results.

We now could have a fully functioning set of examples. Unfortunately or example at this point is extremely unresponsive because we are completely CPU bound constantly running all of the process on the current thread. Up next time, moving our logic to the background thread.

Posted on - Comment
Categories: Rx - VB - VB Dev Center -

Reactive Framework Getting your LINQ on

Last time in our exploration of the Reactive Framework, we built a random Observable event generator. Now that we have our data source, we can start working with it. In the past, we would have hooked up event handlers to the event delegate and imperatively interacted with the values passed in the sender and EventArgs. Of course, when we Thinq LINQ, we try to find simpler, more declarative models to represent our intent.

To start, we need to instantiate and start our event generator:


Private Sensor as New ObservableSensor
Sensor.StartSensor()

Now that we are generating Observables, we can process them using LINQ query comprehensions. For example, if we wanted to filter out only the sensors who's type is "4", we could use this LINQ:


Dim TypeSensors = From s In Sensor
                  Where s.SensorType = "4"
                  Select s

If we wanted to filter out only those sensor readings that are low (less than 3), and only return the sensor's value, we could include the filter (Where) and projection (Select). The following results in an IObservable(Of Double) rather than IObservable(Of SensorInfo) that we started with.


Dim lowValueSensors = From s In Sensor
                      Where s.SensorValue < 3
                      Select s.SensorValue

Of course, if you prefer the Lambda syntax over query comprehensions, you can use those interchangeably with rX just as you would with LINQ. The following query waits for the first case where the sensor value is high (over 17) and fires OnNext returning a boolean once the value is hit.


Dim AnySensor = Sensor.Any(Function(s) s.SensorValue > 17)

All's well in LINQ land with rX, right? Well kind of. Doing these simple projections and filters are straight forward. However, if we start trying to use sorting, grouping, and aggregations, we start running into additional challenges. With these query types, we can't start returning results until the entire set of events is known. Since we're working with a potentially infinite stream of events, we will need to figure out how to partition the results and work with those segments. That will be a task for a future post.

Posted on - Comment
Categories: Rx - VB Dev Center - VB -

Reactive Framework Why bother

Now that LINQ has been out for a while, I’m starting to work on a new set of talks that are related to some of the new extensions to LINQ. Although LINQ to databases (SQL, Entity Framework, etc) are interesting, I’ve always been partial to the more generalized abstraction that is found in LINQ to Objects and XML. One of the most compelling expansions on the core LINQ concepts at this point is the Reactive Framework (rX).

With the Reactive Framework, we flip the paradigm of Enumeration and pulling for more results upside down. Instead of pulling for the next record, the Reactive Framework “Reacts” to events as they are pushed through the event pipeline. Instead of calling MoveNext as we do with IEnumerable, we react to OnNext events.

So when would you use this Reactive framework? Essentially, you can use it anywhere you are responding to events or work with continuous streams. Some sample cases are:

  • Handling UI Events (Mouse clicks, Touch gestures)
  • Working with Asynchronous Web requests (Web services)
  • Analyzing sensor data from Manufacturing, Power, Health care, Telecom, etc. devices. real time.
  • Processing data from continuous data streams (Log files, Stock feeds, etc.)

Many of the samples out there currently demonstrate the first two scenarios, but there aren’t too many examples (yet) on the other scenarios. Over the next few months, I hope to dig into the Reactive Framework and related IObservable based technologies (like SQL Server 2008 R2 StreamInsight) and offer some practical examples where the Reactive Framework can make your programming life easier.

Update: Here are a couple reasons why Rx is useful as compared to traditional events as well:

  • You can replace implementation for mocking/testing purposes
  • Compose complex operations over multiple event/observable streams
  • LINQ like filtering and projection for common programming model.
  • Easy to clean up resources with Dispose
Posted on - Comment
Categories: Rx -

Reactive Framework Building an IObservable Event Generator

In my last post, I mentioned a number of cases where you may want to use the Reactive Framework. For some upcoming presentations, I wanted to focus on a couple of these scenarios, particularly on how you can use the Reactive Framework (rX) to work with events from device sensors. You can often find these kind of sensors in a number of industries, including Robotics, automated manufacturing systems, Medical monitors, Telecom usage, and Live financial feeds. In order to demonstrate using rX in this environment, I needed to build a module that simulated generating a bunch of sample random events. Below is the module that I created. We’ll use this module in some of the future discussions of Reactive Framework.

We’re going to start with a small class that will contain the state of the individual sensor events. We’ll call this our SensorInfo class. It will hold values for the date and time that the event occurred, an indicator on the sensor’s type and the value that the sensor returns. We'll also override the ToString method to allow us to output the values easily.


Public Class SensorInfo
    Public Property TimeStamp As DateTime
    Public Property SensorType As String
    Public Property SensorValue As Double

    Public Overrides Function ToString() As String
        Return String.Format("Time: {0}  , Type: {1}  Value: {2}", TimeStamp, SensorType, SensorValue)
    End Function
End Class

Now that we have our instance class, we can create a class that will generate these sensor items randomly. (This class is not thread safe, nor is it truly random so don't use it in production applications. It is merely designed for demonstration purposes.)


Public Class ObservableSensor

    Private _running As Boolean

    Public Sub StartSensor()
        If Not _running Then
            _running = True
            Dim randomizer = New Random(Date.Now.Millisecond)
            While _running
                Dim randVal = randomizer.NextDouble
                Dim info As New SensorInfo With {.SensorType = CInt(randVal * 4).ToString(),
                                                 .SensorValue = randVal * 20,
                                                 .TimeStamp = Now}
                End If
                Threading.Thread.Sleep(CInt(randomizer.NextDouble * 500))
            End While
        End If
    End Sub

    Public Sub StopSensor()
        _running = False
    End Sub

End Class

In this class, we maintain an internal variable (_running) which tracks whether the sensor is running or not. We also have a method that Starts the sensor and stops it. While the sensor is running, we essentially generate a number of SensorInfo instances with randomized values and then pause for a random period of time before creating another value. At this point, the values that are returned don’t have much meaning. We could easily change this to return stock quotes, manufacturing defects or other sensor responses by manipulating the values this randomizer generates.

Now that we can generate random SensorInfos, we need to actually do something with them. In the past, we could just raise an event for consumers to handle after we generate each sensor’s value. Since we want to leverage the power of the new IObservable/IObserver interfaces and the Reactive Framework, I’ll make this class implement IObservable(Of T) so that we can register a number of IObserver clients and notify them each time we generate a new sensor.

The IObservable(Of T) interface requires of a single method: Subscribe. This takes a single parameter which is the IObserver client that wants to listen to our sensor data. It returns a class that implements IDisposable (so that we can make sure each of our observers know when we’re done sending them data). Since the return object here is actually the ObservableSensor itself, we need to implement both IObservable and IDisposable. Here's our revised ObservableSensor class.


Public Class ObservableSensor
    Implements IObservable(Of SensorInfo)
    Implements IDisposable

    Private _observers As New List(Of IObserver(Of SensorInfo))
    Private _running As Boolean

    Public Function Subscribe(ByVal observer As System.IObserver(Of SensorInfo)) 
                              As System.IDisposable 
                              Implements System.IObservable(Of SensorInfo).Subscribe
        _observers.Add(observer)
        Return Me
    End Function

    Public Sub StartSensor()
        If Not _running Then
            _running = True
            Dim randomizer = New Random(Date.Now.Millisecond)
            While _running
                Dim randVal = randomizer.NextDouble
                If _observers.Any Then
                    Dim info As New SensorInfo With {.SensorType = CInt(randVal * 4).ToString,
                                                     .SensorValue = randVal * 20,
                                                     .TimeStamp = Now}

                    _observers.ForEach(Sub(o) o.OnNext(info))
                End If
                Threading.Thread.Sleep(CInt(randomizer.NextDouble * 500))
            End While
        End If
    End Sub

    Public Sub StopSensor()
        _running = False
    End Sub


#Region "IDisposable Support"
    Private disposedValue As Boolean ' To detect redundant calls

    ' IDisposable
    Protected Overridable Sub Dispose(ByVal disposing As Boolean)
        If Not Me.disposedValue Then
            If disposing Then
                If _observers IsNot Nothing Then
                    _observers.ForEach(Sub(o) o.OnCompleted())
                    _observers.Clear()
                End If
                ' TODO: dispose managed state (managed objects).
            End If

            ' TODO: free unmanaged resources (unmanaged objects) and override Finalize() below.
            ' TODO: set large fields to null.
        End If
        Me.disposedValue = True
    End Sub

    ' This code added by Visual Basic to correctly implement the disposable pattern.
    Public Sub Dispose() Implements IDisposable.Dispose
        ' Do not change this code.  Put cleanup code in Dispose(ByVal disposing As Boolean) above.
        Dispose(True)
        GC.SuppressFinalize(Me)
    End Sub
#End Region

End Class

In this new version, we now have a new _observers object that maintains a list of the observers (clients). This allows us to notify multiple sensor handlers and work with them how they deem appropriate. The subscribe method simply takes the supplied observer and sticks it in the collection.

When we start the sensor, we now check to see if there are any observers (using the LINQ .Any method). If we do, we’ll generate the random sensor data. We then notify all of the listeners using the list .ForEach method passing the lambda expression instructing the observer to invoke it’s OnNext handler (part of the IObserver(Of T) implementation. This is the method which corresponds to IEnumerable’s MoveNext. It is this method which will trigger our reactive framework’s event pipeline to begin processing our sensor notifications.

When we’re done, we need to clean up our resouces. In the Disposing event, we make sure that we call the OnCompleted method on each (ForEach) of the observers in our _observers collection. We also clear the observer collection to remove the reference pointers between the client and our sensor generator.

There you have it, a generic random event generator that we can consume with the Reactive Framework (or similar technologies like StreamInsight). Next time, we’ll start to consume these events.

As always, let me know what you Thinq and if there are any modifications I should consider.

Posted on - Comment
Categories: Rx - VB - VB Dev Center - Visual Studio -