GCC 和 MSVC 之间 boost::asio::io_service 行为的差异:无法取消已发布的作业

Difference in boost::asio::io_service behaviour between GCC and MSVC: cannot cancel posted jobs

在使用 boost::asio::io_service 实现基本线程池时,我观察到停止 io_service.

时排队任务的处理方式存在一些差异

在 MSVC 14 (MS Visual Studio 2015) 上,出于某种原因,在停止 io_service 时未启动的排队任务不会被丢弃,但仍然 运行。当 运行 在 Ubuntu 16.04 (GCC 5.4.0) 上执行此操作时,这些任务将被删除。

如果您愿意或使用下面链接的在线编译器,我已经简化并清理了原始测试并将它们放在 single file (also listed below) which only depends on boost and uses some sleeps to demonstrate the problem. You can build it with the CMakeLists.txt(也列在下面)中。
请注意,线程池仅使用一个工作线程,因此作业 运行 顺序进行。

GCC 的输出符合预期 (Here on an online compiler):

 checkAllWorkIsProcessedBeforeDestruction  
     passed.  
     passed.  
     passed.  
checkWorkCanBeCancelled
     passed.
     passed.
     passed.
checkWorkCanBeInterrupted
     passed.
     passed.
     passed.
checkUninterruptableWorkIsNotInterruptedButCanBeDropped
     passed.
     passed.
     passed.
     passed.

这是 MSVC 14 (Visual Studio 2015) (Here on an online VC++ compiler) 上的输出:

checkAllWorkIsProcessedBeforeDestruction
     passed.
     passed.
     passed.
checkWorkCanBeCancelled
     Error: functor 1 call expected: false current: true
     Error: functor 2 call expected: false current: true
     Error: running time expected: 150 current: 402
checkWorkCanBeInterrupted
     passed.
     passed.
     passed.
checkUninterruptableWorkIsNotInterruptedButCanBeDropped
     passed.
     Error: functor 2 call expected: false current: true
     passed.
     Error: running time expected: 250 current: 404

我是不是做错了什么?

我也填写了一个错误来提升但到目前为止没有得到回应:#13317


源代码:ThreadPoolTests.cpp

// Copyright (c) 2017 Diego Barrios Romero <eldruin@gmail.com>

#include <iostream>
#include <string>
#include <memory>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/chrono.hpp>

class ThreadPool
{
public:
  ThreadPool(const size_t threadCount = boost::thread::hardware_concurrency())
    : work(new boost::asio::io_service::work(service))
  {
    for (size_t i = 0; i < threadCount; ++i)
    {
      threads.create_thread(boost::bind(&boost::asio::io_service::run, &service));
    }
  }
  template<typename FunctionType>
  void post(FunctionType f)
  {
    service.post(f);
  }

  void interrupt()
  {
    threads.interrupt_all();
  }

  void cancel()
  {
    work.reset();
    service.stop();
  }

  ~ThreadPool()
  {
    work.reset();
    threads.join_all();
  }
private:
  boost::asio::io_service service;
  boost::thread_group threads;
  std::unique_ptr<boost::asio::io_service::work> work;
};


struct Functor
{
  void operator()()
  {
    boost::this_thread::sleep(boost::posix_time::milliseconds(200));
    boost::lock_guard<boost::mutex> lock(mutex);
    wasCalled_ = true;
  }
  bool wasCalled() const
  {
    boost::lock_guard<boost::mutex> lock(mutex);
    return wasCalled_;
  }

private:
  bool wasCalled_ = false;
  mutable boost::mutex mutex;
};

struct UninterruptableFunctor : public Functor
{
  void operator()()
  {
    boost::this_thread::disable_interruption disableInterruptions;
    Functor::operator()();
  }
};

template<typename F, typename T1, typename T2>
void check(F compare, T1 expected, T2 current, const std::string& msg)
{
  if (compare(expected, current))
  {
    std::cout << "\tpassed." << std::endl;
  }
  else
  {
    std::cout << std::boolalpha
              << "\tError: " << msg << " expected: " << expected
              << " current: " << current << std::endl;
  }
}

