您如何并行化从 Haskell 中的标准输入延迟读取的信息?

How do you parallelize lazily read information from stdin in Haskell?

我正在使用我编写的这段代码,出于某种原因,threadscope 一直告诉我它几乎从不一次使用多个内核。我认为问题是为了获得第二行,它需要完全评估第一行,但我想不出一种简单的方法让它一次读取 11 行。

module Main where

import Control.Parallel
import Control.Parallel.Strategies
import System.IO
import Data.List.Split
import Control.DeepSeq

process :: [String] -> [String]
process lines = do
    let xs = map (\x -> read x :: Double) lines
        ys = map (\x -> 1.0 / (1.0 + (exp (-x)))) xs
        retlines = map (\x -> (show x ) ++ "\n") ys
    retlines

main :: IO ()
main = do
    c <- getContents
    let xs = lines c
        ys = (process xs) `using` parBuffer 11 rdeepseq
    putStr (foldr (++) [] ys)

如果我没看错这段代码,parBuffer n 只会激发第一个 n 元素——所有其余元素都以通常的 Haskell 方式求值。

parBuffer :: Int -> Strategy a -> Strategy [a]
parBuffer n strat = parBufferWHNF n . map (withStrategy strat)

parBufferWHNF :: Int -> Strategy [a]
parBufferWHNF n0 xs0 = return (ret xs0 (start n0 xs0))
  where -- ret :: [a] -> [a] -> [a]
           ret (x:xs) (y:ys) = y `par` (x : ret xs ys)
           ret xs     _      = xs

        -- start :: Int -> [a] -> [a]
           start 0   ys     = ys
           start !_n []     = []
           start !n  (y:ys) = y `par` start (n-1) ys

请特别注意 start 0 ys = ys 而不是 start 0 ys = evaluateThePreviousChunk `pseq` start n0 ys 或其他会引发更多火花的东西。文档肯定没有说清楚——我不认为“滚动缓冲策略”明显暗示了这种行为,我同意这有点令人惊讶,以至于我想知道这是否只是 parallel 还没有人抓到的库。

您可能需要 parListChunk