ReactiveExtensions BufferWithPredicate

Rx has methods for BufferWithTime, BufferWithCount and BufferWithTimeOrCount, I want to write a BufferWithPredicate method that will look like this:

public IObservable<IList<T>> BufferWithPredicate<T>(this IObservable<T> input, Func<T, IList<T>, bool> predicate)

Essentially, new elements will be added to the existing buffer if the predicate does not return false, in which case the buffer will be returned and the new one will be launched. The predicate takes the next element, and the buffer is the parameters.

How can i achieve this?

+3
source share
1 answer

That should do it for you. I use Observable.Defer to make it work with cold observables:

public static class MyObservableExtensions
{
    public static IObservable<IList<T>> BufferWithPredicate<T>(this IObservable<T> input, Func<T, IList<T>, bool> predicate)
    {
        return Observable.Defer(() =>
            {
                var result = new Subject<IList<T>>();
                var list = new List<T>();
                input.Subscribe(item =>
                    {
                        if (predicate(item, list))
                        {
                            list.Add(item);
                        }
                        else
                        {
                            result.OnNext(list);
                            list = new List<T>();
                            list.Add(item);
                        }
                    }, 
                    () => result.OnNext(list));
                return result;
            });
    }
}

Using:

var observable = new[] { 2, 4, 6, 8, 10, 12, 13, 14, 15 }.ToObservable();
var result = observable.BufferWithPredicate((item, list) => item % 2 == 0);
result.Subscribe(l => Console.WriteLine("New list arrived. Count = {0}", l.Count));

Conclusion:

"New list arrived. Count = 6" 
"New list arrived. Count = 3"
+3
source

All Articles