Haskell Conduit Aeson:解析大型 JSON 和过滤器匹配 key/values

Haskell Conduit Aeson: Parsing Large JSONs and filter matching key/values

我在 Haskell 中编写了一个应用程序,它执行以下操作:

  1. 递归列出一个目录,
  2. 从目录列表中解析 JSON 个文件,
  3. 寻找匹配的键值对,并且
  4. Return 个已找到匹配项的文件名。

我的这个应用程序的第一个版本是我能写的最简单、天真的版本,但我注意到 space 使用率似乎单调增加。

因此,我切换到 conduit,现在我的主要功能如下所示:

conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
conduitFilesFilter projFilter dirname' = do
  (_, allFiles) <- listDirRecur dirname'
  C.runConduit $
    C.yieldMany allFiles
    .| C.filterMC (filterMatchingFile projFilter)
    .| C.sinkList

现在我的应用程序已限制内存使用,但它仍然很慢。对此,我有两个问题。

1)

我使用 stack new 生成框架来创建这个应用程序,它默认使用 ghc 选项 -threaded -rtsopts -with-rtsopts=-N

令我惊讶的是,当我实际访问 运行 应用程序时,它使用了所有可用的处理器(目标机器中大约有 40 个)。但是,我并没有将应用程序的任何部分编写为 运行 并行(实际上我考虑过)。

什么是运行并行?

2)

此外,大多数 JSON 文件都非常大 (10mb),其中可能有 500k 需要遍历。这意味着由于所有 Aeson 解码,我的程序非常慢。我的想法是 运行 我的 filterMatchingFile 部分并行,但是查看 stm-conduit 库,我看不到并行 运行 这个中间动作的明显方法少数处理器。

任何人都可以建议一种使用 stm-conduit 或其他方式巧妙地并行化我的函数的方法吗?


编辑

我意识到我可以将我的 readFile -> decodeObject -> runFilterFunction 分解成 conduit 的单独部分,然后我可以在那里使用有界通道的 stm-conduit。也许我会试一试...


我 运行 我的应用程序 +RTS -s (我将其重新配置为 -N4),我看到以下内容:

 115,961,554,600 bytes allocated in the heap
  35,870,639,768 bytes copied during GC
      56,467,720 bytes maximum residency (681 sample(s))
       1,283,008 bytes maximum slop
             145 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0     108716 colls, 108716 par   76.915s  20.571s     0.0002s    0.0266s
  Gen  1       681 colls,   680 par    0.530s   0.147s     0.0002s    0.0009s

  Parallel GC work balance: 14.99% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.001s  (  0.007s elapsed)
  MUT     time   34.813s  ( 42.938s elapsed)
  GC      time   77.445s  ( 20.718s elapsed)
  EXIT    time    0.000s  (  0.010s elapsed)
  Total   time  112.260s  ( 63.672s elapsed)

  Alloc rate    3,330,960,996 bytes per MUT second

  Productivity  31.0% of total user, 67.5% of total elapsed

gc_alloc_block_sync: 188614
whitehole_spin: 0
gen[0].sync: 33
gen[1].sync: 811204

Michael Snoyman 在 Haskell Cafe 上提示,他指出我的第一个版本并没有真正利用 Conduit 的流媒体功能,我重写了我的应用程序的 Conduit 版本(不使用 stm-conduit)。这是一个很大的改进:我的第一个 Conduit 版本正在对所有数据进行操作,而我没有意识到这一点。

我还增加了托儿所的大小,这通过减少垃圾收集的频率提高了我的工作效率。

我修改后的函数最终看起来像这样:

module Search where

import           Conduit               ((.|))
import qualified Conduit               as C
import           Control.Monad
import           Control.Monad.IO.Class   (MonadIO, liftIO)
import           Control.Monad.Trans.Resource (MonadResource)
import qualified Data.ByteString       as B
import           Data.List             (isPrefixOf)
import           Data.Maybe            (fromJust, isJust)
import           System.Path.NameManip (guess_dotdot, absolute_path)
import           System.FilePath       (addTrailingPathSeparator, normalise)
import           System.Directory      (getHomeDirectory)

