Rx: expand array for multiple threads

I have a stream containing an array, each element of which has an id. I need to split this into a stream by one identifier, which will be completed when the original stream no longer contains the identifier.

eg. sequence of input streams with these three values

[{a:1}, {b:1}]    [{a:2}, {b:2}, {c:1}]     [{b:3}, {c:2}]

should return three threads

a -> 1 2 |
b -> 1 2 3
c ->   1 2

If a completed the third value, since its identifier disappeared, and c was created in the second value, since its identifier appeared.

I'm trying groupByUntil, a bit like

 var input = foo.share();              
 var output = input.selectMany(function (s) {
                        return rx.Observable.fromArray(s);
                }).groupByUntil(
                        function (s) { return s.keys()[0]; },
                        null,
                        function (g) { return input.filter(
                                function (s) { return !findkey(s, g.key); }
                        ); }
                )

, id , . , , , groupByUntil .

?

. fromArray currentThread scheduler, , . ( , ).

fromArray (.., rx.Scheduler.immediate), .

+3
1

, , , - . , .

var d = Object.create(null);
var output = input
    .flatMap(function (s) {
        // end completed groups
        Object
            .keys(d)
            .filter(function (k) { return !findKey(s, k); })
            .forEach(function (k) {
                d[k].onNext(1);
                d[k].onCompleted();
                delete d[k];
            });
        return Rx.Observable.fromArray(s);
    })
    .groupByUntil(
        function (s) { return s.keys()[0]; },
        null,
        function (g) { return d[g.key] = new Rx.AsyncSubject(); });
+1

All Articles