Saturday 7 June 2014

Mixing MVVM with RX - Part 1

In one of my previous positions, I was lucky enough to be involved in a project that used Reactive Extensions in C# extensively.  I also have done a lot of UI (WPF) development using MVVM, so I decided to have a go at combining MVVM with RX. To do this I wanted to write an application that is simple but to capture most of the aspects of MVVM. The application that will be implemented is an address book, which has the following features:

  • All persons will be displayed in a list on the main page
  • Details of the selected person can be viewed/edited by clicking Details button on the item on list
  • Selected person can be deleted from the list by clicking on Delete button on the item on list
  • A new person can be added
The application will use Autofac as an IoC and all dependencies will be injected in constructors. Internally in the ViewModel the properties and commands should be observables. So I will start with the properties first. Since the properties can be read/write then they need to implement both IObservable and IObserver  interfaces so we can push new values and also subscribe to the values.

Fortunately RX has already an interface defined like that and it is called ISubject. So our property will extend ISubject interface and will have only one property called Value, look like this:

public interface IPropertySubject<T> : ISubject<T>
{
    T Value { getset; }
}

And the implementation will look like:


public class PropertySubject<T>  :IPropertySubject<T>
{
    private readonly Subject<T> _subject = new Subject<T>();
    private T _value;
 
    public void OnNext(T value)
    {
        SetValue(value);
    }
 
    private void SetValue(T value)
    {
        _value = value;
        _subject.OnNext(value);            
    }
 
    public void OnError(Exception error)
    {
        _subject.OnError(error);
    }
 
    public void OnCompleted()
    {
        _subject.OnCompleted();
    }
 
    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subject.Subscribe(observer);
    }
 
    public T Value
    {
        get { return _value; }
        set { SetValue(value); }
    }
}


From implementation we can see that we are using an instance of Subject class that is provided by RX. Subject implements both IObserver and IObservable. The value of the property can be set in two ways, either by usingOnNext() method, or by simply setting the  Value property.
Next task is to implement the command. The command interface will implement ICommand and look like:

public interface ICommandObserver<out T> : ICommand 
{
    IObservable<T> OnExecute { get; }
    IObserver<bool> SetCanExecute { get; } 
}

Every time Execute() method is called on ICommandOnExecute will push a new value of type T to an observable. SetCanExecute sets the value that will be returned by ICommand's CanExecute method and raisingCanExecuteChanged event (enabling/disabling the UI component).
The implementation of ICommandObserver looks like:

public class CommandObserver<T> : ICommandObserver<T>
{
    private bool _canExecute;
    private readonly ISubject<T> _executeSubject = new Subject<T>();
    private readonly ISubject<bool> _canExecuteSubject = new Subject<bool>();
 
    public CommandObserver() : this(true)
    {
    }
 
    public CommandObserver(bool value)
    {
        _canExecute = value;
        _canExecuteSubject.DistinctUntilChanged().Subscribe(v =>
        {
            _canExecute = v;
            if (CanExecuteChanged != null)
                CanExecuteChanged(thisEventArgs.Empty);
        });
    }
 
    void ICommand.Execute(object parameter)
    {
        if(parameter is T)
            _executeSubject.OnNext((T) parameter);
        else
            _executeSubject.OnNext(default(T));
    }
 
    bool ICommand.CanExecute(object parameter)
    {
        return _canExecute;
    }
 
    public event EventHandler CanExecuteChanged;
 
    public IObservable<T> OnExecute
    {
        get { return _executeSubject.AsObservable(); }
    }
 
    public IObserver<bool> SetCanExecute
    {
        get { return _canExecuteSubject.AsObserver(); }
    }
}

As you can see, one of the constructors takes a boolean value. This value is the initial value that will be returned by CanExecute() method (enabling/disabling the UI component). Another thing that happens in constructor is that, we only raise the event CanExecuteChanged if the value of pushed on SetCanExecute observer changes from true to false or vice versa. So basically even if we push a number of same values in the SetCanExecute the event is raised only once (another cool feature of RX using DistinctUntilChanged() method).
Now we create a base viewmodel that all our view models will inherit from. The class definition looks like:

public abstract class ViewModelBase : INotifyPropertyChangedIDisposable
{
    private readonly CompositeDisposable _disposables = new CompositeDisposable();
    public event PropertyChangedEventHandler PropertyChanged;
 
