使用 Cowboy Websocket 客户端对 Elixir 进行测试
Using Cowboy Websocket Client for Testing with Elixir
首先,确实缺乏 Cowboy 的文档,尤其是 Websockets,但一般来说,一旦它被破译就可以很好地使用。然后将信息从 Erlang 传输到 Elixir 是另一个步骤。多亏了 我才能够得到一个正常运行的 websocket 用于测试目的,但我无法让它同时收听和选择性地发送消息。我认为这是因为接收阻塞了需要发送的线程,并且它本质上链接到 websocket 连接,因此它在等待接收时无法发送。也许这种理解是有缺陷的。我很想被纠正。我尝试生成无济于事,这就是为什么我认为接收阻塞了 websocket 线程。
def ws do
localhost = 'localhost'
path = '/ws/app/1'
port = 5000
{:ok, _} = :application.ensure_all_started(:gun)
connect_opts = %{
connect_timeout: :timer.minutes(1),
retry: 10,
retry_timeout: 100
}
{:ok, conn_pid} = :gun.open(localhost, port, connect_opts)
IO.inspect(conn_pid, label: "conn_pid")
{:ok, protocol} = :gun.await_up(conn_pid)
IO.inspect(protocol, label: "protocol")
# Set custom header with cookie for device id
stream_ref = :gun.ws_upgrade(conn_pid, path, [{"cookie", "device_id=1235"}])
IO.inspect(stream_ref, label: "stream_ref")
receive do
{:gun_upgrade, ^conn_pid, ^stream_ref, ["websocket"], headers} ->
upgrade_success(conn_pid, headers, stream_ref)
{:gun_response, ^conn_pid, _, _, status, headers} ->
exit({:ws_upgrade_failed, status, headers})
{:gun_error, _conn_pid, _stream_ref, reason} ->
exit({:ws_upgrade_failed, reason})
whatever ->
IO.inspect(whatever, label: "Whatever")
# More clauses here as needed.
after 5000 ->
IO.puts "Took too long!"
:erlang.exit("barf!")
end
:ok
end
def upgrade_success(conn_pid, headers, stream_ref) do
IO.puts("Upgraded #{inspect(conn_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
IO.inspect(self(), label: "upgrade self")
# This one runs and message is received
run_test(conn_pid)
# This should spawn and therefore not block
listen(conn_pid, stream_ref)
# This never runs
run_test(conn_pid)
end
def listen(conn_pid, stream_ref) do
spawn receive_messages(conn_pid, stream_ref)
end
def receive_messages(conn_pid, stream_ref) do
IO.inspect conn_pid, label: "conn_pid!"
IO.inspect stream_ref, label: "stream_ref!"
IO.inspect(self(), label: "self pid")
receive do
{:gun_ws, ^conn_pid, ^stream_ref, {:text, msg} } ->
IO.inspect(msg, label: "Message from websocket server:")
other_messages ->
IO.inspect(other_messages, label: "Other messages")
after 5000 ->
IO.puts "Receive timed out"
end
receive_messages(conn_pid, stream_ref)
end
def send_message(message, conn_pid) do
:gun.ws_send(conn_pid, {:text, message})
end
def run_test(conn_pid) do
IO.puts "Running test"
message = "{\"type\":\"init\",\"body\":{\"device_id\":1234}}"
send_message(message, conn_pid)
end
def stop(conn_pid) do
:gun.shutdown(conn_pid)
end
来自gun docs:
Receiving data
Gun sends an Erlang message to the owner process for every Websocket
message it receives.
and:
Connection
...
Gun connections
...
A Gun connection is an Erlang process that manages a socket to a
remote endpoint. This Gun connection is owned by a user process that
is called the owner of the connection, and is managed by the
supervision tree of the gun application.
The owner process communicates with the Gun connection by calling
functions from the module gun. All functions perform their respective
operations asynchronously. The Gun connection will send Erlang
messages to the owner process whenever needed.
虽然文档中没有特别提到,但我很确定 owner 进程 是调用 gun:open()
的进程。我的尝试还表明所有者进程必须调用 gun:ws_send()
。换句话说,所有者进程既要向服务器发送消息,又要从服务器接收消息。
以下代码使用 gen_server
操作 gun,这样 gen_server 既可以向服务器发送消息,也可以从服务器接收消息。
当 gun 从 cowboy http 服务器收到消息时,gun 将消息 Pid ! Msg
发送给所有者进程。在下面的代码中,gen_server
在 init/1
回调中创建了连接,这意味着 gun 将在 gen_server
从 cowboy 接收到 bang (!) 消息。 gen_server 使用 handle_info()
处理直接发送到其邮箱的邮件。
在handle_cast()
中,gen_server
使用gun向cowboy发送请求。因为 handle_cast()
是异步的,这意味着您可以向 cowboy 发送异步消息。而且,当 gun 从 cowboy 收到消息时,gun 将消息发送 (!) 到 gen_server,然后 gen_server 的 handle_info()
函数处理该消息。在 handle_info()
中,调用 gen_server:reply/2
将消息中继到 gen_server
客户端。因此,只要 gen_server
客户端想要检查从 gun 发送的服务器消息,它就可以跳转到接收子句。
-module(client).
-behavior(gen_server).
-export([start_server/0, send_sync/1, send_async/1, get_message/2, go/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, code_change/3]). %%% client functions
-export([sender/1]).
%%% client functions
%%%
start_server() ->
gen_server:start({local, ?MODULE}, ?MODULE, [], []).
send_sync(Requ) ->
gen_server:call(?MODULE, Requ).
send_async(Requ) ->
gen_server:cast(?MODULE, {websocket_request, Requ}).
get_message(WebSocketPid, ClientRef) ->
receive
{ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
io:format("Inside get_message(): Ref = ~w~n", [ClientRef]),
io:format("Client received gun message: ~s~n", [Msg]);
Other ->
io:format("Client received other message: ~w~n", [Other])
end.
receive_loop(WebSocketPid, ClientRef) ->
receive
{ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
io:format("Client received Gun message: ~s~n", [Msg]);
Other ->
io:format("Client received other message: ~w~n", [Other])
end,
receive_loop(WebSocketPid, ClientRef).
go() ->
{ok, GenServerPid} = start_server(),
io:format("[ME]: Inside go(): GenServerPid=~w~n", [GenServerPid]),
[{conn_pid, ConnPid}, {ref, ClientRef}] = send_sync(get_conn_pid),
io:format("[ME]: Inside go(): ConnPid=~w~n", [ConnPid]),
ok = send_async("ABCD"),
get_message(ConnPid, ClientRef),
spawn(?MODULE, sender, [1]),
ok = send_async("XYZ"),
get_message(ConnPid, ClientRef),
receive_loop(ConnPid, ClientRef).
sender(Count) -> %Send messages to handle_info() every 3 secs
send_async(lists:concat(["Hello", Count])),
timer:sleep(3000),
sender(Count+1).
%%%%%% gen_server callbacks
%%%
init(_Arg) ->
{ok, {no_client, ws()}}.
handle_call(get_conn_pid, From={_ClientPid, ClientRef}, _State={_Client, WebSocketPid}) ->
io:format("[ME]: Inside handle_call(): From = ~w~n", [From]),
{reply, [{conn_pid, WebSocketPid}, {ref, ClientRef}], _NewState={From, WebSocketPid} };
handle_call(stop, _From, State) ->
{stop, normal, shutdown_ok, State}; %Calls terminate()
handle_call(_Other, _From, State) ->
{ok, State}.
handle_cast({websocket_request, Msg}, State={_From, WebSocketPid}) ->
gun:ws_send(WebSocketPid, {text, Msg}), %{text, "It's raining!"}),
{noreply, State}.
handle_info(Msg, State={From, _WebSocketPid}) ->
io:format("[ME]: Inside handle_info(): Msg=~w~n", [Msg]),
gen_server:reply(From, Msg),
{noreply, State}.
terminate(_Reason, _State={_From, WebSocketPid}) ->
gun:shutdown(WebSocketPid).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%% private functions
%%%
ws() ->
{ok, _} = application:ensure_all_started(gun),
{ok, ConnPid} = gun:open("localhost", 8080),
{ok, _Protocol} = gun:await_up(ConnPid),
gun:ws_upgrade(ConnPid, "/please_upgrade_to_websocket"),
receive
{gun_ws_upgrade, ConnPid, ok, Headers} ->
io:format("[ME]: Inside gun_ws_upgrade receive clause: ~w~n",
[ConnPid]),
upgrade_success_handler(ConnPid, Headers);
{gun_response, ConnPid, _, _, Status, Headers} ->
exit({ws_upgrade_failed, Status, Headers});
{gun_error, _ConnPid, _StreamRef, Reason} ->
exit({ws_upgrade_failed, Reason})
after 1000 ->
exit(timeout)
end.
upgrade_success_handler(ConnPid, _Headers) ->
io:format("[ME]: Inside upgrade_success_handler(): ~w~n", [ConnPid]),
ConnPid.
=======
哎呀,下面的回答是如何让服务端向客户端推送数据的。
好的,我明白了——在 erlang 中。这个例子有点虐。您需要做几件事:
1)需要获取进程运行的pid websocket_*
函数,与请求的pid不同:
Post-upgrade initialization
Cowboy has separate processes for handling the connection and
requests. Because Websocket takes over the connection, the Websocket
protocol handling occurs in a different process than the request
handling.
This is reflected in the different callbacks Websocket handlers have.
The init/2 callback is called from the temporary request process and
the websocket_ callbacks from the connection process.
This means that some initialization cannot be done from init/2.
Anything that would require the current pid, or be tied to the current
pid, will not work as intended. The optional websocket_init/1 can be
used [to get the pid of the process running the websocket_ callbacks]:
https://ninenines.eu/docs/en/cowboy/2.6/guide/ws_handlers/
这是我使用的代码:
init(Req, State) ->
{cowboy_websocket, Req, State}. %Perform websocket setup
websocket_init(State) ->
io:format("[ME]: Inside websocket_init"),
spawn(?MODULE, push, [self(), "Hi, there"]),
{ok, State}.
push(WebSocketHandleProcess, Greeting) ->
timer:sleep(4000),
WebSocketHandleProcess ! {text, Greeting}.
websocket_handle({text, Msg}, State) ->
timer:sleep(10000), %Don't respond to client request just yet.
{
reply,
{text, io_lib:format("Server received: ~s", [Msg]) },
State
};
websocket_handle(_Other, State) -> %Ignore
{ok, State}.
这将在客户端等待对客户端先前发送到服务器的请求的回复时向客户端推送消息。
2) 如果您向 运行 进程发送消息,则 websocket_*
函数:
Pid ! {text, Msg}
然后该消息将由 websocket_info()
函数处理——而不是 websocket_handle()
函数:
websocket_info({text, Text}, State) ->
{reply, {text, Text}, State};
websocket_info(_Other, State) ->
{ok, State}.
websocket_info()
函数的 return 值就像 websocket_handle()
函数的 return 值一样工作。
因为你的gun客户端现在接收多条消息,gun客户端需要循环接收:
upgrade_success_handler(ConnPid, Headers) ->
io:format("Upgraded ~w. Success!~nHeaders:~n~p~n",
[ConnPid, Headers]),
gun:ws_send(ConnPid, {text, "It's raining!"}),
get_messages(ConnPid). %Move the receive clause into a recursive function
get_messages(ConnPid) ->
receive
{gun_ws, ConnPid, {text, "Greeting: " ++ Greeting} } ->
io:format("~s~n", [Greeting]),
get_messages(ConnPid);
{gun_ws, ConnPid, {text, Msg} } ->
io:format("~s~n", [Msg]),
get_messages(ConnPid)
end.
感谢 7stud 的示例代码和下面反映的编辑:
这是我的 Elixir 解释,为 gun 提供了一个基本的 WebSocket 客户端:
defmodule WebsocketTester.Application do
use Application
def start(_type, _args) do
path = '/ws/app/1'
port = 5000
host = 'localhost'
args = %{path: path, port: port, host: host}
children = [
{ WebSocket.Client, args }
]
Supervisor.start_link(children, strategy: :one_for_one, name: WebsocketTester.Supervisor)
end
end
defmodule WebSocket.Client do
use GenServer
def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
# GenServer callbacks
def init(args) do
# Set up the websocket connection
# get > upgrade
# Initial state with gun_pid and stream_ref
# %{gun_pid: gun_pid, stream_ref: stream_ref} = ws(args)
{:ok, init_ws(args)}
end
# Give back gun_pid from state
def handle_call(:get_conn, from, %{gun_pid: gun_pid, stream_ref: stream_ref}) do
IO.inspect(gun_pid, label: "handle call gun pid")
{:reply, %{gun_pid: gun_pid, stream_ref: stream_ref}, %{from: from, gun_pid: gun_pid} }
end
# Everything else
def handle_call(other, from, state) do
IO.inspect(other, label: "other call")
IO.inspect(from, label: "from")
{:ok, state}
end
# Client sends message to server.
def handle_cast({:websocket_request, message}, %{gun_pid: gun_pid} = state) do
IO.puts message
IO.inspect(gun_pid, label: "gun_pid")
:gun.ws_send(gun_pid, {:text, message})
{:noreply, state}
end
def handle_info(message, %{from: from} = state) do
IO.inspect(message, label: "Inside handle_info(): ")
GenServer.reply(from, message)
{:noreply, state}
end
def terminate(reason, _state) do
IO.puts "Terminated due to #{reason}."
:ok
end
def code_change(_old_version, state, _extra) do
{:ok, state}
end
## Client functions
# Used for getting gun_pid from state
def send_sync(request) do
GenServer.call(__MODULE__, request)
end
# Send a message async
def send_async(request) do
GenServer.cast(__MODULE__, {:websocket_request, request})
end
# Receive a single message
def get_message(stream_ref, gun_pid) do
receive do
{^stream_ref, {:gun_ws, ^gun_pid, {:text, message} }} ->
IO.puts("Client received gun message: #{message}")
other ->
IO.inspect(other, label: "Client received other message")
end
end
# Receive all messages recursively
def receive_loop(stream_ref, gun_pid) do
IO.puts "Listening"
get_message(stream_ref, gun_pid)
receive_loop(stream_ref, gun_pid)
end
def go() do
# Get the gun_pid from state
%{gun_pid: gun_pid, stream_ref: stream_ref} = send_sync(:get_gun_pid)
IO.inspect(gun_pid, label: "Inside go(): gun_pid=")
# Send messages manually
:ok = send_async(Jason.encode!(%{type: "info", greet: "yo"}))
# Or to send just text
# :ok = send_async("yo")
# Receive messages manually
get_message(stream_ref, gun_pid)
# Start sending loop
spawn sender 1
# Start listening
receive_loop(stream_ref, gun_pid)
end
# Send messages to handle_info() every 3 secs
def sender(count) do
send_async("count is #{count}")
:timer.sleep(3000)
sender(count+1)
end
## End of client functions
# Initialize the websocket connection
def init_ws(args) do
%{ path: path, port: port, host: host} = args
{:ok, _} = :application.ensure_all_started(:gun)
connect_opts = %{
connect_timeout: :timer.minutes(1),
retry: 10,
retry_timeout: 100
}
{:ok, gun_pid} = :gun.open(host, port, connect_opts)
{:ok, _protocol} = :gun.await_up(gun_pid)
# Set custom header with cookie for device id - set_headers can be left out if you don't want custom headers
stream_ref = :gun.ws_upgrade(gun_pid, path, set_headers("I like cookies"))
receive do
{:gun_upgrade, ^gun_pid, ^stream_ref, ["websocket"], headers} ->
upgrade_success(gun_pid, headers, stream_ref)
{:gun_response, ^gun_pid, _, _, status, headers} ->
exit({:ws_upgrade_failed, status, headers})
{:gun_error, _gun_pid, _stream_ref, reason} ->
exit({:ws_upgrade_failed, reason})
whatever ->
IO.inspect(whatever, label: "Whatever")
# More clauses here as needed.
after 5000 ->
IO.puts "Took too long!"
:erlang.exit("barf!")
end
# stop(gun_pid)
end
def set_headers(cookie_value) do
[{"cookie", "my_cookie=#{cookie_value}"}]
end
# This just returns the gun_pid for further reference which gets stored in the GenServer state.
def upgrade_success(gun_pid, headers, stream_ref) do
IO.puts("Upgraded #{inspect(gun_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
%{stream_ref: stream_ref, gun_pid: gun_pid}
end
# To stop gun
def stop(gun_pid) do
:gun.shutdown(gun_pid)
end
end
要使用这个:
iex -S mix
iex> WebSocket.Client.go
首先,确实缺乏 Cowboy 的文档,尤其是 Websockets,但一般来说,一旦它被破译就可以很好地使用。然后将信息从 Erlang 传输到 Elixir 是另一个步骤。多亏了
def ws do
localhost = 'localhost'
path = '/ws/app/1'
port = 5000
{:ok, _} = :application.ensure_all_started(:gun)
connect_opts = %{
connect_timeout: :timer.minutes(1),
retry: 10,
retry_timeout: 100
}
{:ok, conn_pid} = :gun.open(localhost, port, connect_opts)
IO.inspect(conn_pid, label: "conn_pid")
{:ok, protocol} = :gun.await_up(conn_pid)
IO.inspect(protocol, label: "protocol")
# Set custom header with cookie for device id
stream_ref = :gun.ws_upgrade(conn_pid, path, [{"cookie", "device_id=1235"}])
IO.inspect(stream_ref, label: "stream_ref")
receive do
{:gun_upgrade, ^conn_pid, ^stream_ref, ["websocket"], headers} ->
upgrade_success(conn_pid, headers, stream_ref)
{:gun_response, ^conn_pid, _, _, status, headers} ->
exit({:ws_upgrade_failed, status, headers})
{:gun_error, _conn_pid, _stream_ref, reason} ->
exit({:ws_upgrade_failed, reason})
whatever ->
IO.inspect(whatever, label: "Whatever")
# More clauses here as needed.
after 5000 ->
IO.puts "Took too long!"
:erlang.exit("barf!")
end
:ok
end
def upgrade_success(conn_pid, headers, stream_ref) do
IO.puts("Upgraded #{inspect(conn_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
IO.inspect(self(), label: "upgrade self")
# This one runs and message is received
run_test(conn_pid)
# This should spawn and therefore not block
listen(conn_pid, stream_ref)
# This never runs
run_test(conn_pid)
end
def listen(conn_pid, stream_ref) do
spawn receive_messages(conn_pid, stream_ref)
end
def receive_messages(conn_pid, stream_ref) do
IO.inspect conn_pid, label: "conn_pid!"
IO.inspect stream_ref, label: "stream_ref!"
IO.inspect(self(), label: "self pid")
receive do
{:gun_ws, ^conn_pid, ^stream_ref, {:text, msg} } ->
IO.inspect(msg, label: "Message from websocket server:")
other_messages ->
IO.inspect(other_messages, label: "Other messages")
after 5000 ->
IO.puts "Receive timed out"
end
receive_messages(conn_pid, stream_ref)
end
def send_message(message, conn_pid) do
:gun.ws_send(conn_pid, {:text, message})
end
def run_test(conn_pid) do
IO.puts "Running test"
message = "{\"type\":\"init\",\"body\":{\"device_id\":1234}}"
send_message(message, conn_pid)
end
def stop(conn_pid) do
:gun.shutdown(conn_pid)
end
来自gun docs:
Receiving data
Gun sends an Erlang message to the owner process for every Websocket message it receives.
and:
Connection
...
Gun connections
...
A Gun connection is an Erlang process that manages a socket to a remote endpoint. This Gun connection is owned by a user process that is called the owner of the connection, and is managed by the supervision tree of the gun application.
The owner process communicates with the Gun connection by calling functions from the module gun. All functions perform their respective operations asynchronously. The Gun connection will send Erlang messages to the owner process whenever needed.
虽然文档中没有特别提到,但我很确定 owner 进程 是调用 gun:open()
的进程。我的尝试还表明所有者进程必须调用 gun:ws_send()
。换句话说,所有者进程既要向服务器发送消息,又要从服务器接收消息。
以下代码使用 gen_server
操作 gun,这样 gen_server 既可以向服务器发送消息,也可以从服务器接收消息。
当 gun 从 cowboy http 服务器收到消息时,gun 将消息 Pid ! Msg
发送给所有者进程。在下面的代码中,gen_server
在 init/1
回调中创建了连接,这意味着 gun 将在 gen_server
从 cowboy 接收到 bang (!) 消息。 gen_server 使用 handle_info()
处理直接发送到其邮箱的邮件。
在handle_cast()
中,gen_server
使用gun向cowboy发送请求。因为 handle_cast()
是异步的,这意味着您可以向 cowboy 发送异步消息。而且,当 gun 从 cowboy 收到消息时,gun 将消息发送 (!) 到 gen_server,然后 gen_server 的 handle_info()
函数处理该消息。在 handle_info()
中,调用 gen_server:reply/2
将消息中继到 gen_server
客户端。因此,只要 gen_server
客户端想要检查从 gun 发送的服务器消息,它就可以跳转到接收子句。
-module(client).
-behavior(gen_server).
-export([start_server/0, send_sync/1, send_async/1, get_message/2, go/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, code_change/3]). %%% client functions
-export([sender/1]).
%%% client functions
%%%
start_server() ->
gen_server:start({local, ?MODULE}, ?MODULE, [], []).
send_sync(Requ) ->
gen_server:call(?MODULE, Requ).
send_async(Requ) ->
gen_server:cast(?MODULE, {websocket_request, Requ}).
get_message(WebSocketPid, ClientRef) ->
receive
{ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
io:format("Inside get_message(): Ref = ~w~n", [ClientRef]),
io:format("Client received gun message: ~s~n", [Msg]);
Other ->
io:format("Client received other message: ~w~n", [Other])
end.
receive_loop(WebSocketPid, ClientRef) ->
receive
{ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
io:format("Client received Gun message: ~s~n", [Msg]);
Other ->
io:format("Client received other message: ~w~n", [Other])
end,
receive_loop(WebSocketPid, ClientRef).
go() ->
{ok, GenServerPid} = start_server(),
io:format("[ME]: Inside go(): GenServerPid=~w~n", [GenServerPid]),
[{conn_pid, ConnPid}, {ref, ClientRef}] = send_sync(get_conn_pid),
io:format("[ME]: Inside go(): ConnPid=~w~n", [ConnPid]),
ok = send_async("ABCD"),
get_message(ConnPid, ClientRef),
spawn(?MODULE, sender, [1]),
ok = send_async("XYZ"),
get_message(ConnPid, ClientRef),
receive_loop(ConnPid, ClientRef).
sender(Count) -> %Send messages to handle_info() every 3 secs
send_async(lists:concat(["Hello", Count])),
timer:sleep(3000),
sender(Count+1).
%%%%%% gen_server callbacks
%%%
init(_Arg) ->
{ok, {no_client, ws()}}.
handle_call(get_conn_pid, From={_ClientPid, ClientRef}, _State={_Client, WebSocketPid}) ->
io:format("[ME]: Inside handle_call(): From = ~w~n", [From]),
{reply, [{conn_pid, WebSocketPid}, {ref, ClientRef}], _NewState={From, WebSocketPid} };
handle_call(stop, _From, State) ->
{stop, normal, shutdown_ok, State}; %Calls terminate()
handle_call(_Other, _From, State) ->
{ok, State}.
handle_cast({websocket_request, Msg}, State={_From, WebSocketPid}) ->
gun:ws_send(WebSocketPid, {text, Msg}), %{text, "It's raining!"}),
{noreply, State}.
handle_info(Msg, State={From, _WebSocketPid}) ->
io:format("[ME]: Inside handle_info(): Msg=~w~n", [Msg]),
gen_server:reply(From, Msg),
{noreply, State}.
terminate(_Reason, _State={_From, WebSocketPid}) ->
gun:shutdown(WebSocketPid).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%% private functions
%%%
ws() ->
{ok, _} = application:ensure_all_started(gun),
{ok, ConnPid} = gun:open("localhost", 8080),
{ok, _Protocol} = gun:await_up(ConnPid),
gun:ws_upgrade(ConnPid, "/please_upgrade_to_websocket"),
receive
{gun_ws_upgrade, ConnPid, ok, Headers} ->
io:format("[ME]: Inside gun_ws_upgrade receive clause: ~w~n",
[ConnPid]),
upgrade_success_handler(ConnPid, Headers);
{gun_response, ConnPid, _, _, Status, Headers} ->
exit({ws_upgrade_failed, Status, Headers});
{gun_error, _ConnPid, _StreamRef, Reason} ->
exit({ws_upgrade_failed, Reason})
after 1000 ->
exit(timeout)
end.
upgrade_success_handler(ConnPid, _Headers) ->
io:format("[ME]: Inside upgrade_success_handler(): ~w~n", [ConnPid]),
ConnPid.
=======
哎呀,下面的回答是如何让服务端向客户端推送数据的。
好的,我明白了——在 erlang 中。这个例子有点虐。您需要做几件事:
1)需要获取进程运行的pid websocket_*
函数,与请求的pid不同:
Post-upgrade initialization
Cowboy has separate processes for handling the connection and requests. Because Websocket takes over the connection, the Websocket protocol handling occurs in a different process than the request handling.
This is reflected in the different callbacks Websocket handlers have. The init/2 callback is called from the temporary request process and the websocket_ callbacks from the connection process.
This means that some initialization cannot be done from init/2. Anything that would require the current pid, or be tied to the current pid, will not work as intended. The optional websocket_init/1 can be used [to get the pid of the process running the websocket_ callbacks]:
https://ninenines.eu/docs/en/cowboy/2.6/guide/ws_handlers/
这是我使用的代码:
init(Req, State) ->
{cowboy_websocket, Req, State}. %Perform websocket setup
websocket_init(State) ->
io:format("[ME]: Inside websocket_init"),
spawn(?MODULE, push, [self(), "Hi, there"]),
{ok, State}.
push(WebSocketHandleProcess, Greeting) ->
timer:sleep(4000),
WebSocketHandleProcess ! {text, Greeting}.
websocket_handle({text, Msg}, State) ->
timer:sleep(10000), %Don't respond to client request just yet.
{
reply,
{text, io_lib:format("Server received: ~s", [Msg]) },
State
};
websocket_handle(_Other, State) -> %Ignore
{ok, State}.
这将在客户端等待对客户端先前发送到服务器的请求的回复时向客户端推送消息。
2) 如果您向 运行 进程发送消息,则 websocket_*
函数:
Pid ! {text, Msg}
然后该消息将由 websocket_info()
函数处理——而不是 websocket_handle()
函数:
websocket_info({text, Text}, State) ->
{reply, {text, Text}, State};
websocket_info(_Other, State) ->
{ok, State}.
websocket_info()
函数的 return 值就像 websocket_handle()
函数的 return 值一样工作。
因为你的gun客户端现在接收多条消息,gun客户端需要循环接收:
upgrade_success_handler(ConnPid, Headers) ->
io:format("Upgraded ~w. Success!~nHeaders:~n~p~n",
[ConnPid, Headers]),
gun:ws_send(ConnPid, {text, "It's raining!"}),
get_messages(ConnPid). %Move the receive clause into a recursive function
get_messages(ConnPid) ->
receive
{gun_ws, ConnPid, {text, "Greeting: " ++ Greeting} } ->
io:format("~s~n", [Greeting]),
get_messages(ConnPid);
{gun_ws, ConnPid, {text, Msg} } ->
io:format("~s~n", [Msg]),
get_messages(ConnPid)
end.
感谢 7stud 的示例代码和下面反映的编辑:
这是我的 Elixir 解释,为 gun 提供了一个基本的 WebSocket 客户端:
defmodule WebsocketTester.Application do
use Application
def start(_type, _args) do
path = '/ws/app/1'
port = 5000
host = 'localhost'
args = %{path: path, port: port, host: host}
children = [
{ WebSocket.Client, args }
]
Supervisor.start_link(children, strategy: :one_for_one, name: WebsocketTester.Supervisor)
end
end
defmodule WebSocket.Client do
use GenServer
def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
# GenServer callbacks
def init(args) do
# Set up the websocket connection
# get > upgrade
# Initial state with gun_pid and stream_ref
# %{gun_pid: gun_pid, stream_ref: stream_ref} = ws(args)
{:ok, init_ws(args)}
end
# Give back gun_pid from state
def handle_call(:get_conn, from, %{gun_pid: gun_pid, stream_ref: stream_ref}) do
IO.inspect(gun_pid, label: "handle call gun pid")
{:reply, %{gun_pid: gun_pid, stream_ref: stream_ref}, %{from: from, gun_pid: gun_pid} }
end
# Everything else
def handle_call(other, from, state) do
IO.inspect(other, label: "other call")
IO.inspect(from, label: "from")
{:ok, state}
end
# Client sends message to server.
def handle_cast({:websocket_request, message}, %{gun_pid: gun_pid} = state) do
IO.puts message
IO.inspect(gun_pid, label: "gun_pid")
:gun.ws_send(gun_pid, {:text, message})
{:noreply, state}
end
def handle_info(message, %{from: from} = state) do
IO.inspect(message, label: "Inside handle_info(): ")
GenServer.reply(from, message)
{:noreply, state}
end
def terminate(reason, _state) do
IO.puts "Terminated due to #{reason}."
:ok
end
def code_change(_old_version, state, _extra) do
{:ok, state}
end
## Client functions
# Used for getting gun_pid from state
def send_sync(request) do
GenServer.call(__MODULE__, request)
end
# Send a message async
def send_async(request) do
GenServer.cast(__MODULE__, {:websocket_request, request})
end
# Receive a single message
def get_message(stream_ref, gun_pid) do
receive do
{^stream_ref, {:gun_ws, ^gun_pid, {:text, message} }} ->
IO.puts("Client received gun message: #{message}")
other ->
IO.inspect(other, label: "Client received other message")
end
end
# Receive all messages recursively
def receive_loop(stream_ref, gun_pid) do
IO.puts "Listening"
get_message(stream_ref, gun_pid)
receive_loop(stream_ref, gun_pid)
end
def go() do
# Get the gun_pid from state
%{gun_pid: gun_pid, stream_ref: stream_ref} = send_sync(:get_gun_pid)
IO.inspect(gun_pid, label: "Inside go(): gun_pid=")
# Send messages manually
:ok = send_async(Jason.encode!(%{type: "info", greet: "yo"}))
# Or to send just text
# :ok = send_async("yo")
# Receive messages manually
get_message(stream_ref, gun_pid)
# Start sending loop
spawn sender 1
# Start listening
receive_loop(stream_ref, gun_pid)
end
# Send messages to handle_info() every 3 secs
def sender(count) do
send_async("count is #{count}")
:timer.sleep(3000)
sender(count+1)
end
## End of client functions
# Initialize the websocket connection
def init_ws(args) do
%{ path: path, port: port, host: host} = args
{:ok, _} = :application.ensure_all_started(:gun)
connect_opts = %{
connect_timeout: :timer.minutes(1),
retry: 10,
retry_timeout: 100
}
{:ok, gun_pid} = :gun.open(host, port, connect_opts)
{:ok, _protocol} = :gun.await_up(gun_pid)
# Set custom header with cookie for device id - set_headers can be left out if you don't want custom headers
stream_ref = :gun.ws_upgrade(gun_pid, path, set_headers("I like cookies"))
receive do
{:gun_upgrade, ^gun_pid, ^stream_ref, ["websocket"], headers} ->
upgrade_success(gun_pid, headers, stream_ref)
{:gun_response, ^gun_pid, _, _, status, headers} ->
exit({:ws_upgrade_failed, status, headers})
{:gun_error, _gun_pid, _stream_ref, reason} ->
exit({:ws_upgrade_failed, reason})
whatever ->
IO.inspect(whatever, label: "Whatever")
# More clauses here as needed.
after 5000 ->
IO.puts "Took too long!"
:erlang.exit("barf!")
end
# stop(gun_pid)
end
def set_headers(cookie_value) do
[{"cookie", "my_cookie=#{cookie_value}"}]
end
# This just returns the gun_pid for further reference which gets stored in the GenServer state.
def upgrade_success(gun_pid, headers, stream_ref) do
IO.puts("Upgraded #{inspect(gun_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
%{stream_ref: stream_ref, gun_pid: gun_pid}
end
# To stop gun
def stop(gun_pid) do
:gun.shutdown(gun_pid)
end
end
要使用这个:
iex -S mix
iex> WebSocket.Client.go