I am new to Rx, so I hope you can carry me. As an exercise for myself and, possibly, a sample that I can demonstrate for colleagues, I made two wrapper classes for Dns.BeginGetHostEntry () / EndGetHostEntry (): DnsResolver and DnsResolverRx.
Each class has one public static method:
void Resolve(string host, Action<IPHostEntry> getResult, Control context = null);
... and some additional requirements to make it interesting: 1. If context is provided, getResult must be called in the associated thread 2. previous results for the same host are cached for MaxResultAge seconds.
A non-Rx version works fine, but is not really relevant to this issue. The Rx version looks like this:
class DnsResolverRx
{
static Func<string, IObservable<IPHostEntry>> _resolver = Observable.FromAsyncPattern<string, IPHostEntry>(Dns.BeginGetHostEntry, Dns.EndGetHostEntry);
public static void Resolve(string host, Action<IPHostEntry> setResult, Control context = null)
{
IObservable<IPHostEntry> result;
result = _cache.GetOrCreateValue(
host,
() => _resolver(host)
.Do(e => Debug.WriteLine("resolved"))
.Repeat()
.Do(e => Debug.WriteLine("repeated"))
.Replay(MaxResultAge)
.RefCount()
);
result = result.Take(1);
if (context != null)
result = result.ObserveOn(context);
result.Subscribe(
entry => setResult(entry),
ex => setResult(null)
);
}
}
static void Main(string[] args)
{
for (int i=0; i<10; ++i)
{
int num = i;
Debug.WriteLine("start" + num);
DnsResolverRx.Resolve("chief", e => Debug.WriteLine("done"+num));
Thread.Sleep(200);
}
Console.ReadLine();
}
Replay(), , , MaxResultAge , . Repeat(), :
start0
start1
start2
start3
start4
start5
resolved
repeated
done0
done1
done2
done3
done4
done5
start6
resolved
repeated
resolved
repeated
... and so on ad infinitum
- , , ?