Clojure 交换!并行执行的原子
Clojure swap! atom executed in parallel
我正在使用 clojure 编写一个脚本来读取文件中的 URI 序列作为输入,并为它们生成状态代码报告。
我已经使用 clojure.core.async/pipeline-async
执行对 URI 的 HTTP 调用(使用 httpkit 异步调用)。
我想监控脚本的执行,所以我有一个状态原子:
(let [processing (atom [(System/currentTimeMillis) 0])]
以及跟踪进度的功能。
(defn track-progress [total progress]
(swap! progress
(fn [[time count]]
(let [incremented-count (inc count)
now (System/currentTimeMillis)]
(if (= 0 (mod incremented-count (max 1 (int (/ total 20)))))
(do
(println (str "Progress " incremented-count "/" total " | " (- now time) "ms"))
[now incremented-count])
[time incremented-count])))))
在 HTTP 调用后使用它:
(a/pipeline-async
parallelism
output-chan
(fn [[an-id uri] result]
(http/head uri {:throw-exceptions false
:timeout timeout}
(fn [{:keys [status error]}]
(track-progress total processing)
(a/go
(if (nil? error)
(do (a/>! result [an-id (keyword (str status))])
(a/close! result))
(do (a/>! result [an-id :error])
(a/close! result)))))))
input-chan)
processing
原子是在 let
表达式中使用 pipeline-async
部分创建的。
除了那个日志之外,一切似乎都工作正常。
我发现有时日志记录很奇怪,有这样的东西:
Progress 500/10000 | 11519ms
Progress 500/10000 | 11519msProgress 500/10000 | 11519ms
Progress 1000/10000 | 11446ms
Progress 1000/10000 | 11446ms
Progress 1500/10000 | 9503ms
Progress 2000/10000 | 7802ms
Progress 2500/10000 | 12822ms
Progress 2500/10000 | 12822msProgress 2500/10000 | 12822ms
Progress 2500/10000 | 12822ms
Progress 3000/10000 | 10623ms
Progress 3500/10000 | 9018ms
Progress 4000/10000 | 9618ms
Progress 4500/10000 | 13544ms
Progress 5000/10000 | 10541ms
Progress 5500/10000 | 10817ms
Progress 6000/10000 | 8921ms
Progress 6500/10000 | 9078ms
Progress 6500/10000 | 9078ms
Progress 7000/10000 | 9270ms
Progress 7500/10000 | 11826msProgress 7500/10000 | 11826msProgress 7500/10000 | 11826ms
输出格式如shell中所写,似乎有时相同的println
被执行多次,或者fn
传递给swap!
函数在原子中并行执行(无并发)。
(如果 println
我删除了 str
来创建要打印的字符串,我多次获得相同进度的行完全混淆,如 ProgressProgress 7500/10000 | 11826ms7500/100007500 | 11826msProgress/10000 | 11826ms
)
是不是我的代码有问题?
或者我得到 atom
错误,因为我认为它不允许并行执行改变其状态的函数?
Clojure 原子是专门设计的,因此在 multi-threaded 程序中,可以有多个线程在单个原子上执行 swap!
,如果您的程序这样做,那些更新函数 f
可以同时给予 swap!
运行。 swap!
中唯一同步的部分是 'compare and swap' 有效执行的操作:
- 锁定原子的状态
- 检查它的当前值是否是
identical?
它在f
开始执行之前包含的引用,如果是,用f
返回的新对象替换它。
- 解锁原子状态。
函数f
可能需要很长时间才能从当前值计算出新值,但上面的临界区只是指针比较,如果相等,则指针赋值。
这就是为什么 swap!
的文档字符串说“请注意 f 可能会被多次调用,因此应该没有副作用。”
你想要的是序列化来自一组并发执行线程的输出流。您可以使用代理来序列化对一块可变状态的访问,但这里有一个没有状态的退化情况,只有 side-effects。对于这种情况,the locking
function 就是您所需要的。
一个例子:
(ns tst.demo.core
(:use demo.core tupelo.core tupelo.test))
(defn do-println
[& args]
(apply println args))
(def lock-obj (Object.))
(defn do-println-locking
[& args]
(locking lock-obj
(apply println args)))
(def sleep-millis 500)
(defn wait-and-print
[print-fn id]
(Thread/sleep sleep-millis)
(print-fn (format "wait-and-print %s is complete" id)))
(defn start-threads
[print-fn count]
(println "-----------------------------------------------------------------------------")
(let [futures (forv [i (range count)]
(future (wait-and-print print-fn i)))]
(doseq [future futures]
; block until future is complete
(deref future))))
(dotest
(start-threads do-println 10)
(start-threads do-println-locking 10))
典型结果:
--------------------------------------
Clojure 1.10.2-alpha1 Java 15
--------------------------------------
Testing tst.demo.core
-----------------------------------------------------------------------------
wait-and-print 4 is completewait-and-print 3 is completewait-and-print 2 is complete
wait-and-print 8 is completewait-and-print 9 is complete
wait-and-print 6 is completewait-and-print 1 is complete
wait-and-print 7 is complete
wait-and-print 0 is complete
wait-and-print 5 is complete
-----------------------------------------------------------------------------
wait-and-print 5 is complete
wait-and-print 8 is complete
wait-and-print 7 is complete
wait-and-print 9 is complete
wait-and-print 6 is complete
wait-and-print 3 is complete
wait-and-print 0 is complete
wait-and-print 4 is complete
wait-and-print 2 is complete
wait-and-print 1 is complete
所以你可以看到locking
没有序列化的输出是混乱的,而第二种情况下的每个println
都允许完成one-at-a-time(即使顺序仍然是随机的).
如果println
一次打印一个字符而不是一次打印一个字符串,那么在不同步的情况下结果会更加混乱。修改输出函数以分别打印每个字符:
(defn do-println
[& args]
(doseq [ch (str/join args)]
(print ch))
(newline))
(def lock-obj (Object.))
(defn do-println-locking
[& args]
(locking lock-obj
(apply do-println args)))
典型结果:
--------------------------------------
Clojure 1.10.2-alpha1 Java 15
--------------------------------------
Testing tst.demo.core
-----------------------------------------------------------------------------
wwwwwaaawwiiiattti--taaa--nnaiddnaa--dwpp-irrptaiir-niiantnttn -dw2ta- ani96ipds trn- i-pcndrota-impn nrpd4itl- n eipt5tr s e7i
incisots mc0cpo olmmieppstll ee
etctteo
e-
amnidps-l pectroeai
intt- a1n di-sip rcsio nmctmpo plm3lew etaiei
spt t-lceeatone
d
m-pplreitnet
8 is complete
-----------------------------------------------------------------------------
wait-and-print 3 is complete
wait-and-print 9 is complete
wait-and-print 8 is complete
wait-and-print 4 is complete
wait-and-print 6 is complete
wait-and-print 7 is complete
wait-and-print 0 is complete
wait-and-print 1 is complete
wait-and-print 5 is complete
wait-and-print 2 is complete
但我们看到 locking
序列化函数调用,以便当前调用必须在下一个调用开始之前完成。
我正在使用 clojure 编写一个脚本来读取文件中的 URI 序列作为输入,并为它们生成状态代码报告。
我已经使用 clojure.core.async/pipeline-async
执行对 URI 的 HTTP 调用(使用 httpkit 异步调用)。
我想监控脚本的执行,所以我有一个状态原子:
(let [processing (atom [(System/currentTimeMillis) 0])]
以及跟踪进度的功能。
(defn track-progress [total progress]
(swap! progress
(fn [[time count]]
(let [incremented-count (inc count)
now (System/currentTimeMillis)]
(if (= 0 (mod incremented-count (max 1 (int (/ total 20)))))
(do
(println (str "Progress " incremented-count "/" total " | " (- now time) "ms"))
[now incremented-count])
[time incremented-count])))))
在 HTTP 调用后使用它:
(a/pipeline-async
parallelism
output-chan
(fn [[an-id uri] result]
(http/head uri {:throw-exceptions false
:timeout timeout}
(fn [{:keys [status error]}]
(track-progress total processing)
(a/go
(if (nil? error)
(do (a/>! result [an-id (keyword (str status))])
(a/close! result))
(do (a/>! result [an-id :error])
(a/close! result)))))))
input-chan)
processing
原子是在 let
表达式中使用 pipeline-async
部分创建的。
除了那个日志之外,一切似乎都工作正常。
我发现有时日志记录很奇怪,有这样的东西:
Progress 500/10000 | 11519ms
Progress 500/10000 | 11519msProgress 500/10000 | 11519ms
Progress 1000/10000 | 11446ms
Progress 1000/10000 | 11446ms
Progress 1500/10000 | 9503ms
Progress 2000/10000 | 7802ms
Progress 2500/10000 | 12822ms
Progress 2500/10000 | 12822msProgress 2500/10000 | 12822ms
Progress 2500/10000 | 12822ms
Progress 3000/10000 | 10623ms
Progress 3500/10000 | 9018ms
Progress 4000/10000 | 9618ms
Progress 4500/10000 | 13544ms
Progress 5000/10000 | 10541ms
Progress 5500/10000 | 10817ms
Progress 6000/10000 | 8921ms
Progress 6500/10000 | 9078ms
Progress 6500/10000 | 9078ms
Progress 7000/10000 | 9270ms
Progress 7500/10000 | 11826msProgress 7500/10000 | 11826msProgress 7500/10000 | 11826ms
输出格式如shell中所写,似乎有时相同的println
被执行多次,或者fn
传递给swap!
函数在原子中并行执行(无并发)。
(如果 println
我删除了 str
来创建要打印的字符串,我多次获得相同进度的行完全混淆,如 ProgressProgress 7500/10000 | 11826ms7500/100007500 | 11826msProgress/10000 | 11826ms
)
是不是我的代码有问题?
或者我得到 atom
错误,因为我认为它不允许并行执行改变其状态的函数?
Clojure 原子是专门设计的,因此在 multi-threaded 程序中,可以有多个线程在单个原子上执行 swap!
,如果您的程序这样做,那些更新函数 f
可以同时给予 swap!
运行。 swap!
中唯一同步的部分是 'compare and swap' 有效执行的操作:
- 锁定原子的状态
- 检查它的当前值是否是
identical?
它在f
开始执行之前包含的引用,如果是,用f
返回的新对象替换它。 - 解锁原子状态。
函数f
可能需要很长时间才能从当前值计算出新值,但上面的临界区只是指针比较,如果相等,则指针赋值。
这就是为什么 swap!
的文档字符串说“请注意 f 可能会被多次调用,因此应该没有副作用。”
你想要的是序列化来自一组并发执行线程的输出流。您可以使用代理来序列化对一块可变状态的访问,但这里有一个没有状态的退化情况,只有 side-effects。对于这种情况,the locking
function 就是您所需要的。
一个例子:
(ns tst.demo.core
(:use demo.core tupelo.core tupelo.test))
(defn do-println
[& args]
(apply println args))
(def lock-obj (Object.))
(defn do-println-locking
[& args]
(locking lock-obj
(apply println args)))
(def sleep-millis 500)
(defn wait-and-print
[print-fn id]
(Thread/sleep sleep-millis)
(print-fn (format "wait-and-print %s is complete" id)))
(defn start-threads
[print-fn count]
(println "-----------------------------------------------------------------------------")
(let [futures (forv [i (range count)]
(future (wait-and-print print-fn i)))]
(doseq [future futures]
; block until future is complete
(deref future))))
(dotest
(start-threads do-println 10)
(start-threads do-println-locking 10))
典型结果:
--------------------------------------
Clojure 1.10.2-alpha1 Java 15
--------------------------------------
Testing tst.demo.core
-----------------------------------------------------------------------------
wait-and-print 4 is completewait-and-print 3 is completewait-and-print 2 is complete
wait-and-print 8 is completewait-and-print 9 is complete
wait-and-print 6 is completewait-and-print 1 is complete
wait-and-print 7 is complete
wait-and-print 0 is complete
wait-and-print 5 is complete
-----------------------------------------------------------------------------
wait-and-print 5 is complete
wait-and-print 8 is complete
wait-and-print 7 is complete
wait-and-print 9 is complete
wait-and-print 6 is complete
wait-and-print 3 is complete
wait-and-print 0 is complete
wait-and-print 4 is complete
wait-and-print 2 is complete
wait-and-print 1 is complete
所以你可以看到locking
没有序列化的输出是混乱的,而第二种情况下的每个println
都允许完成one-at-a-time(即使顺序仍然是随机的).
如果println
一次打印一个字符而不是一次打印一个字符串,那么在不同步的情况下结果会更加混乱。修改输出函数以分别打印每个字符:
(defn do-println
[& args]
(doseq [ch (str/join args)]
(print ch))
(newline))
(def lock-obj (Object.))
(defn do-println-locking
[& args]
(locking lock-obj
(apply do-println args)))
典型结果:
--------------------------------------
Clojure 1.10.2-alpha1 Java 15
--------------------------------------
Testing tst.demo.core
-----------------------------------------------------------------------------
wwwwwaaawwiiiattti--taaa--nnaiddnaa--dwpp-irrptaiir-niiantnttn -dw2ta- ani96ipds trn- i-pcndrota-impn nrpd4itl- n eipt5tr s e7i
incisots mc0cpo olmmieppstll ee
etctteo
e-
amnidps-l pectroeai
intt- a1n di-sip rcsio nmctmpo plm3lew etaiei
spt t-lceeatone
d
m-pplreitnet
8 is complete
-----------------------------------------------------------------------------
wait-and-print 3 is complete
wait-and-print 9 is complete
wait-and-print 8 is complete
wait-and-print 4 is complete
wait-and-print 6 is complete
wait-and-print 7 is complete
wait-and-print 0 is complete
wait-and-print 1 is complete
wait-and-print 5 is complete
wait-and-print 2 is complete
但我们看到 locking
序列化函数调用,以便当前调用必须在下一个调用开始之前完成。