Single stage channel

I want to do something along ArrowChoice strings, but with cables . I want to expect Aither values, and then pass Left values ​​to one channel and Right values ​​to another, and then merge the results back into the Either stream.

Presumably, this can be done by creating internal channels, such as automata: turn the pipeline into a function that takes an argument and returns a monodic list of outputs:

newtype AutomataM i m o = Automata (i -> m (o, Automata i o))

conduitStep :: Conduit i m o -> AutomataM i m [o]

The reason for the list of outputs is that a channel can give 0 or more outputs for each input.

I looked at ResumableConduit and its relatives, and probably the answer is there somewhere. But I can’t understand how this is done.

+3
source share
2 answers

, , :

import Data.Conduit
import Data.Conduit.Internal (Pipe (..), ConduitM (..))

newtype Automata i o m r = Automata (m ([o], Either r (i -> Automata i o m r)))

conduitStep :: Monad m => ConduitM i o m r -> Automata i o m r
conduitStep (ConduitM con0) =
    Automata $ go [] id con0
  where
    go _ front (Done r) = return (front [], Left r)
    go ls front (HaveOutput p _ o) = go ls (front . (o:)) p
    go ls front (NeedInput p _) =
        case ls of
            [] -> return (front [], Right $ conduitStep . ConduitM . p)
            l:ls' -> go ls' front (p l)
    go ls front (PipeM mp) = mp >>= go ls front
    go ls front (Leftover p l) = go (l:ls) front p

:

  • , .
  • .

, ZipConduit, ZipSource ZipSink, , .


ZipConduit - 0.1.5. , :

import           Control.Applicative
import           Data.Conduit
import           Data.Conduit.Extra
import qualified Data.Conduit.List   as CL

conduit1 :: Monad m => Conduit Int m String
conduit1 = CL.map $ \i -> "conduit1: " ++ show i

conduit2 :: Monad m => Conduit Double m String
conduit2 = CL.map $ \d -> "conduit2: " ++ show d

conduit :: Monad m => Conduit (Either Int Double) m String
conduit = getZipConduit $
    ZipConduit (lefts =$= conduit1) *>
    ZipConduit (rights =$= conduit2)
  where
    lefts = CL.mapMaybe (either Just (const Nothing))
    rights = CL.mapMaybe (either (const Nothing) Just)

main :: IO ()
main = do
    let src = do
            yield $ Left 1
            yield $ Right 2
            yield $ Left 3
            yield $ Right 4
        sink = CL.mapM_ putStrLn
    src $$ conduit =$ sink
+2

, pipes, "push-category" Pipes. . , pipes, "sequencing", , , Arrow .

newtype Edge ( ), push-based pipe Category, Arrow, ArrowChoice Functor, Applicative . . , Arrow/ArrowChoice/Applicative Edge .

(Edit: https://github.com/Gabriel439/Haskell-RCPL-Library)


{-# LANGUAGE RankNTypes           #-}
{-# LANGUAGE TypeSynonymInstances #-}

import Prelude hiding ((.), id)
import Pipes.Core
import Pipes.Lift
import Control.Monad.Morph
import Control.Category
import Control.Monad.State.Strict
import Control.Arrow

pipes; Pipes.Core push. Push-based

-- push :: a -> Proxy a' a a' a m r

, , , , Proxy. , "kickstarted", push-Proxy .

push-based pipe, Category, Arrow ArrowChoice. Edge typeclass, Category Arrow

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

Category "push", push id (<~<) :

instance Monad m => Category (Edge m r) where
  id = Edge push
  Edge a . Edge b = Edge (a <~< b)

Edge arr, id (.. push) . respond, p />/ respond == p, f .

instance Monad m => Arrow (Edge m r) where
  arr f = Edge (push />/ respond . f)

snd "" first

  first (Edge p) = Edge $ \(b, d) ->
    evalStateP d $ (up \>\ hoist lift . p />/ dn) b
    where
      up () = do
        (b, d) <- request ()
        lift (put d)
        return b
      dn c = do
        d <- lift get
        respond (c, d)

, ArrowChoice left. left Right, , .

instance (Monad m) => ArrowChoice (Edge m r) where
    left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
      where
          bef x = case x of
              Left b -> return b
              Right d -> do
                  _ <- respond (Right d)
                  x2 <- request ()
                  bef x2
          up () = do
              x <- request ()
              bef x
          dn c = respond (Left c)

Edge "push-based"

type PProducer m r b =            Edge m r () b
type PConsumer m r a = forall b . Edge m r a  b

Functor Applicative PProducer. case Pipe, . , , , , f yield Pipe.

instance Functor (PProducer m r) where
  fmap f (Edge k) = $ Edge $ \() -> go (k ()) where
    go p = case p of
      Request () ku -> Request ()    (\() -> go (ku ()))
      -- This is the only interesting line
      Respond b  ku -> Respond (f b) (\() -> go (ku ()))
      M          m  -> M (m >>= \p' -> return (go p'))  
      Pure    r     -> Pure r

, Applicative , , .

instance (Monad m) => Applicative (Edge m r ()) where
    pure b = Edge $ \() -> forever $ respond b
    (Edge k1) <*> (Edge k2) = Edge (\() -> goL (k1 ()) (k2 ()))
      where
        goL p1 p2 = case p1 of
            Request () ku -> Request () (\() -> goL   (ku ()) p2)
            Respond f  ku ->                    goR f (ku ()) p2
            M          m  -> M (m >>= \p1' -> return (goL p1' p2))
            Pure    r     -> Pure r
        goR f p1 p2 = case p2 of
            Request () ku -> Request ()    (\() -> goR f p1 (ku ()))
            Respond x  ku -> Respond (f x) (\() -> goL   p1 (ku ()))
            M          m  -> M (m >>= \p2' -> return (goR f p1 p2'))
            Pure    r     -> Pure r
+1

All Articles