如何保证一个函数不能被调用两次?
How to ensure a function can't be called twice?
使用 Phoenix Web App,我有一个控制器调用一个函数来开始一些处理。此处理可能需要很长时间。
目前,如果我两次调用控制器,这个过程会终止并重新启动(不希望)。
如何确保对该函数的后续调用不会终止或重新启动进程,而只是被忽略?
谷歌搜索似乎建议使用代理,但我无法让它工作。虽然我可以设置一个标志的状态来防止它成为re-run,但是当控制器被第二次调用时,进程仍然会死掉。
问题演示:https://github.com/corynorris/singleprocess
左边的按钮按预期工作,使用 javascript 只是为了 post 到端点 /start。
右边的按钮每次按下都会重新启动进程。
更新: POST 请求,将取消之前的请求,并杀死漫长的 运行ning 进程。 GET 请求根本无法做到这一点,因为它会阻止任何后续请求。这里的解决方案是 运行 进程与主线程分开(异步),这样它就不会在 post 请求被取消时死掉。
为了做到这一点,我修改了代码以使用 GenServer
。我不确定这是否是正确的方法,但我有一个经过以下更改的可行解决方案。
1) 我修改了 my_process.ex
以存储进程状态,并异步启动工作函数(通过 Task.async
)
def handle_call(:start, _from, process_map) do
case Map.get(process_map, :process_running) do
true ->
{:reply, process_map, process_map}
_other ->
Task.async(&do_work/0)
updated_process_map = Map.put(process_map, :process_running, true)
{:reply, updated_process_map, updated_process_map}
end
end
2) 我已经实施 handle_info
以在 Task.async
完成时更新状态:
def handle_info(_, process_map) do
updated_process_map = Map.put(process_map, :process_running, false)
{:noreply, updated_process_map}
end
它通过频道API:
广播状态
SingleProcessWeb.Endpoint.broadcast!("room:notification", "new_msg", %{
uid: 1,
body: status
})
3) 我已经更新 application.ex
以通过以下方式启动该过程:
children = [
SingleProcessWeb.Endpoint,
worker(SingleProcess.MyProcess, [[name: :my_process]])
]
我不确定这是否是最好的方法,但它确实有效,所以我接下来的步骤是将其修改为更通用和抽象的流程实现。
How can I ensure that subsequent calls to the function don't kill or
restart the process and are simply ignored?
怎么样:
defmodule MyProcess do
use Agent
def start_link(_args) do
Agent.start_link(fn -> %{} end, name: :flags)
end
def start() do
#spawn() long running process here
end
def running? do
Agent.get(
:flags,
fn(map) -> Map.get(map, :process_running) end
)
end
def set_running_flag do
Agent.update(
:flags,
fn(map) -> Map.put(map, :process_running, true) end
)
end
end
那么在你的行动中:
def your_action(conn, _params) do
if MyProcess.running?() do
render(this)
else
MyProcess.start()
MyProcess.set_running_flag()
render(that)
end
end
Although I can set the state of a flag to prevent it from being
re-run, the process still dies when the controller is called a second
time.
不要link处理。
This processing might take a significant amount of time
是的,但是您是否需要从流程中得到一些回复,或者可以在没有任何进一步联系的情况下启动流程?
回复评论:
这是我所做的:
(我对此进行了更多修改,以便 Agent 永远不会被杀死,并允许在任何先前的计数器完成时启动新的计数器。但是,如果计数器正在广播,则不允许任何请求开始另一个计数器。)
lib/hello/counter.ex
defmodule Hello.Counter do
def start(count) do
set_counting_flag(true)
spawn(__MODULE__, :publish_count, [count])
end
def publish_count(0) do
set_counting_flag(false)
end
def publish_count(count) do
Process.sleep 1_000
HelloWeb.CountChannel.broadcast_count(count)
publish_count(count-1)
end
def is_counting? do
Agent.get(:my_agent,
fn map -> Map.get(map, :counter_running) end
)
end
def set_counting_flag(bool) do
Agent.update(:my_agent,
fn map ->
Map.update(map,
:counter_running,
bool,
fn _ -> bool end
)
end
)
end
end
lib/hello/my_agent.ex:
defmodule Hello.MyAgent do
use Agent
def start_link(_args) do
Agent.start_link(fn -> %{} end, name: :my_agent)
end
end
lib/hello_web/channels/count_channel.ex:
defmodule HelloWeb.CountChannel do
use Phoenix.Channel
#auth
def join("count:lobby", _msg, socket) do
{:ok, socket}
end
def join("count:" <> _other, _params, _socket) do
{:error, %{reason: "unauthorized"}}
end
def handle_in("new_msg", %{"body" => body}, socket) do
broadcast!(socket, "new_msg", %{body: body})
{:noreply, socket}
end
#You can use a Phoenix function to broadcast directly to an Endpoint:
def broadcast_count(n) do
HelloWeb.Endpoint.broadcast!("count:lobby", "new_msg", %{body: "#{n}"})
end
end
lib/hello_web/channels/user_socket.ex:
defmodule HelloWeb.UserSocket do
use Phoenix.Socket
## Channels
channel "count:*", HelloWeb.CountChannel
# Socket params are passed from the client and can
# be used to verify and authenticate a user. After
# verification, you can put default assigns into
# the socket that will be set for all channels, ie
#
# {:ok, assign(socket, :user_id, verified_user_id)}
#
# To deny connection, return `:error`.
#
# See `Phoenix.Token` documentation for examples in
# performing token verification on connect.
def connect(_params, socket, _connect_info) do
{:ok, socket}
end
# Socket id's are topics that allow you to identify all sockets for a given user:
#
# def id(socket), do: "user_socket:#{socket.assigns.user_id}"
#
# Would allow you to broadcast a "disconnect" event and terminate
# all active sockets and channels for a given user:
#
# HelloWeb.Endpoint.broadcast("user_socket:#{user.id}", "disconnect", %{})
#
# Returning `nil` makes this socket anonymous.
def id(_socket), do: nil
end
lib/hello_web/router.ex:
...
...
scope "/", HelloWeb do
pipe_through :browser
get "/", PageController, :index
get "/count/:count", PageController, :counter
end
# Other scopes may use custom stacks.
# scope "/api", HelloWeb do
# pipe_through :api
# end
end
lib/hello_web/controllers/page_controller.ex:
defmodule HelloWeb.PageController do
use HelloWeb, :controller
def index(conn, _params) do
render(conn, "index.html")
end
def counter(conn, %{"count" => count}) do
if ! Hello.Counter.is_counting? do
{int_part, _rest} = Integer.parse(count)
Hello.Counter.start(int_part)
end
render(conn, "index.html")
end
end
hello/assets/js/socket.js:
// NOTE: The contents of this file will only be executed if
// you uncomment its entry in "assets/js/app.js".
// To use Phoenix channels, the first step is to import Socket,
// and connect at the socket path in "lib/web/endpoint.ex".
//
// Pass the token on params as below. Or remove it
// from the params if you are not using authentication.
import {Socket} from "phoenix"
let socket = new Socket("/socket", {params: {token: window.userToken}})
// When you connect, you'll often need to authenticate the client.
// For example, imagine you have an authentication plug, `MyAuth`,
// which authenticates the session and assigns a `:current_user`.
// If the current user exists you can assign the user's token in
// the connection for use in the layout.
//
// In your "lib/web/router.ex":
//
// pipeline :browser do
// ...
// plug MyAuth
// plug :put_user_token
// end
//
// defp put_user_token(conn, _) do
// if current_user = conn.assigns[:current_user] do
// token = Phoenix.Token.sign(conn, "user socket", current_user.id)
// assign(conn, :user_token, token)
// else
// conn
// end
// end
//
// Now you need to pass this token to JavaScript. You can do so
// inside a script tag in "lib/web/templates/layout/app.html.eex":
//
// <script>window.userToken = "<%= assigns[:user_token] %>";</script>
//
// You will need to verify the user token in the "connect/3" function
// in "lib/web/channels/user_socket.ex":
//
// def connect(%{"token" => token}, socket, _connect_info) do
// # max_age: 1209600 is equivalent to two weeks in seconds
// case Phoenix.Token.verify(socket, "user socket", token, max_age: 1209600) do
// {:ok, user_id} ->
// {:ok, assign(socket, :user, user_id)}
// {:error, reason} ->
// :error
// end
// end
//
// Finally, connect to the socket:
socket.connect()
// Now that you are connected, you can join channels with a topic:
let channel = socket.channel("count:lobby", {})
channel.join()
.receive("ok", resp => { console.log("Joined successfully", resp) })
.receive("error", resp => { console.log("Unable to join", resp) })
let text_input_box = document.querySelector("#msg_to_send")
let msg_div = document.querySelector("#received_messages")
text_input_box.addEventListener("keypress", event => {
let return_key = 13
if (event.keyCode == return_key) {
channel.push("new_msg", {body: text_input_box.value})
text_input_box.value = ""
}
})
channel.on("new_msg", payload => {
let new_msg_div = document.createElement('div')
new_msg_div.innerText = `[${Date()}]: ${payload.body}`
msg_div.appendChild(new_msg_div)
})
export default socket
lib/hello_web/templates/page/index.html
<div id="received_messages"></div>
<input id="msg_to_send" type="text"></input>
lib/hello/application.ex:
defmodule Hello.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
def start(_type, _args) do
# List all child processes to be supervised
children = [
# Start the Ecto repository
Hello.Repo,
# Start the endpoint when the application starts
HelloWeb.Endpoint,
# Starts a worker by calling: Hello.Worker.start_link(arg)
# {Hello.Worker, arg},
Hello.MyAgent #calls Hello.MyAgent.start_link([])
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Hello.Supervisor]
Supervisor.start_link(children, opts)
end
# Tell Phoenix to update the endpoint configuration
# whenever the application is updated.
def config_change(changed, _new, removed) do
HelloWeb.Endpoint.config_change(changed, removed)
:ok
end
end
使用 Phoenix Web App,我有一个控制器调用一个函数来开始一些处理。此处理可能需要很长时间。
目前,如果我两次调用控制器,这个过程会终止并重新启动(不希望)。
如何确保对该函数的后续调用不会终止或重新启动进程,而只是被忽略?
谷歌搜索似乎建议使用代理,但我无法让它工作。虽然我可以设置一个标志的状态来防止它成为re-run,但是当控制器被第二次调用时,进程仍然会死掉。
问题演示:https://github.com/corynorris/singleprocess
左边的按钮按预期工作,使用 javascript 只是为了 post 到端点 /start。
右边的按钮每次按下都会重新启动进程。
更新: POST 请求,将取消之前的请求,并杀死漫长的 运行ning 进程。 GET 请求根本无法做到这一点,因为它会阻止任何后续请求。这里的解决方案是 运行 进程与主线程分开(异步),这样它就不会在 post 请求被取消时死掉。
为了做到这一点,我修改了代码以使用 GenServer
。我不确定这是否是正确的方法,但我有一个经过以下更改的可行解决方案。
1) 我修改了 my_process.ex
以存储进程状态,并异步启动工作函数(通过 Task.async
)
def handle_call(:start, _from, process_map) do
case Map.get(process_map, :process_running) do
true ->
{:reply, process_map, process_map}
_other ->
Task.async(&do_work/0)
updated_process_map = Map.put(process_map, :process_running, true)
{:reply, updated_process_map, updated_process_map}
end
end
2) 我已经实施 handle_info
以在 Task.async
完成时更新状态:
def handle_info(_, process_map) do
updated_process_map = Map.put(process_map, :process_running, false)
{:noreply, updated_process_map}
end
它通过频道API:
广播状态 SingleProcessWeb.Endpoint.broadcast!("room:notification", "new_msg", %{
uid: 1,
body: status
})
3) 我已经更新 application.ex
以通过以下方式启动该过程:
children = [
SingleProcessWeb.Endpoint,
worker(SingleProcess.MyProcess, [[name: :my_process]])
]
我不确定这是否是最好的方法,但它确实有效,所以我接下来的步骤是将其修改为更通用和抽象的流程实现。
How can I ensure that subsequent calls to the function don't kill or restart the process and are simply ignored?
怎么样:
defmodule MyProcess do
use Agent
def start_link(_args) do
Agent.start_link(fn -> %{} end, name: :flags)
end
def start() do
#spawn() long running process here
end
def running? do
Agent.get(
:flags,
fn(map) -> Map.get(map, :process_running) end
)
end
def set_running_flag do
Agent.update(
:flags,
fn(map) -> Map.put(map, :process_running, true) end
)
end
end
那么在你的行动中:
def your_action(conn, _params) do
if MyProcess.running?() do
render(this)
else
MyProcess.start()
MyProcess.set_running_flag()
render(that)
end
end
Although I can set the state of a flag to prevent it from being re-run, the process still dies when the controller is called a second time.
不要link处理。
This processing might take a significant amount of time
是的,但是您是否需要从流程中得到一些回复,或者可以在没有任何进一步联系的情况下启动流程?
回复评论:
这是我所做的:
(我对此进行了更多修改,以便 Agent 永远不会被杀死,并允许在任何先前的计数器完成时启动新的计数器。但是,如果计数器正在广播,则不允许任何请求开始另一个计数器。)
lib/hello/counter.ex
defmodule Hello.Counter do
def start(count) do
set_counting_flag(true)
spawn(__MODULE__, :publish_count, [count])
end
def publish_count(0) do
set_counting_flag(false)
end
def publish_count(count) do
Process.sleep 1_000
HelloWeb.CountChannel.broadcast_count(count)
publish_count(count-1)
end
def is_counting? do
Agent.get(:my_agent,
fn map -> Map.get(map, :counter_running) end
)
end
def set_counting_flag(bool) do
Agent.update(:my_agent,
fn map ->
Map.update(map,
:counter_running,
bool,
fn _ -> bool end
)
end
)
end
end
lib/hello/my_agent.ex:
defmodule Hello.MyAgent do
use Agent
def start_link(_args) do
Agent.start_link(fn -> %{} end, name: :my_agent)
end
end
lib/hello_web/channels/count_channel.ex:
defmodule HelloWeb.CountChannel do
use Phoenix.Channel
#auth
def join("count:lobby", _msg, socket) do
{:ok, socket}
end
def join("count:" <> _other, _params, _socket) do
{:error, %{reason: "unauthorized"}}
end
def handle_in("new_msg", %{"body" => body}, socket) do
broadcast!(socket, "new_msg", %{body: body})
{:noreply, socket}
end
#You can use a Phoenix function to broadcast directly to an Endpoint:
def broadcast_count(n) do
HelloWeb.Endpoint.broadcast!("count:lobby", "new_msg", %{body: "#{n}"})
end
end
lib/hello_web/channels/user_socket.ex:
defmodule HelloWeb.UserSocket do
use Phoenix.Socket
## Channels
channel "count:*", HelloWeb.CountChannel
# Socket params are passed from the client and can
# be used to verify and authenticate a user. After
# verification, you can put default assigns into
# the socket that will be set for all channels, ie
#
# {:ok, assign(socket, :user_id, verified_user_id)}
#
# To deny connection, return `:error`.
#
# See `Phoenix.Token` documentation for examples in
# performing token verification on connect.
def connect(_params, socket, _connect_info) do
{:ok, socket}
end
# Socket id's are topics that allow you to identify all sockets for a given user:
#
# def id(socket), do: "user_socket:#{socket.assigns.user_id}"
#
# Would allow you to broadcast a "disconnect" event and terminate
# all active sockets and channels for a given user:
#
# HelloWeb.Endpoint.broadcast("user_socket:#{user.id}", "disconnect", %{})
#
# Returning `nil` makes this socket anonymous.
def id(_socket), do: nil
end
lib/hello_web/router.ex:
...
...
scope "/", HelloWeb do
pipe_through :browser
get "/", PageController, :index
get "/count/:count", PageController, :counter
end
# Other scopes may use custom stacks.
# scope "/api", HelloWeb do
# pipe_through :api
# end
end
lib/hello_web/controllers/page_controller.ex:
defmodule HelloWeb.PageController do
use HelloWeb, :controller
def index(conn, _params) do
render(conn, "index.html")
end
def counter(conn, %{"count" => count}) do
if ! Hello.Counter.is_counting? do
{int_part, _rest} = Integer.parse(count)
Hello.Counter.start(int_part)
end
render(conn, "index.html")
end
end
hello/assets/js/socket.js:
// NOTE: The contents of this file will only be executed if
// you uncomment its entry in "assets/js/app.js".
// To use Phoenix channels, the first step is to import Socket,
// and connect at the socket path in "lib/web/endpoint.ex".
//
// Pass the token on params as below. Or remove it
// from the params if you are not using authentication.
import {Socket} from "phoenix"
let socket = new Socket("/socket", {params: {token: window.userToken}})
// When you connect, you'll often need to authenticate the client.
// For example, imagine you have an authentication plug, `MyAuth`,
// which authenticates the session and assigns a `:current_user`.
// If the current user exists you can assign the user's token in
// the connection for use in the layout.
//
// In your "lib/web/router.ex":
//
// pipeline :browser do
// ...
// plug MyAuth
// plug :put_user_token
// end
//
// defp put_user_token(conn, _) do
// if current_user = conn.assigns[:current_user] do
// token = Phoenix.Token.sign(conn, "user socket", current_user.id)
// assign(conn, :user_token, token)
// else
// conn
// end
// end
//
// Now you need to pass this token to JavaScript. You can do so
// inside a script tag in "lib/web/templates/layout/app.html.eex":
//
// <script>window.userToken = "<%= assigns[:user_token] %>";</script>
//
// You will need to verify the user token in the "connect/3" function
// in "lib/web/channels/user_socket.ex":
//
// def connect(%{"token" => token}, socket, _connect_info) do
// # max_age: 1209600 is equivalent to two weeks in seconds
// case Phoenix.Token.verify(socket, "user socket", token, max_age: 1209600) do
// {:ok, user_id} ->
// {:ok, assign(socket, :user, user_id)}
// {:error, reason} ->
// :error
// end
// end
//
// Finally, connect to the socket:
socket.connect()
// Now that you are connected, you can join channels with a topic:
let channel = socket.channel("count:lobby", {})
channel.join()
.receive("ok", resp => { console.log("Joined successfully", resp) })
.receive("error", resp => { console.log("Unable to join", resp) })
let text_input_box = document.querySelector("#msg_to_send")
let msg_div = document.querySelector("#received_messages")
text_input_box.addEventListener("keypress", event => {
let return_key = 13
if (event.keyCode == return_key) {
channel.push("new_msg", {body: text_input_box.value})
text_input_box.value = ""
}
})
channel.on("new_msg", payload => {
let new_msg_div = document.createElement('div')
new_msg_div.innerText = `[${Date()}]: ${payload.body}`
msg_div.appendChild(new_msg_div)
})
export default socket
lib/hello_web/templates/page/index.html
<div id="received_messages"></div>
<input id="msg_to_send" type="text"></input>
lib/hello/application.ex:
defmodule Hello.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
def start(_type, _args) do
# List all child processes to be supervised
children = [
# Start the Ecto repository
Hello.Repo,
# Start the endpoint when the application starts
HelloWeb.Endpoint,
# Starts a worker by calling: Hello.Worker.start_link(arg)
# {Hello.Worker, arg},
Hello.MyAgent #calls Hello.MyAgent.start_link([])
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Hello.Supervisor]
Supervisor.start_link(children, opts)
end
# Tell Phoenix to update the endpoint configuration
# whenever the application is updated.
def config_change(changed, _new, removed) do
HelloWeb.Endpoint.config_change(changed, removed)
:ok
end
end