Vector.Stream 的流转换器 Monad

Stream transformer Monad for Vector.Stream

Data.Vector.Stream provides an nice Stream implementation that is very efficient thanks to the focus on fusability (see this paper for more). Since vector-0.1 this Stream implementation changed slightly by moving the Step type into a monad. (Now, the implementation is located in Data.Vector.Fusion.Stream.Monadic.)

简而言之,这里是 Stream 的定义:

data Step s a where
    Yield :: a -> s -> Step s a
    Skip  :: s -> Step s a
    Done  :: Step s a

data Stream a = forall s. Stream (s -> Step s a) s

Step s a 用更新函数 s -> Step s a 封装了状态 s 一次迭代的三种可能结果。流是 Done,或者跳过输出,或者产生输出。 (上面的定义使用了 GADT,但这里不相关。)

这个 Stream 的简单应用是:

empty :: Stream a
empty = Stream (const Done) ()

singleton :: a -> Stream a
singleton x = Stream step True where
    step True  = Yield x False
    step False = Done

fromList :: [a] -> Stream a
fromList zs = Stream step zs
where
    step (x:xs) = Yield x xs
    step []     = Done

严格左折是这样完成的:

foldl'S :: (a -> b -> a) -> a -> Stream b -> a
foldl'S f a0 (Stream step s) = go a0 s where
    go a s = a `seq`
                case step s of
                    Yield b s' -> go (f a b) s'
                    Skip    s' -> go a       s'
                    Done       -> a

并且给出了通常的类似列表的函数 lengthS = foldl'S (\n _ -> n+1) 0 等。这当然不如 Conduit or Pipes 优雅,但它简单而快速。到目前为止一切顺利。

现在,让我们尝试将 'low-level' 流聚合到更高级别的流。例如,如果你有一个比特流 Stream Bool,你可能想通过使用一些聪明的编解码器来解码这些比特以产生 Stream Int。当然,总是可以从给定 Stream step s 中提取的 step 函数构建一个新的阶跃函数 s -> Step s b。重复应用 step :: s->Step s a 函数会导致笨拙的 case (step s) of ... 级联,一遍又一遍地处理三种可能性 DoneSkipYield。理想情况下,aggregate 应该像这样:

aggregate :: Stream a -> M?? -> Stream b
newStream = aggregate oldStream $ do
    a1 <- get    -- a1 :: a
    if a1 == True then doSomething
    else do
        a2 <- get
        -- etc.

M??是一些monad,有待定义。让我们尝试输入 Appl s a:

newtype Appl s a = Appl ((s->Step s a) -> s -> Step s a)

之所以叫Appl是因为它有函数应用程序的签名。 monad 实例非常简单:

instance Monad (Appl s) where
    return a = Appl (\_ s -> Yield a s)
    (Appl ap) >>= f = Appl (\step s ->
        case (ap step s) of
            Done -> Done
            Skip s' -> untilNotSkip step s'
            Yield a' s' -> ap' step s' where Appl ap' = f a'

其中 untilNotSkip :: (s->Step s a) -> s -> Step s a 只是重复(嵌套)应用 step :: (s->Step s a) 直到返回 DoneYield

get函数只是普通的函数应用

get :: Appl s a
get = Appl(\step s -> step s)

为了解决问题,FunctorApplicative 需要完成,问题来了:Appl s 不能成为函子。签名是

fmap :: (a->b) -> Appl s a -> Appl s b

那是行不通的,因为为了从函数 (s->Step s a) -> s -> Step s a) 中创建一个函数 (s->Step s b) -> s -> Step s b),我需要一个 b->a。我可以在 a->b 上后退 Appl s b,但我不能向前推进 Appl s a - 即我可以有一个逆变函子,但不能有一个函子。那真是怪了。流 are quite naturally comonads,但我看不到连接。 Appl 的目的是将阶跃函数 s->Step s a 转换为另一个 s->Step s b.

这里出了点问题,Appl 不是正确的“M??”。有人可以帮忙吗?

更新

正如 Li-yao Xia 所指出的,类型应该类似于

data Walk a b = forall s. Walk ((s->Step s a) -> s -> Step s b)

Functor、Applicative 和 Monad 实例将是

instance Functor (Step s) where
    fmap f Done        = Done
    fmap f (Skip s)    = Skip s
    fmap f (Yield a s) = Yield (f a) s

instance Functor (Walk a) where
    fmap f (Walk t) = Walk ( \step s -> fmap f (t step s) )

-- default Applicative given a monad instance
ap :: (Monad m) => m (a -> b) -> m a -> m b
ap mf m = do
    f <- mf
    x <- m
    return (f x)

untilNotSkip :: (s->Step s a) -> s -> Step s a
untilNotSkip step s = case step s of
    Done        -> Done
    Skip s'     -> untilNotSkip step s'
    Yield a' s' -> Yield a' s'

instance Monad (Walk a) where
    return a = Walk (\_ s -> Yield a s)
    Walk t >>= f =
        Walk (\step s -> case t (untilNotSkip step) s of
            Done        -> Done
            Skip _      -> error "Internal error."
            Yield b' s' -> case f b' of Walk t' -> t' step s'   -- bad
    )

instance Applicative (Walk a) where
    pure = return
    (<*>) = ap

然而,类型检查器不允许这个 monad 实例。在>>=的定义中,Walk (\step s -> ...中的sYield b' s' -> ...中的s'不同,但必须相同。这里的根本问题是 (>>=) :: Walk a b -> (b->Walk a c) -> Walk a c 有两个独立的全量化状态 s,一个在第一个参数中,另一个由 b->Walk a c 返回。实际上这是(滥用符号) (forall s. Walk s a b) -> (forall s'. b->Walk s' a' c) -> (forall s''. Walk s'' a c),这在概念上和类型检查器上都没有意义。所有三个 ss's'' 必须是同一类型。

Walk 未在 s 上全部量化的变体:

data Walk s a b = Walk ((s->Step s a) -> s -> Step s b)

允许正确定义绑定,但是 aggregate 将不起作用:

-- does not compile
aggregate :: Stream a -> Walk s a b -> Stream b
aggregate (Stream step s) (M t) = Stream (t step) s

同样,流状态 s 必须始终相同。解决此问题的一种方法是引入 data PreStream s a = PreStream (s -> Step s a) s,但这也不允许 aggregate :: Stream a -> ?? -> Stream b

源码在github.

让我们再看一下Appl,因为看起来差不多。

newtype Appl s a = Appl ((s->Step s a) -> s -> Step s a)

我们的想法是通过将 "low-level" 阶跃函数转换为 "high-level" 阶跃函数来定义流换能器。从这个角度来看,这两个步骤函数不应​​该有相同的输出。例如,如果我们将位转换为字节,我们需要 (s -> Step s Bit) -> s -> Step s Byte.

因此,更好的类型是

newtype Walk s b a = Walk ((s -> Step s b) -> s -> Step s a)
-- A walk is many steps.

此外,由于 Streams 上存在量化,你需要在某些时候对 s 进行一些通用量化才能使用 Walk,所以你可能很好地把它放在类型定义中。

newtype Walk b a = Walk (forall s. (s -> Step s b) -> s -> Step s a)