struct ThreadPoolTest
{
  boost::int_least64_t getRunningTimeInMS() const
  {
    auto executionTime = boost::chrono::high_resolution_clock::now() - start;
    return boost::chrono::duration_cast<boost::chrono::milliseconds>(executionTime).count();    
  }

  template<typename FunctorType, typename F>
  void runTest(F f, bool shouldFunctor1BeCalled, bool shouldFunctor2BeCalled)
  {
    FunctorType functor1, functor2;
    {
      ThreadPool pool(1);
      pool.post(boost::bind(&FunctorType::operator(), &functor1));
      pool.post(boost::bind(&FunctorType::operator(), &functor2));
      f(pool);
    }

    auto eq = [](bool a, bool b) { return a == b; };
    check(eq, shouldFunctor1BeCalled, functor1.wasCalled(), "functor 1 call");
    check(eq, shouldFunctor2BeCalled, functor2.wasCalled(), "functor 2 call");
  }

private:
  boost::chrono::high_resolution_clock::time_point start = boost::chrono::high_resolution_clock::now();
};

void doNothing(ThreadPool&) { }
void cancel(ThreadPool& pool)
{
  pool.cancel();
}
void waitForStartThenInterruptThenCancel(ThreadPool& pool)
{
  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  pool.interrupt();
  pool.cancel();
}

bool lessEq (const boost::int_least64_t a, const boost::int_least64_t b) { return a <= b; }
bool greaterEq (const boost::int_least64_t a, const boost::int_least64_t b) { return a >= b; }

void checkAllWorkIsProcessedBeforeDestruction()
{
  ThreadPoolTest test;
  std::cout << "checkAllWorkIsProcessedBeforeDestruction\n";
  test.runTest<Functor>(doNothing, true, true);
  check(lessEq, 350, test.getRunningTimeInMS(), "running time");
}

void checkWorkCanBeCancelled()
{
  ThreadPoolTest test;
  std::cout << "checkWorkCanBeCancelled\n";
  test.runTest<Functor>(cancel, false, false);
  check(greaterEq, 150, test.getRunningTimeInMS(), "running time");
}

void checkWorkCanBeInterrupted()
{
  ThreadPoolTest test;
  std::cout << "checkWorkCanBeInterrupted\n";
  test.runTest<Functor>(waitForStartThenInterruptThenCancel, false, false);
  check(greaterEq, 150, test.getRunningTimeInMS(), "running time");
}

void checkUninterruptableWorkIsNotInterruptedButCanBeDropped()
{
  ThreadPoolTest test;
  std::cout << "checkUninterruptableWorkIsNotInterruptedButCanBeDropped\n";
  test.runTest<UninterruptableFunctor>(waitForStartThenInterruptThenCancel, true, false);
  check(lessEq, 150, test.getRunningTimeInMS(), "running time");
  check(greaterEq, 250, test.getRunningTimeInMS(), "running time");
}

int main(int, char*[])
{
  checkAllWorkIsProcessedBeforeDestruction();
  checkWorkCanBeCancelled();
  checkWorkCanBeInterrupted();
  checkUninterruptableWorkIsNotInterruptedButCanBeDropped();
}

这里的CMakeLists.txt是为了编译方便。

cmake_minimum_required (VERSION 2.8.11)
project (ThreadPoolTests)

set(CMAKE_CXX_STANDARD 11)

find_package(Boost COMPONENTS thread)
if (Boost_FOUND)
  include_directories(${Boost_INCLUDE_DIR})
else()
  message(FATAL_ERROR "No Boost found")
endif()

add_executable (ThreadPoolTests ThreadPoolTests.cpp)
target_link_libraries(ThreadPoolTests ${Boost_LIBRARIES})

又是这个问题:我是不是做错了什么?

起初我看到你的测试有问题。

让我们先从那个开始。

取消,色情测试

ThreadPool::cancel 做了两件事:

  • 重置work(允许io_service::run()完成)
  • stop 服务

