如何原子地修改可变数组中的值?
How to atomically modify a value within a mutable array?
我在 ST
monad 中有这个可变数组。我有这个循环功能。
runST $ do
myarray <- newMArray (Sz 10) 0
loopM_ 0 (<10) (+1) (\j ->
loopM_ 0 (<10) (+1) (\i ->
when (mytruthcheck j i)
(modifyM_ myarray (pure . (+1)) ((funcofji j i) :: Int)
)))
我想像这样并行使用forkST_
到运行外循环。
runST $ do
myarray <- newMArray (Sz 10) 0
loopM_ 0 (<10) (+1) (\j ->
void (forkST_ (loopM_ 0 (<10) (+1) (\i ->
when (mytruthcheck j i)
(Data.Massiv.Array.Mutable.modifyM_
myarray (pure . (+1)) ((funcofji j i) :: Int)
))))
但我猜这会导致线程冲突,但我真的不知道,尽管我知道 funcofji
可能会针对 j
的不同值输出相同的值,并且因此循环可以为不同的 j
修改 myarray
的相同索引。有没有办法确保这是自动完成的,或者已经是这样了?
顺便说一下,这里是 loopM_
函数
loopM_ :: Monad m => Int -> (Int -> Bool) -> (Int -> Int) -> (Int -> m a) -> m ()
loopM_ !init' condition increment f = go init'
where
go !step
| condition step = f step >> go (increment step)
| otherwise = pure ()
作为评论中讨论的结果,我写了一个原型,可能会有用(我没有尝试编译,所以可能有一些 type/syntax 错误) .
runST $ do
let arrsz = 10 :: Int -- depends on codomain of funcofji
let ncaps = 8 :: Int64 -- see also getNumCapabilities
let outerLoopSize = 10^5 :: Int64
let innerLoopSize = 10^12 :: Int64
let chunksz = ceiling $ fromIntegral outerLoopSize / fromIntegral ncaps
sync <- newEmptyMVar
forM_ [0 .. ncaps - 1] $ \k -> forkST_ $ do
localArr <- newMArray (Sz arrsz) 0
forM_ [k * chunksz .. min outerLoopSize ((k + 1) * chunksz) - 1] $ \j -> do
forM_ [0 .. innerLoopSize - 1] $ \i -> do
when (mytruthcheck j i) $
modifyM_ localArr (pure . (+1)) $ funcofji j i
putMVar sync localArr
resultArr <- takeMVar sync
replicateM_ (ncaps - 1) $ do
localArr <- takeMVar sync
forM_ [0 .. arrsz - 1] $ do \ix ->
elm <- readM localArr ix
modifyM_ resultArr (pure . (+elm)) ix
...
正如评论中提到的,原子修改仅对并发有用这看起来不像这里需要的。您需要的是并行性。,它在 Int
s 的 massiv 中可用:atomicAddIntArray
在massiv
中还有一种内置的并行方式非常有效,所以绝对不需要重新发明轮子:
createArray_ Par (Sz 10) $ \scheduler myarray ->
loopM_ 0 (<10) (+1) $ \j ->
loopM_ 0 (<10) (+1) $ \i ->
when (mytruthcheck j i) $
scheduleWork_ scheduler $
void $ atomicAddIntArray myarray ((funcofji j i) :: Int) 1
也不要骗自己,ST(状态线程)不是为多线程构建的,而是使用 IO。但是,如果您可以保证,尽管进行了多线程设置,但最终产生的结果仍然是确定性的,那么使用 unsafePerformIO
.
是可以的
编辑
我刚注意到这条评论:
The loop size of j is close to 100,000 and the loop size of i is close to 1 Billion.
这让我相信以这种方式并行化会更好:
createArray_ Par (Sz 10) $ \scheduler myarray ->
iforSchedulerM_ scheduler (0 ..: (100000 :. 1000000000)) $ \_ (j :. i) ->
when (mytruthcheck j i) $
void $ atomicAddIntArray myarray ((funcofji j i) :: Int) 1
这将确保您只安排几个作业,而不是数十亿个。如果您对每个 i
和 j
的工作负载有更深入的了解,请查看 iforSchedulerM_
实现以自定义并行化。
基于@freestyle 和@lehins 的回答我写了这个。这更接近于@freestyle 的答案,但正如@lehins 所指出的那样使用了 unsafeAtomicAddIntArray 并保持对 loopM_ 的使用。出于某种原因,尽管充分利用了所有内核,@lehins 的回答并没有在合理的时间内完成计算。这似乎是使用 massiv
scheduler
的结果,但我不确定。值得一提的是,swap 没有被使用,内存一直可用。通过这个解决方案,与我问题中未使用 forkIO 或 forkST_ 的程序相比,我的速度提高了 3 倍。此外,这也比我的单线程程序多使用了 50% 的内存。我稍后可能会在这个答案中添加一个程序,该程序通过将线程从最外层循环分配到最内层循环来概括可交换操作并行嵌套循环处理,直到 ncaps
的所有线程都已分配,同时考虑到每个循环的大小.
unsafePerformIO $ do
let arrz = 10 :: Int
let ncaps = 8 :: Int -- see also getNumCapabilities
let outerLoopSize = 10^5 :: Int
let innerLoopSize = 10^12 :: Int
let chunksz = outerLoopSize `div` (ncaps-1)
myarray <- newMArray (Sz arrz) 0
Control.Monad.Parallel.forM_ [0 .. (ncaps-1)] (\k -> (loopM_ (k * chunksz) (< (min outerLoopSize ((k+1) * chunksz) )) (+ 1) (\j -> (loopM_ 0 (< innerLoopSize) (+ 1) (\i -> when (mytruthcheck j i) (void (unsafeAtomicAddIntArray myarray (funcofji j i) 1)))))))
unsafeFreeze Par myarray
我在 ST
monad 中有这个可变数组。我有这个循环功能。
runST $ do
myarray <- newMArray (Sz 10) 0
loopM_ 0 (<10) (+1) (\j ->
loopM_ 0 (<10) (+1) (\i ->
when (mytruthcheck j i)
(modifyM_ myarray (pure . (+1)) ((funcofji j i) :: Int)
)))
我想像这样并行使用forkST_
到运行外循环。
runST $ do
myarray <- newMArray (Sz 10) 0
loopM_ 0 (<10) (+1) (\j ->
void (forkST_ (loopM_ 0 (<10) (+1) (\i ->
when (mytruthcheck j i)
(Data.Massiv.Array.Mutable.modifyM_
myarray (pure . (+1)) ((funcofji j i) :: Int)
))))
但我猜这会导致线程冲突,但我真的不知道,尽管我知道 funcofji
可能会针对 j
的不同值输出相同的值,并且因此循环可以为不同的 j
修改 myarray
的相同索引。有没有办法确保这是自动完成的,或者已经是这样了?
顺便说一下,这里是 loopM_
函数
loopM_ :: Monad m => Int -> (Int -> Bool) -> (Int -> Int) -> (Int -> m a) -> m ()
loopM_ !init' condition increment f = go init'
where
go !step
| condition step = f step >> go (increment step)
| otherwise = pure ()
作为评论中讨论的结果,我写了一个原型,可能会有用(我没有尝试编译,所以可能有一些 type/syntax 错误) .
runST $ do
let arrsz = 10 :: Int -- depends on codomain of funcofji
let ncaps = 8 :: Int64 -- see also getNumCapabilities
let outerLoopSize = 10^5 :: Int64
let innerLoopSize = 10^12 :: Int64
let chunksz = ceiling $ fromIntegral outerLoopSize / fromIntegral ncaps
sync <- newEmptyMVar
forM_ [0 .. ncaps - 1] $ \k -> forkST_ $ do
localArr <- newMArray (Sz arrsz) 0
forM_ [k * chunksz .. min outerLoopSize ((k + 1) * chunksz) - 1] $ \j -> do
forM_ [0 .. innerLoopSize - 1] $ \i -> do
when (mytruthcheck j i) $
modifyM_ localArr (pure . (+1)) $ funcofji j i
putMVar sync localArr
resultArr <- takeMVar sync
replicateM_ (ncaps - 1) $ do
localArr <- takeMVar sync
forM_ [0 .. arrsz - 1] $ do \ix ->
elm <- readM localArr ix
modifyM_ resultArr (pure . (+elm)) ix
...
正如评论中提到的,原子修改仅对并发有用这看起来不像这里需要的。您需要的是并行性。,它在 Int
s 的 massiv 中可用:atomicAddIntArray
在massiv
中还有一种内置的并行方式非常有效,所以绝对不需要重新发明轮子:
createArray_ Par (Sz 10) $ \scheduler myarray ->
loopM_ 0 (<10) (+1) $ \j ->
loopM_ 0 (<10) (+1) $ \i ->
when (mytruthcheck j i) $
scheduleWork_ scheduler $
void $ atomicAddIntArray myarray ((funcofji j i) :: Int) 1
也不要骗自己,ST(状态线程)不是为多线程构建的,而是使用 IO。但是,如果您可以保证,尽管进行了多线程设置,但最终产生的结果仍然是确定性的,那么使用 unsafePerformIO
.
编辑
我刚注意到这条评论:
The loop size of j is close to 100,000 and the loop size of i is close to 1 Billion.
这让我相信以这种方式并行化会更好:
createArray_ Par (Sz 10) $ \scheduler myarray ->
iforSchedulerM_ scheduler (0 ..: (100000 :. 1000000000)) $ \_ (j :. i) ->
when (mytruthcheck j i) $
void $ atomicAddIntArray myarray ((funcofji j i) :: Int) 1
这将确保您只安排几个作业,而不是数十亿个。如果您对每个 i
和 j
的工作负载有更深入的了解,请查看 iforSchedulerM_
实现以自定义并行化。
基于@freestyle 和@lehins 的回答我写了这个。这更接近于@freestyle 的答案,但正如@lehins 所指出的那样使用了 unsafeAtomicAddIntArray 并保持对 loopM_ 的使用。出于某种原因,尽管充分利用了所有内核,@lehins 的回答并没有在合理的时间内完成计算。这似乎是使用 massiv
scheduler
的结果,但我不确定。值得一提的是,swap 没有被使用,内存一直可用。通过这个解决方案,与我问题中未使用 forkIO 或 forkST_ 的程序相比,我的速度提高了 3 倍。此外,这也比我的单线程程序多使用了 50% 的内存。我稍后可能会在这个答案中添加一个程序,该程序通过将线程从最外层循环分配到最内层循环来概括可交换操作并行嵌套循环处理,直到 ncaps
的所有线程都已分配,同时考虑到每个循环的大小.
unsafePerformIO $ do
let arrz = 10 :: Int
let ncaps = 8 :: Int -- see also getNumCapabilities
let outerLoopSize = 10^5 :: Int
let innerLoopSize = 10^12 :: Int
let chunksz = outerLoopSize `div` (ncaps-1)
myarray <- newMArray (Sz arrz) 0
Control.Monad.Parallel.forM_ [0 .. (ncaps-1)] (\k -> (loopM_ (k * chunksz) (< (min outerLoopSize ((k+1) * chunksz) )) (+ 1) (\j -> (loopM_ 0 (< innerLoopSize) (+ 1) (\i -> when (mytruthcheck j i) (void (unsafeAtomicAddIntArray myarray (funcofji j i) 1)))))))
unsafeFreeze Par myarray