What is the correct way to create an Observable that reads the stream to the end

I'm scared here. I usually read a book, but not yet. I have found many examples of various things related to reading streams using RX, but it is very difficult for me to plunge.

I know that I can use Observable.FromAsyncPattern to create a wrapper for Stream BeginRead / EndRead or BeginReadLine / EndReadLine methods.

But this is only read once - when the first observer subscribes.

I want an Observable that will continue reading and pumping OnNext until the stream errors end.

In addition to this, I would also like to know how I can share what is happening with several subscribers so that they all receive the items.

+5
source share
4

Observable.Create

,

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    //this part here can be changed to something like this
                    //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive);

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }

Connect() IConnectableObservable

+1

, rxx:

using (new FileStream(@"filename.txt", FileMode.Open)
       .ReadToEndObservable()
       .Subscribe(x => Console.WriteLine(x.Length)))
{
  Console.ReadKey();
}

.

+3

-----------!

: NetworkStream

:

public static class Ext
{        
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
    {        
        // to hold read data
        var buffer = new byte[bufferSize];
        // Step 1: async signature => observable factory
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead, 
            stream.EndRead);
        return Observable.While(
            // while there is data to be read
            () => stream.CanRead, 
            // iteratively invoke the observable factory, which will
            // "recreate" it such that it will start from the current
            // stream position - hence "0" for offset
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

, , :

// Note: ToEnumerable works here because your filestream
// has a finite length - don't do this with infinite streams!
var blobboData  = stream
     .ReadObservable(bufferSize)
     // take while we're still reading data
     .TakeWhile(returnBuffer => returnBuffer.Length > 0)
     .ToEnumerable()
     // mash them all together
     .SelectMany(buffer => buffer)
     .ToArray();
+1

Repeat, Publish Replay, .

An example of a simple complete Rx solution for reading lines from any stream to the end:

public static IObservable<string> ReadLines(Stream stream)
{
    return Observable.Using(
        () => new StreamReader(stream),
        reader => Observable.FromAsync(reader.ReadLineAsync)
                            .Repeat()
                            .TakeWhile(line => line != null));
}

This solution also exploits the fact that it ReadLinereturns nullwhen the end of the stream is reached.

+1
source

All Articles