在 std::thread 中提升 asio 阻塞操作而不是使用异步方法?
Boost asio blocking operations in a std::thread instead of using async methods?
所以我一直在努力在 Boost.Asio 上创建一些抽象层。我想以原子方式处理某些批次的操作,例如 tcp::resolver::resolve()
和 asio::connect()
。如果我同时使用这两者的异步版本,代码会变得非常糟糕,因为我必须 "chain" 回调。本质上:
- 用户调用我的
Connect()
包装器方法,该方法采用主机和服务字符串,它们还提供连接完成时调用的回调。
- 使用主机和服务字符串参数调用
resolver::async_resolve()
。绑定用户回调到resolve的回调(传递连接后调用的回调)
- 从解析回调,如果成功,调用
asio::async_connect()
。同样,将用户的回调绑定到连接回调。
- 在连接回调中,如果成功,调用用户的回调
这要么是因为大量嵌套的 lambda 令人讨厌,要么是因为函数是分开的,现在我有一个 class 加载了样板。在我看来,做这样的事情会简单得多(没有编译,所以现在就把它当作伪代码):
using ConnectCallback = std::function<void(std::shared_ptr<tcp::socket>)>;
void Connect(std::string const& host, std::string const& service, ConnectCallback cb)
{
std::thread{[this, host, service, cb{std::move(cb)}]
{
std::shared_ptr<tcp::socket> socket;
try
{
tcp::resolver r{m_context};
auto endpoints = r.resolve(host, service);
socket = std::make_shared<tcp::socket>(m_context);
asio::connect(*socket, endpoints);
}
catch (std::exception const&)
{
// either resolve or connect failed / timed out
}
cb(std::move(socket));
}}.detach();
}
对我来说这要简单得多,至少对于启动连接来说是这样,因为我不必担心那么多回调。唯一的缺点是我不确定如何使用此方法处理超时情况。我在 Google 上找到的所有与超时相关的解决方案都需要使用 async_
方法。
是否推荐这样做,还是我必须坚持使用异步方法?如果是后者,我可以使用哪些技术来简化回调链样板文件?
如果编写处理程序对您来说很烦人,您可以考虑使用 coroutines。它适用于异步操作并使您能够实现超时。
struct Client2 {
Client2(asio::io_context& io)
: io(io) {}
asio::io_context& io;
asio::ip::tcp::resolver resolver{io};
asio::ip::tcp::socket sock{io};
asio::high_resolution_timer timer{io};
atomic_bool stopped{false};
void connect (const string& host, const string& service, int timeoutMs)
{
boost::asio::spawn(io,std::bind(&Client2::establishConnection,this,host,service,timeoutMs,std::placeholders::_1));
boost::asio::spawn(io,std::bind(&Client2::doTimeout,this,std::placeholders::_1));
}
void establishConnection (string host, string service, int timeoutMs,boost::asio::yield_context yield)
{
try {
timer.expires_after(std::chrono::milliseconds(timeoutMs)); // set timeout
auto res = resolver.async_resolve(host,service,yield);
// resume here when handler for resolving was called
if (stopped)
return;
asio::async_connect(sock,res,yield);
timer.cancel(); // connection is established, do sth with sock here, cancel timer
}
catch (std::exception& ex) {
}
}
void doTimeout (boost::asio::yield_context yield)
{
try {
timer.async_wait(yield); // throw exception when was canceled by startConnecting
}
catch (std::exception& ex) {
return;
}
resolver.cancel(); // timeout == timer expired, so cancel resolving and close socket
sock.close();
stopped = true;
}
};
// in main
asio::io_context io;
Client2 client{io};
client.connect("localhost","5444",200);
thread th([&](){ io.run(); }); // call run from at least 2 threads
io.run(); // establishConnection and doTimeout can be executed concurrently
th.join();
我在代码中添加了一些注释。
简而言之:使用了两个协程。在 establishConnection
中执行了两个异步操作:async_resolve
和 async_connect
。在 doTimeout
协程 timer
中启动。当计时器在建立连接之前到期时,我们取消解析并关闭套接字。如果在计时器过期之前建立了连接,我们取消计时器,我们可以用 sock
.
执行一些操作
establishConnection
和 doTimeout
的主体可以移动到 lambda 中,作为 asio::spawn
函数的参数。所以我们可以只有一个成员函数,并且没有用于执行 3 个异步操作的代码的处理程序。如果这让您满意,请开始使用协程。
所以我一直在努力在 Boost.Asio 上创建一些抽象层。我想以原子方式处理某些批次的操作,例如 tcp::resolver::resolve()
和 asio::connect()
。如果我同时使用这两者的异步版本,代码会变得非常糟糕,因为我必须 "chain" 回调。本质上:
- 用户调用我的
Connect()
包装器方法,该方法采用主机和服务字符串,它们还提供连接完成时调用的回调。 - 使用主机和服务字符串参数调用
resolver::async_resolve()
。绑定用户回调到resolve的回调(传递连接后调用的回调) - 从解析回调,如果成功,调用
asio::async_connect()
。同样,将用户的回调绑定到连接回调。 - 在连接回调中,如果成功,调用用户的回调
这要么是因为大量嵌套的 lambda 令人讨厌,要么是因为函数是分开的,现在我有一个 class 加载了样板。在我看来,做这样的事情会简单得多(没有编译,所以现在就把它当作伪代码):
using ConnectCallback = std::function<void(std::shared_ptr<tcp::socket>)>;
void Connect(std::string const& host, std::string const& service, ConnectCallback cb)
{
std::thread{[this, host, service, cb{std::move(cb)}]
{
std::shared_ptr<tcp::socket> socket;
try
{
tcp::resolver r{m_context};
auto endpoints = r.resolve(host, service);
socket = std::make_shared<tcp::socket>(m_context);
asio::connect(*socket, endpoints);
}
catch (std::exception const&)
{
// either resolve or connect failed / timed out
}
cb(std::move(socket));
}}.detach();
}
对我来说这要简单得多,至少对于启动连接来说是这样,因为我不必担心那么多回调。唯一的缺点是我不确定如何使用此方法处理超时情况。我在 Google 上找到的所有与超时相关的解决方案都需要使用 async_
方法。
是否推荐这样做,还是我必须坚持使用异步方法?如果是后者,我可以使用哪些技术来简化回调链样板文件?
如果编写处理程序对您来说很烦人,您可以考虑使用 coroutines。它适用于异步操作并使您能够实现超时。
struct Client2 {
Client2(asio::io_context& io)
: io(io) {}
asio::io_context& io;
asio::ip::tcp::resolver resolver{io};
asio::ip::tcp::socket sock{io};
asio::high_resolution_timer timer{io};
atomic_bool stopped{false};
void connect (const string& host, const string& service, int timeoutMs)
{
boost::asio::spawn(io,std::bind(&Client2::establishConnection,this,host,service,timeoutMs,std::placeholders::_1));
boost::asio::spawn(io,std::bind(&Client2::doTimeout,this,std::placeholders::_1));
}
void establishConnection (string host, string service, int timeoutMs,boost::asio::yield_context yield)
{
try {
timer.expires_after(std::chrono::milliseconds(timeoutMs)); // set timeout
auto res = resolver.async_resolve(host,service,yield);
// resume here when handler for resolving was called
if (stopped)
return;
asio::async_connect(sock,res,yield);
timer.cancel(); // connection is established, do sth with sock here, cancel timer
}
catch (std::exception& ex) {
}
}
void doTimeout (boost::asio::yield_context yield)
{
try {
timer.async_wait(yield); // throw exception when was canceled by startConnecting
}
catch (std::exception& ex) {
return;
}
resolver.cancel(); // timeout == timer expired, so cancel resolving and close socket
sock.close();
stopped = true;
}
};
// in main
asio::io_context io;
Client2 client{io};
client.connect("localhost","5444",200);
thread th([&](){ io.run(); }); // call run from at least 2 threads
io.run(); // establishConnection and doTimeout can be executed concurrently
th.join();
我在代码中添加了一些注释。
简而言之:使用了两个协程。在 establishConnection
中执行了两个异步操作:async_resolve
和 async_connect
。在 doTimeout
协程 timer
中启动。当计时器在建立连接之前到期时,我们取消解析并关闭套接字。如果在计时器过期之前建立了连接,我们取消计时器,我们可以用 sock
.
establishConnection
和 doTimeout
的主体可以移动到 lambda 中,作为 asio::spawn
函数的参数。所以我们可以只有一个成员函数,并且没有用于执行 3 个异步操作的代码的处理程序。如果这让您满意,请开始使用协程。