SignalR and Reactive Extensions are an Rx for server push notifications by ThinqLinq

SignalR and Reactive Extensions are an Rx for server push notifications

Recently, I had the need to build a system where multiple clients needed to be notified as changes happened on the server. While watching Damian Edwards, Brad Wilson and Levi Broderick present on Async in ASP.Net during the AspConf I was introduced to the dynamic simplicity that SignalR brings to the table. I was even more intrigued by the fact that it integrates directly with IObservable and the Reactive Extensions. After using it for a week, I’m truly impressed by what they’ve done with this library. To give you an idea of what I mean, let’s take my ObservableSensor demo which generates random values and see how we can use SignalR to expose these values over a distributed client environment.

Reactive Extensions on the Server

To begin, let’s look at the server. Here we will use the Observable.Generate method to generate some random values with associated random categories and the timestamp when the value was generated:

Option Strict Off

Imports Microsoft.VisualBasic
Imports System.Reactive.Linq
Imports SignalR

Public Class ObservableSensor

    Public Sub New()
        Dim rand = New Random(Now.Millisecond)

        Dim Generator = Observable.Generate(Of Double, SensorData)(
            initialState:=0,
            condition:=Function(val) True,
            iterate:=Function(val) rand.NextDouble,
            resultSelector:=Function(val) New SensorData With 
                                          {
                                              .Time = Now,
                                              .Value = val * 20,
                                              .Category = (CInt(val * 4)).ToString()
                                          },
            timeSelector:=Function(val) TimeSpan.FromSeconds(val))

        Generator.Subscribe(Sub(value)
                                Dim context = GlobalHost.ConnectionManager.GetHubContext(Of ObservableSensorHub)()
                                context.Clients.Broadcast(value)
                            End Sub)
    End Sub
End Class

Public Class SensorData
    Public Property Time As DateTime
    Public Property Category As String
    Public Property Value As Double
End Class

If you recall, we discussed the Observable.Generate implementation last year. The new part here occurs in the Subscribe implementation.

SignalR across the tiers

In this case, we are going to “Broadcast” our newly created methods to anyone listening. In this case, we are publishing our notifications without having a direct connection to the SignalR hub. We can grab a hold of it using the GlobalHost ConnectionManager to get the hub in our AppDomain for a type of ObservableSensorHub. What is this Hub thing you may ask. Well, here is the implementation for the ObservableSensorHub:

Imports SignalR.Hubs

Public Class ObservableSensorHub
    Inherits Hub

End Class

In case you’re wondering, no I’m not missing code here. That’s the complete implementation. We’re just creating a strongly typed instance of the SignalR.Hubs.Hub type for the the ConnectionManager to work with. In this simple application, we’re just going to start generating values when the web application starts. In the Global.asax, add the following implementation:

    Sub Application_Start(ByVal sender As Object, ByVal e As EventArgs)
        Sensor = New ObservableSensor
    End Sub

 

At this point, we now have a server that can broadcast our random values over HTTP to any clients that wish to subscribe. Before moving to the client, I need to say a bit more about the Broadcast “Method” that we are calling on on the Hub’s Clients. If you look at the type definition of Clients, you will see that there is no Broadcast method. Clients is actually a Dynamic object. At run time, we are declaring that it has a method called Broadcast. The SignalR infrastructure then knows how to translate requests for an invocation method by that name into an HTTP message to be sent to any clients (serializing the results into a JSON object using Json.Net). Remember in Visual Basic, we enable the Dynamic functionality by specifying Option Strict Off at the top of our class definition.

Now how do we consume these messages? Let’s start with a console application. First, make sure that you’ve installed and added the references to the SignalR.Client library. The easiest way is to use NuGet. Rather than a bunch of text, let’s just jump to the code:

Option Strict Off

Imports SignalR.Client.Hubs
Imports System.Reactive.Linq
Imports Newtonsoft.Json.Linq

Module Module1

    Sub Main()
        Dim cn = New HubConnection("http://localhost:5687/")
        Dim sensor = cn.CreateProxy("observableSensorHub")

        sensor.On(Of SensorData)("broadcast", Sub(item) Console.WriteLine(item.Value))

        cn.Start().Wait()
        Console.ReadLine()

    End Sub

End Module

 

Here, we create a new HubConnection specifying the endpoint for our web application. SignalR does support self hosted servers if you want to use a Windows Service or other back end implementation. As long as the client can see the server over the network, you can wire it up. Second, we create a dynamic proxy specifying the hub type that we created on the server. Note here that the casing of the proxy is important and uses Camel casing even though the implementation on the server used Pascal casing. This is done to make the hubs and methods feel more natural to JavaScript clients.

Next, we specify how to handle the push notifications that the server sends. We do that using the .On method, specifying the type (SensorData) that the method should be deserialized back into. We then specify the name of the method (altering the case as mentioned above) that we are listening for along with the Action that should be invoked as each value is received. In this case, we’ll just output the value that we received to the console window.

RX in the Client

At this point we have Rx pushing messages to the client via SignalR. Let’s take it a step further and add some Rx goodness on the client side as well. In addition to the On method, the Proxy also supports an Observe method which turns the message pump into an IObservable of an array of Objects where each of the method parameters are contained in that array. Since our Broadcast method only has a single parameter of type SensorData, we will grab it by getting the first array element and calling the Json.Net .ToObject implementation to translate it back into our strongly typed object. From there, we work with it just as we would any other Observable sequence. For example, to output only the generated values for Category 1, we could use the following:

        Dim cn = New HubConnection("http://localhost:5687/")
        Dim sensor = cn.CreateProxy("observableSensorHub")

        Dim items = From item In sensor.Observe("broadcast")
                    Let instance = item(0).ToObject(Of SensorData)()
                    Where instance.Category = "1"
                    Select instance

        Using items.Subscribe(Sub(value) Console.WriteLine(value.Value))

            cn.Start().Wait()
            Console.ReadLine()

        End Using

In this case, we Start the connection inside of our subscription’s Using clause along with the Console.ReadLine. Once a key is pressed, the subscription is disposed freeing our resource.

One of the nice things is the flexibility that SignalR offers. Pretty much anything that speaks HTTP can consume these messages from our Rx Server. If we wanted to consume the same messages in a web client, we could use the following Javascript code:

      $(function () {
          // Proxy created on the fly
          var hub = $.connection.observableSensorHub;

          // Declare a function on the chat hub so the server can invoke it
          hub.Broadcast = function (value) {
              $('#values').append('<li>' + value.Time + ': ' + value.Value + '</li>');
          };

          // Start the connection
          $.connection.hub.start();
      });

In the javascript, we connect to our server by referring to the $.connection.observableSensorHub. Notice the camel cased name here? That’s the SignalR translation in action again. We then specify the method handler for the dynamically invoked “Broadcast” method. Here we just add list item entries to the unordered list.

As I said above, I’m so impressed with SignalR so far. Don’t be surprised to see me post more about it in the future. For now however, if you want feel free to download this sample and try it out yourself. You will need to refresh the nuget packages to get it to run. Additionally, realize that the sample was built with RC builds of Visual Studio 2012, Rx 2.0, and SignalR. I don’t guarantee that the sample will continue to work once these are officially released. If you notice any issues or want to see more details, let me know what you Thinq below.

Posted on - Comment
Categories: VB Dev Center - Rx -
comments powered by Disqus