io_service::poll_one 非确定性行为

io_service::poll_one non-deterministic behaviour

在下面的代码中,我希望输出始终为 1,因为我希望在调用 poll_one() 时只有一个处理程序到 运行。然而,大约300次,输出一次实际上是3。根据我对boost库的理解,这似乎是不正确的。非确定性行为是错误还是预期的?

#include <boost/asio.hpp>

int main() {
  boost::asio::io_service io;
  boost::asio::io_service::work io_work(io);
  boost::asio::io_service::strand strand1(io);
  boost::asio::io_service::strand strand2(io);
  int val = 0;

  strand1.post([&val, &strand2]() {
    val = 1;
    strand2.post([&val]() {
      val = 2;
    });
    boost::asio::spawn(strand2, [&val](boost::asio::yield_context yield) {
      val = 3;
    });
  });

  io.poll_one();
  std::cout << "Last executed: " << val << std::endl;

  return 0;
}

使用 boost-asio 1.60.0.6

观察到的行为已明确定义并预计会发生,但不应期望它经常发生。

Asio 的 strand 实现池有限,strand 的默认分配策略是散列。如果发生哈希冲突,两条链将使用相同的实现。当发生散列冲突时,示例简化为以下 demo:

#include <cassert>
#include <boost/asio.hpp>

int main()
{
  boost::asio::io_service io_service;
  boost::asio::io_service::strand strand1(io_service);
  // Have strand2 use the same implementation as strand1.
  boost::asio::io_service::strand strand2(strand1);

  int value = 0;
  auto handler1 = [&value, &strand1, &strand2]() {
    assert(strand1.running_in_this_thread());
    assert(strand2.running_in_this_thread());
    value = 1;

    // handler2 is queued into strand and never invoked.
    auto handler2 = [&value]() { assert(false); };
    strand2.post(handler2);

    // handler3 is immediately executed.
    auto handler3 = [&value]() { value = 3; };
    strand2.dispatch(handler3);
    assert(value == 3);
  };

  // Enqueue handler1.
  strand1.post(handler1);

  // Run the event processing loop, executing handler1.
  assert(io_service.poll_one() == 1);
}

在上面的例子中:

  • io_service.poll_one() 执行单个就绪处理程序 (handler1)
  • handler2 从未被调用
  • handler3strand2.dispatch() 中立即调用,因为 strand2.dispatch() 是从 strand2.running_in_this_thread() returns true[= 的处理程序中调用的141=]

观察到的行为有多种细节:

  • io_service::poll_one() 将 运行 io_service 的事件循环并且不会阻塞,它最多会执行一个准备好 运行 的处理程序。在 dispatch() 的上下文中立即执行的处理程序永远不会排入 io_service,并且不受 poll_one() 调用单个处理程序的限制。

  • boost::asio::spawn(strand, function) 重载通过 strand.dispatch():

    启动堆栈协程 as-if
    • if strand.running_in_this_thread() returns false 为调用者,则协程将被发布到 strand 用于延迟调用
    • if strand.running_in_this_thread() returns true 为调用者,则协程将立即执行
  • 使用相同实现的离散 strand 对象仍然保持链的保证。即,不会发生并发执行,并且 order of handler invocation is well defined. When discrete strand objects are using discrete implementations, and multiple threads are running the io_service, then one may observe the discrete strands executing concurrently. However, when discrete strand objects use the same implementation, one will not observe concurrency even if multiple threads are running the io_service. This behavior is documented:

    The implementation makes no guarantee that handlers posted or dispatched through different strand objects will be invoked concurrently.

  • Asio 的 strand 实现池有限。当前默认值为 193 并且可以通过将 BOOST_ASIO_STRAND_IMPLEMENTATIONS 定义为所需的数字来控制。此功能已在 Boost.Asio 1.48 release notes

    中注明

    Made the number of strand implementations configurable by defining BOOST_ASIO_STRAND_IMPLEMENTATIONS to the desired number.

    通过减小池大小,可以增加两个离散链使用相同实现的机会。使用原始代码,如果将池大小设置为 1,则 strand1strand2 将始终使用相同的实现,导致 val 始终为 3 (demo).

  • 分配链实现的默认策略是使用黄金比率哈希。由于使用了散列算法,因此有可能发生冲突,导致相同的实现被用于多个离散的 strand 对象。通过定义 BOOST_ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION,可以将分配策略更改为循环,防止在 BOOST_ASIO_STRAND_IMPLEMENTATIONS + 1 链分配发生之前发生冲突。 Boost.Asio 1.48 发行说明中记录了此功能:

    Added support for a new BOOST_ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION flag which switches the allocation of strand implementations to use a round-robin approach rather than hashing.

鉴于上述细节,在原始代码中观察到 1 时会发生以下情况:

  • strand1strand2 有离散的实现
  • io_service::poll_one() 执行直接发布到 strand1
  • 中的单个处理程序
  • 发布到 strand1 的处理程序将 val 设置为 1
  • 发布到 strand2 的处理程序已入队且从未被调用
  • 协程的创建被推迟,因为 strand 的调用顺序保证会阻止在发布到 strand2 的前一个处理程序执行之后创建协程:

    given a strand object s, if s.post(a) happens-before s.dispatch(b), where the latter is performed outside the strand, then asio_handler_invoke(a1, &a1) happens-before asio_handler_invoke(b1, &b1).

另一方面,当观察到 3 时:

  • strand1strand2 发生哈希冲突,导致它们使用相同的底层链实现
  • io_service::poll_one() 执行直接发布到 strand1
  • 中的单个处理程序
  • 发布到 strand1 的处理程序将 val 设置为 1
  • 发布到 strand2 的处理程序已入队且从未被调用
  • boost::asio::spawn()内立即创建并调用协程,将val设置为3,因为strand2可以安全地执行协程,同时保持非-处理程序调用的并发执行和顺序