An Rx newbie observes property changes

I recently started taking some tentative steps into Reactive Extensions (Rx) (yes, I realise I’m way behind on this). The case I started off with was observing a property, and performing an action only when it changed to specific values.

public interface IProperty<T> {
    T Value { get; set; }
    event Action ValueChanged;
}

The traditional way of doing this in .NET is to subscribe to events:

    [Test]
    public void WithEvents() {
        _property.ValueChanged += OnValueChanged;

        _property.Value = 5;
        _property.Value = 10;
        _property.Value = 15;
        _property.Value = 2;

        _property.ValueChanged -= OnValueChanged;
        Assert.That(_propertyValues.ToArray(), Is.EqualTo(new[] { 10, 15 }));
    }

    private void OnValueChanged() {
        var value = _property.Value;
        if (value >= 10) {
            _propertyValues.Add(value);
        }
    }

Here we subscribe to the ValueChanged event. Every time the event fires our OnValueChanged() method will be called, which will check if the property value meets our condition, and if so then add it to _propertyValues (a list of int).

The other alternative is to change from looking at this situation as the procedure of responding to each event by looking up the Value, to looking at the changes as a collection of Values produced by the events. This is where Rx comes in. (At least this is my newbie interpretation; please correct me in the comments.)

Everything I know about Rx in under 200 words

Rx seems to revolve around 2 main interfaces (that I believe shipped with .NET4, and are also available from Rx for .NET 3.5). The first is IObserver<T>, which has OnNext<T>(T item), OnError(Exception error) and OnCompleted(). This interface is the dual of the iterator interface (IEnumerator<T>), which just means that rather than iterating over a collection, we get each member of the collection pushed to us.

The other interface is IObservable<T>, which just has a Subscribe<T>(IObserver<T> observer) method that returns an IDisposable. This lets us register an observer to start receiving pushed items, and dispose of the observable to clean up subscriptions.

Rx itself is a whole heap of supporting implementations and extension methods to help us do incredibly cool things with those two interfaces*. Rather than looking at these incredibly cool things, let’s look at our previous example instead, partly because the cool things are covered in many places, and partly because this example is adapted from a real problem I had to solve, so I know it’s something we might be able to use in day-to-day development. :)

* For a real (i.e. factual :)) explanation, try Lee Campbell’s Rx series.

Observing the values of our changing property

I initially stuffed up my effort to get this working using Rx by creating a terribly naive implementation, but thanks to a very helpful StackOverflow answer from Scott Weinstein it’s now looking pretty neat.

The first thing we need to do is create an IObservable<T> over our property value. Rx has built-in methods to do this from events, but only when they follow the standard event handler signature (whereas ours is declared as event Action). Instead we can use Observable.Create<T>() to help us. (We could do this in-line where we want to use it, but I’ve wrapped it in an reusable method because I’ve got lots of properties where this will come in handy.)

    public static class ObservableEx {
        public static IObservable<T> FromEvent<T>(Func<T> selector, Action<Action> subscribe, Action<Action> unsubscribe) {
            return Observable.Create<T>(obs => {
                Action pushNext = () => obs.OnNext(selector());
                subscribe(pushNext);
                return () => unsubscribe(pushNext);
            });
        }
    }

This looks a little more confusing than it actually is. Observable.Create<T>() takes a delegate which itself takes an IObserver<T> and returns an action that disposes of our observable. In the body of that delegate we put the logic necessary to push values to the observer. This is the subscription logic for any observer that subscribes to our observable.

Aside: If we want to push the initial state of the value to observers when they subscribe, we can add an invocation of `pushNext()` after `subscribe(pushNext)`. This will ensure our collection always has a starting value.

In this case we push whatever value is returned by our selector, and subscribe to the required event so we push whenever that event is raised. When our observable is disposed we unsubscribe. And here’s how we use it:

    [Test]
    public void WithObservable() {
        ObservableEx.FromEvent(
                () => _property.Value, 
                action => _property.ValueChanged += action, 
                action => _property.ValueChanged -= action)
            .Where(x => x >= 10)
            .Subscribe(_propertyValues.Add);

        _property.Value = 5;
        _property.Value = 10;
        _property.Value = 15;
        _property.Value = 2;

        Assert.That(_propertyValues.ToArray(), Is.EqualTo(new[] { 10, 15 }));
    }

This is the same test as before but rewritten to use observables. We create our observable over our property value based on its ValueChanged events using our extension method. We then filter this collection of values using a standard, LINQy Where clause. Finally we put our logic for handling occurrences of this particular observation by calling Subscribe and giving it a callback to execute; in this case our logic to add the value to _propertyValues.

Compared with the previous approach, we have consolidated event subscriptions, unsubscriptions, filtering the values and the logic of handling the values we’re interested in to a single observable declaration. Having all this related information together is great, but being able to declare our intention in the code rather than the procedure we need to implement it is, IMO, awesome. Not to mention how nice it is to have our event subscription cleaned up for us when our observable goes out of scope. :)

Note: Whenever I’ve showed this to people the first thing they’ve asked is ‘how can I test this?’ Normally we try and test behaviour rather than implementation, so in the case I tried this for I didn’t need to modify any tests; I just replaced event subscriptions with the observable creation and they all passed. That said, Rx has a package available to support testing, or you could mock the interfaces if, despite my warnings, you still think it’s a good idea. ;)

This may seem like a silly example, so let’s look at a more realistic case where the same code could come in handy. Say we want to run some diagnostics whenever a device gets connected. We could observe the device’s connection state, filter it to just look at Connected values, then run the required diagnostics code when we get this value. Better yet, we can also combine observables, so we could subscribe to notifications only when the device is connected, its driver has reported it is ready to use, and the software is running in diagnostics mode. This would make for messy event handling code, but be quite readable as code combining and filtering observables.

I’m definitely keen to play around with this more, and hope I’ve managed to get those of you that have been putting it off (like me) interested enough in it to do the same. :)

Comments