如何 return 完成第一个异步任务

How to return first async task to complete

我有几项任务正在 运行异步执行。根据输入,一个或多个可能 运行 长,但只有一个任务会 return :success 消息。

slowtask = Task.async(slow())
fasttask = Task.async(fast())

我怎样才能捕获上面两个任务中的第一个来完成,而不必等待另一个?我试过 Task.find/2,但因为它是用枚举实现的,所以它似乎在找到 ref/message 之前等待所有退出信号。我的另一个想法是在 Stream.cycle 中对此进行投票,忽略仍然存在的任务并捕获已经退出的任务。不过,似乎 un elixir 喜欢以这种方式进行投票。

目前还没有在 Elixir 上执行此操作的简单方法。你最好的选择是,如果你只是在给定的进程中等待这些消息,是这样的:

  defmodule TaskFinder do
    def run do
      task1 = Task.async fn -> :timer.sleep(1000); 1 end
      task2 = Task.async fn -> :timer.sleep(5000); 2 end
      await [task1, task2]
    end

    # Be careful, this will receive all messages sent
    # to this process. It will return the first task
    # reply and the list of tasks that came second.
    def await(tasks) do
      receive do
        message ->
          case Task.find(tasks, message) do
            {reply, task} ->
              {reply, List.delete(tasks, task)}
            nil ->
              await(tasks)
          end
      end
    end
  end

  IO.inspect TaskFinder.run

请注意,您还可以使用此模式在 GenServer 中生成任务并使用 Task.find/2 查找匹配的任务。我还将此示例添加到 Elixir 文档中。

要获得第一个结果,您应该等待消息,然后将消息传递给Task.find/2,并处理第一个结果,形式为{task_result, task}

defmodule Tasks do
  def run do
    :random.seed(:os.timestamp)
    durations = Enum.shuffle(1..10)

    Enum.map(durations, fn(duration) -> Task.async(fn -> run_task(duration) end) end)
    |> get_first_result
    |> IO.inspect
  end

  defp get_first_result(tasks) do
    receive do
      msg ->
        case Task.find(tasks, msg) do
          {result, _task} ->
            # got the result
            result

          nil -> 
            # no result -> continue waiting
            get_first_result(tasks)
        end
    end
  end


  defp run_task(1) do
    :success
  end

  defp run_task(duration) do
    :timer.sleep(duration * 100)
    :ok
  end
end

如果 "master" 进程是 GenServer,您应该从 handle_info/2 中调用 Task.find/2,而不是 运行 这个递归循环。

根据 Jose Valim 的回答,这是我用来匹配返回的回复的内容:

def run do
    task1 = Task.async(fn -> :timer.sleep(10000); :slow end)
    task2 = Task.async(fn -> :timer.sleep(2000); :fail end)
    task3 = Task.async(fn -> :timer.sleep(1000); :fail end)

    await([task1, task2, task3])
end

def await(tasks) do
  receive do
    message ->
      case Task.find(tasks, message) do
        {:fail, task} ->
          await(List.delete(tasks, task))
        {reply, _task} ->
          reply             
        nil ->
          await(tasks)
      end
  end
end

这让我可以将第一个函数与 return 除了 :fail 原子之外的东西相匹配,并给我回复。这是否有效,因为 receive/1 只是等待任何消息出现?