如何 return 完成第一个异步任务
How to return first async task to complete
我有几项任务正在 运行异步执行。根据输入,一个或多个可能 运行 长,但只有一个任务会 return :success 消息。
slowtask = Task.async(slow())
fasttask = Task.async(fast())
我怎样才能捕获上面两个任务中的第一个来完成,而不必等待另一个?我试过 Task.find/2
,但因为它是用枚举实现的,所以它似乎在找到 ref/message 之前等待所有退出信号。我的另一个想法是在 Stream.cycle
中对此进行投票,忽略仍然存在的任务并捕获已经退出的任务。不过,似乎 un elixir 喜欢以这种方式进行投票。
目前还没有在 Elixir 上执行此操作的简单方法。你最好的选择是,如果你只是在给定的进程中等待这些消息,是这样的:
defmodule TaskFinder do
def run do
task1 = Task.async fn -> :timer.sleep(1000); 1 end
task2 = Task.async fn -> :timer.sleep(5000); 2 end
await [task1, task2]
end
# Be careful, this will receive all messages sent
# to this process. It will return the first task
# reply and the list of tasks that came second.
def await(tasks) do
receive do
message ->
case Task.find(tasks, message) do
{reply, task} ->
{reply, List.delete(tasks, task)}
nil ->
await(tasks)
end
end
end
end
IO.inspect TaskFinder.run
请注意,您还可以使用此模式在 GenServer 中生成任务并使用 Task.find/2
查找匹配的任务。我还将此示例添加到 Elixir 文档中。
要获得第一个结果,您应该等待消息,然后将消息传递给Task.find/2
,并处理第一个结果,形式为{task_result, task}
。
defmodule Tasks do
def run do
:random.seed(:os.timestamp)
durations = Enum.shuffle(1..10)
Enum.map(durations, fn(duration) -> Task.async(fn -> run_task(duration) end) end)
|> get_first_result
|> IO.inspect
end
defp get_first_result(tasks) do
receive do
msg ->
case Task.find(tasks, msg) do
{result, _task} ->
# got the result
result
nil ->
# no result -> continue waiting
get_first_result(tasks)
end
end
end
defp run_task(1) do
:success
end
defp run_task(duration) do
:timer.sleep(duration * 100)
:ok
end
end
如果 "master" 进程是 GenServer
,您应该从 handle_info/2
中调用 Task.find/2
,而不是 运行 这个递归循环。
根据 Jose Valim 的回答,这是我用来匹配返回的回复的内容:
def run do
task1 = Task.async(fn -> :timer.sleep(10000); :slow end)
task2 = Task.async(fn -> :timer.sleep(2000); :fail end)
task3 = Task.async(fn -> :timer.sleep(1000); :fail end)
await([task1, task2, task3])
end
def await(tasks) do
receive do
message ->
case Task.find(tasks, message) do
{:fail, task} ->
await(List.delete(tasks, task))
{reply, _task} ->
reply
nil ->
await(tasks)
end
end
end
这让我可以将第一个函数与 return 除了 :fail 原子之外的东西相匹配,并给我回复。这是否有效,因为 receive/1 只是等待任何消息出现?
我有几项任务正在 运行异步执行。根据输入,一个或多个可能 运行 长,但只有一个任务会 return :success 消息。
slowtask = Task.async(slow())
fasttask = Task.async(fast())
我怎样才能捕获上面两个任务中的第一个来完成,而不必等待另一个?我试过 Task.find/2
,但因为它是用枚举实现的,所以它似乎在找到 ref/message 之前等待所有退出信号。我的另一个想法是在 Stream.cycle
中对此进行投票,忽略仍然存在的任务并捕获已经退出的任务。不过,似乎 un elixir 喜欢以这种方式进行投票。
目前还没有在 Elixir 上执行此操作的简单方法。你最好的选择是,如果你只是在给定的进程中等待这些消息,是这样的:
defmodule TaskFinder do
def run do
task1 = Task.async fn -> :timer.sleep(1000); 1 end
task2 = Task.async fn -> :timer.sleep(5000); 2 end
await [task1, task2]
end
# Be careful, this will receive all messages sent
# to this process. It will return the first task
# reply and the list of tasks that came second.
def await(tasks) do
receive do
message ->
case Task.find(tasks, message) do
{reply, task} ->
{reply, List.delete(tasks, task)}
nil ->
await(tasks)
end
end
end
end
IO.inspect TaskFinder.run
请注意,您还可以使用此模式在 GenServer 中生成任务并使用 Task.find/2
查找匹配的任务。我还将此示例添加到 Elixir 文档中。
要获得第一个结果,您应该等待消息,然后将消息传递给Task.find/2
,并处理第一个结果,形式为{task_result, task}
。
defmodule Tasks do
def run do
:random.seed(:os.timestamp)
durations = Enum.shuffle(1..10)
Enum.map(durations, fn(duration) -> Task.async(fn -> run_task(duration) end) end)
|> get_first_result
|> IO.inspect
end
defp get_first_result(tasks) do
receive do
msg ->
case Task.find(tasks, msg) do
{result, _task} ->
# got the result
result
nil ->
# no result -> continue waiting
get_first_result(tasks)
end
end
end
defp run_task(1) do
:success
end
defp run_task(duration) do
:timer.sleep(duration * 100)
:ok
end
end
如果 "master" 进程是 GenServer
,您应该从 handle_info/2
中调用 Task.find/2
,而不是 运行 这个递归循环。
根据 Jose Valim 的回答,这是我用来匹配返回的回复的内容:
def run do
task1 = Task.async(fn -> :timer.sleep(10000); :slow end)
task2 = Task.async(fn -> :timer.sleep(2000); :fail end)
task3 = Task.async(fn -> :timer.sleep(1000); :fail end)
await([task1, task2, task3])
end
def await(tasks) do
receive do
message ->
case Task.find(tasks, message) do
{:fail, task} ->
await(List.delete(tasks, task))
{reply, _task} ->
reply
nil ->
await(tasks)
end
end
end
这让我可以将第一个函数与 return 除了 :fail 原子之外的东西相匹配,并给我回复。这是否有效,因为 receive/1 只是等待任何消息出现?