When reading data from NetworkStream using ReadUntilClosedObservable1, the returned data is corrupted, as some data reading blocks overlap.
However, when I read data using ReadUntilClosedObservable2, the data comes in without a problem.
I want to use ReadUntilClosedObservable1, because multiple reads from the stream in ReadUntilClosedObservable2 burn the CPU.
How can I receive messages in sync order?
UPDATE:
return Observable.Timer(TimeSpan.Zero, interval, TaskPoolScheduler.Default)
.SelectMany(_ => readToEnd)
.Where(dataChunk => dataChunk.Length > 0);
I just noticed that readToEndto quit again and again before he completes the previous task. Do I need to synchronize it? If this Observable.Timeris a problem, how can I achieve the same effect without it by reading the intervals, but starting without waiting?
public static IObservable<int> ReadObservable(this Stream stream, byte[] buffer
,int offset, int count)
{
return stream.ReadAsync(buffer, offset, count)
.ToObservable();
}
public static IObservable<byte[]> ReadObservable(this Stream stream,
int bufferSize)
{
var buffer = new byte[bufferSize];
return stream.ReadObservable(buffer, 0, buffer.Length)
.Select(cbRead =>
{
if (cbRead == 0)
{
return new byte[0];
}
if (cbRead == buffer.Length)
{
return buffer;
}
var dataChunk = new byte[cbRead];
Buffer.BlockCopy(buffer, 0, dataChunk,
0, cbRead);
return dataChunk;
});
}
public static IObservable<byte[]> ReadUntilClosedObservable1(this NetworkStream
stream, int bufferSize, TimeSpan interval)
{
var readToEnd = Observable.Defer(() => stream.ReadObservable(bufferSize))
.DoWhile(() => stream.DataAvailable)
.ToList()
.Select(dataChunks =>
{
var buffer = new List<byte>();
foreach (var dataChunk in dataChunks)
{
buffer.AddRange(dataChunk);
}
return buffer.ToArray();
});
return Observable.Timer(TimeSpan.Zero, interval, TaskPoolScheduler.Default)
.SelectMany(_ => readToEnd)
.Where(dataChunk => dataChunk.Length > 0);
}
public static IObservable<byte[]> ReadUntilClosedObservable2(this Stream stream
,int bufferSize)
{
return Observable.Defer(() => stream.ReadObservable(bufferSize))
.Repeat()
.Where(dataChunk => dataChunk.Length > 0);
}