LINQ and Related Topics

Reactive Framework Subscribing to Observables

It’s been a while since I started the Reactive Framework series. In case you missed the earlier posts, here’s the links to what we’ve done so far:

At this point, we’ve created our observer and set up the the logic that handles our OnNext values. What we haven’t done yet is wired our LINQ based processing pipeline to the event source. To do this, we need to Subscribe the handler to the Observables. By default, we need to create a new class that implements IObserver. To keep this simple, let’s just output the values to the console for now:


Class ConsoleObserver(Of T)
    Implements IObserver(Of T)


    Public Sub OnCompleted() Implements System.IObserver(Of T).OnCompleted

    End Sub

    Public Sub OnError(ByVal [error] As System.Exception) Implements System.IObserver(Of T).OnError

    End Sub

    Public Sub OnNext(ByVal value As T) Implements System.IObserver(Of T).OnNext
        Console.WriteLine(value.ToString)
    End Sub
End Class

The IObserver interface has three methods: the method that is fired when the source is done (OnCompleted), the method that occurs when an exception occurs (OnError) and the method that is used when each new value is received (OnNext). In the case of this simple example, we only implement the OnNext method and output the value to the Console Window.

With this in place, we can tie this all together creating and starting our sensor, filtering and projecting from the values (using LINQ) and displaying the values (through the new ConsoleObserver):


Dim Sensor As New ObservableSensor
Sensor.Start()

Dim lowvalueSensors = From s In Sensor
                      Where s.SensorValue < 3
                      Select s.SensorValue

lowvalueSensors.Subscribe(New ConsoleObserver(Of Double))

Thankfully, consuming our Observable chain doesn’t require creating a new class. Observable.Subscribe offers an additional overload which, instead of taking an IObservable, we can use an Action Lambda. As a result, we can restate our example above as follows:


Dim Sensor As New ObservableSensor
Sensor.Start()

Dim lowvalueSensors = From s In Sensor
                      Where s.SensorValue < 3
                      Select s.SensorValue

lowvalueSensors.Subscribe(Sub(value) Console.WriteLine(value))

While this consuming code is slightly longer, the total net effect is significantly more maintainable code since we don’t need to declare a separate class to just output our results.

We now could have a fully functioning set of examples. Unfortunately or example at this point is extremely unresponsive because we are completely CPU bound constantly running all of the process on the current thread. Up next time, moving our logic to the background thread.

Posted on 6/28/2010 7:17:00 PM - Comments(1)
Categories: Rx VB VB Dev Center

Using LINQPad with MongoDb and NoRM

I’ve recently been working on a project that might be a good fit for the rising wave of Document Databases espoused by the rising NoSQL movement. Unlike the traditional Relational Databases like SQL Server, Document Databases can store object hierarchies, or unshaped data for quick and efficient saving and retrieval. There are quite a number of these databases appearing, including MongoDb, CouchDb, RavenDb. Wikipedia also has a listing of Document Databases with links to other options.

Typically, these data stores expose their data through XML or JSON interfaces. Luckily,  many of these databases also have LINQ providers to make interfacing with the systems easier. For example, if you are using MongoDb, you can use a provider like the NoRM (No-ORM) provider. Today I had the pleasure of sitting in on a webcast that Michael Kennedy of DevelopMentor did showing MongoDb with NoRM. During the presentation the question came up regarding using LINQPad in the environment. I promised to tell them how to do this, so here you go:

Install and Startup Mongo

The first step you need to do is download and startup MongoDb. To download it, go to the MongoDb site and download the version appropriate to your OS. Once you’ve downloaded it, unzip the files to your favorite directory. From the command line, start Mongo by entering the following from the bin directory that you unzipped:

mongob -dbpath=c:\projects\data\mongo

Of course, this assumes you want to put the database in the c:\projects\data\mongo path. The path does need to be present before running this command. You will be prompted to open a firewall hole for Mongo. Once you’re done, Mongo should be up and running.

Preparing NoRM

Now that the database is running, we need to prepare the .Net provider. Download the NoRM provider through Github, or directly from their download site. Unzip these files and open the appropriate sln file in Visual Studio. NoRM works in VS 2008 and NoRM – VS2010 works in VS 2010. Once you open it, compile it. Now you’re ready to roll. If you’re not sure how to get started with the NoRM provider, take a look at the many unit tests that they’ve provided to get up and running quickly.

Using LINQPad