问题是,重置 work 不会影响任何正在进行的工作。 stop 也没有。因此,任何已发布的工作都将完成。

您获得 "expected" 行为的唯一情况是您 stop io_service 甚至在池中的第一个线程启动之前 运行正在发布的任务之一。

事实上,您 似乎 在 GCC 上始终获得这种行为是侥幸。实际上这是一场比赛,通过在 cancel() 之前添加最小的延迟很容易证明这一点。

事实上,即使按原样使用代码,重复 运行s 也会显示虚假故障,例如:

checkWorkCanBeCancelled
    Error: functor 1 call expected: false actual: true
    passed (functor 2 call).
    Error: running time expected: 150 actual: 200

启用处理程序跟踪确认只有一个处理程序被放弃:

即使只是启用地址清理程序也会使故障几乎总是可以重现。

深入挖掘:io_service::stop

如果这是一场竞赛,并且测试仅限于单个服务线程,那么微妙的竞赛 window 肯定不会大到足以让第二个任务 运行 即使 io_service::stop() 被调用了吗?

我在这方面遇到了困难,所以我用更多的定时采样来检测 Functor 函数。让我们也记录一个任务的调用,这样我们就可以区分从未开始的任务和只是没有完成的任务:

struct Functor {
    void operator()() {
        {
            boost::lock_guard<boost::mutex> lock(mutex);
            state_.invocation = Clock::now();
        }
        boost::this_thread::sleep(boost::posix_time::milliseconds(200));
        {
            boost::lock_guard<boost::mutex> lock(mutex);
            state_.completion = Clock::now();
        }
    }

    struct State {
        TP start, invocation, completion;

        friend std::ostream& operator<<(std::ostream& os, State const& s) {
            return os << "[" << relative(s.invocation, s.start) << "," << relative(s.completion, s.start) << "]";
        }
    };

    State sample() const {
        boost::lock_guard<boost::mutex> lock(mutex);
        return state_;
    }

  private:
    State state_ = { TP::min(), TP::min(), TP::min() };
    mutable boost::mutex mutex;
};


struct Functor {
    void operator()() {
        {
            boost::lock_guard<boost::mutex> lock(mutex);
            state_.wasInvoked = true;
        }
        boost::this_thread::sleep(boost::posix_time::milliseconds(200));
        {
            boost::lock_guard<boost::mutex> lock(mutex);
            state_.wasCompleted = true;
        }
    }

    struct State {
        bool wasInvoked, wasCompleted;

        friend std::ostream& operator<<(std::ostream& os, State const& s) {
            if (s.wasInvoked && s.wasCompleted) return os << "[invoked,completed]";
            if (s.wasInvoked) return os << "[invoked]";
            assert(!s.wasCompleted);
            return os << "[]";
        }
    };

    State sample() const {
        boost::lock_guard<boost::mutex> lock(mutex);
        return state_;
    }

  private:
    State state_ = { false, false };
    mutable boost::mutex mutex;
};

现在,runTests 可以扩展以保持所有任务的计时以及 actions/pool 关闭的计时:

struct ThreadPoolTest {
    boost::int_least64_t getRunningTimeInMS() const { return relative(Clock::now(), start); }

    template <typename FunctorType, typename ScenarioAction>
    void runTest(ScenarioAction action, bool shouldFunctor1BeCalled, bool shouldFunctor2BeCalled) {
        struct Task {
            std::string name;
            Task(std::string name) : name(name) {}

            FunctorType functor;
            Functor::State before, after, final_;
        } tasks[] = { {"functor1"}, {"functor2"} };

        TP before_action, after_action, pool_final;

        {
            ThreadPool pool(1);
            for (auto& task : tasks) pool.post(std::ref(task.functor));

            for (auto& task : tasks) task.before = task.functor.sample();

            before_action = Clock::now();
            action(pool);
            after_action = Clock::now();

            for (auto& task : tasks) task.after = task.functor.sample();
        }

        pool_final = Clock::now();
        for (auto& task : tasks) task.final_ = task.functor.sample();

        // aids in pretty printing
        for (auto& task : tasks) for (auto sample : { &Task::before, &Task::after, &Task::final_ }) {
            (task.*sample).start = start;
        }

        for (auto& task : tasks)
            std::cout << "DEBUG: " << task.name << " before:" << task.before << " after:" << task.after << " final:" << task.final_ << "\n";
        std::cout << "DEBUG: pool/action before:" << relative(before_action, start) << " after:" << relative(after_action, start) << " final:" << relative(pool_final, start) << "\n";

        check(std::equal_to<>{}, shouldFunctor1BeCalled, is_set(tasks[0].final_.completion), "functor 1 call");
        check(std::equal_to<>{}, shouldFunctor2BeCalled, is_set(tasks[1].final_.completion), "functor 2 call");
    }

