Flow 中的并行处理不是并行的?
Parallel processing in Flow is not parallel?
我有一个简单的模块。
defmodule Flow.F1 do
def t1 do
["one", "two", "three"]
|> Flow.from_enumerable()
|> Flow.map(&p1(&1))
|> Flow.partition()
|> Enum.to_list()
end
defp p1(item) do
IO.puts("Processing #{item}")
:timer.sleep(2000)
IO.puts("Processed #{item}")
String.upcase(item)
end
end
现在 运行 Flow.F1.t1() 我得到这个结果:
Processing one
Processed one
Processing two
Processed two
Processing three
Processed three
["TWO", "ONE", "THREE"]
处理需要 6 秒,因为它在每个 P1 调用中等待 2 秒,但我希望(从文档中)Elixir.map
正在并行处理项目。所以我们应该看到类似的东西:
Processing one
Processing two
Processing three
Processed one
Processed two
Processed three
而且只需要 2 秒。有人可以解释一下我在这里缺少什么吗?
如 docs.
中所述,流程处理 500 个项目的批次
我认为您的输入数据不足以将其拆分为多个消费者,因此只有一个消费者承担了所有工作。
如果您将选项 max_demand: 1
添加到您的 Flow.from_enumerable
它将并行消耗。
我有一个简单的模块。
defmodule Flow.F1 do
def t1 do
["one", "two", "three"]
|> Flow.from_enumerable()
|> Flow.map(&p1(&1))
|> Flow.partition()
|> Enum.to_list()
end
defp p1(item) do
IO.puts("Processing #{item}")
:timer.sleep(2000)
IO.puts("Processed #{item}")
String.upcase(item)
end
end
现在 运行 Flow.F1.t1() 我得到这个结果:
Processing one
Processed one
Processing two
Processed two
Processing three
Processed three
["TWO", "ONE", "THREE"]
处理需要 6 秒,因为它在每个 P1 调用中等待 2 秒,但我希望(从文档中)Elixir.map
正在并行处理项目。所以我们应该看到类似的东西:
Processing one
Processing two
Processing three
Processed one
Processed two
Processed three
而且只需要 2 秒。有人可以解释一下我在这里缺少什么吗?
如 docs.
中所述,流程处理 500 个项目的批次我认为您的输入数据不足以将其拆分为多个消费者,因此只有一个消费者承担了所有工作。
如果您将选项 max_demand: 1
添加到您的 Flow.from_enumerable
它将并行消耗。