异步似乎不使用多线程 C++
async appears to not use multiple threads C++
我的异步调用在串行执行中似乎 运行.. 让我相信我在代码中做的事情不正确或者.. 可能不理解某些事情
在高层次上,我接收了 100,000 个左右的字符串,使用它们执行的查询速率限制为每个连接每秒 1 个。我从 4 个服务连接开始,并希望同时使用所有 4 个。 .等待他们所有的答案都回来然后合并结果
Main.cpp
struct DataFetcher {
DataFetcher(cpool::ConnectionPool& tws_conn_pool): conn_pool(&conn_pool){
std::cout << "DataFetcher() this->conn_pool == "<< this->conn_pool << std::endl;
}
std::vector<std::string> process_data(std::vector<std::string> sub_batch_vector){
std::for_each(sub_batch_vector.begin(), sub_batch_vector.end(),[](std::string query_str){
std::cout << query_str << ";";
std::this_thread::sleep_for (std::chrono::seconds(1));
});
cpool::ConnectionPool::ConnectionProxy proxy_conn = this->my_conn_pool->get_connection();
proxy_conn->is_healthy();
// will do more stuff later here
return sub_batch_vector;
}
cpool::ConnectionPool * const tws_conn_pool;
};
在循环中使用此异步调用的正确方法是什么。将有可变数量的可用 ServiceConnections,所以我想做这样的事情
std::vector<std::future<std::vector<std::string>>> results_vector;
for (int i=0 ; i < this->tws_conn_pool->size()-1; i++) {
std::vector<std::string> subvector = {queries_list.begin() + (batch_size*i), queries_list.begin() + (batch_size*(i+1))};
DataFetcher fetcher = DataFetcher(*this->conn_pool);
auto fut = std::async(std::launch::async, &DataFetcher::process_data, &fetcher, subvector);
results_vector.push_back(fut);
}
但这看起来很迟钝,实际上无法编译..
from /Server/cpp_server/src/_server.cpp:1:
/usr/include/c++/9/ext/new_allocator.h: In instantiation of ‘void __gnu_cxx::new_allocator<_Tp>::construct(_Up*, _Args&& ...) [with _Up = std::future<std::vector<std::__cxx11::basic_string<char> > >; _Args = {const std::future<std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > >&}; _Tp = std::future<std::vector<std::__cxx11::basic_string<char> > >]’:
/usr/include/c++/9/bits/alloc_traits.h:482:2: required from ‘static void std::allocator_traits<std::allocator<_CharT> >::construct(std::allocator_traits<std::allocator<_CharT> >::allocator_type&, _Up*, _Args&& ...) [with _Up = std::future<std::vector<std::__cxx11::basic_string<char> > >; _Args = {const std::future<std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > >&}; _Tp = std::future<std::vector<std::__cxx11::basic_string<char> > >; std::allocator_traits<std::allocator<_CharT> >::allocator_type = std::allocator<std::future<std::vector<std::__cxx11::basic_string<char> > > >]’
/usr/include/c++/9/bits/stl_vector.h:1189:30: required from ‘void std::vector<_Tp, _Alloc>::push_back(const value_type&) [with _Tp = std::future<std::vector<std::__cxx11::basic_string<char> > >; _Alloc = std::allocator<std::future<std::vector<std::__cxx11::basic_string<char> > > >; std::vector<_Tp, _Alloc>::value_type = std::future<std::vector<std::__cxx11::basic_string<char> > >]’
/Server/cpp_server/src/_server.cpp:175:35: required from here
/usr/include/c++/9/ext/new_allocator.h:145:20: error: use of deleted function ‘std::future<_Res>::future(const std::future<_Res>&) [with _Res = std::vector<std::__cxx11::basic_string<char> >]’
145 | noexcept(noexcept(::new((void *)__p)
| ^~~~~~~~~~~~~~~~~~
146 | _Up(std::forward<_Args>(__args)...)))
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In file included from /Server/cpp_server/src/_server.cpp:4:
/usr/include/c++/9/future:782:7: note: declared here
782 | future(const future&) = delete;
| ^~~~~~
make[2]: *** [CMakeFiles/_server.dir/build.make:63: CMakeFiles/_server.dir/src/_server.cpp.o] Error 1
make[1]: *** [CMakeFiles/Makefile2:76: CMakeFiles/_server.dir/all] Error 2
让数据获取器通过指针获取其池。获取引用并立即获取其地址是愚蠢的。
将 fut
移动到向量中。期货不可复制,只能移动。
将fetcher
传递给按值异步。 Pointers/references 局部变量不应传递给异步。
将 subvector
移动到 async
。效率。
销毁异步任务时异步块产生的未来(或移动到目的地)。
我的异步调用在串行执行中似乎 运行.. 让我相信我在代码中做的事情不正确或者.. 可能不理解某些事情
在高层次上,我接收了 100,000 个左右的字符串,使用它们执行的查询速率限制为每个连接每秒 1 个。我从 4 个服务连接开始,并希望同时使用所有 4 个。 .等待他们所有的答案都回来然后合并结果
Main.cpp
struct DataFetcher {
DataFetcher(cpool::ConnectionPool& tws_conn_pool): conn_pool(&conn_pool){
std::cout << "DataFetcher() this->conn_pool == "<< this->conn_pool << std::endl;
}
std::vector<std::string> process_data(std::vector<std::string> sub_batch_vector){
std::for_each(sub_batch_vector.begin(), sub_batch_vector.end(),[](std::string query_str){
std::cout << query_str << ";";
std::this_thread::sleep_for (std::chrono::seconds(1));
});
cpool::ConnectionPool::ConnectionProxy proxy_conn = this->my_conn_pool->get_connection();
proxy_conn->is_healthy();
// will do more stuff later here
return sub_batch_vector;
}
cpool::ConnectionPool * const tws_conn_pool;
};
在循环中使用此异步调用的正确方法是什么。将有可变数量的可用 ServiceConnections,所以我想做这样的事情
std::vector<std::future<std::vector<std::string>>> results_vector;
for (int i=0 ; i < this->tws_conn_pool->size()-1; i++) {
std::vector<std::string> subvector = {queries_list.begin() + (batch_size*i), queries_list.begin() + (batch_size*(i+1))};
DataFetcher fetcher = DataFetcher(*this->conn_pool);
auto fut = std::async(std::launch::async, &DataFetcher::process_data, &fetcher, subvector);
results_vector.push_back(fut);
}
但这看起来很迟钝,实际上无法编译..
from /Server/cpp_server/src/_server.cpp:1:
/usr/include/c++/9/ext/new_allocator.h: In instantiation of ‘void __gnu_cxx::new_allocator<_Tp>::construct(_Up*, _Args&& ...) [with _Up = std::future<std::vector<std::__cxx11::basic_string<char> > >; _Args = {const std::future<std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > >&}; _Tp = std::future<std::vector<std::__cxx11::basic_string<char> > >]’:
/usr/include/c++/9/bits/alloc_traits.h:482:2: required from ‘static void std::allocator_traits<std::allocator<_CharT> >::construct(std::allocator_traits<std::allocator<_CharT> >::allocator_type&, _Up*, _Args&& ...) [with _Up = std::future<std::vector<std::__cxx11::basic_string<char> > >; _Args = {const std::future<std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > >&}; _Tp = std::future<std::vector<std::__cxx11::basic_string<char> > >; std::allocator_traits<std::allocator<_CharT> >::allocator_type = std::allocator<std::future<std::vector<std::__cxx11::basic_string<char> > > >]’
/usr/include/c++/9/bits/stl_vector.h:1189:30: required from ‘void std::vector<_Tp, _Alloc>::push_back(const value_type&) [with _Tp = std::future<std::vector<std::__cxx11::basic_string<char> > >; _Alloc = std::allocator<std::future<std::vector<std::__cxx11::basic_string<char> > > >; std::vector<_Tp, _Alloc>::value_type = std::future<std::vector<std::__cxx11::basic_string<char> > >]’
/Server/cpp_server/src/_server.cpp:175:35: required from here
/usr/include/c++/9/ext/new_allocator.h:145:20: error: use of deleted function ‘std::future<_Res>::future(const std::future<_Res>&) [with _Res = std::vector<std::__cxx11::basic_string<char> >]’
145 | noexcept(noexcept(::new((void *)__p)
| ^~~~~~~~~~~~~~~~~~
146 | _Up(std::forward<_Args>(__args)...)))
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In file included from /Server/cpp_server/src/_server.cpp:4:
/usr/include/c++/9/future:782:7: note: declared here
782 | future(const future&) = delete;
| ^~~~~~
make[2]: *** [CMakeFiles/_server.dir/build.make:63: CMakeFiles/_server.dir/src/_server.cpp.o] Error 1
make[1]: *** [CMakeFiles/Makefile2:76: CMakeFiles/_server.dir/all] Error 2
让数据获取器通过指针获取其池。获取引用并立即获取其地址是愚蠢的。
将 fut
移动到向量中。期货不可复制,只能移动。
将fetcher
传递给按值异步。 Pointers/references 局部变量不应传递给异步。
将 subvector
移动到 async
。效率。
销毁异步任务时异步块产生的未来(或移动到目的地)。