Cancelling a Reactive Extensions Observable
I’m often asked how do you cancel an Observable. In previous posts, I’ve shown how to stop observing by disposing the result of the subscription to the Observable. Of course, this isn’t the only way. If you are using one of the Observable.Generate methods, you can pass in a pointer to an object (like the System.Threading.CancellationTokenSource) and change a flag on that object, then in the iterate lambda function, reference that object to see if the cancellation flag was set. Here’s an example:
Private WithEvents ts As New System.Threading.CancellationTokenSource
Protected Overrides Sub OnInitialized(e As System.EventArgs)
MyBase.OnInitialized(e)
Dim items As New ObservableCollection(Of Integer)
ItemsListbox.ItemsSource = items
Dim cancelClicked = Observable.FromEventPattern(Of RoutedEventArgs)(Cancel, "Click")
Dim obs = Observable.Generate(0,
Function(x) Not ts.IsCancellationRequested,
Function(index) index + 1,
Function(index) index,
Function(index) TimeSpan.FromSeconds(1))
obs.TakeUntil(cancelClicked).
ObserveOnDispatcher().
Subscribe(Sub(item)
items.Add(item)
End Sub)
End Sub
Private Sub Cancel_Click(sender As System.Object, e As System.Windows.RoutedEventArgs) Handles Cancel.Click
ts.Cancel()
End Sub
In this example, we’re setting up a class level variable (ts) as a CancellationTokenSource. When we create our observable using Generate, the second parameter evaluates whether or not it should continue iterating. By checking the ts.IsCancellationRequested, we will evaluate that each time we iterate. Because it is a module level variable, we can cancel it by calling Cancel() in the Cancel Button click event handler.
As another alternative, we can convert the Cancel click into an observable collection as well by using the Observable.FromEventPattern. Then on the main observable, join it with the button observable using TakeUntil as follows:
Protected Overrides Sub OnInitialized(e As System.EventArgs)
MyBase.OnInitialized(e)
Dim items As New ObservableCollection(Of Integer)
ItemsListbox.ItemsSource = items
Dim cancelClicked = Observable.FromEventPattern(Of RoutedEventArgs)(Cancel, "Click")
Dim obs = Observable.Generate(0,
Function(x) True,
Function(index) index + 1,
Function(index) index,
Function(index) TimeSpan.FromSeconds(1))
obs.TakeUntil(cancelClicked).
ObserveOnDispatcher().
Subscribe(Sub(item)
items.Add(item)
End Sub)
End Sub
Do you have another favorite alternative for cancelling an Observable, let me know.