如何并行化 N 个对象上的 M 个顺序操作(带同步点)
how to parallelize M sequential operations over N objects (with sync point)
假设我有N个对象和M个操作(其中一些在做网络I/O)。我想按顺序为 N 个对象中的每一个调用操作序列,但尽可能允许并行性(跨对象)。管道中有一个同步(扇入)点,比方说在操作 M-1 处。在 core.async 中执行此操作的 best/easiest 方法是什么? [此外,这是在 ClojureScript 中,所以 thread
不是一个选项)。
猫的 alet
、promesa/promise
和 promesa/all
的组合就可以了。
这是代码(省略):
(ns example
(:require [[cats.context :as ctx :include-macros true]
[cats.core :as m :include-macros true]
[promesa.monad :as pm]
[promesa.core :as p :include-macros true]]))
(defn foo []
(ctx/with-context pm/promise-context
(let [last-stage (fn [obj] (final-operation obj)) ;; could be chain a la first-stage
first-stage (fn [obj]
(p/chain
(operation-m0 args)
(partial operation-m1 some-args)
(partial operation-m2 some-more-args)))
first-stage-results (p/all (mapv first-stage objects))]
(p/then (m/alet [_ (p/then first-stage-results global-operation-requiring-fan-in)
z (p/all (mapv last-stage objects))] z)
(fn [_] (.info logger "Finished all operations."))))))
operation-m0
、operation-m1
、operation-m2
和 final-operation
应该是 return 立即承诺和 resolve
结果时承诺的函数可用。
也许你可以使用 core.async/pipeline-async which applies a function asynchronously to the elements from a channel, placing the call results into a channel in the same order as the input. Here is an example。
假设我有N个对象和M个操作(其中一些在做网络I/O)。我想按顺序为 N 个对象中的每一个调用操作序列,但尽可能允许并行性(跨对象)。管道中有一个同步(扇入)点,比方说在操作 M-1 处。在 core.async 中执行此操作的 best/easiest 方法是什么? [此外,这是在 ClojureScript 中,所以 thread
不是一个选项)。
猫的 alet
、promesa/promise
和 promesa/all
的组合就可以了。
这是代码(省略):
(ns example
(:require [[cats.context :as ctx :include-macros true]
[cats.core :as m :include-macros true]
[promesa.monad :as pm]
[promesa.core :as p :include-macros true]]))
(defn foo []
(ctx/with-context pm/promise-context
(let [last-stage (fn [obj] (final-operation obj)) ;; could be chain a la first-stage
first-stage (fn [obj]
(p/chain
(operation-m0 args)
(partial operation-m1 some-args)
(partial operation-m2 some-more-args)))
first-stage-results (p/all (mapv first-stage objects))]
(p/then (m/alet [_ (p/then first-stage-results global-operation-requiring-fan-in)
z (p/all (mapv last-stage objects))] z)
(fn [_] (.info logger "Finished all operations."))))))
operation-m0
、operation-m1
、operation-m2
和 final-operation
应该是 return 立即承诺和 resolve
结果时承诺的函数可用。
也许你可以使用 core.async/pipeline-async which applies a function asynchronously to the elements from a channel, placing the call results into a channel in the same order as the input. Here is an example。