    public void OnPropertyChanged(string propertyName)
    {
        PropertyChangedEventHandler handler = PropertyChanged;
        if (handler != null) 
            handler(thisnew PropertyChangedEventArgs(propertyName));
    }
 
    public void ShouldDispose(IDisposable disposable)
    {
        _disposables.Add(disposable);   
    }
 
    public IDoFluidCommand<T> For<T>(ICommandObserver<T> cmd)
    {
        var  fluidCommand = new FluidCommand<T>(cmd);
        _disposables.Add(fluidCommand);
 
        return fluidCommand;
    }
 
    public void Dispose()
    {
        _disposables.Dispose();
    }
}

When we subscribe to an observable an IDisposable is returned. To unsubscribe from the observable we callDispose() method on the returned IDisposable. Since all our properties and commands on our view models will be observables, everytime we subscribe to an observable we need to keep track of the returned disposable so when the viewmodel is disposed then we need to unsubscribe from all observables. We do this by adding the returned disposable (when subscribing) to the CompositeDisposable instance (_disposables object).CompositeDisposable is another class from RX which is a list of disposables and when Dispose() method is called on it, it will call Dispose() method of all items that it contains.
Next step is to create a property provider that will create a property or command that is declared in the viewmodels' interface. So the property provider will look like:

public interface IPropertyProvider<T> : IDisposable
{
    IPropertySubject<K> CreateProperty<K>(Expression<Func<TK>> expression);
    IPropertySubject<K> CreateProperty<K>(Expression<Func<TK>> expression, K value);
    IPropertySubject<K> CreateProperty<K>(Expression<Func<TK>> expression, IObservable<K> values);
    ICommandObserver<K> CreateCommand<K>(Expression<Func<TICommand>> expression);
    ICommandObserver<K> CreateCommand<K>(Expression<Func<TICommand>> expression, bool isEnabled);
    ICommandObserver<K> CreateCommand<K>(Expression<Func<TICommand>> expression, IObservable<bool> isEnabled);
}

The interface declares three overloaded methods for creating properties. The first method returns a new instance of the right property type, the second method returns a new instance of the property and sets its initial value to the provided, and the third method returns an instance of the property and sets its value to the values of the provided observable (updating the value everytime a new element comes in the observable). The same is with the methods for creating the commands.
The implementation of the property provider interface looks like:

public class PropertyProvider<T> : IPropertyProvider<T>
{
    private readonly ViewModelBase _viewModel;
    private readonly ISchedulers _schedulers;
    private readonly CompositeDisposable _disposables = new CompositeDisposable();
 
    public PropertyProvider(ViewModelBase viewModel, ISchedulers schedulers)
    {
        _viewModel = viewModel;
        _schedulers = schedulers;
 
        _viewModel.ShouldDispose(_disposables);
    }
 
    private PropertySubject<K> GetProperty<K>(Expression<Func<TK>> expr)
    {
        var propertyName = ((MemberExpression) expr.Body).Member.Name;
        var propSubject = new PropertySubject<K>();
 
        _disposables.Add(propSubject.ObserveOn(_schedulers.Dispatcher)
                                    .Subscribe(v => _viewModel.OnPropertyChanged(propertyName)));
 
        return propSubject;
    }
 
    public void Dispose()
    {
        if(!_disposables.IsDisposed)
            _disposables.Dispose();
    }
 
 
    public IPropertySubject<K> CreateProperty<K>(Expression<Func<TK>> expression)
    {
        return GetProperty(expression);
    }
 
    public IPropertySubject<K> CreateProperty<K>(Expression<Func<TK>> expression, K value)
    {
        var propSubject = GetProperty(expression);
        propSubject.Value = value;
        return propSubject;
    }
 
    public IPropertySubject<K> CreateProperty<K>(Expression<Func<TK>> expression, IObservable<K> values)
    {
        var propSubject = GetProperty(expression);
        _disposables.Add(values.Subscribe(v => propSubject.Value = v));
        return propSubject;
    }
 
    public ICommandObserver<K> CreateCommand<K>(Expression<Func<TICommand>> expression)
    {
        return new CommandObserver<K>(true);
    }
 