import           Filters


sourceFilesFilter :: (MonadResource m, MonadIO m) => ProjectFilter -> FilePath -> C.ConduitM () String m ()
sourceFilesFilter projFilter dirname' =
    C.sourceDirectoryDeep False dirname'
    .| parseProject projFilter

parseProject :: (MonadResource m, MonadIO m) => ProjectFilter -> C.ConduitM FilePath String m ()
parseProject (ProjectFilter filterFunc) = do
  C.awaitForever go
  where
    go path' = do
      bytes <- liftIO $ B.readFile path'
      let isProj = validProject bytes
      when (isJust isProj) $ do
        let proj' = fromJust isProj
        when (filterFunc proj') $ C.yield path'

我的主要只是 运行 管道并打印通过过滤器的管道:

mainStreamingConduit :: IO ()
mainStreamingConduit = do
  options <- getRecord "Search JSON Files"
  let filterFunc = makeProjectFilter options
  searchDir <- absolutize (searchPath options)
  itExists <- doesDirectoryExist searchDir
  case itExists of
    False -> putStrLn "Search Directory does not exist" >> exitWith (ExitFailure 1)
    True -> C.runConduitRes $ sourceFilesFilter filterFunc searchDir .| C.mapM_ (liftIO . putStrLn)

我 运行 是这样的(通常没有统计数据):

stack exec search-json -- --searchPath $FILES --name NAME +RTS -s -A32m -n4m

在不增加苗圃规模的情况下,我的生产率提高了 30% 左右。然而,有了上面的内容,它看起来像这样:

  72,308,248,744 bytes allocated in the heap
     733,911,752 bytes copied during GC
       7,410,520 bytes maximum residency (8 sample(s))
         863,480 bytes maximum slop
             187 MB total memory in use (27 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0       580 colls,   580 par    2.731s   0.772s     0.0013s    0.0105s
  Gen  1         8 colls,     7 par    0.163s   0.044s     0.0055s    0.0109s

  Parallel GC work balance: 35.12% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.001s  (  0.006s elapsed)
  MUT     time   26.155s  ( 31.602s elapsed)
  GC      time    2.894s  (  0.816s elapsed)
  EXIT    time   -0.003s  (  0.008s elapsed)
  Total   time   29.048s  ( 32.432s elapsed)

  Alloc rate    2,764,643,665 bytes per MUT second

  Productivity  90.0% of total user, 97.5% of total elapsed

gc_alloc_block_sync: 3494
whitehole_spin: 0
gen[0].sync: 15527
gen[1].sync: 177

我仍然想弄清楚如何并行化 filterProj . parseJson . readFile 部分,但现在我对此很满意。

从您的程序描述来看,它没有理由增加内存使用量。我认为这是由于错过了惰性计算而导致的意外内存泄漏。这可以通过堆分析轻松检测到:https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/profiling.html#hp2ps-rendering-heap-profiles-to-postscript。其他可能的原因是运行时没有将所有内存释放回 OS。在达到某个阈值之前,它将保持与处理的最大文件成比例的内存。如果通过进程 RSS 大小进行跟踪,这可能看起来像是内存泄漏。

-A32m 选项增加苗圃规模。它允许您的程序在触发垃圾收集之前分配更多内存。统计数据显示在 GC 期间保留的内存非常少,因此发生的频率较低,程序花在实际工作上的时间更多。

在有关并行性的 Haskell wiki 和讨论等待线程的 的帮助下,我想出了如何使用 stm-conduit 运行 这个应用程序在 main 退出之前结束。

它的工作方式是我创建一个通道来保存所有要操作的文件名。然后,我 fork 了一堆线程,每个线程 运行 是一个 Conduit,filepath-channel 是一个 Source。我跟踪所有子线程并等待它们完成。

也许这个解决方案对其他人有用?

并非我所有的 lower-level 过滤器函数都存在,但要点是我有一个 Conduit 可以测试一些 JSON。如果它通过了,那么它 yield 就是 FilePath

这是我的主要内容:

{-# LANGUAGE DeriveGeneric     #-}
{-# LANGUAGE OverloadedStrings #-}


module Main where

import           Conduit                      ((.|))
import qualified Conduit                      as C
import           Control.Concurrent
import           Control.Monad                (forM_)
import           Control.Monad.IO.Class       (liftIO)
import           Control.Concurrent.STM
import           Control.Monad.Trans.Resource (register)

import qualified Data.Conduit.TMChan          as STMChan
import           Data.Maybe                   (isJust, fromJust)
import qualified Data.Text                    as T
import           Options.Generic
import           System.Directory            (doesDirectoryExist)
import           System.Exit

import           Search


data Commands =
  Commands { searchPath  :: String
           , par         :: Maybe Int
           , project     :: Maybe T.Text
           , revision    :: Maybe T.Text
           } deriving (Generic, Show)

instance ParseRecord Commands

makeProjectFilter :: Commands -> ProjectFilter
makeProjectFilter options =
  let stdFilts = StdProjectFilters
        (ProjName <$> project options)
        (Revision <$> revision options)
  in makeProjectFilters stdFilts

main :: IO ()
main = do
  options <- getRecord "Search JSON Files"
  -- Would user like to run in parallel?
  let runner = if isJust $ par options
        then mainSTMConduit (fromJust $ par options)
        else mainStreamingConduit

  -- necessary things to search files: search path, filters to use, search dir exists
  let filterFunc = makeProjectFilter options
  searchDir <- absolutize (searchPath options)
  itExists <- doesDirectoryExist searchDir

  -- Run it if it exists
  case itExists of
    False -> putStrLn "Search Directory does not exist" >> exitWith (ExitFailure 1)
    True -> runner filterFunc searchDir

-- Single-threaded version with bounded memory usage
mainStreamingConduit :: ProjectFilter -> FilePath -> IO ()
mainStreamingConduit filterFunc searchDir = do
  C.runConduitRes $
    sourceFilesFilter filterFunc searchDir .| C.mapM_C (liftIO . putStrLn)

-- Multiple-threaded version of this program using channels from `stm-conduit`
mainSTMConduit :: Int -> ProjectFilter -> FilePath -> IO ()
mainSTMConduit nrWorkers filterFunc searchDir = do
  children <- newMVar []
  inChan <- atomically $ STMChan.newTBMChan 16
  _ <- forkIO . C.runResourceT $ do
         _ <- register $ atomically $ STMChan.closeTBMChan inChan
         C.runConduitRes $ C.sourceDirectoryDeep False searchDir .| STMChan.sinkTBMChan inChan True
  forM_ [1..nrWorkers] (\_ -> forkChild children $ runConduitChan inChan filterFunc)
  waitForChildren children
  return ()


runConduitChan :: STMChan.TBMChan FilePath -> ProjectFilter -> IO ()
runConduitChan inChan filterFunc = do
  C.runConduitRes $
       STMChan.sourceTBMChan inChan
       .| parseProject filterFunc
       .| C.mapM_C (liftIO . putStrLn)

waitForChildren :: MVar [MVar ()] -> IO ()
waitForChildren children = do
  cs <- takeMVar children
  case cs of
    []   -> return ()
    m:ms -> do
      putMVar children ms
      takeMVar m
      waitForChildren children

forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
forkChild children io = do
  mvar <- newEmptyMVar
  childs <- takeMVar children
  putMVar children (mvar:childs)
  forkFinally io (\_ -> putMVar mvar ())

注意:我将 stm-conduit 3.0.0conduit 1.12.1 一起使用,这就是我需要包含布尔参数的原因:

STMChan.sinkTBMChan inChan True

stm-conduit 版本 4.0.0 中,此函数自动关闭频道,因此布尔参数已被删除。