I have a stream containing an array, each element of which has an id. I need to split this into a stream by one identifier, which will be completed when the original stream no longer contains the identifier.
eg. sequence of input streams with these three values
[{a:1}, {b:1}] [{a:2}, {b:2}, {c:1}] [{b:3}, {c:2}]
should return three threads
a -> 1 2 |
b -> 1 2 3
c -> 1 2
If a completed the third value, since its identifier disappeared, and c was created in the second value, since its identifier appeared.
I'm trying groupByUntil, a bit like
var input = foo.share();
var output = input.selectMany(function (s) {
return rx.Observable.fromArray(s);
}).groupByUntil(
function (s) { return s.keys()[0]; },
null,
function (g) { return input.filter(
function (s) { return !findkey(s, g.key); }
); }
)
, id , . , , , groupByUntil .
?
. fromArray currentThread scheduler, , . ( , ).
fromArray (.., rx.Scheduler.immediate), .