I was hoping to see a non-deterministic striping operation for sources with a signature such as
interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a)
The use case is that I have a p2p application that supports open connections to many nodes on the network, and basically just sits waiting for messages from any of them. When a message arrives, he doesn't care where it came from, but he needs to process the message as soon as possible. Theoretically, this kind of application (at least when used for sources like sockets) can completely bypass the GHC IO manager and run select/ epoll/ etc. calls directly, but I don’t really care about how it is implemented while it works.
Is this possible with conduit? A less general, but probably more appropriate approach might be to write a function [(k, Socket)] -> Source m (k, ByteString)that handles reception on all sockets for you.
I noticed operations ResumableSourcein the conduit, but they all seem to want to know about a particular Sinkone that feels like part of an abstraction leak, at least for that operation.
source
share