使用 Cowboy Websocket 客户端对 Elixir 进行测试

Using Cowboy Websocket Client for Testing with Elixir

首先,确实缺乏 Cowboy 的文档,尤其是 Websockets,但一般来说,一旦它被破译就可以很好地使用。然后将信息从 Erlang 传输到 Elixir 是另一个步骤。多亏了 我才能够得到一个正常运行的 websocket 用于测试目的,但我无法让它同时收听和选择性地发送消息。我认为这是因为接收阻塞了需要发送的线程,并且它本质上链接到 websocket 连接,因此它在等待接收时无法发送。也许这种理解是有缺陷的。我很想被纠正。我尝试生成无济于事,这就是为什么我认为接收阻塞了 websocket 线程。

def ws do
    localhost = 'localhost'
    path = '/ws/app/1'
    port = 5000

    {:ok, _} = :application.ensure_all_started(:gun)
    connect_opts = %{
      connect_timeout: :timer.minutes(1),
      retry: 10,
      retry_timeout: 100
    }

    {:ok, conn_pid} = :gun.open(localhost, port, connect_opts)
    IO.inspect(conn_pid, label: "conn_pid")
    {:ok, protocol} = :gun.await_up(conn_pid)
    IO.inspect(protocol, label: "protocol")
    # Set custom header with cookie for device id
    stream_ref = :gun.ws_upgrade(conn_pid, path, [{"cookie", "device_id=1235"}])
    IO.inspect(stream_ref, label: "stream_ref")
    receive do
      {:gun_upgrade, ^conn_pid, ^stream_ref, ["websocket"], headers} ->
              upgrade_success(conn_pid, headers, stream_ref)
      {:gun_response, ^conn_pid, _, _, status, headers} ->
              exit({:ws_upgrade_failed, status, headers})
      {:gun_error, _conn_pid, _stream_ref, reason} ->
              exit({:ws_upgrade_failed, reason})
      whatever ->
        IO.inspect(whatever, label: "Whatever")
      # More clauses here as needed.
    after 5000 ->
        IO.puts "Took too long!"
        :erlang.exit("barf!")
    end
    :ok
  end

  def upgrade_success(conn_pid, headers, stream_ref) do
    IO.puts("Upgraded #{inspect(conn_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")

    IO.inspect(self(), label: "upgrade self")
    # This one runs and message is received
    run_test(conn_pid)
    # This should spawn and therefore not block
    listen(conn_pid, stream_ref)
    # This never runs
    run_test(conn_pid)
  end

  def listen(conn_pid, stream_ref) do
    spawn receive_messages(conn_pid, stream_ref)
  end
  def receive_messages(conn_pid, stream_ref) do
    IO.inspect conn_pid, label: "conn_pid!"
    IO.inspect stream_ref, label: "stream_ref!"
    IO.inspect(self(), label: "self pid")
    receive do
      {:gun_ws, ^conn_pid, ^stream_ref, {:text, msg} } ->
          IO.inspect(msg, label: "Message from websocket server:")
      other_messages ->
        IO.inspect(other_messages, label: "Other messages")
    after 5000 ->
      IO.puts "Receive timed out"
    end
    receive_messages(conn_pid, stream_ref)
  end

  def send_message(message, conn_pid) do
    :gun.ws_send(conn_pid, {:text, message})
  end

  def run_test(conn_pid) do
    IO.puts "Running test"
    message = "{\"type\":\"init\",\"body\":{\"device_id\":1234}}"
    send_message(message, conn_pid)
  end

  def stop(conn_pid) do
    :gun.shutdown(conn_pid)
  end

来自gun docs

Receiving data

Gun sends an Erlang message to the owner process for every Websocket message it receives.

and:

Connection

...

Gun connections

...

A Gun connection is an Erlang process that manages a socket to a remote endpoint. This Gun connection is owned by a user process that is called the owner of the connection, and is managed by the supervision tree of the gun application.

The owner process communicates with the Gun connection by calling functions from the module gun. All functions perform their respective operations asynchronously. The Gun connection will send Erlang messages to the owner process whenever needed.

虽然文档中没有特别提到,但我很确定 owner 进程 是调用 gun:open() 的进程。我的尝试还表明所有者进程必须调用 gun:ws_send()。换句话说,所有者进程既要向服务器发送消息,又要从服务器接收消息。

以下代码使用 gen_server 操作 gun,这样 gen_server 既可以向服务器发送消息,也可以从服务器接收消息。

