I would like to create the next combinator
public static IObservable<U> HeadTailSelect<T, U>
(this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{
}
The selection method should be passed to the current event and the tail observed to all future events. This should be guaranteed that when subscribing to the tail at any time in the future, the first event received will be the very next one that was received after the head.
I know that this will require some buffering, but I'm not quite sure how to do this.
It has some good properties. You can do
IObservable<IObservable<Unit>> windows =
source
.HeadTailSelect((h,tail)=>Observable
.Interval(TimeSpan.FromSeconds(1))
.TakeUntil(tail)
.Select(_=>Unit.Default)
)
and avoid race conditions, as a result of which you need to TakeUntilregister in the window after answering the first event, when you miss some events.
- , .
, , , .
public class HeadTailSelect : ReactiveTest
{
TestScheduler _Scheduler = new TestScheduler();
[Fact]
public void ShouldWork()
{
var o = _Scheduler.CreateColdObservable
(OnNext(10, "A")
, OnNext(11, "B")
, OnNext(12, "C")
, OnNext(13, "D")
, OnNext(14, "E")
, OnNext(15, "F")
, OnCompleted<string>(700)
);
var data = o.HeadTailSelect((head, tail) => tail.Take(2).ToList())
.SelectMany(p=>p)
.Select(l=>String.Join("-", l));
var actual = _Scheduler.Start(() =>
data
, created: 0
, subscribed: 1
, disposed: 1000
);
actual.Messages.Count()
.Should()
.Be(7);
var messages = actual.Messages.Take(6)
.Select(v => v.Value.Value)
.ToList();
messages[0].Should().Be("B-C");
messages[1].Should().Be("C-D");
messages[2].Should().Be("D-E");
messages[3].Should().Be("E-F");
messages[4].Should().Be("F");
messages[5].Should().Be("");
}
}