Assuming you’ve downloaded and installed LINQPad, you should now be ready to start consuming Mongo through NoRM in LINQPad. Start by opening LINQPad and creating a new query. In order to work with our database, we need to create some class structures. As a result, you need to select the C# Program or VB Program option under the Language drop-down. (I’m choosing to do this in C# this time because the NoRM provider doesn’t fully support VB expression trees at this point.)

image 

By selecting the Program option, we can now create not only LINQ queries, but entire classes and methods. Scroll to the bottom of the code window and add a class describing the shape that we want to save and retrieve.


class Post
{
	public ObjectId Id { get; set; }
	public string Title { get; set; }
	public string Content { get; set; }
	public DateTime PubDate { get; set; }
	public string[] Categories { get; set; }
}

Keeping with the typical samples I have on this site, I’m modeling a Blog post. This will create a hierarchical shape containing posts with a collection of categories. Now that the shape is defined, we need to figure out how to insert and retrieve it using LINQPad.

To start, press F4 (or select Query and Query Properties from the menu). This will bring up a dialog to add references. In this dialog, click the Add button and locate the Norm.dll that you generated when you compiled it above when Preparing NoRM.

image

Before you close this window, select the “Additional Namespace Imports” tab at the top. This will allow us to add the Norm imports/using clauses. Add an import for Norm and Norm.Linq. (Notice here, you don’t include the “using” or “imports” keywords, just the namespaces that you want to import.) Once you’ve added the reference, close the dialog so that we can continue consuming Mongo.

image

In the code window, enter the following inside the body of the void Main() method that was generated when we selected the Program option in LINQPad:


using (var cn = MongoQueryProvider.Create("mongodb://127.0.0.1/ThinqLinq?strict=false"))
{
	var posts = cn.DB.GetCollection<Post>();
	posts.Insert(new Post
	{
		Title="Testing Linqpad",
		Content="This is a test using LinqPad",
		PubDate=DateTime.Now,
		Categories = new string[] {"NoSql", "MongoDb", "LinqPad"}
	});

	var foundPosts = from p in new MongoQuery<Post>(cn)
			where p.Title == "Testing Linqpad"
			select p;
		
	foundPosts.Dump();
}

