如何在列表中存储自删除期货

How to store self-removing futures in a list

我有一些任务需要异步执行,服务器无法在还有任务时关闭运行。所以我试图将 std::async 返回的期货存储在一个列表中,但我也不想得到一个无限增长的列表。所以我想在完成时删除期货。

以下是我正在尝试做的事情:

// this is a member of the server class
std::list<std::future<void>> pending;

std::list<std::future<void>>::iterator iter = ???;

pending.push_back( std::async( std::launch::async, [iter]()
{
    doSomething();
    pending.remove( iter );
} );

这里,iter需要指向新插入的元素,但是我无法在插入元素之前(没有迭代器)和之后(因为它传递给了lambda按价值)。我可以制作一个 shared_ptr 来存储迭代器,但这似乎太过分了。

有更好的模式吗?

更新: 这似乎还有另一个问题。当 future 试图将自己从列表中删除时,它实际上是在等待自己完成,这会锁定所有内容。糟糕!

最重要的是,列表析构函数在调用元素析构函数之前清空列表。

您似乎可以将默认值 std::future 添加到列表中,获取一个迭代器,然后 将您的未来 移入。

请注意,非互斥保护 remove(iter) 看起来非常危险。

这是一种方法。我认为这个不需要期货:

#include <unordered_set>
#include <condition_variable>
#include <mutex>
#include <thread>

struct server
{
    std::mutex pending_mutex;
    std::condition_variable pending_condition;
    std::unordered_set<unsigned> pending;
    unsigned next_id =  0;

    void add_task()
    {
        auto lock = std::unique_lock(pending_mutex);
        auto id = next_id++;
        auto t = std::thread([this, id]{
            this->doSomething();
            this->notify_complete(id);
        });
        t.detach(); // or we could store it somewhere. e.g. pending could be a map
        pending.insert(id);
    }

    void doSomething();

    void notify_complete(unsigned id)
    {
        auto lock = std::unique_lock(pending_mutex);
        pending.erase(id);
        if (pending.empty())
            pending_condition.notify_all();
    }

    void wait_all_complete()
    {
        auto none_left = [&] { return pending.empty(); };

        auto lock = std::unique_lock(pending_mutex);
        pending_condition.wait(lock, none_left);
    }
};


int main()
{
    auto s = server();
    s.add_task();
    s.add_task();
    s.add_task();
    s.wait_all_complete();
}

这是关于期货的,以防万一:

#include <unordered_map>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>

struct server
{
    std::mutex pending_mutex;
    std::condition_variable pending_condition;
    std::unordered_map<unsigned, std::future<void>> pending;
    unsigned next_id =  0;

    void add_task()
    {
        auto lock = std::unique_lock(pending_mutex);
        auto id = next_id++;
        auto f = std::async(std::launch::async, [this, id]{
            this->doSomething();
            this->notify_complete(id);
        });
        pending.emplace(id, std::move(f));
    }

    void doSomething();

    void notify_complete(unsigned id)
    {
        auto lock = std::unique_lock(pending_mutex);
        pending.erase(id);
        if (pending.empty())
            pending_condition.notify_all();
    }

    void wait_all_complete()
    {
        auto none_left = [&] { return pending.empty(); };

        auto lock = std::unique_lock(pending_mutex);
        pending_condition.wait(lock, none_left);
    }
};


int main()
{
    auto s = server();
    s.add_task();
    s.add_task();
    s.add_task();
    s.wait_all_complete();
}

这是列表版本:

#include <list>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <future>

struct server
{
    using pending_list = std::list<std::future<void>>;
    using id_type = pending_list::const_iterator;

    std::mutex pending_mutex;
    std::condition_variable pending_condition;
    pending_list pending;

    void add_task()
    {
        auto lock = std::unique_lock(pending_mutex);

        // redundant construction
        auto id = pending.emplace(pending.end());
        auto f = std::async(std::launch::async, [this, id]{
            this->doSomething();
            this->notify_complete(id);
        });
        *id = std::move(f);
    }

    void doSomething();

    void notify_complete(id_type id)
    {
        auto lock = std::unique_lock(pending_mutex);
        pending.erase(id);
        if (pending.empty())
            pending_condition.notify_all();
    }

    void wait_all_complete()
    {
        auto none_left = [&] { return pending.empty(); };

        auto lock = std::unique_lock(pending_mutex);
        pending_condition.wait(lock, none_left);
    }
};


int main()
{
    auto s = server();
    s.add_task();
    s.add_task();
    s.add_task();
    s.wait_all_complete();
}