Reactive Extension Flow Arithmetic

I am currently working on an application that integrates many data streams through equations. What I would like to do is something like:

var result = (stream1 + stream2) / stream3 + stream4 * 2;

Where resultupdated whenever an update of any of the threads. Currently, the only way to express this in Rx is as follows:

var result = stream1.CombineLatest(stream2, (x,y) => x+y)
  .CombineLatest(stream3, (x,y) => x / y)
  .CombineLatest(stream4, (x,y) => x + y*2);

This is not so clear.

My current idea is this:

Public class ArithmeticStream : IObservable<double>
{
    public static ArithmeticStream operator +(ArithmeticStream xx, ArithmeticStream yy)
    {
        return Observable.CombineLatest(xx,yy, (x,y) => x + y);
    }
    ...
}

The problem is that CombineLatest returns IObservable<double>instead of ArithmeticStream.

Two possible questions:

How can I transparently convert IObservable<double>to ArithmeticStream?

Is there an alternative route that will give me the result I want?

+5
source share
4 answers

, ...

, , (, , )... , - , DSL- "", , .

public static class ArithmeticStreamExt
{
    public static ArithmeticStream Wrap(this IObservable<double> src)
    {
        return new ArithmeticStream(src);
    }
    public static ArithmeticStream Const(this double constValue)
    {
        return new ArithmeticStream(Observable.Return(constValue));
    }
}
public class ArithmeticStream 
{
    private IObservable<double> _inner;

    public ArithmeticStream(IObservable<double> source)
    {
        _inner = source;
    }

    public IObservable<double> Source {get { return _inner; }}

    public static ArithmeticStream operator +(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l + r));
    }
    public static ArithmeticStream operator -(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l - r));
    }
    public static ArithmeticStream operator *(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l * r));
    }
    public static ArithmeticStream operator /(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l / r));
    }

    public static ArithmeticStream operator +(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l + r));
    }
    public static ArithmeticStream operator -(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l - r));
    }
    public static ArithmeticStream operator *(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l * r));
    }
    public static ArithmeticStream operator /(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
             left._inner.CombineLatest(right, (l, r) => l / r));
    }
}

:

void Main()
{
    var s1 = new Subject<double>();
    var s2 = new Subject<double>();
    var s3 = new Subject<double>();
    var s4 = new Subject<double>();

    var result = (s1.Wrap() + s2) / s3 + (s4.Wrap() * 2.0.Const());
    using(result.Source.Subscribe(Console.WriteLine))
    {
        s1.OnNext(1.0);
        s2.OnNext(2.0);
        s3.OnNext(3.0);
        s4.OnNext(4.0);
    }
}
+2

... , DSL- ( ):

public static class Ext
{
    public static IObservable<double> Const(this double constant)
    {
        return Observable.Return(constant);
    }

    public static IObservable<double> Plus(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l + r);
    }
    public static IObservable<double> Minus(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l - r);
    }
    public static IObservable<double> Times(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l * r);
    }
    public static IObservable<double> Over(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right,(l,r) => l / r);
    }
}

, :

var result = (s1.Plus(s2)).Over(s3)
        .Plus(s4)
        .Times(2.0.Const());

, :

var verboseResult = 
    (s1.Do(Console.WriteLine).Plus(s2.Do(Console.WriteLine)))
    .Over(s3.Do(Console.WriteLine))
    .Plus(s4.Do(Console.WriteLine))
    .Times(2.0.Const())
    .Do(x => Console.WriteLine("(s1 + s2) / s3 + s4 * 2 = " + x));
+2

CombineLatest, 3, 4, 5 .. IObservables . - .

, , .

CombineLatest .

+1

This is not as good as operator overloading, but I am sure that you cannot perform such operator overloading because there is no support for operator overloading in extension methods.

var stream1 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.5, x => x);
var stream2 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.25, x => x);
var stream3 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.125, x => x);
var stream4 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.0625, x => x);
var result = stream1.CombineLatest(stream2, stream3, stream4, (w, x, y, z) => (w + x) / y + z * 2);

Otherwise, JerKimball gave a good answer.

0
source

All Articles