Listening to a stream created from a list in Dart

I slightly modified the example from the tutorial https://www.dartlang.org/docs/tutorials/streams/ , adding an item after the subscription:

import 'dart:async';

main() {
  var data = new List<int>();
  var stream = new Stream.fromIterable(data);  // create the stream

  // subscribe to the streams events
  stream.listen((value) {       //
    print("Received: $value");  // onData handler
  });                           //
  data.add(1);
}

And after running this program, I have:

Uncaught Error: Concurrent modification during iteration: _GrowableList len:1.
Stack Trace: 
#0      ListIterator.moveNext (dart:_collection-dev/iterable.dart:315)
#1      _IterablePendingEvents.handleNext (dart:async/stream_impl.dart:532)
#2      _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:661)
#3      _asyncRunCallback (dart:async/schedule_microtask.dart:18)
#4      _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:11)
#5      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:151)
#6      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:166)
#7      _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:93)

The room data.add(1)before adding the listener works as expected.

I checked the flow documentation and did not find what I am doing wrong. I expected the listener to be fired at best and simply not to be fired at worst, but no exception.

Is behavior expected? If yes, please describe why.

+3
source share
2 answers

- , . Dart (*), . , Stream.fromIterable, , :

var data = [1,2,3];
for(var d in data) {
    print(d);
    data.add(d+10);
}

data.add , , Timer.run(() => data.add(2)), "". , .

Received: 2 . , new Stream.fromIterable. (onDone ), .

(*) : iterator.dart SDK 1.1.3 - " , ". api.dartlang.org .

: - StreamController.

// or new StreamController<int>.broadcast(), if you want to listen to the stream more than once
StreamController s = new StreamController<int>();
// produce periodic errors
new Timer.periodic(new Duration(seconds: 5), (Timer t) {
    s.isClosed ? t.cancel() : s.addError("I AM ERROR");
});
// add some elements before subscribing
s.add(6);
s.add(9);
// this will close the stream eventually
new Timer(new Duration(seconds: 20), () => s.close());
// start listening to the stream
s.stream.listen((v) => print(v), 
        onError: (err) => print("An error occured: $err"), 
        onDone: () => print("The stream was closed"));
// add another element before the next event loop iteration
Timer.run(() => s.add(4711));
// periodically add an element
new Timer.periodic(new Duration(seconds: 3), (Timer t) {
    s.isClosed ? t.cancel() : s.add(0);
});
// one more (will be sent before 4711)
s.add(4);
+3

. , (, ) .

0

All Articles