Necessity: long-term program with TCP connections
A C # 4.0 program (VS1010, XP) must connect to the host using TCP, send and receive bytes, sometimes close the connection and reopen it later. The surrounding code is written using the Rx.Net style Observable. The data volume is low, but the program should run continuously (avoid memory leaks, taking care of the correct allocation of resources).
The text below is long because I explain what I searched and found. Now it works.
General questions: since Rx was once unintuitive, are the solutions good? Will it be reliable (say, can it work for years without problems)?
Solution so far
Submit
The program receives NetworkStreamas follows:
TcpClient tcpClient = new TcpClient();
LingerOption lingerOption = new LingerOption(false, 0); // Make sure that on call to Close(), connection is closed immediately even if some data is pending.
tcpClient.LingerState = lingerOption;
tcpClient.Connect(remoteHostPort);
return tcpClient.GetStream();
Asynchronous sending is quite simple. Rx.Net allows you to handle this with much shorter and cleaner code than traditional solutions. I created a dedicated thread with EventLoopScheduler. Transactions that require sending are expressed using IObservable. Use ObserveOn(sendRecvThreadScheduler)ensures that all send operations are performed on this thread.
sendRecvThreadScheduler = new EventLoopScheduler(
ts =>
{
var thread = new System.Threading.Thread(ts) { Name = "my send+receive thread", IsBackground = true };
return thread;
});
// Loop code for sending not shown (too long and off-topic).
So far, this is excellent and impeccable.
Get h3>
, Rx.Net , .
(, http://www.introtorx.com/) stackoverflow , , Rx.Net qaru.site/questions/1142820/...:
public static class Ext
{
public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
{
var buffer = new byte[bufferSize];
var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead,
stream.EndRead);
return Observable.While(
() => stream.CanRead,
Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
.Select(readBytes => buffer.Take(readBytes).ToArray()));
}
}
. .
.
. : , , .
, , BeginRead()/EndRead() , CPU, . ( Subscribe() ReadObservable , ) ( ). , , Subscribe().
someobject.readOneStreamObservableSubscription = myobject.readOneStreamObservable.Subscribe(buf =>
{
if (buf.Length == 0)
{
MyLoggerLog("Read explicitly returned zero bytes. Closing stream.");
this.pscDestroyIfAny();
}
});
. , -, . # - BeginRead BeginWrite? -
a CancellationToken, Observable.While() . , BeginRead() .
. .net - - , Catch, Observable .
:
public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize, CancellationToken token)
{
var buffer = new byte[bufferSize];
var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead,
stream.EndRead);
return Observable.While(
() =>
{
return (!token.IsCancellationRequested) && stream.CanRead;
},
Observable.Defer(() =>
{
if ((!token.IsCancellationRequested) && stream.CanRead)
{
return asyncRead(buffer, 0, bufferSize);
}
else
{
return Observable.Empty<int>();
}
})
.Catch(Observable.Empty<int>())
.Select(readBytes => buffer.Take(readBytes).ToArray()));
}
?
, , . , , , . .
, .
-, :
.Catch(Observable.Empty<int>())
catch . , , (. )?
.Catch((Func<Exception, IObservable<int>>)(ex =>
{
MyLoggerLogException("On asynchronous read from network.", ex);
return Observable.Empty<int>();
}))
, .
/ ?
- , Reactive Extensions?
.