    public ICommandObserver<K> CreateCommand<K>(Expression<Func<TICommand>> expression, bool isEnabled)
    {
        return new CommandObserver<K>(isEnabled);
    }
 
    public ICommandObserver<K> CreateCommand<K>(Expression<Func<TICommand>> expression, IObservable<bool> isEnabled)
    {
        var cmd = new CommandObserver<K>(true);
        _disposables.Add(isEnabled.Subscribe(cmd.SetCanExecute));
        return cmd;
    }
}

From the implementation we can see that PropertyProvider takes an instance of ViewModelBase andISchedulers in its constructor. ISchedulers  (the definition and implementation are defined further down) is just a wrapper around different schedulers (Thread, ThreadPool, Task, Dispatcher, ...) that are provided by RX.
As we can see from the GetProperty<>(...) method, everytime a new property subject is created, we subscribe to it so whenever a new value is pushed to it, the PropertyChanged event is fired on the view model automatically, so on our view models we don't need to worry about raising the PropertyChanged event. Another cool feature that is provided by RX is that since we are observing on dispatcher scheduler (using ObserveOn(...)method provided by RX) we can change the value of the property on any thread and the property will be updated on the dispatcher. So no need for the ugly check InvokeRequired or CheckAccess() and BeginInvoke().
Now, instead of creating concrete instances of the PropertyProvider class in our viewmodels for the properties, we are going to defer the creation of the property provider to a factory class and then inject this factory on the constructor of our viewmodels. This allows us when we write unit tests to fake the factory, and are also we adhere to SRP (Single Responsibility Principle) on the viewmodel (a pattern that I use in my projects, especially when using IoC).
So the interface for the factory looks like:

public interface IPropertyProviderFactory
{
    IPropertyProvider<T> Create<T>(ViewModelBase viewModelBase);
}

and the implementation looks like:

public class PropertyProviderFactory : IPropertyProviderFactory
{
    private readonly ISchedulers _schedulers;
 
    public PropertyProviderFactory(ISchedulers schedulers)
    {
        _schedulers = schedulers;
    }
 
    public IPropertyProvider<T> Create<T>(ViewModelBase viewModelBase)
    {
        return new PropertyProvider<T>(viewModelBase, _schedulers);
    }
}

The next thing to implement is a messaging bus that will be used by viewmodels to communicate with each other (similar to Event Aggregator Pattern). The interface of the messaging bus is very simple

public interface IMessageBus
{
    IDisposable Subscribe<T>(Action<T> action);
    void Publish<T>(T item);
}

So if we want to publish a message we use the Publish(...) method by providing the instance of message type (messages are declared as classes), and if we want to listen to a particular type of a message we subscribe to a relevant type of message and pass an action that will be executed when the message is published.

The implementation of the message bus looks like :

public class MessageBus : IMessageBus
{
    private readonly ISubject<object> _messageBus = new Subject<object>();
 
    public IDisposable Subscribe<T>(Action<T> action)
    {
        return _messageBus.OfType<T>().Subscribe(action);
    }
 
    public void Publish<T>(T item)
    {
        _messageBus.OnNext(item);
    }
}


The last thing left before we can move to our application is signature and implementation of the ISchedulersinterface which looks like this:

public interface ISchedulers
{
    IScheduler Dispatcher { get; }
    IScheduler NewThread { get; }
    IScheduler NewTask { get; }
    IScheduler ThreadPool { get; }
    IScheduler Timer { get; }
}

and the implementation is :

public class Schedulers : ISchedulers
{
    public IScheduler Dispatcher
    {
        get { return DispatcherScheduler.Current; }
    }
 
    public IScheduler NewThread
    {
        get { return NewThreadScheduler.Default; }
    }
 
    public IScheduler NewTask
    {
        get { return TaskPoolScheduler.Default; }
    }
 
    public IScheduler ThreadPool
    {
        get { return ThreadPoolScheduler.Instance; }
    }
 
    public IScheduler Timer
    {
        get { return ImmediateScheduler.Instance; }
    }
}

As mentioned before, the reason for defining ISchedulers interface and not using the RX schedulers is that when we write unit tests, we can fake ISchedulers interface and return test scheduler.

On the next part I will start implementing the application itself, so we can see how this framework fits nicely with the MVVM.

No comments:

Post a Comment