Reading from a stream using Observable via FromAsyncPattern, how to close / undo

Necessity: long-term program with TCP connections

A C # 4.0 program (VS1010, XP) must connect to the host using TCP, send and receive bytes, sometimes close the connection and reopen it later. The surrounding code is written using the Rx.Net style Observable. The data volume is low, but the program should run continuously (avoid memory leaks, taking care of the correct allocation of resources).

The text below is long because I explain what I searched and found. Now it works.

General questions: since Rx was once unintuitive, are the solutions good? Will it be reliable (say, can it work for years without problems)?

Solution so far

Submit

The program receives NetworkStreamas follows:

TcpClient tcpClient = new TcpClient();
LingerOption lingerOption = new LingerOption(false, 0); // Make sure that on call to Close(), connection is closed immediately even if some data is pending.
tcpClient.LingerState = lingerOption;

tcpClient.Connect(remoteHostPort);
return tcpClient.GetStream();

Asynchronous sending is quite simple. Rx.Net allows you to handle this with much shorter and cleaner code than traditional solutions. I created a dedicated thread with EventLoopScheduler. Transactions that require sending are expressed using IObservable. Use ObserveOn(sendRecvThreadScheduler)ensures that all send operations are performed on this thread.

sendRecvThreadScheduler = new EventLoopScheduler(
    ts =>
    {
        var thread = new System.Threading.Thread(ts) { Name = "my send+receive thread", IsBackground = true };
        return thread;
    });
        // Loop code for sending not shown (too long and off-topic).

So far, this is excellent and impeccable.

Get h3>

, Rx.Net , . (, http://www.introtorx.com/) stackoverflow , , Rx.Net qaru.site/questions/1142820/...:

public static class Ext
{
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
    {
        // to hold read data
        var buffer = new byte[bufferSize];
        // Step 1: async signature => observable factory
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead,
            stream.EndRead);
        return Observable.While(
            // while there is data to be read
            () => stream.CanRead,
            // iteratively invoke the observable factory, which will
            // "recreate" it such that it will start from the current
            // stream position - hence "0" for offset
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

. .

.

. : , , .

, , BeginRead()/EndRead() , CPU, . ( Subscribe() ReadObservable , ) ( ). , , Subscribe().

    someobject.readOneStreamObservableSubscription = myobject.readOneStreamObservable.Subscribe(buf =>
    {
        if (buf.Length == 0)
        {
                MyLoggerLog("Read explicitly returned zero bytes.  Closing stream.");
                this.pscDestroyIfAny();
        }
    });

. , -, . # - BeginRead BeginWrite? -

a CancellationToken, Observable.While() . , BeginRead() .

. .net - - , Catch, Observable .

:

public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize, CancellationToken token)
{
    // to hold read data
    var buffer = new byte[bufferSize];
    // Step 1: async signature => observable factory
    var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
        stream.BeginRead,
        stream.EndRead);
    return Observable.While(
        // while there is data to be read
        () =>
        {
            return (!token.IsCancellationRequested) && stream.CanRead;
        },
        // iteratively invoke the observable factory, which will
        // "recreate" it such that it will start from the current
        // stream position - hence "0" for offset
        Observable.Defer(() =>
            {
                if ((!token.IsCancellationRequested) && stream.CanRead)
                {
                    return asyncRead(buffer, 0, bufferSize);
                }
                else
                {
                    return Observable.Empty<int>();
                }
            })
            .Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
        .Select(readBytes => buffer.Take(readBytes).ToArray()));
}

?

, , . , , , . .

, .

-, :

.Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.

catch . , , (. )?

.Catch((Func<Exception, IObservable<int>>)(ex =>
{
    MyLoggerLogException("On asynchronous read from network.", ex);
    return Observable.Empty<int>();
})) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.

, .

/ ?

- , Reactive Extensions?

.

+2

All Articles