如何从流本身中取消无限流?
How to cancel an infinite stream from within the stream itself?
我正在尝试在清空队列后取消间隔 (interval_timer
),但不确定什么是正确的策略。
let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let interval_timer = tokio_timer::Timer::default();
let timer = interval_timer
.interval(Duration::from_millis(1000))
.map_err(|_| {
println!("Errored out");
});
let s = timer.for_each(move |_| {
println!("Woke up");
let item = some_vars.pop().unwrap();
let f = futures::future::ok(item).map(|x| {
println!("{:?}", x);
});
tokio::spawn(f)
});
tokio::run(s);
我按照 gitter 中的建议尝试了 drop
,但最终出现错误:
let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut interval_timer = tokio_timer::Timer::default();
let timer = interval_timer
.interval(Duration::from_millis(1000))
.map_err(|_| {
println!("Errored out");
});
let s = timer.for_each(move |_| {
println!("Woke up");
if some_vars.len() == 1 {
drop(interval_timer);
}
let item = some_vars.pop().unwrap();
let f = futures::future::ok(item).map(|x| {
println!("{:?}", x);
});
tokio::spawn(f)
});
tokio::run(s);
错误:
error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:72:22
|
60 | let mut interval_timer = tokio_timer::Timer::default();
| ------------------ captured outer variable
...
72 | drop(interval_timer);
| ^^^^^^^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
tokio_timer::Interval
实现了futures::Stream
,所以尝试使用take_while
方法:
let s = timer
.take_while(|()|
future::ok(is_net_completed()))
.for_each(move |_| {
println!("Woke up");
// ...
})
我创建了 Tokio 的 Interval
结构的副本,添加了对我的应用程序方法的引用以指示何时提前中断。
就我而言,我想中断 Interval
关机。
我的间隔轮询方法如下所示:
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.session.read().unwrap().shutdown {
return Ok(Async::Ready(Some(Instant::now())));
}
// Wait for the delay to be done
let _ = match self.delay.poll() {
然后你需要保持对任务的句柄(在超时任务中运行时调用task = futures::task::current()
)。
然后您可以随时调用 task.notify()
来启动间隔并使用您的突破代码,提前中断 Interval
。
在Interval
里面有一个可以修改的Delay
结构,你可以创建一个Interval
,你可以中断和改变超时,这样你就可以中断一次并然后继续。
如果您想从流的 外部 取消流,请参阅 stream-cancel。
对于您的具体情况,最简单的方法是将您的集合转换为流并将其与间隔计时器一起压缩。这样,当集合为空时,生成的流自然停止:
use futures::{future, stream, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11
fn main() {
tokio::run({
let some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let timer =
Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));
let some_vars = stream::iter_ok(some_vars.into_iter().rev());
let combined = timer.zip(some_vars);
combined.for_each(move |(_, item)| {
eprintln!("Woke up");
tokio::spawn(future::lazy(move || {
println!("{:?}", item);
Ok(())
}));
Ok(())
})
});
}
否则,您可以通过使用 and_then
从集合中删除值并控制流是否应继续来停止流:
use futures::{future, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11
fn main() {
tokio::run({
let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let timer =
Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));
let limited = timer.and_then(move |_| {
if some_vars.len() <= 4 {
Err(())
} else {
some_vars.pop().ok_or(())
}
});
limited.for_each(move |item| {
eprintln!("Woke up");
tokio::spawn(future::lazy(move || {
println!("{:?}", item);
Ok(())
}));
Ok(())
})
});
}
我正在尝试在清空队列后取消间隔 (interval_timer
),但不确定什么是正确的策略。
let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let interval_timer = tokio_timer::Timer::default();
let timer = interval_timer
.interval(Duration::from_millis(1000))
.map_err(|_| {
println!("Errored out");
});
let s = timer.for_each(move |_| {
println!("Woke up");
let item = some_vars.pop().unwrap();
let f = futures::future::ok(item).map(|x| {
println!("{:?}", x);
});
tokio::spawn(f)
});
tokio::run(s);
我按照 gitter 中的建议尝试了 drop
,但最终出现错误:
let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut interval_timer = tokio_timer::Timer::default();
let timer = interval_timer
.interval(Duration::from_millis(1000))
.map_err(|_| {
println!("Errored out");
});
let s = timer.for_each(move |_| {
println!("Woke up");
if some_vars.len() == 1 {
drop(interval_timer);
}
let item = some_vars.pop().unwrap();
let f = futures::future::ok(item).map(|x| {
println!("{:?}", x);
});
tokio::spawn(f)
});
tokio::run(s);
错误:
error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:72:22
|
60 | let mut interval_timer = tokio_timer::Timer::default();
| ------------------ captured outer variable
...
72 | drop(interval_timer);
| ^^^^^^^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
tokio_timer::Interval
实现了futures::Stream
,所以尝试使用take_while
方法:
let s = timer
.take_while(|()|
future::ok(is_net_completed()))
.for_each(move |_| {
println!("Woke up");
// ...
})
我创建了 Tokio 的 Interval
结构的副本,添加了对我的应用程序方法的引用以指示何时提前中断。
就我而言,我想中断 Interval
关机。
我的间隔轮询方法如下所示:
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.session.read().unwrap().shutdown {
return Ok(Async::Ready(Some(Instant::now())));
}
// Wait for the delay to be done
let _ = match self.delay.poll() {
然后你需要保持对任务的句柄(在超时任务中运行时调用task = futures::task::current()
)。
然后您可以随时调用 task.notify()
来启动间隔并使用您的突破代码,提前中断 Interval
。
在Interval
里面有一个可以修改的Delay
结构,你可以创建一个Interval
,你可以中断和改变超时,这样你就可以中断一次并然后继续。
如果您想从流的 外部 取消流,请参阅 stream-cancel。
对于您的具体情况,最简单的方法是将您的集合转换为流并将其与间隔计时器一起压缩。这样,当集合为空时,生成的流自然停止:
use futures::{future, stream, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11
fn main() {
tokio::run({
let some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let timer =
Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));
let some_vars = stream::iter_ok(some_vars.into_iter().rev());
let combined = timer.zip(some_vars);
combined.for_each(move |(_, item)| {
eprintln!("Woke up");
tokio::spawn(future::lazy(move || {
println!("{:?}", item);
Ok(())
}));
Ok(())
})
});
}
否则,您可以通过使用 and_then
从集合中删除值并控制流是否应继续来停止流:
use futures::{future, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11
fn main() {
tokio::run({
let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let timer =
Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));
let limited = timer.and_then(move |_| {
if some_vars.len() <= 4 {
Err(())
} else {
some_vars.pop().ok_or(())
}
});
limited.for_each(move |item| {
eprintln!("Woke up");
tokio::spawn(future::lazy(move || {
println!("{:?}", item);
Ok(())
}));
Ok(())
})
});
}