Cancelling a Reactive Extensions Observable by ThinqLinq

Cancelling a Reactive Extensions Observable

I’m often asked how do you cancel an Observable. In previous posts, I’ve shown how to stop observing by disposing the result of the subscription to the Observable. Of course, this isn’t the only way. If you are using one of the Observable.Generate methods, you can pass in a pointer to an object (like the System.Threading.CancellationTokenSource) and change a flag on that object, then in the iterate lambda function, reference that object to see if the cancellation flag was set. Here’s an example:


    Private WithEvents ts As New System.Threading.CancellationTokenSource

    Protected Overrides Sub OnInitialized(e As System.EventArgs)
        MyBase.OnInitialized(e)

        Dim items As New ObservableCollection(Of Integer)
        ItemsListbox.ItemsSource = items

        Dim cancelClicked = Observable.FromEventPattern(Of RoutedEventArgs)(Cancel, "Click")

        Dim obs = Observable.Generate(0,
                                      Function(x) Not ts.IsCancellationRequested,
                                      Function(index) index + 1,
                                      Function(index) index,
                                      Function(index) TimeSpan.FromSeconds(1))

        obs.TakeUntil(cancelClicked).
            ObserveOnDispatcher().
            Subscribe(Sub(item)
                          items.Add(item)
                      End Sub)
    End Sub

    Private Sub Cancel_Click(sender As System.Object, e As System.Windows.RoutedEventArgs) Handles Cancel.Click
        ts.Cancel()
    End Sub

In this example, we’re setting up a class level variable (ts) as a CancellationTokenSource. When we create our observable using Generate, the second parameter evaluates whether or not it should continue iterating. By checking the ts.IsCancellationRequested, we will evaluate that each time we iterate. Because it is a module level variable, we can cancel it by calling Cancel() in the Cancel Button click event handler.

As another alternative, we can convert the Cancel click into an observable collection as well by using the Observable.FromEventPattern. Then on the main observable, join it with the button observable using TakeUntil as follows:


    Protected Overrides Sub OnInitialized(e As System.EventArgs)
        MyBase.OnInitialized(e)

        Dim items As New ObservableCollection(Of Integer)
        ItemsListbox.ItemsSource = items

        Dim cancelClicked = Observable.FromEventPattern(Of RoutedEventArgs)(Cancel, "Click")

        Dim obs = Observable.Generate(0,
                                      Function(x) True,
                                      Function(index) index + 1,
                                      Function(index) index,
                                      Function(index) TimeSpan.FromSeconds(1))

        obs.TakeUntil(cancelClicked).
            ObserveOnDispatcher().
            Subscribe(Sub(item)
                          items.Add(item)
                      End Sub)
    End Sub

Do you have another favorite alternative for cancelling an Observable, let me know.

Posted on - Comment
Categories: Rx - VB Dev Center -
comments powered by Disqus