Clojure 交换!并行执行的原子

Clojure swap! atom executed in parallel

我正在使用 clojure 编写一个脚本来读取文件中的 URI 序列作为输入,并为它们生成状态代码报告。

我已经使用 clojure.core.async/pipeline-async 执行对 URI 的 HTTP 调用(使用 httpkit 异步调用)。

我想监控脚本的执行,所以我有一个状态原子:

(let [processing (atom [(System/currentTimeMillis) 0])]

以及跟踪进度的功能。

(defn track-progress [total progress]
  (swap! progress 
         (fn [[time count]]
            (let [incremented-count (inc count)
                  now (System/currentTimeMillis)]
              (if (= 0 (mod incremented-count (max 1 (int (/ total 20)))))
                (do
                  (println (str "Progress " incremented-count "/" total " | " (- now time) "ms"))
                  [now incremented-count])
                [time incremented-count])))))

在 HTTP 调用后使用它:

(a/pipeline-async
      parallelism
      output-chan
      (fn [[an-id uri] result]
        (http/head uri {:throw-exceptions false
                        :timeout timeout}
                   (fn [{:keys [status error]}]
                     (track-progress total processing)
                     (a/go 
                        (if (nil? error)
                            (do (a/>! result [an-id (keyword (str status))])
                                (a/close! result))
                            (do (a/>! result [an-id :error])
                                (a/close! result)))))))
      input-chan)

processing 原子是在 let 表达式中使用 pipeline-async 部分创建的。
除了那个日志之外,一切似乎都工作正常。 我发现有时日志记录很奇怪,有这样的东西:


Progress 500/10000 | 11519ms
Progress 500/10000 | 11519msProgress 500/10000 | 11519ms

Progress 1000/10000 | 11446ms
Progress 1000/10000 | 11446ms
Progress 1500/10000 | 9503ms
Progress 2000/10000 | 7802ms
Progress 2500/10000 | 12822ms
Progress 2500/10000 | 12822msProgress 2500/10000 | 12822ms
Progress 2500/10000 | 12822ms

Progress 3000/10000 | 10623ms
Progress 3500/10000 | 9018ms
Progress 4000/10000 | 9618ms
Progress 4500/10000 | 13544ms
Progress 5000/10000 | 10541ms
Progress 5500/10000 | 10817ms
Progress 6000/10000 | 8921ms
Progress 6500/10000 | 9078ms
Progress 6500/10000 | 9078ms
Progress 7000/10000 | 9270ms
Progress 7500/10000 | 11826msProgress 7500/10000 | 11826msProgress 7500/10000 | 11826ms

输出格式如shell中所写,似乎有时相同的println被执行多次,或者fn传递给swap! 函数在原子中并行执行(无并发)。 (如果 println 我删除了 str 来创建要打印的字符串,我多次获得相同进度的行完全混淆,如 ProgressProgress 7500/10000 | 11826ms7500/100007500 | 11826msProgress/10000 | 11826ms

是不是我的代码有问题?
或者我得到 atom 错误,因为我认为它不允许并行执行改变其状态的函数?

Clojure 原子是专门设计的,因此在 multi-threaded 程序中,可以有多个线程在单个原子上执行 swap!,如果您的程序这样做,那些更新函数 f 可以同时给予 swap! 运行。 swap! 中唯一同步的部分是 'compare and swap' 有效执行的操作:

  • 锁定原子的状态
  • 检查它的当前值是否是identical?它在f开始执行之前包含的引用,如果是,用f返回的新对象替换它。
  • 解锁原子状态。

函数f可能需要很长时间才能从当前值计算出新值,但上面的临界区只是指针比较,如果相等,则指针赋值。

这就是为什么 swap! 的文档字符串说“请注意 f 可能会被多次调用,因此应该没有副作用。”

你想要的是序列化来自一组并发执行线程的输出流。您可以使用代理来序列化对一块可变状态的访问,但这里有一个没有状态的退化情况,只有 side-effects。对于这种情况,the locking function 就是您所需要的。

一个例子:

(ns tst.demo.core
  (:use demo.core tupelo.core tupelo.test))

(defn do-println
  [& args]
  (apply println args))

(def lock-obj (Object.))
(defn do-println-locking
  [& args]
  (locking lock-obj
    (apply println args)))

(def sleep-millis 500)
(defn wait-and-print
  [print-fn id]
  (Thread/sleep sleep-millis)
  (print-fn (format "wait-and-print %s is complete" id)))

(defn start-threads
  [print-fn count]
  (println "-----------------------------------------------------------------------------")
  (let [futures (forv [i (range count)]
                  (future (wait-and-print print-fn i)))]
    (doseq [future futures]
      ; block until future is complete
      (deref future))))

(dotest
  (start-threads do-println 10)
  (start-threads do-println-locking 10))

典型结果:

--------------------------------------
   Clojure 1.10.2-alpha1    Java 15
--------------------------------------

Testing tst.demo.core
-----------------------------------------------------------------------------
wait-and-print 4 is completewait-and-print 3 is completewait-and-print 2 is complete
wait-and-print 8 is completewait-and-print 9 is complete
wait-and-print 6 is completewait-and-print 1 is complete

wait-and-print 7 is complete
wait-and-print 0 is complete

wait-and-print 5 is complete


-----------------------------------------------------------------------------
wait-and-print 5 is complete
wait-and-print 8 is complete
wait-and-print 7 is complete
wait-and-print 9 is complete
wait-and-print 6 is complete
wait-and-print 3 is complete
wait-and-print 0 is complete
wait-and-print 4 is complete
wait-and-print 2 is complete
wait-and-print 1 is complete

所以你可以看到locking没有序列化的输出是混乱的,而第二种情况下的每个println都允许完成one-at-a-time(即使顺序仍然是随机的).

如果println一次打印一个字符而不是一次打印一个字符串,那么在不同步的情况下结果会更加混乱。修改输出函数以分别打印每个字符:

(defn do-println
  [& args]
  (doseq [ch (str/join args)]
    (print ch))
  (newline))

(def lock-obj (Object.))
(defn do-println-locking
  [& args]
  (locking lock-obj
    (apply do-println args)))

典型结果:

--------------------------------------
   Clojure 1.10.2-alpha1    Java 15
--------------------------------------

Testing tst.demo.core
-----------------------------------------------------------------------------
wwwwwaaawwiiiattti--taaa--nnaiddnaa--dwpp-irrptaiir-niiantnttn  -dw2ta-  ani96ipds trn- i-pcndrota-impn nrpd4itl- n eipt5tr s e7i 
 incisots   mc0cpo olmmieppstll ee
etctteo
e-
 amnidps-l pectroeai
intt- a1n di-sip rcsio nmctmpo plm3lew etaiei
spt t-lceeatone
d
m-pplreitnet
 8 is complete
-----------------------------------------------------------------------------
wait-and-print 3 is complete
wait-and-print 9 is complete
wait-and-print 8 is complete
wait-and-print 4 is complete
wait-and-print 6 is complete
wait-and-print 7 is complete
wait-and-print 0 is complete
wait-and-print 1 is complete
wait-and-print 5 is complete
wait-and-print 2 is complete

但我们看到 locking 序列化函数调用,以便当前调用必须在下一个调用开始之前完成。