Use RX to fire events at different times?

I have a large collection of simple paired classes:

public class Pair { public DateTime Timestamp; public double Value; }

They are sorted by ascending timestamp. I want to trigger an event with a value (e.g. Action <double>) for each item in the list at the appropriate time. Time is in the past, so I need to normalize the timestamps so that the first one is "now." Can we set this with Reactive Extensions so that it fires the next event after the time difference between the two elements?

+5
source share
3 answers

Tell me pairs- your sequence:

var obs = pairs.OrderBy(p => p.Timestamp).ToObservable();

Now obs- pairs as ordered observables.

Observable.Zip(
    obs,
    obs.Take(1).Concat(obs),
    (pair1, pair2) => Observable.Timer(pair1.Timestamp - pair2.Timestamp)
      .Select(_ => pair1.Value))
.Concat()
.Subscribe(/* Do something here */);

Zip , . , , .

Original 1--2--4--7--11
Offset   1--1--2--4--7--11
Joined   0--1--2--3--4

Observable.Timer, . Concat IObservable<IObservable<double>> IObservable<double>. , .

+6

"Rx" Rx, :

Action<double> action =
    x =>
        Console.WriteLine(x);

var ts0 = pairs.Select(p => p.Timestamp).Min();

pairs
    .ForEach(p => 
        Scheduler
            .ThreadPool
            .Schedule(
                p.Timestamp.Subtract(ts0),
                () => action(p.Value)));

System.Interactive ForEach, ForEach .

:

var pairs = new []
{
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 30), Value = 1.1, },
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 45), Value = 1.2, },
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 40), Value = 1.3, },
};

, .

+2

, , .

static void RunPairs(IEnumerable<Pair> pairs, Action<double> pairEvent)
{
  if (pairs == null || !pairs.Any() || pairEvent == null)
    return;

  // if we can promise the pairs are already sorted
  // obviously we don't need this next line
  pairs = pairs.OrderBy(p => p.Timestamp);
  var first = pairs .First().Timestamp;
  var wrapped = pairs.Select(p => new { Offset = (p.Timestamp - first), Pair = p });

  var start = DateTime.Now;

  double interval = 250; // 1/4 second
  Timer timer = new Timer(interval);

  timer.AutoReset = true;
  timer.Elapsed += (sender, elapsedArgs) =>
  {
    var signalTime = elapsedArgs.SignalTime;
    var elapsedTime = (signalTime - start);

    var pairsToTrigger = wrapped.TakeWhile(wrap => elapsedTime > wrap.Offset).Select(w => w.Pair);
    wrapped = wrapped.Skip(pairsToTrigger.Count());

    if (!wrapped.Any())
      timer.Stop();

    foreach (var pair in pairsToTrigger)
      pairEvent(pair.Value);    
  };

  timer.Start();
}
0

All Articles