Using Rx Repeat () and Replay () to cache and restart a DNS query

I am new to Rx, so I hope you can carry me. As an exercise for myself and, possibly, a sample that I can demonstrate for colleagues, I made two wrapper classes for Dns.BeginGetHostEntry () / EndGetHostEntry (): DnsResolver and DnsResolverRx.

Each class has one public static method:

void Resolve(string host, Action<IPHostEntry> getResult, Control context = null);

... and some additional requirements to make it interesting: 1. If context is provided, getResult must be called in the associated thread 2. previous results for the same host are cached for MaxResultAge seconds.

A non-Rx version works fine, but is not really relevant to this issue. The Rx version looks like this:

class DnsResolverRx
{
  static Func<string, IObservable<IPHostEntry>> _resolver = Observable.FromAsyncPattern<string, IPHostEntry>(Dns.BeginGetHostEntry, Dns.EndGetHostEntry);

  public static void Resolve(string host, Action<IPHostEntry> setResult, Control context = null)
  {
    IObservable<IPHostEntry> result;
    result = _cache.GetOrCreateValue( // a trivial TryGetValue wrapper
      host,
      () => _resolver(host)
        .Do(e => Debug.WriteLine("resolved"))
        .Repeat()
        .Do(e => Debug.WriteLine("repeated"))
        .Replay(MaxResultAge)
        .RefCount()
    );

    result = result.Take(1); // each request needs only 1 result

    if (context != null)
      result = result.ObserveOn(context);

    result.Subscribe(
      entry => setResult(entry),
      ex => setResult(null)
    );
  }
}

static void Main(string[] args)
{
  for (int i=0; i<10; ++i)
  {
    int num = i;
    Debug.WriteLine("start" + num);
    DnsResolverRx.Resolve("chief", e => Debug.WriteLine("done"+num));
    Thread.Sleep(200);
  }

  Console.ReadLine();
}

Replay(), , , MaxResultAge , . Repeat(), :

start0
start1
start2
start3
start4
start5
resolved
repeated
done0
done1
done2
done3
done4
done5
start6
resolved
repeated
resolved
repeated
... and so on ad infinitum

- , , ?

+3
1

- FromAsyncPatterns , , , .

_resolver(host)

Observable.Defer(() => _resolver(host))

, .

( ), .

+6

All Articles