RXCPP:阻塞函数超时
RXCPP: Timeout on blocking function
考虑一个阻塞函数:this_thread::sleep_for(milliseconds(3000));
我正在尝试获得以下行为:
Trigger Blocking Function
|---------------------------------------------X
我想触发拦截功能,如果时间太长(超过两秒),应该会超时。
我做了以下事情:
my_connection = observable<>::create<int>([](subscriber<int> s) {
auto s2 = observable<>::just(1, observe_on_new_thread()) |
subscribe<int>([&](auto x) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
});
}) |
timeout(seconds(2), observe_on_new_thread());
我无法让它工作。对于初学者,我认为 s 不能 on_next 来自不同的线程。
所以我的问题是,正确的反应方式是什么?如何在 rxcpp 中包装阻塞函数并为其添加超时?
随后,我想获得一个行为如下的 RX 流:
Trigger Cleanup
|------------------------X
(Delay) Trigger Cleanup
|-----------------X
好问题!以上已经很接近了。
这里有一个如何让阻塞操作适配 rxcpp 的例子。它 libcurl polling 发出 http 请求。
以下应该符合您的预期。
auto sharedThreads = observe_on_event_loop();
auto my_connection = observable<>::create<int>([](subscriber<int> s) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
s.on_completed();
}) |
subscribe_on(observe_on_new_thread()) |
//start_with(0) | // workaround bug in timeout
timeout(seconds(2), sharedThreads);
//skip(1); // workaround bug in timeout
my_connection.as_blocking().subscribe(
[](int){},
[](exception_ptr ep){cout << "timed out" << endl;}
);
subscribe_on
将在专用线程上 运行 create
,因此 create
可以阻塞该线程。
timeout
将 运行 计时器放在不同的线程上,可以与其他人共享,并传输所有 on_next
/on_error
/on_completed
调用同一个线程。
as_blocking
将确保 subscribe
在完成之前不会 return。这仅用于防止 main()
退出 - 通常在测试或示例程序中。
编辑:为 timeout
中的错误添加了解决方法。目前,它不会安排第一个超时,直到第一个值到达。
EDIT-2:timeout
错误已修复,不再需要解决方法。
考虑一个阻塞函数:this_thread::sleep_for(milliseconds(3000));
我正在尝试获得以下行为:
Trigger Blocking Function
|---------------------------------------------X
我想触发拦截功能,如果时间太长(超过两秒),应该会超时。
我做了以下事情:
my_connection = observable<>::create<int>([](subscriber<int> s) {
auto s2 = observable<>::just(1, observe_on_new_thread()) |
subscribe<int>([&](auto x) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
});
}) |
timeout(seconds(2), observe_on_new_thread());
我无法让它工作。对于初学者,我认为 s 不能 on_next 来自不同的线程。
所以我的问题是,正确的反应方式是什么?如何在 rxcpp 中包装阻塞函数并为其添加超时?
随后,我想获得一个行为如下的 RX 流:
Trigger Cleanup
|------------------------X
(Delay) Trigger Cleanup
|-----------------X
好问题!以上已经很接近了。
这里有一个如何让阻塞操作适配 rxcpp 的例子。它 libcurl polling 发出 http 请求。
以下应该符合您的预期。
auto sharedThreads = observe_on_event_loop();
auto my_connection = observable<>::create<int>([](subscriber<int> s) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
s.on_completed();
}) |
subscribe_on(observe_on_new_thread()) |
//start_with(0) | // workaround bug in timeout
timeout(seconds(2), sharedThreads);
//skip(1); // workaround bug in timeout
my_connection.as_blocking().subscribe(
[](int){},
[](exception_ptr ep){cout << "timed out" << endl;}
);
subscribe_on
将在专用线程上 运行create
,因此create
可以阻塞该线程。timeout
将 运行 计时器放在不同的线程上,可以与其他人共享,并传输所有on_next
/on_error
/on_completed
调用同一个线程。as_blocking
将确保subscribe
在完成之前不会 return。这仅用于防止main()
退出 - 通常在测试或示例程序中。
编辑:为 timeout
中的错误添加了解决方法。目前,它不会安排第一个超时,直到第一个值到达。
EDIT-2:timeout
错误已修复,不再需要解决方法。