当 gun 从 cowboy http 服务器收到消息时,gun 将消息 Pid ! Msg 发送给所有者进程。在下面的代码中,gen_serverinit/1 回调中创建了连接,这意味着 gun 将在 gen_server 从 cowboy 接收到 bang (!) 消息。 gen_server 使用 handle_info() 处理直接发送到其邮箱的邮件。

handle_cast()中,gen_server使用gun向cowboy发送请求。因为 handle_cast() 是异步的,这意味着您可以向 cowboy 发送异步消息。而且,当 gun 从 cowboy 收到消息时,gun 将消息发送 (!) 到 gen_server,然后 gen_server 的 handle_info() 函数处理该消息。在 handle_info() 中,调用 gen_server:reply/2 将消息中继到 gen_server 客户端。因此,只要 gen_server 客户端想要检查从 gun 发送的服务器消息,它就可以跳转到接收子句。

-module(client).
-behavior(gen_server).
-export([start_server/0, send_sync/1, send_async/1, get_message/2, go/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, code_change/3]).  %%% client functions
-export([sender/1]).

%%% client functions
%%%

start_server() ->
    gen_server:start({local, ?MODULE}, ?MODULE, [], []).

send_sync(Requ) ->
    gen_server:call(?MODULE, Requ).

send_async(Requ) -> 
    gen_server:cast(?MODULE, {websocket_request, Requ}).

get_message(WebSocketPid, ClientRef) ->
    receive
        {ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
            io:format("Inside get_message(): Ref = ~w~n", [ClientRef]),
            io:format("Client received gun message: ~s~n", [Msg]);
        Other ->
            io:format("Client received other message: ~w~n", [Other])
    end.

receive_loop(WebSocketPid, ClientRef) ->
    receive
        {ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
            io:format("Client received Gun message: ~s~n", [Msg]);
        Other ->
            io:format("Client received other message: ~w~n", [Other])
    end,
    receive_loop(WebSocketPid, ClientRef).

go() ->
    {ok, GenServerPid} = start_server(),
    io:format("[ME]: Inside go(): GenServerPid=~w~n", [GenServerPid]),

    [{conn_pid, ConnPid}, {ref, ClientRef}] = send_sync(get_conn_pid),
    io:format("[ME]: Inside go(): ConnPid=~w~n", [ConnPid]),

    ok = send_async("ABCD"),
    get_message(ConnPid, ClientRef),

    spawn(?MODULE, sender, [1]),

    ok = send_async("XYZ"),
    get_message(ConnPid, ClientRef),

    receive_loop(ConnPid, ClientRef).

sender(Count) -> %Send messages to handle_info() every 3 secs
    send_async(lists:concat(["Hello", Count])),
    timer:sleep(3000),
    sender(Count+1).

%%%%%% gen_server callbacks
%%%

init(_Arg) ->
    {ok, {no_client, ws()}}.

handle_call(get_conn_pid, From={_ClientPid, ClientRef}, _State={_Client, WebSocketPid}) ->
    io:format("[ME]: Inside handle_call(): From = ~w~n", [From]),
    {reply, [{conn_pid, WebSocketPid}, {ref, ClientRef}], _NewState={From, WebSocketPid} };
handle_call(stop, _From, State) ->
    {stop, normal, shutdown_ok, State}; %Calls terminate()
handle_call(_Other, _From, State) ->
    {ok, State}.

handle_cast({websocket_request, Msg}, State={_From, WebSocketPid}) ->
    gun:ws_send(WebSocketPid, {text, Msg}), %{text, "It's raining!"}),
    {noreply, State}.

handle_info(Msg, State={From, _WebSocketPid}) ->
    io:format("[ME]: Inside handle_info(): Msg=~w~n", [Msg]),
    gen_server:reply(From, Msg),
    {noreply, State}.

terminate(_Reason, _State={_From, WebSocketPid}) -> 
    gun:shutdown(WebSocketPid).


code_change(_OldVsn, State, _Extra) ->
    {ok, State}.


%%%% private functions
%%%

ws() ->
    {ok, _} = application:ensure_all_started(gun),
    {ok, ConnPid} = gun:open("localhost", 8080),
    {ok, _Protocol} = gun:await_up(ConnPid),

    gun:ws_upgrade(ConnPid, "/please_upgrade_to_websocket"),

    receive
        {gun_ws_upgrade, ConnPid, ok, Headers} ->
            io:format("[ME]: Inside gun_ws_upgrade receive clause: ~w~n", 
                      [ConnPid]),
            upgrade_success_handler(ConnPid, Headers);
        {gun_response, ConnPid, _, _, Status, Headers} ->
            exit({ws_upgrade_failed, Status, Headers});
        {gun_error, _ConnPid, _StreamRef, Reason} ->
            exit({ws_upgrade_failed, Reason})
    after 1000 ->
        exit(timeout)
    end.


upgrade_success_handler(ConnPid, _Headers) ->
    io:format("[ME]: Inside upgrade_success_handler(): ~w~n", [ConnPid]),  
    ConnPid.

=======

哎呀,下面的回答是如何让服务端向客户端推送数据的。

好的,我明白了——在 erlang 中。这个例子有点虐。您需要做几件事:

1)需要获取进程运行的pid websocket_*函数,与请求的pid不同:

