mapcat在core.async中打破背压时内存泄漏在哪里?
Where is the memory leak when mapcat breaks backpressure in core.async?
我在 Clojure 中写了一些 core.async 代码,当我 运行 它消耗了所有可用内存并因错误而失败。似乎在 core.async 管道中使用 mapcat
会破坏背压。 (不幸的是,出于超出此问题范围的原因。)
下面是一些代码,通过计算 mapcat
ing t运行sducer 的 :x
s 来说明问题:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x))))))
=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000
生产者远远领先于消费者。
看来我不是第一个发现这一点的人。
但是 here 给出的解释似乎并没有完全涵盖它。 (尽管它确实提供了足够的解决方法。)
从概念上讲,我希望生产者领先,但只能领先于通道中可能缓冲的几条消息的长度。
我的问题是,所有其他消息在哪里?到输出的第四行 7000 :x
下落不明。
更新 2020-01-14:内存泄漏现已修复。
这个问题有两种可能的解释"Where is the memory leak?"
首先,数据保存在哪里?答案似乎就在扩展变换下游的通道缓冲区中。
频道默认使用一个FixedBuffer
(clojure.core.async.impl.buffers/FixedBuffer),可以判断是否已满但不反对过满。
其次,哪一段代码导致缓冲区溢出?这(如果我错了请纠正我)似乎在 the take!
method of ManyToManyChannel
(clojure.core.async.impl.channels/ManyToManyChannel) where the first call to add!
on the buffer occurs before any calls to full?
已经发生。
似乎 take!
假设它可以为它删除的每个项目至少添加一个项目到缓冲区。对于长 运行 扩展换能器,例如 mapcat
,这并不总是一个安全的假设。
通过在 core.async 的本地副本中将 this line 更改为 (when (and (.hasNext iter) (not (impl/full? buf)))
我可以使问题中的代码按预期运行。 (N.B。我对 core.async 的理解不足以保证这对 您的 用例来说是一个可靠的解决方案。)
更新 2016-09-17:现在有一个问题:http://dev.clojure.org/jira/browse/ASYNC-178
更新 2020-01-14:现在已修复:https://clojure.atlassian.net/browse/ASYNC-210(尽管较早的票已关闭为 'Declined')
我在 Clojure 中写了一些 core.async 代码,当我 运行 它消耗了所有可用内存并因错误而失败。似乎在 core.async 管道中使用 mapcat
会破坏背压。 (不幸的是,出于超出此问题范围的原因。)
下面是一些代码,通过计算 mapcat
ing t运行sducer 的 :x
s 来说明问题:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x))))))
=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000
生产者远远领先于消费者。
看来我不是第一个发现这一点的人。 但是 here 给出的解释似乎并没有完全涵盖它。 (尽管它确实提供了足够的解决方法。) 从概念上讲,我希望生产者领先,但只能领先于通道中可能缓冲的几条消息的长度。
我的问题是,所有其他消息在哪里?到输出的第四行 7000 :x
下落不明。
更新 2020-01-14:内存泄漏现已修复。
这个问题有两种可能的解释"Where is the memory leak?"
首先,数据保存在哪里?答案似乎就在扩展变换下游的通道缓冲区中。
频道默认使用一个FixedBuffer
(clojure.core.async.impl.buffers/FixedBuffer),可以判断是否已满但不反对过满。
其次,哪一段代码导致缓冲区溢出?这(如果我错了请纠正我)似乎在 the take!
method of ManyToManyChannel
(clojure.core.async.impl.channels/ManyToManyChannel) where the first call to add!
on the buffer occurs before any calls to full?
已经发生。
似乎 take!
假设它可以为它删除的每个项目至少添加一个项目到缓冲区。对于长 运行 扩展换能器,例如 mapcat
,这并不总是一个安全的假设。
通过在 core.async 的本地副本中将 this line 更改为 (when (and (.hasNext iter) (not (impl/full? buf)))
我可以使问题中的代码按预期运行。 (N.B。我对 core.async 的理解不足以保证这对 您的 用例来说是一个可靠的解决方案。)
更新 2016-09-17:现在有一个问题:http://dev.clojure.org/jira/browse/ASYNC-178
更新 2020-01-14:现在已修复:https://clojure.atlassian.net/browse/ASYNC-210(尽管较早的票已关闭为 'Declined')