How to implement an atomic switch from one IObserver to another?

I have IObservable<byte[]>one that I convert to IObservable<XDocument>using some intermediate steps:

var observedXDocuments =
    from b in observedBytes
    // Lot of intermediate steps to transform byte arrays into XDocuments
    select xDoc;

At some point in time I'm interested in the observable XDocument, so I subscribe to IObserver<XDocument>. At a later moment, I would like to subscribe to another IObserver<XDocument>and get rid of the old.

How can I do this in one atomic operation without losing the observable XDocument? I could do something like:

oldObserver.Dispose();
observedXDocuments.Subscribe(newObserver);

I am worried that between these two challenges I might lose XDocument. If I switch two calls, it may happen that I get the same XDocumenttwice.

+3
source share
2 answers

. ExchangeableObserver, . ExchangeableObserver -. . Exchange(). - :

public class ExchangeableObserver<T> : IObserver<T> {
  private IObserver<T> inner;

  public ExchangeableObserver(IObserver<T> inner) {
    this.inner=inner;
  }

  public IObserver<T> Exchange(IObserver<T> newInner) {
    return Interlocked.Exchange(ref inner, newInner);
  }

  public void OnNext(T value) {
    inner.OnNext(value);
  }

  public void OnCompleted() {
    inner.OnCompleted();
  }

  public void OnError(Exception error) {
    inner.OnError(error);
  }
}
+6

, shure , IObservable<byte[]> IObservable<XDocument>, .

, ( testet)

  System.Threading.ReaderWriterLockSlim criticalSection 
       = new System.Threading.ReaderWriterLockSlim(...);  


  ... converting from `IObservable<byte[]>` to `IObservable<XDocument>`  
  criticalSection.EnterReadLock();
  Call IObservable<XDocument>
  criticalSection.ExitReadLock();

  .... replacing IObservable<XDocument>
  criticalSection.EnterWriteLock();
  Call change IObservable<XDocument>
  criticalSection.ExitWriteLock();

: Call IObservable<XDocument>

  > What exactly do you mean with the line `Call IObservable<XDocument>`?

sentense

  > I have an `IObservable<byte[]>` that I transform 
  > into an `IObservable<XDocument>` using some intermediate steps...

IObservable<byte[]>, XDocument byte[], , IObservable<XDocument>.

Call IObservable<XDocument> means code that fires a subsequent event

+1
source

All Articles