Channel sequential decoding of binary data

The goal is to have a channel with the following type signature

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a

The channel must repeatedly analyze the protocol buffers (using the function ByteString -> a) received via TCP / IP (using the packet network-conduit).

Wired Message Format

{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...

(curly braces are not a member of the protocol; they are used only here for entity separation).

The first idea was to use sequenceSinkfor reuse Sinkthat one ProtoBuf can parse:

[...]
import qualified Data.Binary         as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util   as CU

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
    CU.sequenceSink () $ \() ->
        do lenBytes <- CB.take 4                                -- read protobuf length
           let len :: Word32
               len = B.decode lengthBytes                       -- decode ProtoBuf length
               intLen = fromIntegral len
           protobufBytes <- CB.take intLen                      -- read the ProtoBuf bytes
           return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf

It does not work (it works only for the first protocol buffer), because it seems that there are several remaining bytes already read from the source, but not consumed through CB.take, which are discarded.

" ".

, ?

PS: , . , {length}{UTF8 encoded string}{length}{UTF8 encoded string}... , (utf8StringConduit :: MonadResource m => Conduit ByteString m Text).

:

( () ) CB.take , ( ) await ( ). , , , , sequenceSink , : - (.

( , ):

utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
    CU.sequenceSink [] $ \st ->
        do (lengthBytes, st') <- takeWithState BS.empty st 4
           let len :: Word32
               len = B.decode $ BSL.fromChunks [lengthBytes]
               intLength = fromIntegral len
           (textBytes, st'') <- takeWithState BS.empty st' intLength
           return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]

takeWithState :: Monad m
              => ByteString
              -> [ByteString]
              -> Int
              -> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
    let stateLenSum = foldl' (+) 0 $ map BS.length state
     in if stateLenSum >= neededLen
           then do let (firstChunk:state') = state
                       (neededChunk, pushBack) = BS.splitAt neededLen firstChunk
                       acc' = acc `BS.append` neededChunk
                       neededLen' = neededLen - BS.length neededChunk
                       state'' = if BS.null pushBack
                                    then state'
                                    else pushBack:state'
                   takeWithState acc' state'' neededLen'
           else do aM <- await
                   case aM of
                     Just a -> takeWithState acc (state ++ [a]) neededLen
                     Nothing -> error "to be fixed later"
+5
1

messageWithLengthPutM messageWithLengthGetM (. ), , varint , . , messageWithLength Get/Put -

myMessageWithLengthGetM = 
   do size <- getWord32be 
      getMessageWithSize size

, getMessageWithSize, . , getByteString, " " .

As for pipelines: have you tried to implement a pipeline without Data.Conduit.Util? Sort of

protobufConduit protobufDecode = loop
   where
      loop = 
         do len <- liftM convertLen (CB.take 4)
            bs <- CB.take len
            yield (protobufDecode bs)
            loop

The code used here is:

pbufSerialize :: (ReflectDescriptor w, Wire w) => Conduit w IO ByteString
pbufSerialize = awaitForever f
    where f pb = M.mapM_ yield $ BSL.toChunks $ runPut (messageWithLengthPutM pb)

pbufParse :: (ReflectDescriptor w, Wire w, Show w) => Conduit ByteString IO w
pbufParse = new
    where
      new = read (runGet messageWithLengthGetM . BSL.fromChunks . (:[]))
      read parse =
          do mbs <- await
             case mbs of
               Just bs -> checkResult (parse bs)
               Nothing -> return ()
      checkResult result =
          case result of
            Failed _ errmsg -> fail errmsg
            Partial cont -> read (cont . Just . BSL.fromChunks . (:[]))
            Finished rest _ msg ->
                do yield msg
                   checkResult (runGet messageWithLengthGetM rest)
+4
source

All Articles