从线程池工作线程使用时 GetQueuedCompletionStatus 的奇怪行为

Strange behaviour of GetQueuedCompletionStatus when used from thread pool worker threads

我一直在测试将 IO 完成端口与线程池中的工作线程相结合,但偶然发现了一种我无法解释的行为。特别是以下代码:

  int data;
  for (int i = 0; i < NUM; ++i)
      PostQueuedCompletionStatus(cp, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data));

  {
      std::thread t([&] ()
      {
            LPOVERLAPPED aux;
            DWORD        cmd;
            ULONG_PTR    key;

            for (int i = 0; i < NUM; ++i)
            {
              if (!GetQueuedCompletionStatus(cp, &cmd, &key, &aux, 0))
                break;
              ++count;
            }
      });

      t.join();
   }

工作正常并接收 NUM 状态通知(NUM 是一个大数字,100000 或更多),使用线程池工作对象的类似代码读取每个工作项目的一个状态通知并在阅读后重新发布工作项目它在阅读了数百个状态通知后失败了。具有以下全局变量(请不要介意名称):

HANDLE cport;
PTP_POOL pool;
TP_CALLBACK_ENVIRON env;
PTP_WORK work;
std::size_t num_calls;
std::mutex mutex;
std::condition_variable cv; 
bool job_done;

和回调函数:

static VOID CALLBACK callback(PTP_CALLBACK_INSTANCE instance_, PVOID pv_, PTP_WORK work_)
{
  LPOVERLAPPED aux;
  DWORD        cmd;
  ULONG_PTR    key;

  if (GetQueuedCompletionStatus(cport, &cmd, &key, &aux, 0))
  {
    ++num_calls;
    SubmitThreadpoolWork(work);
  }
  else
  {
    std::unique_lock<std::mutex> l(mutex);
    std::cout << "No work after " << num_calls << " calls.\n";
    job_done = true;
    cv.notify_one();
  }
}

以下代码:

{
  job_done = false;
  std::unique_lock<std::mutex> l(mutex);

  num_calls = 0;
  cport = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);

  pool = CreateThreadpool(nullptr);
  InitializeThreadpoolEnvironment(&env);
  SetThreadpoolCallbackPool(&env, pool);

  work = CreateThreadpoolWork(callback, nullptr, &env);

  for (int i = 0; i < NUM; ++i)
      PostQueuedCompletionStatus(cport, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data));

  SubmitThreadpoolWork(work);
  cv.wait_for(l, std::chrono::milliseconds(10000), [] { return job_done; } );
}
尽管 NUM 设置为 1000000,但

会在 250 次左右调用 GetQueuedCompletionStatus 后报告 "No more work after ..."。更奇怪的是,将等待时间从 0 设置为 10 毫秒会增加成功调用的次数到几十万,偶尔会阅读所有 1000000 条通知。我不太明白,因为所有状态通知都是在第一次提交工作对象之前发布的。

有没有可能是完成端口和线程池的组合真的有问题,还是我的代码有问题?请不要深入探讨我为什么要这样做 - 我正在调查各种可能性并偶然发现了这一点。在我看来,它应该可以工作,但不知道出了什么问题。谢谢。

我试过 运行 这段代码,问题似乎是提供给 CreateIoCompletionPortNumberOfConcurrentThreads 参数。传递 1 意味着执行 callback 的第一个池线程与 io 完成端口相关联,但由于线程池可能使用不同的线程 GetQueuedCompletionStatus 执行 callback,因此发生这种情况时将失败。 From documentation:

The most important property of an I/O completion port to consider carefully is the concurrency value. The concurrency value of a completion port is specified when it is created with CreateIoCompletionPort via the NumberOfConcurrentThreads parameter. This value limits the number of runnable threads associated with the completion port. When the total number of runnable threads associated with the completion port reaches the concurrency value, the system blocks the execution of any subsequent threads associated with that completion port until the number of runnable threads drops below the concurrency value.

Although any number of threads can call GetQueuedCompletionStatus for a specified I/O completion port, when a specified thread calls GetQueuedCompletionStatus the first time, it becomes associated with the specified I/O completion port until one of three things occurs: The thread exits, specifies a different I/O completion port, or closes the I/O completion port. In other words, a single thread can be associated with, at most, one I/O completion port.

因此,要将 io 完成与线程池一起使用,您需要将并发线程数设置为线程池的大小(您可以使用 SetThreadpoolThreadMaximum 进行设置)。

::DWORD const threads_count{1};

cport = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, threads_count);
...
pool = ::CreateThreadpool(nullptr);
::SetThreadpoolThreadMaximum(pool, threads_count);