当 future 包装被删除时,我如何停止 运行 同步代码?
How can I stop running synchronous code when the future wrapping it is dropped?
我有异步代码调用需要一段时间的同步代码 运行,所以我遵循了 中列出的建议。但是,我的异步代码有一个超时,超时之后我就不再对同步计算的结果感兴趣了:
use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10
// This takes 1 second
fn long_running_complicated_calculation() -> i32 {
let mut sum = 0;
for i in 0..10 {
thread::sleep(Duration::from_millis(100));
eprintln!("{}", i);
sum += i;
// Interruption point
}
sum
}
#[tokio::main]
async fn main() {
let handle = task::spawn_blocking(long_running_complicated_calculation);
let guarded = time::timeout(Duration::from_millis(250), handle);
match guarded.await {
Ok(s) => panic!("Sum was calculated: {:?}", s),
Err(_) => eprintln!("Sum timed out (expected)"),
}
}
运行 此代码显示超时触发,但同步代码 also 继续 运行:
0
1
Sum timed out (expected)
2
3
4
5
6
7
8
9
当 future 包装被删除时,如何停止 运行ning 同步代码?
我没想到编译器会神奇地停止我的同步代码。我用 "interruption point" 注释了一行,在那里我需要手动进行某种检查以提前退出我的函数,但我不知道如何轻松获得 [= 结果的通知13=](或 ThreadPool::spawn_with_handle
,对于纯基于期货的代码)已被删除。
您可以传递一个原子布尔值,然后使用它来将任务标记为需要取消。 (我不确定我是否为 load
/store
调用使用了合适的 Ordering
,这可能需要更多考虑)
这是您的代码的修改版本,输出
0
1
Sum timed out (expected)
2
Interrupted...
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10
// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
let mut sum = 0;
for i in 0..10 {
thread::sleep(Duration::from_millis(100));
eprintln!("{}", i);
sum += i;
// Interruption point
if !flag.load(Ordering::Relaxed) {
eprintln!("Interrupted...");
break;
}
}
sum
}
#[tokio::main]
async fn main() {
let some_bool = Arc::new(AtomicBool::new(true));
let some_bool_clone = some_bool.clone();
let handle =
task::spawn_blocking(move || long_running_complicated_calculation(&some_bool_clone));
let guarded = time::timeout(Duration::from_millis(250), handle);
match guarded.await {
Ok(s) => panic!("Sum was calculated: {:?}", s),
Err(_) => {
eprintln!("Sum timed out (expected)");
some_bool.store(false, Ordering::Relaxed);
}
}
}
在当前的 Tokio 中,在期货 / handles 下跌时自动发生这种情况是不可能的。 http://github.com/tokio-rs/tokio/issues/1830 and http://github.com/tokio-rs/tokio/issues/1879.
正在为此开展一些工作
但是,您可以通过将期货包装在自定义类型中来获得类似的东西。
这是一个看起来与原始代码几乎相同的示例,但在模块中添加了一个简单的包装器类型。如果我在包装器类型上实现 Future<T>
只是转发到包装句柄,那将更加符合人体工程学,但事实证明这很烦人。
mod blocking_cancelable_task {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::task;
pub struct BlockingCancelableTask<T> {
pub h: Option<tokio::task::JoinHandle<T>>,
flag: Arc<AtomicBool>,
}
impl<T> Drop for BlockingCancelableTask<T> {
fn drop(&mut self) {
eprintln!("Dropping...");
self.flag.store(false, Ordering::Relaxed);
}
}
impl<T> BlockingCancelableTask<T>
where
T: Send + 'static,
{
pub fn new<F>(f: F) -> BlockingCancelableTask<T>
where
F: FnOnce(&AtomicBool) -> T + Send + 'static,
{
let flag = Arc::new(AtomicBool::new(true));
let flag_clone = flag.clone();
let h = task::spawn_blocking(move || f(&flag_clone));
BlockingCancelableTask { h: Some(h), flag }
}
}
pub fn spawn<F, T>(f: F) -> BlockingCancelableTask<T>
where
T: Send + 'static,
F: FnOnce(&AtomicBool) -> T + Send + 'static,
{
BlockingCancelableTask::new(f)
}
}
use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time::Duration};
use tokio::time; // 0.2.10
// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
let mut sum = 0;
for i in 0..10 {
thread::sleep(Duration::from_millis(100));
eprintln!("{}", i);
sum += i;
// Interruption point
if !flag.load(Ordering::Relaxed) {
eprintln!("Interrupted...");
break;
}
}
sum
}
#[tokio::main]
async fn main() {
let mut h = blocking_cancelable_task::spawn(long_running_complicated_calculation);
let guarded = time::timeout(Duration::from_millis(250), h.h.take().unwrap());
match guarded.await {
Ok(s) => panic!("Sum was calculated: {:?}", s),
Err(_) => {
eprintln!("Sum timed out (expected)");
}
}
}
我有异步代码调用需要一段时间的同步代码 运行,所以我遵循了
use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10
// This takes 1 second
fn long_running_complicated_calculation() -> i32 {
let mut sum = 0;
for i in 0..10 {
thread::sleep(Duration::from_millis(100));
eprintln!("{}", i);
sum += i;
// Interruption point
}
sum
}
#[tokio::main]
async fn main() {
let handle = task::spawn_blocking(long_running_complicated_calculation);
let guarded = time::timeout(Duration::from_millis(250), handle);
match guarded.await {
Ok(s) => panic!("Sum was calculated: {:?}", s),
Err(_) => eprintln!("Sum timed out (expected)"),
}
}
运行 此代码显示超时触发,但同步代码 also 继续 运行:
0
1
Sum timed out (expected)
2
3
4
5
6
7
8
9
当 future 包装被删除时,如何停止 运行ning 同步代码?
我没想到编译器会神奇地停止我的同步代码。我用 "interruption point" 注释了一行,在那里我需要手动进行某种检查以提前退出我的函数,但我不知道如何轻松获得 [= 结果的通知13=](或 ThreadPool::spawn_with_handle
,对于纯基于期货的代码)已被删除。
您可以传递一个原子布尔值,然后使用它来将任务标记为需要取消。 (我不确定我是否为 load
/store
调用使用了合适的 Ordering
,这可能需要更多考虑)
这是您的代码的修改版本,输出
0
1
Sum timed out (expected)
2
Interrupted...
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10
// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
let mut sum = 0;
for i in 0..10 {
thread::sleep(Duration::from_millis(100));
eprintln!("{}", i);
sum += i;
// Interruption point
if !flag.load(Ordering::Relaxed) {
eprintln!("Interrupted...");
break;
}
}
sum
}
#[tokio::main]
async fn main() {
let some_bool = Arc::new(AtomicBool::new(true));
let some_bool_clone = some_bool.clone();
let handle =
task::spawn_blocking(move || long_running_complicated_calculation(&some_bool_clone));
let guarded = time::timeout(Duration::from_millis(250), handle);
match guarded.await {
Ok(s) => panic!("Sum was calculated: {:?}", s),
Err(_) => {
eprintln!("Sum timed out (expected)");
some_bool.store(false, Ordering::Relaxed);
}
}
}
在当前的 Tokio 中,在期货 / handles 下跌时自动发生这种情况是不可能的。 http://github.com/tokio-rs/tokio/issues/1830 and http://github.com/tokio-rs/tokio/issues/1879.
正在为此开展一些工作但是,您可以通过将期货包装在自定义类型中来获得类似的东西。
这是一个看起来与原始代码几乎相同的示例,但在模块中添加了一个简单的包装器类型。如果我在包装器类型上实现 Future<T>
只是转发到包装句柄,那将更加符合人体工程学,但事实证明这很烦人。
mod blocking_cancelable_task {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::task;
pub struct BlockingCancelableTask<T> {
pub h: Option<tokio::task::JoinHandle<T>>,
flag: Arc<AtomicBool>,
}
impl<T> Drop for BlockingCancelableTask<T> {
fn drop(&mut self) {
eprintln!("Dropping...");
self.flag.store(false, Ordering::Relaxed);
}
}
impl<T> BlockingCancelableTask<T>
where
T: Send + 'static,
{
pub fn new<F>(f: F) -> BlockingCancelableTask<T>
where
F: FnOnce(&AtomicBool) -> T + Send + 'static,
{
let flag = Arc::new(AtomicBool::new(true));
let flag_clone = flag.clone();
let h = task::spawn_blocking(move || f(&flag_clone));
BlockingCancelableTask { h: Some(h), flag }
}
}
pub fn spawn<F, T>(f: F) -> BlockingCancelableTask<T>
where
T: Send + 'static,
F: FnOnce(&AtomicBool) -> T + Send + 'static,
{
BlockingCancelableTask::new(f)
}
}
use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time::Duration};
use tokio::time; // 0.2.10
// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
let mut sum = 0;
for i in 0..10 {
thread::sleep(Duration::from_millis(100));
eprintln!("{}", i);
sum += i;
// Interruption point
if !flag.load(Ordering::Relaxed) {
eprintln!("Interrupted...");
break;
}
}
sum
}
#[tokio::main]
async fn main() {
let mut h = blocking_cancelable_task::spawn(long_running_complicated_calculation);
let guarded = time::timeout(Duration::from_millis(250), h.h.take().unwrap());
match guarded.await {
Ok(s) => panic!("Sum was calculated: {:?}", s),
Err(_) => {
eprintln!("Sum timed out (expected)");
}
}
}