Boost ASIO:向所有连接的客户端发送消息
Boost ASIO: Send message to all connected clients
我正在从事一个涉及 boost::beast
websocket/http 混合服务器的项目,该服务器在 boost::asio
之上运行。我的项目主要基于 advanced_server.cpp
示例源。
它工作正常,但现在我正在尝试添加一项功能,需要向 所有 连接的客户端发送消息。
我对 boost::asio
不是很熟悉,但现在我看不出有什么方法可以产生类似 "broadcast" 的事件(如果这是正确的术语)。
我天真的方法是看看我是否可以让 websocket_session()
的构造附加事件侦听器之类的东西,然后析构函数分离侦听器。那时,我可以触发事件,并让所有当前有效的 websocket 会话(websocket_session()
的生命周期范围内)执行回调。
有 ,它通过(ab)使用 boost::asio::steady_timer
或多或少地完成了我想要的,但这似乎是一种可怕的 hack 来完成本应非常简单的事情.
基本上,给定一个有状态的 boost::asio
服务器,我该如何对多个连接进行操作?
首先:您可以广播 UDP,但不能广播给连接的客户端。那只是...UDP。
其次,link 展示了如何在 Asio 中拥有类似 condition-variable(事件)的界面。那只是你问题的一小部分。您忘记了大局:您需要以某种方式了解打开的连接集:
- 例如将会话指针 (
weak_ptr
) 的容器保存到每个连接
- 每个连接订阅一个信号槽(例如 Boost Signals)。
选项 1. 非常适合性能,选项 2. 更适合灵活性(将事件源与订阅者解耦,使异构订阅者成为可能,例如不来自连接)。
因为我认为选项 1. 对线程处理 w.r.t 更简单,更好 w.r.t。效率(例如,您可以在不复制的情况下从一个缓冲区为所有客户端提供服务)并且您可能不需要双重解耦 signal/slots,让我参考一个答案,我已经展示了纯 Asio(没有 Beast) :
它展示了 "connection pool" 的概念 - 它本质上是一个 thread-safe 容器 weak_ptr<connection>
具有一些垃圾收集逻辑的对象。
演示:介绍回声服务器
在 chatting about things 之后我想花时间实际演示这两种方法,所以我在说什么完全清楚。
首先让我们展示一个简单的,run-of-the 具有
的 mill 异步 TCP 服务器
- 有多个并发连接
- 每个连接的会话都从客户端读取 line-by-line,并将相同的内容回显给客户端
- 3秒后停止接受,并在最后一个客户端断开连接后退出
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
private:
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
session->start();
if (!ec)
accept_loop();
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(3s);
s.stop(); // active connections will continue
th.join();
}
方法 1. 添加广播消息
因此,让我们添加 "broadcast messages" 以同时发送到所有活动连接。我们添加两个:
- 每个新连接一个(说 "Player ## has entered the game")
一个模拟全局的 "server event",就像你在问题中描述的那样)。它从 main:
中触发
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";
注意我们是如何通过注册一个弱指针到每个已接受的连接并在每个连接上进行操作来做到这一点的:
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
broadcast
也直接从 main
使用,简单地说:
size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}
using-asio-post
branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}
private:
using connptr = std::shared_ptr<connection>;
using weakptr = std::weak_ptr<connection>;
std::mutex _mx;
std::vector<weakptr> _registered;
size_t reg_connection(weakptr wp) {
std::lock_guard<std::mutex> lk(_mx);
_registered.push_back(wp);
return _registered.size();
}
template <typename F>
size_t for_each_active(F f) {
std::vector<connptr> active;
{
std::lock_guard<std::mutex> lk(_mx);
for (auto& w : _registered)
if (auto c = w.lock())
active.push_back(c);
}
for (auto& c : active) {
std::cout << "(running action for " << c->_s.remote_endpoint() << ")" << std::endl;
f(*c);
}
return active.size();
}
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";
std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue
th.join();
}
方法 2:广播但带有增强信号2
信号方法是 Dependency Inversion 的一个很好的例子。
最突出的注意事项:
- 信号槽在调用它的线程上被调用 ("raising the event")
- 那里有
scoped_connection
,因此当 connection
被销毁时,订阅会 *自动 删除
- 从 "reached # active connections" 到 "reached # active subscribers" 有 subtle difference in the wording of the console message。
The difference is key to understanding the added flexibility: the signal owner/invoker does not know anything about the subscribers. That's the decoupling/dependency inversion we're talking about
using-signals2
branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
#include <boost/signals2.hpp>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
boost::signals2::scoped_connection _subscription;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
size_t broadcast(std::string const& msg) {
_broadcast_event(msg);
return _broadcast_event.num_slots();
}
private:
boost::signals2::signal<void(std::string const& msg)> _broadcast_event;
size_t reg_connection(connection& c) {
c._subscription = _broadcast_event.connect(
[&c](std::string msg){ c.send(msg, true); }
);
return _broadcast_event.num_slots();
}
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(*session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active subscribers\n";
std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue
th.join();
}
See the diff between Approach 1. and 2.: Compare View on github
运行 3 个并发客户端的输出示例:
(for a in {1..3}; do netcat localhost 6767 < /etc/dictionaries-common/words > echoed.$a& sleep .1; done; time wait)
@sehe 的回答太棒了,所以我会简短一点。一般来说,要实现对所有活动连接进行操作的算法,您必须执行以下操作:
维护活动连接列表。如果这个列表被多个线程访问,它将需要同步(std::mutex
)。应将新连接插入列表,当连接被破坏或变为非活动状态时,应将其从列表中删除。
要迭代列表,如果列表被多个线程访问(即不止一个线程调用 asio::io_context::run
,或者如果列表也被多个线程访问,则需要同步不调用 asio::io_context::run
)
在迭代期间,如果算法需要检查或修改任何连接的状态,并且该状态可以被其他线程更改,则需要额外的同步。这包括连接对象存储的任何内部 "queue" 消息。
同步连接对象的一种简单方法是使用boost::asio::post
提交一个函数以在连接对象的上下文中执行,这将是一个显式链(boost::asio::strand
,如在高级服务器示例中一样)或隐式链(当只有一个线程调用 io_context::run
时得到的结果)。 @sehe 提供的方法 1 使用 post
以这种方式进行同步。
另一种同步connection对象的方法是"stop the world." 也就是调用io_context::stop
,等待所有线程退出,然后保证没有其他线程正在访问连接列表。然后你可以随心所欲地读写连接对象状态。完成连接列表后,调用 io_context::restart
并再次启动调用 io_context::run
的线程。停止 io_context
不会停止网络 activity,内核和网络驱动程序仍然从内部缓冲区发送和接收数据。 TCP/IP 流量控制将处理事情,因此应用程序仍然可以平稳运行,即使它在 "stop the world." 期间变得短暂无响应这种方法可以简化事情,但根据您的特定应用程序,您必须评估它是否是适合你。
希望对您有所帮助!
感谢@sehe 的精彩回答。不过,我认为方法 2 中存在一个小而严重的错误。恕我直言 reg_connection
应该如下所示:
size_t reg_connection(std::shared_ptr<connection> c) {
c->_subscription = _broadcast_event.connect(
[weak_c = std::weak_ptr<connection>(c)](std::string msg){
if(auto c = weak_c.lock())
c->send(msg, true);
}
);
return _broadcast_event.num_slots();
}
否则,您可能会遇到导致服务器崩溃的竞争条件。如果在调用 lambda 期间连接实例被破坏,引用将变得无效。
同样 connection#send()
应该看起来像这样,否则 this
可能会在调用 lambda 时死掉:
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(),
[self=shared_from_this(), msg=std::move(msg), at_front] {
if (self->enqueue(std::move(msg), at_front))
self->write_loop();
});
}
PS:我本来可以将此作为对@sehe 的回答的评论发布的,但不幸的是我没有足够的声誉。
我正在从事一个涉及 boost::beast
websocket/http 混合服务器的项目,该服务器在 boost::asio
之上运行。我的项目主要基于 advanced_server.cpp
示例源。
它工作正常,但现在我正在尝试添加一项功能,需要向 所有 连接的客户端发送消息。
我对 boost::asio
不是很熟悉,但现在我看不出有什么方法可以产生类似 "broadcast" 的事件(如果这是正确的术语)。
我天真的方法是看看我是否可以让 websocket_session()
的构造附加事件侦听器之类的东西,然后析构函数分离侦听器。那时,我可以触发事件,并让所有当前有效的 websocket 会话(websocket_session()
的生命周期范围内)执行回调。
有 ,它通过(ab)使用 boost::asio::steady_timer
或多或少地完成了我想要的,但这似乎是一种可怕的 hack 来完成本应非常简单的事情.
基本上,给定一个有状态的 boost::asio
服务器,我该如何对多个连接进行操作?
首先:您可以广播 UDP,但不能广播给连接的客户端。那只是...UDP。
其次,link 展示了如何在 Asio 中拥有类似 condition-variable(事件)的界面。那只是你问题的一小部分。您忘记了大局:您需要以某种方式了解打开的连接集:
- 例如将会话指针 (
weak_ptr
) 的容器保存到每个连接 - 每个连接订阅一个信号槽(例如 Boost Signals)。
选项 1. 非常适合性能,选项 2. 更适合灵活性(将事件源与订阅者解耦,使异构订阅者成为可能,例如不来自连接)。
因为我认为选项 1. 对线程处理 w.r.t 更简单,更好 w.r.t。效率(例如,您可以在不复制的情况下从一个缓冲区为所有客户端提供服务)并且您可能不需要双重解耦 signal/slots,让我参考一个答案,我已经展示了纯 Asio(没有 Beast) :
它展示了 "connection pool" 的概念 - 它本质上是一个 thread-safe 容器 weak_ptr<connection>
具有一些垃圾收集逻辑的对象。
演示:介绍回声服务器
在 chatting about things 之后我想花时间实际演示这两种方法,所以我在说什么完全清楚。
首先让我们展示一个简单的,run-of-the 具有
的 mill 异步 TCP 服务器- 有多个并发连接
- 每个连接的会话都从客户端读取 line-by-line,并将相同的内容回显给客户端
- 3秒后停止接受,并在最后一个客户端断开连接后退出
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
private:
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
session->start();
if (!ec)
accept_loop();
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(3s);
s.stop(); // active connections will continue
th.join();
}
方法 1. 添加广播消息
因此,让我们添加 "broadcast messages" 以同时发送到所有活动连接。我们添加两个:
- 每个新连接一个(说 "Player ## has entered the game")
一个模拟全局的 "server event",就像你在问题中描述的那样)。它从 main:
中触发std::this_thread::sleep_for(1s); auto n = s.broadcast("random global event broadcast\n"); std::cout << "Global event broadcast reached " << n << " active connections\n";
注意我们是如何通过注册一个弱指针到每个已接受的连接并在每个连接上进行操作来做到这一点的:
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
broadcast
也直接从 main
使用,简单地说:
size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}
using-asio-post
branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}
private:
using connptr = std::shared_ptr<connection>;
using weakptr = std::weak_ptr<connection>;
std::mutex _mx;
std::vector<weakptr> _registered;
size_t reg_connection(weakptr wp) {
std::lock_guard<std::mutex> lk(_mx);
_registered.push_back(wp);
return _registered.size();
}
template <typename F>
size_t for_each_active(F f) {
std::vector<connptr> active;
{
std::lock_guard<std::mutex> lk(_mx);
for (auto& w : _registered)
if (auto c = w.lock())
active.push_back(c);
}
for (auto& c : active) {
std::cout << "(running action for " << c->_s.remote_endpoint() << ")" << std::endl;
f(*c);
}
return active.size();
}
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";
std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue
th.join();
}
方法 2:广播但带有增强信号2
信号方法是 Dependency Inversion 的一个很好的例子。
最突出的注意事项:
- 信号槽在调用它的线程上被调用 ("raising the event")
- 那里有
scoped_connection
,因此当connection
被销毁时,订阅会 *自动 删除 - 从 "reached # active connections" 到 "reached # active subscribers" 有 subtle difference in the wording of the console message。
The difference is key to understanding the added flexibility: the signal owner/invoker does not know anything about the subscribers. That's the decoupling/dependency inversion we're talking about
using-signals2
branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
#include <boost/signals2.hpp>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
boost::signals2::scoped_connection _subscription;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
size_t broadcast(std::string const& msg) {
_broadcast_event(msg);
return _broadcast_event.num_slots();
}
private:
boost::signals2::signal<void(std::string const& msg)> _broadcast_event;
size_t reg_connection(connection& c) {
c._subscription = _broadcast_event.connect(
[&c](std::string msg){ c.send(msg, true); }
);
return _broadcast_event.num_slots();
}
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(*session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active subscribers\n";
std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue
th.join();
}
See the diff between Approach 1. and 2.: Compare View on github
运行 3 个并发客户端的输出示例:
(for a in {1..3}; do netcat localhost 6767 < /etc/dictionaries-common/words > echoed.$a& sleep .1; done; time wait)
@sehe 的回答太棒了,所以我会简短一点。一般来说,要实现对所有活动连接进行操作的算法,您必须执行以下操作:
维护活动连接列表。如果这个列表被多个线程访问,它将需要同步(
std::mutex
)。应将新连接插入列表,当连接被破坏或变为非活动状态时,应将其从列表中删除。要迭代列表,如果列表被多个线程访问(即不止一个线程调用
asio::io_context::run
,或者如果列表也被多个线程访问,则需要同步不调用asio::io_context::run
)在迭代期间,如果算法需要检查或修改任何连接的状态,并且该状态可以被其他线程更改,则需要额外的同步。这包括连接对象存储的任何内部 "queue" 消息。
同步连接对象的一种简单方法是使用
boost::asio::post
提交一个函数以在连接对象的上下文中执行,这将是一个显式链(boost::asio::strand
,如在高级服务器示例中一样)或隐式链(当只有一个线程调用io_context::run
时得到的结果)。 @sehe 提供的方法 1 使用post
以这种方式进行同步。另一种同步connection对象的方法是"stop the world." 也就是调用
io_context::stop
,等待所有线程退出,然后保证没有其他线程正在访问连接列表。然后你可以随心所欲地读写连接对象状态。完成连接列表后,调用io_context::restart
并再次启动调用io_context::run
的线程。停止io_context
不会停止网络 activity,内核和网络驱动程序仍然从内部缓冲区发送和接收数据。 TCP/IP 流量控制将处理事情,因此应用程序仍然可以平稳运行,即使它在 "stop the world." 期间变得短暂无响应这种方法可以简化事情,但根据您的特定应用程序,您必须评估它是否是适合你。
希望对您有所帮助!
感谢@sehe 的精彩回答。不过,我认为方法 2 中存在一个小而严重的错误。恕我直言 reg_connection
应该如下所示:
size_t reg_connection(std::shared_ptr<connection> c) {
c->_subscription = _broadcast_event.connect(
[weak_c = std::weak_ptr<connection>(c)](std::string msg){
if(auto c = weak_c.lock())
c->send(msg, true);
}
);
return _broadcast_event.num_slots();
}
否则,您可能会遇到导致服务器崩溃的竞争条件。如果在调用 lambda 期间连接实例被破坏,引用将变得无效。
同样 connection#send()
应该看起来像这样,否则 this
可能会在调用 lambda 时死掉:
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(),
[self=shared_from_this(), msg=std::move(msg), at_front] {
if (self->enqueue(std::move(msg), at_front))
self->write_loop();
});
}
PS:我本来可以将此作为对@sehe 的回答的评论发布的,但不幸的是我没有足够的声誉。