安全地取消 boost asio 截止时间计时器

Cancelling boost asio deadline timer safely

我正在尝试安全地取消 boost::asio::basic_waitable_timer<std::chrono::steady_clock>

根据这个 ,这段代码应该可以完成这项工作:

timer.get_io_service().post([&]{timer.cancel();})

恐怕它对我不起作用。
我做错了什么吗?
这是我的代码:

#include <iostream>
#include "boost/asio.hpp"
#include <chrono>
#include <thread>
#include <random>

boost::asio::io_service io_service;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer(io_service);
std::atomic<bool> started;

void handle_timeout(const boost::system::error_code& ec)
{
    if (!ec) {
        started = true;
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout\n";
        timer.expires_from_now(std::chrono::milliseconds(10));
        timer.async_wait(&handle_timeout);
    } else if (ec == boost::asio::error::operation_aborted) {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout aborted\n";
    } else {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout another error\n";
    }
}

int main() {

    std::cout << "tid: " << std::this_thread::get_id() << ", Hello, World!" << std::endl;
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(1, 100);

    for (auto i = 0; i < 1000; i++) {

        started = false;
        std::thread t([&](){

            timer.expires_from_now(std::chrono::milliseconds(0));
            timer.async_wait(&handle_timeout);

            io_service.run();
        });

        while (!started) {};
        auto sleep = dis(gen);
        std::cout << "tid: " << std::this_thread::get_id() << ", i: " << i << ", sleeps for " << sleep << " [ms]" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
        timer.get_io_service().post([](){
            std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post\n";
            timer.cancel();
        });
//      timer.cancel();
        std::cout << "tid: " << std::this_thread::get_id() << ", i: " << i << ", waiting for thread to join()" << std::endl;
        t.join();
        io_service.reset();
    }

    return 0;
}

这是输出:

...
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
tid: 140737353967488, i: 2, waiting for thread to join()
tid: 140737335076608, cancelling in post
tid: 140737335076608, handle_timeout aborted
tid: 140737353967488, i: 3, sleeps for 21 [ms]
tid: 140737335076608, handle_timeout
tid: 140737353967488, i: 3, waiting for thread to join()
tid: 140737335076608, handle_timeout
tid: 140737335076608, cancelling in post
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
tid: 140737335076608, handle_timeout
...
continue forever...

如您所见,timer.cancel() 正在从适当的线程调用:

tid: 140737335076608, cancelling in post

但是没有

tid: 140737335076608, handle_timeout aborted

之后。

Main 永远等待。

取消 安全。

只是不够稳健。当计时器未挂起时,您没有考虑到这种情况。然后,您取消它一次,但一旦调用完成处理程序,它只会启动一个新的异步等待。

以下是我如何跟踪问题的详细步骤。

SUMMARY TL;DR

Cancelling a time only cancels asynchronous operations in flight.

If you want to shutdown an asynchronous call chain, you'll have to use additional logic for that. An example is given below.

处理程序跟踪

启用

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1

这会产生可以用 boost/libs/asio/tools/handlerviz.pl:

可视化的输出

成功跟踪

如您所见,取消发生时 async_wait 正在飞行中。

一条"bad" 轨迹

(t运行cated 因为它会 运行 无限)

请注意完成处理程序如何看待 cc=system:0,而不是 cc=system:125(对于 operation_aborted)。这是发布的取消实际上没有 "take" 这一事实的症状。唯一合乎逻辑的解释(在图中不可见)是计时器在调用取消之前已经过期。

让我们比较一下原始痕迹¹

¹ 消除噪声差异

正在检测

所以,我们有领先优势。我们能检测到吗?

    timer.get_io_service().post([](){
        std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post\n";
        if (timer.expires_from_now() >= std::chrono::steady_clock::duration(0)) {
            timer.cancel();
        } else {
            std::cout << "PANIC\n";
            timer.cancel();
        }
    });

打印:

tid: 140113177143232, i: 0, waiting for thread to join()
tid: 140113177143232, i: 1, waiting for thread to join()
tid: 140113177143232, i: 2, waiting for thread to join()
tid: 140113177143232, i: 3, waiting for thread to join()
tid: 140113177143232, i: 4, waiting for thread to join()
tid: 140113177143232, i: 5, waiting for thread to join()
tid: 140113177143232, i: 6, waiting for thread to join()
tid: 140113177143232, i: 7, waiting for thread to join()
tid: 140113177143232, i: 8, waiting for thread to join()
tid: 140113177143232, i: 9, waiting for thread to join()
tid: 140113177143232, i: 10, waiting for thread to join()
tid: 140113177143232, i: 11, waiting for thread to join()
tid: 140113177143232, i: 12, waiting for thread to join()
tid: 140113177143232, i: 13, waiting for thread to join()
tid: 140113177143232, i: 14, waiting for thread to join()
tid: 140113177143232, i: 15, waiting for thread to join()
tid: 140113177143232, i: 16, waiting for thread to join()
tid: 140113177143232, i: 17, waiting for thread to join()
tid: 140113177143232, i: 18, waiting for thread to join()
tid: 140113177143232, i: 19, waiting for thread to join()
tid: 140113177143232, i: 20, waiting for thread to join()
tid: 140113177143232, i: 21, waiting for thread to join()
tid: 140113177143232, i: 22, waiting for thread to join()
tid: 140113177143232, i: 23, waiting for thread to join()
tid: 140113177143232, i: 24, waiting for thread to join()
tid: 140113177143232, i: 25, waiting for thread to join()
tid: 140113177143232, i: 26, waiting for thread to join()
PANIC

我们可以用另一种更清晰的方式传达 "super-cancellation" 吗?当然,我们只有 timer 对象可以使用:

信号关闭

timer 对象没有太多可用的属性。没有 close() 或类似的东西,比如在套接字上,可用于将计时器置于某种无效状态。

但是有过期时间点,我们可以使用一个特殊的域名 为我们的应用程序发出 "invalid" 信号的值:

timer.get_io_service().post([](){
    std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post\n";
    // also cancels:
    timer.expires_at(Timer::clock_type::time_point::min());
});

这个"special value"在完成处理器中很容易处理:

void handle_timeout(const boost::system::error_code& ec)
{
    if (!ec) {
        started = true;
        if (timer.expires_at() != Timer::time_point::min()) {
            timer.expires_from_now(std::chrono::milliseconds(10));
            timer.async_wait(&handle_timeout);
        } else {
            std::cerr << "handle_timeout: detected shutdown\n";
        }
    } 
    else if (ec != boost::asio::error::operation_aborted) {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout error " << ec.message() << "\n";
    }
}