Let’s step through this code a bit. The first line sets up our connection to the Mongo Database. We pass in the URI of the server (mongodb://127.0.0.1/) along with the name of the “database” that we want to use, or create if it hasn’t already been created (“ThinqLinq”).

This connection behaves similarly to the LINQ to SQL DataContext or Entity Framework’s ObjectContext. As with those contexts, we next need to access the “Table” for the type that we are trying to create. We do that by accessing the GetCollection method of the DB object referencing the type that we want to get (Post).

With the reference to the posts collection, we can insert a new Post object into our database by calling the Insert method. Notice, unlike a traditional RMDBS, we have not actually created anything in the database yet. This is the great thing about a Document Database. We can save our objects directly in the database without having to create the structures ahead of time.

With the object inserted into the database, we can now fetch it back using a standard LINQ query. The only difference we see here to the pattern in other LINQ providers is that we access the data source by calling MongoQuery passing in the Type that we want to fetch and the connection object that we declared at the top of the method. We view the results by calling the LINQPad Dump extension method on the results. Once completed, here’s a screen shot including the final code and results. Notice that the results do include the three items we added in the Categories array along with the single post record.

image

Interestingly, the built-in performance timer and Lambda expression options in LINQPad still work here even though we aren’t accessing a traditional database. The SQL tab remains blank because we aren’t issuing a SQL statement to Mongo.

Document Databases and the whole NoSQL movement are quite intriguing. There are plenty of times where relational data stores are a better fit, particularly when you are needing to report on related objects rather than working with object hierarchies natively. However, tools like MongoDB and NoRM make working with non-traditional hierarchical document stores quite easy as well and point to some interesting options for data storage and retrieval for other needs.

Posted on 5/25/2010 10:26:00 PM - Comments(3)
Categories: C# LinqPad

Reactive Framework Getting your LINQ on

Last time in our exploration of the Reactive Framework, we built a random Observable event generator. Now that we have our data source, we can start working with it. In the past, we would have hooked up event handlers to the event delegate and imperatively interacted with the values passed in the sender and EventArgs. Of course, when we Thinq LINQ, we try to find simpler, more declarative models to represent our intent.

To start, we need to instantiate and start our event generator:


Private Sensor as New ObservableSensor
Sensor.StartSensor()

Now that we are generating Observables, we can process them using LINQ query comprehensions. For example, if we wanted to filter out only the sensors who's type is "4", we could use this LINQ:


Dim TypeSensors = From s In Sensor
                  Where s.SensorType = "4"
                  Select s

If we wanted to filter out only those sensor readings that are low (less than 3), and only return the sensor's value, we could include the filter (Where) and projection (Select). The following results in an IObservable(Of Double) rather than IObservable(Of SensorInfo) that we started with.


Dim lowValueSensors = From s In Sensor
                      Where s.SensorValue < 3
                      Select s.SensorValue

Of course, if you prefer the Lambda syntax over query comprehensions, you can use those interchangeably with rX just as you would with LINQ. The following query waits for the first case where the sensor value is high (over 17) and fires OnNext returning a boolean once the value is hit.


Dim AnySensor = Sensor.Any(Function(s) s.SensorValue > 17)

All's well in LINQ land with rX, right? Well kind of. Doing these simple projections and filters are straight forward. However, if we start trying to use sorting, grouping, and aggregations, we start running into additional challenges. With these query types, we can't start returning results until the entire set of events is known. Since we're working with a potentially infinite stream of events, we will need to figure out how to partition the results and work with those segments. That will be a task for a future post.

Posted on 4/28/2010 9:53:00 PM - Comments(2)
Categories: Rx VB Dev Center VB

Reactive Framework Why bother

Now that LINQ has been out for a while, I’m starting to work on a new set of talks that are related to some of the new extensions to LINQ. Although LINQ to databases (SQL, Entity Framework, etc) are interesting, I’ve always been partial to the more generalized abstraction that is found in LINQ to Objects and XML. One of the most compelling expansions on the core LINQ concepts at this point is the Reactive Framework (rX).

With the Reactive Framework, we flip the paradigm of Enumeration and pulling for more results upside down. Instead of pulling for the next record, the Reactive Framework “Reacts” to events as they are pushed through the event pipeline. Instead of calling MoveNext as we do with IEnumerable, we react to OnNext events.

So when would you use this Reactive framework? Essentially, you can use it anywhere you are responding to events or work with continuous streams. Some sample cases are:

  • Handling UI Events (Mouse clicks, Touch gestures)
  • Working with Asynchronous Web requests (Web services)
  • Analyzing sensor data from Manufacturing, Power, Health care, Telecom, etc. devices. real time.
  • Processing data from continuous data streams (Log files, Stock feeds, etc.)

Many of the samples out there currently demonstrate the first two scenarios, but there aren’t too many examples (yet) on the other scenarios. Over the next few months, I hope to dig into the Reactive Framework and related IObservable based technologies (like SQL Server 2008 R2 StreamInsight) and offer some practical examples where the Reactive Framework can make your programming life easier.

Posted on 4/26/2010 7:52:00 PM - Comments(3)
Categories: Rx

Reactive Framework Building an IObservable Event Generator

In my last post, I mentioned a number of cases where you may want to use the Reactive Framework. For some upcoming presentations, I wanted to focus on a couple of these scenarios, particularly on how you can use the Reactive Framework (rX) to work with events from device sensors. You can often find these kind of sensors in a number of industries, including Robotics, automated manufacturing systems, Medical monitors, Telecom usage, and Live financial feeds. In order to demonstrate using rX in this environment, I needed to build a module that simulated generating a bunch of sample random events. Below is the module that I created. We’ll use this module in some of the future discussions of Reactive Framework.

We’re going to start with a small class that will contain the state of the individual sensor events. We’ll call this our SensorInfo class. It will hold values for the date and time that the event occurred, an indicator on the sensor’s type and the value that the sensor returns. We'll also override the ToString method to allow us to output the values easily.


Public Class SensorInfo
    Public Property TimeStamp As DateTime
    Public Property SensorType As String
    Public Property SensorValue As Double

    Public Overrides Function ToString() As String
        Return String.Format("Time: {0}  , Type: {1}  Value: {2}", TimeStamp, SensorType, SensorValue)
    End Function
End Class

Now that we have our instance class, we can create a class that will generate these sensor items randomly. (This class is not thread safe, nor is it truly random so don't use it in production applications. It is merely designed for demonstration purposes.)


Public Class ObservableSensor

    Private _running As Boolean

    Public Sub StartSensor()
        If Not _running Then
            _running = True
            Dim randomizer = New Random(Date.Now.Millisecond)
            While _running
                Dim randVal = randomizer.NextDouble
                Dim info As New SensorInfo With {.SensorType = CInt(randVal * 4).ToString(),
                                                 .SensorValue = randVal * 20,
                                                 .TimeStamp = Now}
                End If
                Threading.Thread.Sleep(CInt(randomizer.NextDouble * 500))
            End While
        End If
    End Sub

    Public Sub StopSensor()
        _running = False
    End Sub

End Class

In this class, we maintain an internal variable (_running) which tracks whether the sensor is running or not. We also have a method that Starts the sensor and stops it. While the sensor is running, we essentially generate a number of SensorInfo instances with randomized values and then pause for a random period of time before creating another value. At this point, the values that are returned don’t have much meaning. We could easily change this to return stock quotes, manufacturing defects or other sensor responses by manipulating the values this randomizer generates.

Now that we can generate random SensorInfos, we need to actually do something with them. In the past, we could just raise an event for consumers to handle after we generate each sensor’s value. Since we want to leverage the power of the new IObservable/IObserver interfaces and the Reactive Framework, I’ll make this class implement IObservable(Of T) so that we can register a number of IObserver clients and notify them each time we generate a new sensor.

The IObservable(Of T) interface requires of a single method: Subscribe. This takes a single parameter which is the IObserver client that wants to listen to our sensor data. It returns a class that implements IDisposable (so that we can make sure each of our observers know when we’re done sending them data). Since the return object here is actually the ObservableSensor itself, we need to implement both IObservable and IDisposable. Here's our revised ObservableSensor class.


Public Class ObservableSensor
    Implements IObservable(Of SensorInfo)
    Implements IDisposable

    Private _observers As New List(Of IObserver(Of SensorInfo))
    Private _running As Boolean

    Public Function Subscribe(ByVal observer As System.IObserver(Of SensorInfo)) 
                              As System.IDisposable 
                              Implements System.IObservable(Of SensorInfo).Subscribe
        _observers.Add(observer)
        Return Me
    End Function

    Public Sub StartSensor()
        If Not _running Then
            _running = True
            Dim randomizer = New Random(Date.Now.Millisecond)
            While _running
                Dim randVal = randomizer.NextDouble
                If _observers.Any Then
                    Dim info As New SensorInfo With {.SensorType = CInt(randVal * 4).ToString,
                                                     .SensorValue = randVal * 20,
                                                     .TimeStamp = Now}

                    _observers.ForEach(Sub(o) o.OnNext(info))
                End If
                Threading.Thread.Sleep(CInt(randomizer.NextDouble * 500))
            End While
        End If
    End Sub

    Public Sub StopSensor()
        _running = False
    End Sub


#Region "IDisposable Support"
    Private disposedValue As Boolean ' To detect redundant calls

    ' IDisposable
    Protected Overridable Sub Dispose(ByVal disposing As Boolean)
        If Not Me.disposedValue Then
            If disposing Then
                If _observers IsNot Nothing Then
                    _observers.ForEach(Sub(o) o.OnCompleted())
                    _observers.Clear()
                End If
                ' TODO: dispose managed state (managed objects).
            End If

            ' TODO: free unmanaged resources (unmanaged objects) and override Finalize() below.
            ' TODO: set large fields to null.
        End If
        Me.disposedValue = True
    End Sub

    ' This code added by Visual Basic to correctly implement the disposable pattern.
    Public Sub Dispose() Implements IDisposable.Dispose
        ' Do not change this code.  Put cleanup code in Dispose(ByVal disposing As Boolean) above.
        Dispose(True)
        GC.SuppressFinalize(Me)
    End Sub
#End Region

End Class

In this new version, we now have a new _observers object that maintains a list of the observers (clients). This allows us to notify multiple sensor handlers and work with them how they deem appropriate. The subscribe method simply takes the supplied observer and sticks it in the collection.

When we start the sensor, we now check to see if there are any observers (using the LINQ .Any method). If we do, we’ll generate the random sensor data. We then notify all of the listeners using the list .ForEach method passing the lambda expression instructing the observer to invoke it’s OnNext handler (part of the IObserver(Of T) implementation. This is the method which corresponds to IEnumerable’s MoveNext. It is this method which will trigger our reactive framework’s event pipeline to begin processing our sensor notifications.

When we’re done, we need to clean up our resouces. In the Disposing event, we make sure that we call the OnCompleted method on each (ForEach) of the observers in our _observers collection. We also clear the observer collection to remove the reference pointers between the client and our sensor generator.

There you have it, a generic random event generator that we can consume with the Reactive Framework (or similar technologies like StreamInsight). Next time, we’ll start to consume these events.

As always, let me know what you Thinq and if there are any modifications I should consider.

Posted on 4/26/2010 7:48:00 PM - Comments(3)
Categories: Rx VB VB Dev Center VS 2010