如何在不轮询的情况下实现std::when_any?

How to implement std::when_any without polling?

考虑http://en.cppreference.com/w/cpp/experimental/when_any。以下只是朴素简化实现:

#include <future>

template<typename Iterator>
auto when_any(Iterator first, Iterator last)
{
    while (true)
    {
        for (auto pos = first; pos != last; ++pos)
        {
            if (pos->is_ready())
            {
                return std::move(*pos);
            }
        }
    }
}

我不满意,因为它是一个无限循环的繁忙轮询。

有没有办法避免忙轮询?

不是真的,没有延续的期货的用处非常有限。

如果您被迫这样做并使用 std::future,我建议通过 .wait_for() with increasing timeouts 进行更智能的轮询。

无轮询版本将为每个未来启动 1 个线程,并让它们设置一个条件变量,未来就绪。

然后你 "leak" 线程直到期货准备就绪,同时返回一个准备就绪的事实。

这很糟糕。但是没有轮询。

为了做得更好,您需要有一个可以设置(理想情况下可以删除)的延续的未来。然后你只要让期货在完成时通知你,然后等待。这需要修改或编写自己的未来。

这是延续和 when_any 都被提议标准化的原因之一。以后你需要它们。

现在,如果您有自己的系统,您可以将其基于线程安全队列来传递内容而不是通过条件变量实现的期货。这需要在 "future" 创建时进行合作。

struct many_waiter_t {
  std::mutex m;
  std::condition_variable cv;
  std::vector<std::size_t> index;

  std::size_t wait() {
    auto l = lock();
    cv.wait(l, [this]{
      return !index.empty();
    });
    auto r = index.back();
    index.pop_back();
    return r;
  }
  void set( std::size_t N ) {
    {
      auto l = lock();
      index.push_back(N);
    }
    cv.notify_one();
  }
};
template<class T>
std::future<T> add_waiter( std::future<T> f, std::size_t i, std::shared_ptr<many_waiter_t> waiter )
{
  return std::async([f = std::move(f), waiter, i]{
    auto r = f.get();
    waiter.set(i);
    return r;
  });
}

使用一个 futures 数组 fs,我们可以生成一个新的 futures 数组 f2s 和一个等待者,这样等待者可以非自旋锁等待,直到未来准备好,而 f2s 对应于原来的 fs.

您可以在 waiter 上重复等待,直到 f2s 都准备就绪。

I have posted an implementation of when_any over on CodeReview. 正如 Yakk 在他的回答中所说,

To do better, you need to have a future with a continuation you can set (and remove ideally). Then you just ask the futures to notify you when done, then wait. This requires modifying or writing your own future.

所以我的实现依赖于future::then(),它的要点是:

template<class... Futures>
struct when_any_shared_state {
    promise<tuple<Futures...>> m_promise;
    tuple<Futures...> m_tuple;
    std::atomic<bool> m_done;
    std::atomic<bool> m_count_to_two;

    when_any_shared_state(promise<tuple<Futures...>> p) :
        m_promise(std::move(p)), m_done(false), m_count_to_two(false) {}
};

template<class... Futures>
auto when_any(Futures... futures) -> future<tuple<Futures...>>
{
    using shared_state = detail::when_any_shared_state<Futures...>;
    using R = tuple<Futures...>;
    promise<R> p;
    future<R> result = p.get_future();

    auto sptr = make_shared<shared_state>(std::move(p));
    auto satisfy_combined_promise =
        [sptr](auto f) {
            if (sptr->m_done.exchange(true) == false) {
                if (sptr->m_count_to_two.exchange(true)) {
                    sptr->m_promise.set_value(std::move(sptr->m_tuple));
                }
            }
            return f.get();
        };
    sptr->m_tuple = tuple<Futures...>(futures.then(satisfy_combined_promise)...);
    if (sptr->m_count_to_two.exchange(true)) {
        sptr->m_promise.set_value(std::move(sptr->m_tuple));
    }
    return result;
}

您为每个传入的 future 附加一个延续(使用 then)。此延续将 shared_ptr 保存到共享状态。共享状态包含一个计数到一 (m_done) 和一个计数到二 (m_count_to_two)。执行的每个延续都会将计数递增到一;如果它是赢家,它也会将计数增加到二。在完成所有这些设置后,主线程还将计数增加到二。一旦 count-to-two 达到 2(表示主线程完成设置 并且 至少执行了一个延续),我们将调用 set_value on对应于 when_any 的 return 未来的承诺。哒哒!