Task.async 在 Elixir Stream
Task.async in Elixir Stream
我想在一个大列表上做一个平行映射。代码看起来有点像这样:
big_list
|> Stream.map(&Task.async(Module, :do_something, [&1]))
|> Stream.map(&Task.await(&1))
|> Enum.filter filter_fun
但我正在检查 Stream 的实现,据我所知,Stream.map
组合函数并将组合函数应用于流中的元素,这意味着序列是这样的:
- 取第一个元素
- 创建异步任务
- 等待它完成
- 取第二个元素...
在那种情况下,它不会并行执行。我是对的还是我漏掉了什么?
如果我是对的,那么这段代码呢?
Stream.map Task.async ...
|> Enum.map Task.await ...
是否会并行 运行?
第二个也不符合你的要求。你可以通过这段代码清楚地看到它:
defmodule Test do
def test do
[1,2,3]
|> Stream.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))
end
def job(number) do
:timer.sleep 1000
IO.inspect(number)
end
end
Test.test
您会看到一个数字,然后等待 1 秒,然后是另一个数字,依此类推。这里的关键是你想尽快创建任务,所以你不应该使用
完全懒惰 Stream.map
。而是在那个时候使用急切的 Enum.map
:
|> Enum.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))
另一方面,你可以在等待时使用Stream.map
,只要你稍后做一些急切的操作,就像你的filter
。这样,等待将穿插在您可能对结果进行的任何处理中。
Elixir 1.4 提供了新的 Task.async_stream/5 函数,它将 return 一个流 运行 在可枚举的每个项目上同时执行给定函数。
还有一些选项可以使用 :max_concurrency
和 :timeout
选项参数来指定最大工作人员数和超时时间。
请注意,您不必等待此任务,因为函数 return 是一个流,因此您可以使用 Enum.to_list/1 or use Stream.run/1。
这将使您的示例 运行 同时发生:
big_list
|> Task.async_stream(Module, :do_something, [])
|> Enum.filter(filter_fun)
你可以试试Parallel Stream.
stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end)
stream |> Enum.into([])
[2,4,6,8,10,12,14,16,18,20]
更新
或者更好地使用 Flow
我想在一个大列表上做一个平行映射。代码看起来有点像这样:
big_list
|> Stream.map(&Task.async(Module, :do_something, [&1]))
|> Stream.map(&Task.await(&1))
|> Enum.filter filter_fun
但我正在检查 Stream 的实现,据我所知,Stream.map
组合函数并将组合函数应用于流中的元素,这意味着序列是这样的:
- 取第一个元素
- 创建异步任务
- 等待它完成
- 取第二个元素...
在那种情况下,它不会并行执行。我是对的还是我漏掉了什么?
如果我是对的,那么这段代码呢?
Stream.map Task.async ...
|> Enum.map Task.await ...
是否会并行 运行?
第二个也不符合你的要求。你可以通过这段代码清楚地看到它:
defmodule Test do
def test do
[1,2,3]
|> Stream.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))
end
def job(number) do
:timer.sleep 1000
IO.inspect(number)
end
end
Test.test
您会看到一个数字,然后等待 1 秒,然后是另一个数字,依此类推。这里的关键是你想尽快创建任务,所以你不应该使用
完全懒惰 Stream.map
。而是在那个时候使用急切的 Enum.map
:
|> Enum.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))
另一方面,你可以在等待时使用Stream.map
,只要你稍后做一些急切的操作,就像你的filter
。这样,等待将穿插在您可能对结果进行的任何处理中。
Elixir 1.4 提供了新的 Task.async_stream/5 函数,它将 return 一个流 运行 在可枚举的每个项目上同时执行给定函数。
还有一些选项可以使用 :max_concurrency
和 :timeout
选项参数来指定最大工作人员数和超时时间。
请注意,您不必等待此任务,因为函数 return 是一个流,因此您可以使用 Enum.to_list/1 or use Stream.run/1。
这将使您的示例 运行 同时发生:
big_list
|> Task.async_stream(Module, :do_something, [])
|> Enum.filter(filter_fun)
你可以试试Parallel Stream.
stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end)
stream |> Enum.into([])
[2,4,6,8,10,12,14,16,18,20]
更新 或者更好地使用 Flow