How to choose a head and tail at the same time in jet extensions

I would like to create the next combinator

public static IObservable<U> HeadTailSelect<T, U>
    (this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{

}

The selection method should be passed to the current event and the tail observed to all future events. This should be guaranteed that when subscribing to the tail at any time in the future, the first event received will be the very next one that was received after the head.

I know that this will require some buffering, but I'm not quite sure how to do this.

It has some good properties. You can do

IObservable<IObservable<Unit>> windows =
source
    .HeadTailSelect((h,tail)=>Observable
        .Interval(TimeSpan.FromSeconds(1))
        .TakeUntil(tail)
        .Select(_=>Unit.Default)
    )

and avoid race conditions, as a result of which you need to TakeUntilregister in the window after answering the first event, when you miss some events.

- , .

, , , .

public class HeadTailSelect : ReactiveTest
{
    TestScheduler _Scheduler = new TestScheduler();

    [Fact]
    public void ShouldWork()
    {

        var o = _Scheduler.CreateColdObservable
            (OnNext(10, "A")
            , OnNext(11, "B")
            , OnNext(12, "C")
            , OnNext(13, "D")
            , OnNext(14, "E")
            , OnNext(15, "F")
            , OnCompleted<string>(700)
            );

        var data = o.HeadTailSelect((head, tail) => tail.Take(2).ToList())
            .SelectMany(p=>p)
            .Select(l=>String.Join("-", l));


        var actual = _Scheduler.Start(() =>
            data
        , created: 0
        , subscribed: 1
        , disposed: 1000
        );

        actual.Messages.Count()
                .Should()
                .Be(7);

        var messages = actual.Messages.Take(6)
                             .Select(v => v.Value.Value)
                             .ToList();

        messages[0].Should().Be("B-C");
        messages[1].Should().Be("C-D");
        messages[2].Should().Be("D-E");
        messages[3].Should().Be("E-F");
        messages[4].Should().Be("F");
        messages[5].Should().Be("");

    }
}
+2
1

, . , .

/// <summary>
/// Pass the head and tail of the observable to the
/// selector function. Note that 
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="U"></typeparam>
/// <param name="source"></param>
/// <param name="fn"></param>
/// <returns></returns>
public static IObservable<U> HeadTailSelect<T, U>
    (this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{
    var tail = new Subject<T>();
    return Observable.Create<U>(observer =>
    {
        return source.Subscribe(v =>
        {
            tail.OnNext(v);
            var u = fn(v, tail);
            observer.OnNext(u);

        }
        ,e=> { tail.OnCompleted();observer.OnError(e);  }
        ,()=> { tail.OnCompleted();observer.OnCompleted();  });
    });
}

, u - IObservable . , , .

+1

All Articles