The goal is to have a channel with the following type signature
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
The channel must repeatedly analyze the protocol buffers (using the function ByteString -> a) received via TCP / IP (using the packet network-conduit).
Wired Message Format
{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...
(curly braces are not a member of the protocol; they are used only here for entity separation).
The first idea was to use sequenceSinkfor reuse Sinkthat one ProtoBuf can parse:
[...]
import qualified Data.Binary as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util as CU
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
CU.sequenceSink () $ \() ->
do lenBytes <- CB.take 4 -- read protobuf length
let len :: Word32
len = B.decode lengthBytes -- decode ProtoBuf length
intLen = fromIntegral len
protobufBytes <- CB.take intLen -- read the ProtoBuf bytes
return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf
It does not work (it works only for the first protocol buffer), because it seems that there are several remaining bytes already read from the source, but not consumed through CB.take, which are discarded.
" ".
, ?
PS: , . , {length}{UTF8 encoded string}{length}{UTF8 encoded string}... , (utf8StringConduit :: MonadResource m => Conduit ByteString m Text).
:
( () ) CB.take , ( ) await ( ). , , , , sequenceSink , : - (.
( , ):
utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
CU.sequenceSink [] $ \st ->
do (lengthBytes, st') <- takeWithState BS.empty st 4
let len :: Word32
len = B.decode $ BSL.fromChunks [lengthBytes]
intLength = fromIntegral len
(textBytes, st'') <- takeWithState BS.empty st' intLength
return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]
takeWithState :: Monad m
=> ByteString
-> [ByteString]
-> Int
-> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
let stateLenSum = foldl' (+) 0 $ map BS.length state
in if stateLenSum >= neededLen
then do let (firstChunk:state') = state
(neededChunk, pushBack) = BS.splitAt neededLen firstChunk
acc' = acc `BS.append` neededChunk
neededLen' = neededLen - BS.length neededChunk
state'' = if BS.null pushBack
then state'
else pushBack:state'
takeWithState acc' state'' neededLen'
else do aM <- await
case aM of
Just a -> takeWithState acc (state ++ [a]) neededLen
Nothing -> error "to be fixed later"