如何并行化 N 个对象上的 M 个顺序操作(带同步点)

how to parallelize M sequential operations over N objects (with sync point)

假设我有N个对象和M个操作(其中一些在做网络I/O)。我想按顺序为 N 个对象中的每一个调用操作序列,但尽可能允许并行性(跨对象)。管道中有一个同步(扇入)点,比方说在操作 M-1 处。在 core.async 中执行此操作的 best/easiest 方法是什么? [此外,这是在 ClojureScript 中,所以 thread 不是一个选项)。

猫的 aletpromesa/promisepromesa/all 的组合就可以了。

这是代码(省略):

(ns example
  (:require [[cats.context :as ctx :include-macros true]
            [cats.core :as m :include-macros true]
            [promesa.monad :as pm]
            [promesa.core :as p :include-macros true]]))

(defn foo []
  (ctx/with-context pm/promise-context
    (let [last-stage (fn [obj] (final-operation obj)) ;; could be chain a la first-stage
          first-stage (fn [obj]
                        (p/chain
                         (operation-m0 args)
                         (partial operation-m1 some-args)
                         (partial operation-m2 some-more-args)))
          first-stage-results (p/all (mapv first-stage objects))]
      (p/then (m/alet [_ (p/then first-stage-results global-operation-requiring-fan-in)
                       z (p/all (mapv last-stage objects))] z)
              (fn [_] (.info logger "Finished all operations."))))))

operation-m0operation-m1operation-m2final-operation 应该是 return 立即承诺和 resolve 结果时承诺的函数可用。

也许你可以使用 core.async/pipeline-async which applies a function asynchronously to the elements from a channel, placing the call results into a channel in the same order as the input. Here is an example