Wednesday, January 20, 2016

Rate Limiting Events with the Reactive Extensions Rx

A fun little challenge that came up at work...  we have a stream of events that we publish / subscribe via a Reactive IObservable.   One of the things we're testing for is swamping the subscribers with events for periods of time.  The tools in System.Reactive.Linq namespace include utilities like .Throttle(...), .Sample(...).  None of these supported our needs.

For our needs in this particular event stream, in this point in our stream, we can't afford to drop events.  

Sample(...) picks off an item at various intervals, dropping the rest.  Throttle(...) sets up a window of time. It won't release any object until there's been a case where only one object was buffered in the given time window.  If you get another while the window's open, the window widens to the original timespan.

Then there's .Buffer(...) which can store event objects for a window, and then release them.  That amounts to releasing all the events in periodic bursts, but it's not a rate limit.

Finally there's the .Delay(...) method... which, ironically, delays publishing objects by an offset amount...  but that delays all objects by the same time offset.  If you have three events come in, 1 millisecond apart each, and put a 1 minute delay on them, they'll enter the collection, and one minute later, will be published out... in a three-millisecond burst.

I want to be able to constrain my publisher such that I only want n number of entities per second. 

My solution separates the pub/sub.  It loads a queue with incoming events, and emits them immediately, up to the limit on the subscriber side. On the publisher side, it resets the counter and emits any overflow objects in the queue, also up to the limit.   

Yes, this model has problems, so address your risks appropriately... ("use at your own risk").    You can run out of memory if the origin provides more items than the limiter is allowed to emit over long periods of time.

Anyway, here's a Program.cs with the RxHelper extension RateLimit(...).  The program has a decent little before/after RateLimit(...).



using System;
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;


using System.Reactive.Linq;
 
namespace ConsoleApplication2
{
    class Program
    {
        static void Main(string[] args)
        {
 
            // simulate very fast (no delay) data feed
            var unThrottledFeed = GetFastObservableSequence();
 


            unThrottledFeed.Subscribe(Console.WriteLine);
            Console.WriteLine("That was an example of an event stream (with 100 events) only constrained by system resources.");


            Console.WriteLine();

            Console.WriteLine("Now rate-limiting at 10 items per second...\n");

            const int itemsPerSecond = 10;
 
            var throttledFeed = GetFastObservableSequence()
                    .RateLimit(itemsPerSecond);
 
 
            throttledFeed.Subscribe(Console.WriteLine);
 
            Console.WriteLine("END OF LINE");
            Console.WriteLine("Note that the Main() method would be done here, were it not for the ReadKey(), the RateLimit subscriber is scheduled.");
            Console.WriteLine();
            Console.WriteLine("Rate limited events will appear here:");
            Console.ReadKey(true);
 
 
        }
 
        #region Example Artifacts
        private static IObservable<TestClass> GetFastObservableSequence()
        {
            var counter = 0;
            var rnd = new Random();
            return Observable.Defer(() =>
                Observable.Return(0)
                    .Select(p =>;
                    {
                        counter++;
                        var x = new TestClass(30.0, counter);
                        x.Value += Math.Round(rnd.NextDouble(), 2);
                        return x;
                    })
                    .Repeat(100));
        }
 
        private class TestClass
        {
            public TestClass(double value, int instance)
            {
                Value = value;
                Instance = instance;
            }
            private int Instance { get; set; }
            public double Value { get; set; }
            public override string ToString()
            {
                return $"{Instance}: {Value}";
            }
        }
        #endregion
    }
 
 
   
    internal static class RxHelper
    {
        public static IObservable<TSource> RateLimit<TSource>(
            this IObservable<TSource> source,
            int itemsPerSecond,
            IScheduler scheduler = null)
        {
            scheduler = scheduler ?? Scheduler.Default;
            var timeSpan = TimeSpan.FromSeconds(1);
            var itemsEmitted = 0L;
            return Observable.Create<TSource>(
                observer =>
                {
                    var buffer = new ConcurrentQueue<TSource>();
                    Action emit = delegate()
                    {
                        while (Interlocked.Read(ref itemsEmitted) < itemsPerSecond)
                        {
                            TSource item;
                            if (!buffer.TryDequeue(out item))
                                break;
                            observer.OnNext(item);
                            Interlocked.Increment(ref itemsEmitted);

                        }
                    };
                   
                    var sourceSub = source
                        .Subscribe(x =>
                        {
                            buffer.Enqueue(x);
                            emit();

                        });
                    var timer = Observable.Interval(timeSpan, scheduler)
                        .Subscribe(x =>
                        {
                            Interlocked.Exchange(ref itemsEmitted, 0);
                            emit();
                        }, observer.OnError, observer.OnCompleted);
                    return new CompositeDisposable(sourceSub, timer);
                });
        }
    }

}
 
 

Edit 1/27/2016:  Had to tweak RateLimiter(...) to immediately emit objects as long as it hadn't hit it's limit for the time span.  It always queues, just in case, to maintain order.