  private:
    TP start = Clock::now();
};

我们的 GCC 运行 打印:

checkWorkCanBeCancelled
DEBUG: functor1 before:[-1,-1] after:[-1,-1] final:[0,200]
DEBUG: functor2 before:[-1,-1] after:[-1,-1] final:[-1,-1]
DEBUG: pool/action before:0 after:0 final:200
    Error: functor 1 call expected: false actual: true
    passed (functor 2 call).
    Error: running time expected: 150 actual: 200

它表明在我们的服务线程调用 functor1 的同时调用了操作函数(在本例中为 cancel)。 functor2 从未被调用过。

On MSVC,同样测试打印:

checkWorkCanBeCancelled
DEBUG: functor1 before:[-1,-1] after:[-1,-1] final:[2,198]
DEBUG: functor2 before:[-1,-1] after:[-1,-1] final:[198,401]
DEBUG: pool/action before:0 after:0 final:404
    Error: functor 1 call expected: false actual: true
    Error: functor 2 call expected: false actual: true
    Error: running time expected: 150 actual: 405

与 GCC 一样,cancel 操作 运行 在时间 0 毫秒,但是 st运行gely 这两个任务完成,即使它们在 after[=112] 之后被调用=] 那个动作 运行.

This indicates that on Windows, Asio will not prevent existing tasks being dispatched to threads if io_service::stop() is invoked. Increasing the load to 9 tasks shows consistent results:

DEBUG: functor1 before:[-1,-1] after:[-1,-1] final:[2,195]
DEBUG: functor2 before:[-1,-1] after:[-1,-1] final:[195,398]
DEBUG: functor3 before:[-1,-1] after:[-1,-1] final:[399,602]
DEBUG: functor4 before:[-1,-1] after:[-1,-1] final:[602,821]
DEBUG: functor5 before:[-1,-1] after:[-1,-1] final:[821,1024]
DEBUG: functor6 before:[-1,-1] after:[-1,-1] final:[1024,1228]
DEBUG: functor7 before:[-1,-1] after:[-1,-1] final:[1228,1431]
DEBUG: functor8 before:[-1,-1] after:[-1,-1] final:[1431,1634]
DEBUG: functor9 before:[-1,-1] after:[-1,-1] final:[1634,1837]
DEBUG: pool/action before:0 after:0 final:1838

中断

中断在 Linux/GCC and MSVC.

上工作正常

不间断

最后一个场景实际上与第二个场景相同(因为任务不受中断影响)。

总结:

在 windows 上,io_service::stop() 的行为与文档相矛盾,因此这将是一个错误:

这是一个更简单的复制器:

#include <boost/asio.hpp>
#include <thread>
#include <iostream>
using namespace std::chrono_literals;

int main() {
    boost::asio::io_service s;

    s.post([] { std::this_thread::sleep_for(5ms); std::cout << "1\n"; });
    s.post([] { std::this_thread::sleep_for(5ms); std::cout << "2\n"; });
    s.post([] { std::this_thread::sleep_for(5ms); std::cout << "3\n"; });

    std::thread th([&] { s.run(); });

    std::this_thread::sleep_for(1ms);
    s.stop();

    th.join();
}

在 GCC 上打印 1,在 MSVC 上打印 1 2 3