如何在循环中生成异步方法?
How can I spawn asynchronous methods in a loop?
我有一个包含 resolve()
方法的对象向量,该方法使用 reqwest
查询外部网站 API。在每个对象上调用 resolve()
方法后,我想打印每个请求的结果。
这是我的半异步编译和工作代码(但不是真正异步的):
for mut item in items {
item.resolve().await;
item.print_result();
}
我尝试使用 tokio::join!
来生成所有异步调用并等待它们完成,但我可能做错了什么:
tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
这是我遇到的错误:
error[E0308]: mismatched types
--> src\main.rs:25:51
|
25 | tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
| ^^^^^^^^^^^^^^ expected `()`, found opaque type
|
::: src\redirect_definition.rs:32:37
|
32 | pub async fn resolve(&mut self) {
| - the `Output` of this `async fn`'s found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
如何一次为所有实例调用 resolve()
方法?
这段代码反映了答案——现在我正在处理我不太理解的借用检查器错误——我应该用 'static
注释我的一些变量吗?
let mut items = get_from_csv(path);
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
for task in tasks {
task.await;
}
for item in items {
item.print_result();
}
error[E0597]: `items` does not live long enough
--> src\main.rs:18:25
|
18 | let tasks: Vec<_> = items
| -^^^^
| |
| _________________________borrowed value does not live long enough
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
31 | }
| - `items` dropped here while still borrowed
error[E0505]: cannot move out of `items` because it is borrowed
--> src\main.rs:27:17
|
18 | let tasks: Vec<_> = items
| -----
| |
| _________________________borrow of `items` occurs here
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
27 | for item in items {
| ^^^^^ move out of `items` occurs here
由于您想并行等待 futures,您可以 spawn 将它们合并到 运行 并行的单独任务中。由于它们 运行 彼此独立且独立于生成它们的线程,您可以按任何顺序等待它们的句柄。
理想情况下,您应该这样写:
// spawn tasks that run in parallel
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
// now await them to get the resolve's to complete
for task in tasks {
task.await.unwrap();
}
// and we're done
for item in &items {
item.print_result();
}
但这将被借用检查器拒绝,因为 item.resolve()
返回的未来持有对 item
的借用引用。引用被传递给 tokio::spawn()
,后者将它交给另一个线程,编译器无法证明 item
会比该线程长寿。 (想send reference to local data to a thread的时候遇到同样的问题。)
对此有几种可能的解决方案;我发现最优雅的方法是 将 项目移到传递给 tokio::spawn()
的异步闭包中,并让任务在完成后将它们交还给您。基本上,您使用 items
向量来创建任务并立即从等待的结果中重构它:
// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
.into_iter()
.map(|mut item| {
tokio::spawn(async {
item.resolve().await;
item
})
})
.collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
item.print_result();
}
playground 中的可运行代码。
请注意,futures
crate 包含一个 join_all
函数,它类似于您需要的函数,除了它会轮询各个 futures 而不确保它们 运行 并行。我们可以编写一个通用的 join_parallel
,它使用 join_all
,但也使用 tokio::spawn
来获得并行执行:
async fn join_parallel<T: Send + 'static>(
futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
// unwrap the Result because it is introduced by tokio::spawn()
// and isn't something our caller can handle
futures::future::join_all(tasks)
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
使用此函数,回答问题所需的代码归结为:
let items = join_parallel(items.into_iter().map(|mut item| async {
item.resolve().await;
item
})).await;
for item in &items {
item.print_result();
}
同样,运行playground中的可用代码。
我有一个包含 resolve()
方法的对象向量,该方法使用 reqwest
查询外部网站 API。在每个对象上调用 resolve()
方法后,我想打印每个请求的结果。
这是我的半异步编译和工作代码(但不是真正异步的):
for mut item in items {
item.resolve().await;
item.print_result();
}
我尝试使用 tokio::join!
来生成所有异步调用并等待它们完成,但我可能做错了什么:
tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
这是我遇到的错误:
error[E0308]: mismatched types
--> src\main.rs:25:51
|
25 | tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
| ^^^^^^^^^^^^^^ expected `()`, found opaque type
|
::: src\redirect_definition.rs:32:37
|
32 | pub async fn resolve(&mut self) {
| - the `Output` of this `async fn`'s found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
如何一次为所有实例调用 resolve()
方法?
这段代码反映了答案——现在我正在处理我不太理解的借用检查器错误——我应该用 'static
注释我的一些变量吗?
let mut items = get_from_csv(path);
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
for task in tasks {
task.await;
}
for item in items {
item.print_result();
}
error[E0597]: `items` does not live long enough
--> src\main.rs:18:25
|
18 | let tasks: Vec<_> = items
| -^^^^
| |
| _________________________borrowed value does not live long enough
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
31 | }
| - `items` dropped here while still borrowed
error[E0505]: cannot move out of `items` because it is borrowed
--> src\main.rs:27:17
|
18 | let tasks: Vec<_> = items
| -----
| |
| _________________________borrow of `items` occurs here
| |
19 | | .iter_mut()
| |___________________- argument requires that `items` is borrowed for `'static`
...
27 | for item in items {
| ^^^^^ move out of `items` occurs here
由于您想并行等待 futures,您可以 spawn 将它们合并到 运行 并行的单独任务中。由于它们 运行 彼此独立且独立于生成它们的线程,您可以按任何顺序等待它们的句柄。
理想情况下,您应该这样写:
// spawn tasks that run in parallel
let tasks: Vec<_> = items
.iter_mut()
.map(|item| tokio::spawn(item.resolve()))
.collect();
// now await them to get the resolve's to complete
for task in tasks {
task.await.unwrap();
}
// and we're done
for item in &items {
item.print_result();
}
但这将被借用检查器拒绝,因为 item.resolve()
返回的未来持有对 item
的借用引用。引用被传递给 tokio::spawn()
,后者将它交给另一个线程,编译器无法证明 item
会比该线程长寿。 (想send reference to local data to a thread的时候遇到同样的问题。)
对此有几种可能的解决方案;我发现最优雅的方法是 将 项目移到传递给 tokio::spawn()
的异步闭包中,并让任务在完成后将它们交还给您。基本上,您使用 items
向量来创建任务并立即从等待的结果中重构它:
// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
.into_iter()
.map(|mut item| {
tokio::spawn(async {
item.resolve().await;
item
})
})
.collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
item.print_result();
}
playground 中的可运行代码。
请注意,futures
crate 包含一个 join_all
函数,它类似于您需要的函数,除了它会轮询各个 futures 而不确保它们 运行 并行。我们可以编写一个通用的 join_parallel
,它使用 join_all
,但也使用 tokio::spawn
来获得并行执行:
async fn join_parallel<T: Send + 'static>(
futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
// unwrap the Result because it is introduced by tokio::spawn()
// and isn't something our caller can handle
futures::future::join_all(tasks)
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
使用此函数,回答问题所需的代码归结为:
let items = join_parallel(items.into_iter().map(|mut item| async {
item.resolve().await;
item
})).await;
for item in &items {
item.print_result();
}
同样,运行playground中的可用代码。