如何在 GenServer Elixir 中每秒发出请求而不等待前一个请求完成

how to make request per second without waiting for previous one to complete in GenServer Elixir

这与上述问题有关,所以我发布了link。

我做了这样的GenServer worker

这是我的整个 GenServer

defmodule Recording.Worker do
  use GenServer
  require Logger

  def start_link(opts) do
    {id, opts} = Map.pop!(opts, :id)
    GenServer.start_link(__MODULE__, opts, name: id)
  end

  def init(state) do
    schedule_fetch_call(state.sleep)
    {:ok, state}
  end

  def handle_info(:jpeg_fetch, state) do
    {for_jpeg_bank, running} =
      make_jpeg_request(state.camera)
      |> running_map()

    IO.inspect("request")
    IO.inspect(DateTime.utc_now())
    put_it_in_jpeg_bank(for_jpeg_bank, state.camera.name)
    schedule_fetch_call(state.sleep)
    {:noreply, Map.put(state, :running, running)}
  end

  def get_state(pid) do
    GenServer.call(pid, :get)
  end

  def handle_call(:get, _from, state),
    do: {:reply, state, state}

  defp schedule_fetch_call(sleep),
    do: Process.send_after(self(), :jpeg_fetch, sleep)

  defp make_jpeg_request(camera) do
    headers = get_request_headers(camera.auth, camera.username, camera.password)
    requested_at = DateTime.utc_now()
    Everjamer.request(:get, camera.url, headers)
    |> get_body_size(requested_at)
  end

  defp get_body_size({:ok, %Finch.Response{body: body, headers: headers, status: 200}}, requested_at) do
    IO.inspect(headers)
    {body, "9", requested_at}
  end

  defp get_body_size(_error, requested_at), do: {:failed, requested_at}

  defp running_map({body, file_size, requested_at}),
    do:
      {%{datetime: requested_at, image: body, file_size: file_size},
       %{datetime: requested_at}}

  defp running_map({:failed, requested_at}), do: {%{}, %{datetime: requested_at}}

  defp get_request_headers("true", username, password),
    do: [{"Authorization", "Basic #{Base.encode64("#{username}:#{password}")}"}]

  defp get_request_headers(_, _username, _password), do: []

  defp put_it_in_jpeg_bank(state, process) do
    String.to_atom("storage_#{process}")
    |> Process.whereis()
    |> JpegBank.add(state)
  end
end

我正在尝试每秒发出一个 HTTP 请求。即使在使用 GenSever 并使用 DynamicSupervisor 启动它时,例如

General.Supervisor.start_child(Recording.Worker, %{id: String.to_atom(detailed_camera.name), camera: detailed_camera, sleep: detailed_camera.sleep})

这部分

  def handle_info(:jpeg_fetch, state) do
    {for_jpeg_bank, running} =
      make_jpeg_request(state.camera)
      |> running_map()

    IO.inspect("request")
    IO.inspect(DateTime.utc_now())
    put_it_in_jpeg_bank(for_jpeg_bank, state.camera.name)
    schedule_fetch_call(state.sleep)
    {:noreply, Map.put(state, :running, running)}
  end

仍在等待上一个请求完成,它变为(请求完成所花费的时间)/每个请求而不是每秒请求。

IO.inspect的结果如

"request"
~U[2020-12-30 05:27:21.466262Z]
"request"
~U[2020-12-30 05:27:24.184548Z]
"request"
~U[2020-12-30 05:27:26.967173Z]

"request"
~U[2020-12-30 05:27:29.831532Z]

在 Elixir 中有没有什么方法可以在不使用 spawn 的情况下,handle_info 保持 运行 而不等待上一个请求或上一个方法完成?

一个人可能会带着两个子进程启动 SupervisorDynamicSupervisor 进程和会产生工人的进程。后者会每秒生成一个 worker 并立即重新安排。

主要主管

defmodule MyApp.Supervisor do
  use Supervisor

  def start_link(),
    do: Supervisor.start_link(__MODULE__, nil, name: __MODULE__)

  @impl Supervisor
  def init(nil) do
    children = [
      {DynamicSupervisor, strategy: :one_for_one, name: MyApp.DS},
      {MyApp.WorkerStarter, [%{sleep: 1_000}]}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

工人初学者

defmodule MyApp.WorkerStarter do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  def init(state) do
    schedule_fetch_call(state)
    {:ok, state}
  end

  def handle_info(:request, state) do
    schedule_fetch_call(state.sleep)
    DynamicSupervisor.start_child(MyApp.DS, {MyApp.Worker, [state])
    {:noreply, state}
  end

  defp schedule_fetch_call(state),
    do: Process.send_after(self(), :request, state.sleep)
end

工人

defmodule MyApp.Worker do
  use GenServer

  def start_link(state), 
    do: GenServer.start_link(__MODULE__, state)
  
  def init(state) do
    perform_work(state)
    {:stop, :normal}
  end

  defp perform_work(state), do: ...