写入文件时如何压缩输出?
How to compress the output when writing to a file?
我有一个计算连同其他东西一起生成一些数据(很多),我想写入一个文件。
现在代码的结构方式是(简化):
writeRecord :: Handle -> Record -> IO ()
writeRecord h r = hPutStrLn h (toByteString r)
然后在更大的计算期间定期调用此函数。它几乎就像一个日志,实际上是同时写入多个文件。
现在我想用 Gzip
压缩输出文件。
在像 Java 这样的语言中,我会做这样的事情:
outStream = new GzipOutputStream(new FileOutputStream(path))
然后将写入包装的输出流。
Haskell的做法是什么?
我想写一些像
writeRecord h r = hPut h ((compressed . toByteString) r)
不正确,因为单独压缩每个小位的效率不高(我什至尝试过,压缩文件的大小比未压缩的文件大)。
我也不认为我可以只生成一个惰性 ByteString
(甚至是块列表)然后用 compressed . fromChunks
编写它,因为这将需要我的 "generator" 在内存中构建完整的东西。而且同时生成多个文件这一事实使它变得更加复杂。
那么在 Haskell 中有什么方法可以解决这个问题?写入文件并压缩它们?
使用管道执行此操作非常简单,但您需要稍微调整一下代码。我已经将前后代码示例放在一起来演示它。基本思路是:
- 将
hPutStr h
替换为yield
- 添加一些
liftIO
包装器
- 不要使用
withBinaryFile
等,而是使用 runConduitRes
、gzip
和 sinkFile
示例如下:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr)
import Data.Conduit (ConduitM, (.|), yield, runConduitRes)
import Data.Conduit.Binary (sinkFile)
import Data.Conduit.Zlib (gzip)
import System.IO (Handle)
-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"
-- Original version
producerHandle :: Handle -> IO ()
producerHandle h = do
str <- someAction
hPutStr h str
-- Conduit version
producerConduit :: MonadIO m => ConduitM i ByteString m ()
producerConduit = do
str <- liftIO someAction
yield str
main :: IO ()
main = runConduitRes $ producerConduit
.| gzip
.| sinkFile "some-file.txt.gz"
您可以在 the conduit tutorial 中了解更多关于导管的信息。
你的 Java 想法很有趣,再给我几分钟,我会添加一个看起来更像那样的答案。
编辑
这是一个更接近您的 Java 风格方法的版本。它依赖于一个 SinkFunc.hs
模块,该模块可作为 Gist 使用:https://gist.github.com/snoyberg/283154123d30ff9e201ea4436a5dd22d
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -Wall -Werror #-}
import Data.ByteString (ByteString)
import Data.Conduit ((.|))
import Data.Conduit.Binary (sinkHandle)
import Data.Conduit.Zlib (gzip)
import System.IO (withBinaryFile, IOMode (WriteMode))
import SinkFunc (withSinkFunc)
-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"
producerFunc :: (ByteString -> IO ()) -> IO ()
producerFunc write = do
str <- someAction
write str
main :: IO ()
main = withBinaryFile "some-file.txt.gz" WriteMode $ \h -> do
let sink = gzip .| sinkHandle h
withSinkFunc sink $ \write -> producerFunc write
EDIT 2 还有一个很好的衡量标准,实际上使用 ZipSink
将数据流式传输到多个不同的文件。有 很多 种不同的切片方法,但这是一种有效的方法:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString (ByteString)
import Data.Conduit (ConduitM, (.|), yield, runConduitRes, ZipSink (..))
import Data.Conduit.Binary (sinkFile)
import qualified Data.Conduit.List as CL
import Data.Conduit.Zlib (gzip)
data Output = Foo ByteString | Bar ByteString
fromFoo :: Output -> Maybe ByteString
fromFoo (Foo bs) = Just bs
fromFoo _ = Nothing
fromBar :: Output -> Maybe ByteString
fromBar (Bar bs) = Just bs
fromBar _ = Nothing
producer :: Monad m => ConduitM i Output m ()
producer = do
yield $ Foo "This is going to Foo"
yield $ Bar "This is going to Bar"
sinkHelper :: MonadResource m
=> FilePath
-> (Output -> Maybe ByteString)
-> ConduitM Output o m ()
sinkHelper fp f
= CL.mapMaybe f
.| gzip
.| sinkFile fp
main :: IO ()
main = runConduitRes
$ producer
.| getZipSink
(ZipSink (sinkHelper "foo.txt.gz" fromFoo) *>
ZipSink (sinkHelper "bar.txt.gz" fromBar))
对于增量压缩,我认为您可以在 Codec.Compression.Zlib.Internal
.
中使用 compressIO
/foldCompressStream
如果您能够将生产者操作表示为 IO (Maybe a)
(例如 MVar
拍摄或 InputStream
/Chan
读取),其中 Nothing
表示输入结束,像这样的东西应该可以工作:
import System.IO (Handle)
import qualified Data.ByteString as BS
import qualified Codec.Compression.Zlib.Internal as ZLib
compressedWriter :: Handle -> (IO (Maybe BS.ByteString)) -> IO ()
compressedWriter handle source =
ZLib.foldCompressStream
(\next -> source >>= maybe (next BS.empty) next)
(\chunk next -> BS.hPut handle chunk >> next)
(return ())
(ZLib.compressIO ZLib.rawFormat ZLib.defaultCompressParams)
所有流媒体库都支持压缩。如果我了解特定问题以及您的思考方式,io-streams
可能是最适合您的目的。在这里,我交替写入 trump
和 clinton
输出流,它们被写入为压缩文件。接下来,我展示了迈克尔的 conduit
程序
的 pipes
等价物
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package io-streams
{-# LANGUAGE OverloadedStrings #-}
import qualified System.IO.Streams as IOS
import qualified System.IO as IO
import Data.ByteString (ByteString)
analyzer :: IOS.OutputStream ByteString -> IOS.OutputStream ByteString -> IO ()
analyzer clinton trump = do
IOS.write (Just "This is a string\n") clinton
IOS.write (Just "This is a string\n") trump
IOS.write (Just "Clinton string\n") clinton
IOS.write (Just "Trump string\n") trump
IOS.write (Just "Another Clinton string\n") clinton
IOS.write (Just "Another Trump string\n") trump
IOS.write Nothing clinton
IOS.write Nothing trump
main:: IO ()
main =
IOS.withFileAsOutput "some-file-clinton.txt.gz" $ \clinton_compressed ->
IOS.withFileAsOutput "some-file-trump.txt.gz" $ \trump_compressed -> do
clinton <- IOS.gzip IOS.defaultCompressionLevel clinton_compressed
trump <- IOS.gzip IOS.defaultCompressionLevel trump_compressed
analyzer clinton trump
显然,您可以在 analyzer
中混合各种 IO
在写入两个输出流的行为之间 - 我只是在 write
中显示,可以这么说.特别是,如果 analyzer
被理解为依赖于输入流,则 write
s 可以依赖于输入流中的 read
s。 Here's 一个(稍微!)更复杂的程序。如果我 运行 我看到上面的程序
$ stack gzip_so.hs
$ gunzip some-file-clinton.txt.gz
$ gunzip some-file-trump.txt.gz
$ cat some-file-clinton.txt
This is a string
Clinton string
Another Clinton string
$ cat some-file-trump.txt
This is a string
Trump string
Another Trump string
对于管道和管道,有多种方法可以实现上述效果,并且零件分解程度更高。然而,写入单独的文件会更加微妙。无论如何,这里的管道相当于 Michael S 的管道程序:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package pipes-zlib
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr)
import System.IO (IOMode(..), withFile, Handle)
import Pipes
import qualified Pipes.ByteString as PB
import qualified Pipes.GZip as P
-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"
-- Original version
producerHandle :: Handle -> IO ()
producerHandle h = do
str <- someAction
hPutStr h str
producerPipe :: MonadIO m => Producer ByteString m ()
producerPipe = do
str <- liftIO someAction
yield str
main :: IO ()
main = withFile "some-file-pipes.txt.gz" WriteMode $ \h ->
runEffect $ P.compress P.defaultCompression producerPipe >-> PB.toHandle h
-- 编辑
这里的价值在于另一种将多个生产者叠加在带有管道或导管的单个线程上的方法,以添加 Michael S 和 danidiaz
提到的不同方法:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package pipes-zlib
{-# LANGUAGE OverloadedStrings #-}
import Pipes
import Pipes.GZip
import qualified Pipes.Prelude as P
import qualified Pipes.ByteString as Bytes
import System.IO
import Control.Monad (replicateM_)
producer = replicateM_ 50000 $ do
marie "This is going to Marie\n" -- arbitary IO can be interspersed here
arthur "This is going to Arthur\n" -- with liftIO
sylvia "This is going to Sylvia\n"
where
marie = yield; arthur = lift . yield; sylvia = lift . lift . yield
sinkHelper h p = runEffect (compress bestSpeed p >-> Bytes.toHandle h)
main :: IO ()
main =
withFile "marie.txt.gz" WriteMode $ \marie ->
withFile "arthur.txt.gz" WriteMode $ \arthur ->
withFile "sylvia.txt.gz" WriteMode $ \sylvia ->
sinkHelper sylvia
$ sinkHelper arthur
$ sinkHelper marie
$ producer
它非常简单和快速,并且可以通过明显的改动在管道中编写 - 但是从 'monad transformer stack' 的角度来看,发现它自然需要更高水平的支持。从类似 streaming
库的角度来看,这将是编写此类程序的最自然方式。
此解决方案类似于 Michael Snoyman 的 EDIT 2,但使用 foldl, pipes, pipes-zlib and streaming-eversion 包。
{-# language OverloadedStrings #-}
module Main where
-- cabal install bytestring foldl pipes pipes-zlib streaming-eversion
import Data.Foldable
import Data.ByteString
import qualified Control.Foldl as L
import Pipes
import qualified Pipes.Prelude
import Pipes.Zlib (compress,defaultCompression,defaultWindowBits)
import Streaming.Eversion.Pipes (transvertMIO)
import System.IO
type Tag = String
producer :: Monad m => Producer (Tag,ByteString) m ()
producer = do
yield $ ("foo","This is going to Foo")
yield $ ("bar","This is going to Bar")
foldForTag :: Handle -> Tag -> L.FoldM IO (Tag,ByteString) ()
foldForTag handle tag =
L.premapM (\(tag',bytes) -> if tag' == tag then Just bytes else Nothing)
. L.handlesM L.folded
. transvertMIO (compress defaultCompression defaultWindowBits)
$ L.mapM_ (Data.ByteString.hPut handle)
main :: IO ()
main = do
withFile "foo.txt" WriteMode $ \h1 ->
withFile "bar.txt" WriteMode $ \h2 ->
let multifold = traverse_ (uncurry foldForTag) [(h1,"foo"),(h2,"bar")]
in L.impurely Pipes.Prelude.foldM multifold producer
此解决方案类似于 Michael Snoyman 的 EDIT 2,但使用 streaming, streaming-bytestring, pipes and pipes-zlib 包。
{-# language OverloadedStrings #-}
module Main where
-- cabal install bytestring streaming streaming-bytestring pipes pipes-zlib
import Data.ByteString
import qualified Data.ByteString.Streaming as B
import Streaming
import qualified Streaming.Prelude as S
import Pipes (next)
import qualified Pipes.Prelude
import Pipes.Zlib (compress,defaultCompression,defaultWindowBits)
import System.IO
type Tag = String
producer :: Monad m => Stream (Of (Tag,ByteString)) m ()
producer = do
S.yield ("foo","This is going to Foo")
S.yield ("bar","This is going to Bar")
-- I couldn't find a streaming-zlib on Hackage, took a pipes detour
compress' :: MonadIO m
=> Stream (Of ByteString) m r -> Stream (Of ByteString) m r
compress' = S.unfoldr Pipes.next
. compress defaultCompression defaultWindowBits
. Pipes.Prelude.unfoldr S.next
keepTag :: Monad m
=> Tag -> Stream (Of (Tag,ByteString)) m r -> Stream (Of ByteString) m r
keepTag tag = S.map snd . S.filter ((tag==) . fst)
main :: IO ()
main = runResourceT
. B.writeFile "foo.txt" . B.fromChunks . compress' . keepTag "foo"
. B.writeFile "bar.txt" . B.fromChunks . compress' . keepTag "bar"
$ S.copy producer
我使用 copy function from Streaming.Prelude,它可以让你
Duplicate the content of stream, so that it can be acted on twice in
different ways, but without breaking streaming.
我有一个计算连同其他东西一起生成一些数据(很多),我想写入一个文件。
现在代码的结构方式是(简化):
writeRecord :: Handle -> Record -> IO ()
writeRecord h r = hPutStrLn h (toByteString r)
然后在更大的计算期间定期调用此函数。它几乎就像一个日志,实际上是同时写入多个文件。
现在我想用 Gzip
压缩输出文件。
在像 Java 这样的语言中,我会做这样的事情:
outStream = new GzipOutputStream(new FileOutputStream(path))
然后将写入包装的输出流。
Haskell的做法是什么? 我想写一些像
writeRecord h r = hPut h ((compressed . toByteString) r)
不正确,因为单独压缩每个小位的效率不高(我什至尝试过,压缩文件的大小比未压缩的文件大)。
我也不认为我可以只生成一个惰性 ByteString
(甚至是块列表)然后用 compressed . fromChunks
编写它,因为这将需要我的 "generator" 在内存中构建完整的东西。而且同时生成多个文件这一事实使它变得更加复杂。
那么在 Haskell 中有什么方法可以解决这个问题?写入文件并压缩它们?
使用管道执行此操作非常简单,但您需要稍微调整一下代码。我已经将前后代码示例放在一起来演示它。基本思路是:
- 将
hPutStr h
替换为yield
- 添加一些
liftIO
包装器 - 不要使用
withBinaryFile
等,而是使用runConduitRes
、gzip
和sinkFile
示例如下:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr)
import Data.Conduit (ConduitM, (.|), yield, runConduitRes)
import Data.Conduit.Binary (sinkFile)
import Data.Conduit.Zlib (gzip)
import System.IO (Handle)
-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"
-- Original version
producerHandle :: Handle -> IO ()
producerHandle h = do
str <- someAction
hPutStr h str
-- Conduit version
producerConduit :: MonadIO m => ConduitM i ByteString m ()
producerConduit = do
str <- liftIO someAction
yield str
main :: IO ()
main = runConduitRes $ producerConduit
.| gzip
.| sinkFile "some-file.txt.gz"
您可以在 the conduit tutorial 中了解更多关于导管的信息。
你的 Java 想法很有趣,再给我几分钟,我会添加一个看起来更像那样的答案。
编辑
这是一个更接近您的 Java 风格方法的版本。它依赖于一个 SinkFunc.hs
模块,该模块可作为 Gist 使用:https://gist.github.com/snoyberg/283154123d30ff9e201ea4436a5dd22d
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -Wall -Werror #-}
import Data.ByteString (ByteString)
import Data.Conduit ((.|))
import Data.Conduit.Binary (sinkHandle)
import Data.Conduit.Zlib (gzip)
import System.IO (withBinaryFile, IOMode (WriteMode))
import SinkFunc (withSinkFunc)
-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"
producerFunc :: (ByteString -> IO ()) -> IO ()
producerFunc write = do
str <- someAction
write str
main :: IO ()
main = withBinaryFile "some-file.txt.gz" WriteMode $ \h -> do
let sink = gzip .| sinkHandle h
withSinkFunc sink $ \write -> producerFunc write
EDIT 2 还有一个很好的衡量标准,实际上使用 ZipSink
将数据流式传输到多个不同的文件。有 很多 种不同的切片方法,但这是一种有效的方法:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString (ByteString)
import Data.Conduit (ConduitM, (.|), yield, runConduitRes, ZipSink (..))
import Data.Conduit.Binary (sinkFile)
import qualified Data.Conduit.List as CL
import Data.Conduit.Zlib (gzip)
data Output = Foo ByteString | Bar ByteString
fromFoo :: Output -> Maybe ByteString
fromFoo (Foo bs) = Just bs
fromFoo _ = Nothing
fromBar :: Output -> Maybe ByteString
fromBar (Bar bs) = Just bs
fromBar _ = Nothing
producer :: Monad m => ConduitM i Output m ()
producer = do
yield $ Foo "This is going to Foo"
yield $ Bar "This is going to Bar"
sinkHelper :: MonadResource m
=> FilePath
-> (Output -> Maybe ByteString)
-> ConduitM Output o m ()
sinkHelper fp f
= CL.mapMaybe f
.| gzip
.| sinkFile fp
main :: IO ()
main = runConduitRes
$ producer
.| getZipSink
(ZipSink (sinkHelper "foo.txt.gz" fromFoo) *>
ZipSink (sinkHelper "bar.txt.gz" fromBar))
对于增量压缩,我认为您可以在 Codec.Compression.Zlib.Internal
.
compressIO
/foldCompressStream
如果您能够将生产者操作表示为 IO (Maybe a)
(例如 MVar
拍摄或 InputStream
/Chan
读取),其中 Nothing
表示输入结束,像这样的东西应该可以工作:
import System.IO (Handle)
import qualified Data.ByteString as BS
import qualified Codec.Compression.Zlib.Internal as ZLib
compressedWriter :: Handle -> (IO (Maybe BS.ByteString)) -> IO ()
compressedWriter handle source =
ZLib.foldCompressStream
(\next -> source >>= maybe (next BS.empty) next)
(\chunk next -> BS.hPut handle chunk >> next)
(return ())
(ZLib.compressIO ZLib.rawFormat ZLib.defaultCompressParams)
所有流媒体库都支持压缩。如果我了解特定问题以及您的思考方式,io-streams
可能是最适合您的目的。在这里,我交替写入 trump
和 clinton
输出流,它们被写入为压缩文件。接下来,我展示了迈克尔的 conduit
程序
pipes
等价物
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package io-streams
{-# LANGUAGE OverloadedStrings #-}
import qualified System.IO.Streams as IOS
import qualified System.IO as IO
import Data.ByteString (ByteString)
analyzer :: IOS.OutputStream ByteString -> IOS.OutputStream ByteString -> IO ()
analyzer clinton trump = do
IOS.write (Just "This is a string\n") clinton
IOS.write (Just "This is a string\n") trump
IOS.write (Just "Clinton string\n") clinton
IOS.write (Just "Trump string\n") trump
IOS.write (Just "Another Clinton string\n") clinton
IOS.write (Just "Another Trump string\n") trump
IOS.write Nothing clinton
IOS.write Nothing trump
main:: IO ()
main =
IOS.withFileAsOutput "some-file-clinton.txt.gz" $ \clinton_compressed ->
IOS.withFileAsOutput "some-file-trump.txt.gz" $ \trump_compressed -> do
clinton <- IOS.gzip IOS.defaultCompressionLevel clinton_compressed
trump <- IOS.gzip IOS.defaultCompressionLevel trump_compressed
analyzer clinton trump
显然,您可以在 analyzer
中混合各种 IO
在写入两个输出流的行为之间 - 我只是在 write
中显示,可以这么说.特别是,如果 analyzer
被理解为依赖于输入流,则 write
s 可以依赖于输入流中的 read
s。 Here's 一个(稍微!)更复杂的程序。如果我 运行 我看到上面的程序
$ stack gzip_so.hs
$ gunzip some-file-clinton.txt.gz
$ gunzip some-file-trump.txt.gz
$ cat some-file-clinton.txt
This is a string
Clinton string
Another Clinton string
$ cat some-file-trump.txt
This is a string
Trump string
Another Trump string
对于管道和管道,有多种方法可以实现上述效果,并且零件分解程度更高。然而,写入单独的文件会更加微妙。无论如何,这里的管道相当于 Michael S 的管道程序:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package pipes-zlib
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr)
import System.IO (IOMode(..), withFile, Handle)
import Pipes
import qualified Pipes.ByteString as PB
import qualified Pipes.GZip as P
-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"
-- Original version
producerHandle :: Handle -> IO ()
producerHandle h = do
str <- someAction
hPutStr h str
producerPipe :: MonadIO m => Producer ByteString m ()
producerPipe = do
str <- liftIO someAction
yield str
main :: IO ()
main = withFile "some-file-pipes.txt.gz" WriteMode $ \h ->
runEffect $ P.compress P.defaultCompression producerPipe >-> PB.toHandle h
-- 编辑
这里的价值在于另一种将多个生产者叠加在带有管道或导管的单个线程上的方法,以添加 Michael S 和 danidiaz
提到的不同方法:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package pipes-zlib
{-# LANGUAGE OverloadedStrings #-}
import Pipes
import Pipes.GZip
import qualified Pipes.Prelude as P
import qualified Pipes.ByteString as Bytes
import System.IO
import Control.Monad (replicateM_)
producer = replicateM_ 50000 $ do
marie "This is going to Marie\n" -- arbitary IO can be interspersed here
arthur "This is going to Arthur\n" -- with liftIO
sylvia "This is going to Sylvia\n"
where
marie = yield; arthur = lift . yield; sylvia = lift . lift . yield
sinkHelper h p = runEffect (compress bestSpeed p >-> Bytes.toHandle h)
main :: IO ()
main =
withFile "marie.txt.gz" WriteMode $ \marie ->
withFile "arthur.txt.gz" WriteMode $ \arthur ->
withFile "sylvia.txt.gz" WriteMode $ \sylvia ->
sinkHelper sylvia
$ sinkHelper arthur
$ sinkHelper marie
$ producer
它非常简单和快速,并且可以通过明显的改动在管道中编写 - 但是从 'monad transformer stack' 的角度来看,发现它自然需要更高水平的支持。从类似 streaming
库的角度来看,这将是编写此类程序的最自然方式。
此解决方案类似于 Michael Snoyman 的 EDIT 2,但使用 foldl, pipes, pipes-zlib and streaming-eversion 包。
{-# language OverloadedStrings #-}
module Main where
-- cabal install bytestring foldl pipes pipes-zlib streaming-eversion
import Data.Foldable
import Data.ByteString
import qualified Control.Foldl as L
import Pipes
import qualified Pipes.Prelude
import Pipes.Zlib (compress,defaultCompression,defaultWindowBits)
import Streaming.Eversion.Pipes (transvertMIO)
import System.IO
type Tag = String
producer :: Monad m => Producer (Tag,ByteString) m ()
producer = do
yield $ ("foo","This is going to Foo")
yield $ ("bar","This is going to Bar")
foldForTag :: Handle -> Tag -> L.FoldM IO (Tag,ByteString) ()
foldForTag handle tag =
L.premapM (\(tag',bytes) -> if tag' == tag then Just bytes else Nothing)
. L.handlesM L.folded
. transvertMIO (compress defaultCompression defaultWindowBits)
$ L.mapM_ (Data.ByteString.hPut handle)
main :: IO ()
main = do
withFile "foo.txt" WriteMode $ \h1 ->
withFile "bar.txt" WriteMode $ \h2 ->
let multifold = traverse_ (uncurry foldForTag) [(h1,"foo"),(h2,"bar")]
in L.impurely Pipes.Prelude.foldM multifold producer
此解决方案类似于 Michael Snoyman 的 EDIT 2,但使用 streaming, streaming-bytestring, pipes and pipes-zlib 包。
{-# language OverloadedStrings #-}
module Main where
-- cabal install bytestring streaming streaming-bytestring pipes pipes-zlib
import Data.ByteString
import qualified Data.ByteString.Streaming as B
import Streaming
import qualified Streaming.Prelude as S
import Pipes (next)
import qualified Pipes.Prelude
import Pipes.Zlib (compress,defaultCompression,defaultWindowBits)
import System.IO
type Tag = String
producer :: Monad m => Stream (Of (Tag,ByteString)) m ()
producer = do
S.yield ("foo","This is going to Foo")
S.yield ("bar","This is going to Bar")
-- I couldn't find a streaming-zlib on Hackage, took a pipes detour
compress' :: MonadIO m
=> Stream (Of ByteString) m r -> Stream (Of ByteString) m r
compress' = S.unfoldr Pipes.next
. compress defaultCompression defaultWindowBits
. Pipes.Prelude.unfoldr S.next
keepTag :: Monad m
=> Tag -> Stream (Of (Tag,ByteString)) m r -> Stream (Of ByteString) m r
keepTag tag = S.map snd . S.filter ((tag==) . fst)
main :: IO ()
main = runResourceT
. B.writeFile "foo.txt" . B.fromChunks . compress' . keepTag "foo"
. B.writeFile "bar.txt" . B.fromChunks . compress' . keepTag "bar"
$ S.copy producer
我使用 copy function from Streaming.Prelude,它可以让你
Duplicate the content of stream, so that it can be acted on twice in different ways, but without breaking streaming.