Post-upgrade initialization

Cowboy has separate processes for handling the connection and requests. Because Websocket takes over the connection, the Websocket protocol handling occurs in a different process than the request handling.

This is reflected in the different callbacks Websocket handlers have. The init/2 callback is called from the temporary request process and the websocket_ callbacks from the connection process.

This means that some initialization cannot be done from init/2. Anything that would require the current pid, or be tied to the current pid, will not work as intended. The optional websocket_init/1 can be used [to get the pid of the process running the websocket_ callbacks]:

https://ninenines.eu/docs/en/cowboy/2.6/guide/ws_handlers/

这是我使用的代码:

init(Req, State) ->
    {cowboy_websocket, Req, State}.  %Perform websocket setup

websocket_init(State) ->
    io:format("[ME]: Inside websocket_init"),
    spawn(?MODULE, push, [self(), "Hi, there"]),
    {ok, State}.

push(WebSocketHandleProcess, Greeting) ->
    timer:sleep(4000),
    WebSocketHandleProcess ! {text, Greeting}.

websocket_handle({text, Msg}, State) ->
    timer:sleep(10000), %Don't respond to client request just yet.
    {
     reply, 
     {text, io_lib:format("Server received: ~s", [Msg]) },
     State
    };
websocket_handle(_Other, State) ->  %Ignore
    {ok, State}.

这将在客户端等待对客户端先前发送到服务器的请求的回复时向客户端推送消息。

2) 如果您向 运行 进程发送消息,则 websocket_* 函数:

Pid ! {text, Msg}

然后该消息将由 websocket_info() 函数处理——而不是 websocket_handle() 函数:

websocket_info({text, Text}, State) ->
    {reply, {text, Text}, State};
websocket_info(_Other, State) ->
    {ok, State}.

websocket_info() 函数的 return 值就像 websocket_handle() 函数的 return 值一样工作。

因为你的gun客户端现在接收多条消息,gun客户端需要循环接收:

upgrade_success_handler(ConnPid, Headers) ->
    io:format("Upgraded ~w. Success!~nHeaders:~n~p~n", 
              [ConnPid, Headers]),

    gun:ws_send(ConnPid, {text, "It's raining!"}),

    get_messages(ConnPid).  %Move the receive clause into a recursive function

get_messages(ConnPid) ->
    receive
        {gun_ws, ConnPid, {text, "Greeting: " ++ Greeting} } ->
            io:format("~s~n", [Greeting]),
            get_messages(ConnPid);

        {gun_ws, ConnPid, {text, Msg} } ->
            io:format("~s~n", [Msg]),
            get_messages(ConnPid)
    end.

感谢 7stud 的示例代码和下面反映的编辑:

这是我的 Elixir 解释,为 gun 提供了一个基本的 WebSocket 客户端:

defmodule WebsocketTester.Application do

  use Application

  def start(_type, _args) do

    path = '/ws/app/1'

    port = 5000

    host = 'localhost'

    args = %{path: path, port: port, host: host}

    children = [
      { WebSocket.Client, args }
    ]
    Supervisor.start_link(children, strategy: :one_for_one, name: WebsocketTester.Supervisor)
  end
end

