Ordering Periodic Tasks With ReactiveX

Invoking tasks periodically with ReactiveX and controlling the order of completion.

Published on January 1, 2018

ReactiveX has a useful operator, Observable.Timer, which can be used to create an observable that fires on a given interval. This can be used to run a task on a periodic basis such as checking for content updates against a server routinely. This is clearly quite useful and made especially convenient by ReactiveX but there is one ambiguity: how can we control the order in which overlapping operations complete? If a task is repeated on a given interval and a later instance completes before an earlier instance in what order should the results of each be received by the observer?

Three common scenarios are covered in this post:

  • Unordered Completion: Results will be pushed to the observer in the order the operations complete.
  • Ordered Completion: Results will be pushed to the observer in the order the operations were started.
  • Recent Only Completion: Only the most recent results will be pushed. If a later operation completes before an earlier operation any earlier operations will be cancelled.

In most scenarios, the tasks will not take long enough to overlap but it is best to be explicit. Recent Only Completion is likely the most applicable to a periodic update scenario as once the newest data is available it is reasonable to cancel any previous attempts to retrieve data.

Unordered Completion

Unordered Completion is when the operation results are pushed to the observer as the operations complete regardless of when or in what order the operations were started. The following method can be used to achieve this effect. It receives an operation to run on a periodic basis and a TimeSpan representing the duration between invocations. The invoked operation receives a cancellation token that will be marked for cancellation if/when the subscription to the returned observable is discarded. The first invocation of the provided operation will occur immediately after the returned observable is subscribed to.

/// <summary>
///     Returns an observable that periodically invokes the specified operation and
///     emits the results as they arrive regardless of when the operations were invoked.
/// </summary>
public static IObservable<TResult> UnorderedCompletion<TResult>(
    Func<CancellationToken, Task<TResult>> operation,
    TimeSpan period)
{
    return
        Observable
            .Timer(TimeSpan.Zero, period)
            .Select(_ => Observable.FromAsync(operation))
            .Merge();
}

A potential name for the containing class is PeriodicObservable. The call would then read as follows.

var subscription = PeriodicObservable
    .UnorderedCompletion(myAsyncTask, period)
    .Subscribe(result => ... );

Ordered Completion

Ordered Completion is when the results are pushed to the observer in the order the operations were started. If a later operation completes before an earlier operation the later result will be held until the earlier result is available. It is otherwise similar to Unordered Completion above.

/// <summary>
///     Returns an observable that periodically invokes the specified operation and emits
///     the results in the order the operations were invoked. If a later operation completes
///     before an earlier one the result will be held until the earlier operation completes.
/// </summary>
public static IObservable<TResult> OrderedCompletion<TResult>(
    Func<CancellationToken, Task<TResult>> operation,
    TimeSpan period)
{
    return Observable.Create((IObserver<TResult> observer) =>
    {
        var disposable = new CancellationDisposable();
        Observable
            .Timer(TimeSpan.Zero, period)
            .Select(_ => operation(disposable.Token))
            .Concat()
            .Subscribe(observer.OnNext, observer.OnError, disposable.Token);
        return disposable;
    });
}

What made implementing the method above particularly tricky was having the tasks start on a regular interval and supporting cancellation. The Concat operator has support for either but not both together. It was, therefore, necessary to inject a locally instantiated cancellation token and wrap the whole thing in another observable.

Recent Only Completion

This behaviour is often the most desirable but also the trickiest to get right. As long as the tasks complete in the order they were started, regardless of whether or not they overlap, it will behave as one would expect with results arriving sequentially. What makes Recent Only Completion powerful is that if a later operation completes before an earlier operation any earlier operations will be cancelled. This means that as soon as a result is available any previous operations that have not yet completed will be discarded. This is desirable because in a situation where a server or external source is being periodically polled for the most recent data outdated data can be reasonably discarded when newer data becomes available. If a request for newer data completes before a request for older data then the request for older data can reasonably be cancelled.

/// <summary>
///     Returns an observable that periodically invokes the specified operation and emits
///     only the most recent results. If a later operation completes before an earlier
///     one then the earlier operation will be canceled and the later result returned.
/// </summary>
public static IObservable<TResult> RecentOnlyCompletion<TResult>(
    Func<CancellationToken, Task<TResult>> operation,
    TimeSpan period)
{
    return Observable.Create((IObserver<TResult> observer) =>
    {
        // Disposable that cancels all operations
        var masterDisposable = new CancellationDisposable();

        // Queue of all running operations with corresponding cancellation disposables
        var operationObservableQueue = new ConcurrentQueue<Tuple<IObservable<TResult>, IDisposable>>();

        Observable
            .Timer(TimeSpan.Zero, period)
            .Select(_ => Observable.FromAsync(operation))
            // When a new operation starts prepare cancellation mechanisms and add it to the queue 
            .Subscribe(operationObservable =>
            {
                // Disposable that cancels this operation only
                var operationDisposable = new CancellationDisposable();

                // Cancels on either master or operation level cancellation
                var combinedDisposable =
                    CancellationTokenSource.CreateLinkedTokenSource(
                        masterDisposable.Token,
                        operationDisposable.Token);

                operationObservableQueue.Enqueue(new Tuple<IObservable<TResult>, IDisposable>(
                    operationObservable,
                    operationDisposable));

                // When an operation completes cancel all previous operations, dequeue, and return result
                operationObservable
                .Synchronize(gate: operationObservableQueue)
                .Subscribe(result =>
                {
                    Tuple<IObservable<TResult>, IDisposable> lastDequeued = null;
                    while (lastDequeued?.Item1 != operationObservable)
                    {
                        operationObservableQueue.TryDequeue(out lastDequeued);
                        lastDequeued.Item2.Dispose();
                    }

                    observer.OnNext(result);

                    // This operation will cancel on either an operation or master level cancellation
                }, observer.OnError, combinedDisposable.Token);
            }, masterDisposable.Token);

        return masterDisposable;
    });
}

The method above is long and unruly and should rightfully be refactored into its own stateful class. I have left it as a single method to make it easier to contain in this post. There is a lot going on so I have provided some preliminary documentation through comments.

Action Synchronization

If any of the methods above are used to refresh content or otherwise manipulate mutable data when new results become available then some consideration should be put into ensuring the subscription does not invoke OnNext multiple times concurrently. For example, if two results come in at about the same time and the application tries to update the UI for both results at once then the UI could become corrupted. Rx.NET (ReactiveX) provides a useful operator, Synchronize, that is used to ensure this does not happen. When Synchronize is used the subscription will block until OnNext has completed before invoking OnNext again.

var subscription = PeriodicObservable
    .RecentOnlyCompletion(myAsyncTask, period)
    .Synchronize()
    .Subscribe(result => ...));

Closing Notes

I highly recommend Tamir Dresher's Rx.NET in Action. It offers an exceptional in-depth overview of Rx.NET and many related concepts.

Code for this post including the methods above, units tests, and example usages are available on my GitHub. All work is my own. You are welcome to use this code for whatever you like but I cannot guarantee that it will work as described or as expected under any circumstances.