Non-Thermally Intermittent Channel Sources

I was hoping to see a non-deterministic striping operation for sources with a signature such as

interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a)

The use case is that I have a p2p application that supports open connections to many nodes on the network, and basically just sits waiting for messages from any of them. When a message arrives, he doesn't care where it came from, but he needs to process the message as soon as possible. Theoretically, this kind of application (at least when used for sources like sockets) can completely bypass the GHC IO manager and run select/ epoll/ etc. calls directly, but I don’t really care about how it is implemented while it works.

Is this possible with conduit? A less general, but probably more appropriate approach might be to write a function [(k, Socket)] -> Source m (k, ByteString)that handles reception on all sockets for you.

I noticed operations ResumableSourcein the conduit, but they all seem to want to know about a particular Sinkone that feels like part of an abstraction leak, at least for that operation.

+5
source share
2 answers

The stm-conduit package provides mergeSources that does something similar, albeit not identical to what you are looking for. This is probably a good place to start.

+5
source

Yes it is possible.

Source , Source Sink, - concurrency:

concur :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Sink a m r

... Source, :

synchronize :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Source a m r

, , , conduit, , , Source, , .

, API :

poll :: (WhateverIOMonadClassItWouldWant m) => [Source a m r] -> m (Source a m r)

... k, .

+3

All Articles