defmodule WebSocket.Client do

  use GenServer

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  # GenServer callbacks

  def init(args) do
    # Set up the websocket connection
    # get > upgrade
    # Initial state with gun_pid and stream_ref
    # %{gun_pid: gun_pid, stream_ref: stream_ref} = ws(args)
    {:ok, init_ws(args)}
  end

  # Give back gun_pid from state
  def handle_call(:get_conn, from, %{gun_pid: gun_pid, stream_ref: stream_ref}) do
    IO.inspect(gun_pid, label: "handle call gun pid")
    {:reply, %{gun_pid: gun_pid, stream_ref: stream_ref}, %{from: from, gun_pid: gun_pid} }
  end
  # Everything else
  def handle_call(other, from, state) do
    IO.inspect(other, label: "other call")
    IO.inspect(from, label: "from")
    {:ok, state}
  end
  # Client sends message to server.
  def handle_cast({:websocket_request, message}, %{gun_pid: gun_pid} = state) do
    IO.puts message
    IO.inspect(gun_pid, label: "gun_pid")
    :gun.ws_send(gun_pid, {:text, message})
    {:noreply, state}
  end

  def handle_info(message, %{from: from} = state) do
    IO.inspect(message, label: "Inside handle_info(): ")
    GenServer.reply(from, message)
    {:noreply, state}
  end

  def terminate(reason, _state) do
    IO.puts "Terminated due to #{reason}."
    :ok
  end


  def code_change(_old_version, state, _extra) do
    {:ok, state}
  end

  ## Client functions
  # Used for getting gun_pid from state
  def send_sync(request) do
    GenServer.call(__MODULE__, request)
  end

  # Send a message async
  def send_async(request) do
    GenServer.cast(__MODULE__, {:websocket_request, request})
  end

  # Receive a single message
  def get_message(stream_ref, gun_pid) do
      receive do
          {^stream_ref, {:gun_ws, ^gun_pid, {:text, message} }} ->
              IO.puts("Client received gun message: #{message}")
          other ->
            IO.inspect(other, label: "Client received other message")
      end
  end

  # Receive all messages recursively
  def receive_loop(stream_ref, gun_pid) do
    IO.puts "Listening"
      get_message(stream_ref, gun_pid)
      receive_loop(stream_ref, gun_pid)
  end

  def go() do
    # Get the gun_pid from state
    %{gun_pid: gun_pid, stream_ref: stream_ref} = send_sync(:get_gun_pid)
    IO.inspect(gun_pid, label: "Inside go(): gun_pid=")
    # Send messages manually
    :ok = send_async(Jason.encode!(%{type: "info", greet: "yo"}))
    # Or to send just text
    # :ok = send_async("yo")

    # Receive messages manually
    get_message(stream_ref, gun_pid)

    # Start sending loop
    spawn sender 1

    # Start listening
    receive_loop(stream_ref, gun_pid)
  end

  # Send messages to handle_info() every 3 secs
  def sender(count) do
      send_async("count is #{count}")
      :timer.sleep(3000)
      sender(count+1)
  end

  ## End of client functions

  # Initialize the websocket connection
  def init_ws(args) do

    %{ path: path, port: port, host: host} = args

    {:ok, _} = :application.ensure_all_started(:gun)
    connect_opts = %{
      connect_timeout: :timer.minutes(1),
      retry: 10,
      retry_timeout: 100
    }

    {:ok, gun_pid} = :gun.open(host, port, connect_opts)
    {:ok, _protocol} = :gun.await_up(gun_pid)
    # Set custom header with cookie for device id - set_headers can be left out if you don't want custom headers
    stream_ref = :gun.ws_upgrade(gun_pid, path, set_headers("I like cookies"))
    receive do
      {:gun_upgrade, ^gun_pid, ^stream_ref, ["websocket"], headers} ->
            upgrade_success(gun_pid, headers, stream_ref)
      {:gun_response, ^gun_pid, _, _, status, headers} ->
              exit({:ws_upgrade_failed, status, headers})
      {:gun_error, _gun_pid, _stream_ref, reason} ->
              exit({:ws_upgrade_failed, reason})
      whatever ->
        IO.inspect(whatever, label: "Whatever")
      # More clauses here as needed.
    after 5000 ->
        IO.puts "Took too long!"
        :erlang.exit("barf!")
    end
    # stop(gun_pid)
  end


  def set_headers(cookie_value) do
    [{"cookie", "my_cookie=#{cookie_value}"}]
  end

  # This just returns the gun_pid for further reference which gets stored in the GenServer state.
  def upgrade_success(gun_pid, headers, stream_ref) do
    IO.puts("Upgraded #{inspect(gun_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
    %{stream_ref: stream_ref, gun_pid: gun_pid}
  end

  # To stop gun
  def stop(gun_pid) do
    :gun.shutdown(gun_pid)
  end

end

要使用这个:

iex -S mix
iex> WebSocket.Client.go