可以在 dart 中将值异步插入流中吗?
Possible to insert values into stream asynchronously in dart?
我有一个功能可以下载 RSS Feed 并对其进行解析。之后,我遍历每个团队并对其进行一些处理。现在,这个处理是异步的,每当它完成时我都会放弃它,但这意味着每个项目都将被线性处理。有没有办法一次性处理所有项目并在处理时产生它们?
高级代码:
Stream<String> processRSSFeedItems() async* {
RSSFeed rssFeed = RSSFeed("some URL");
for(int itemIndex = 0; itemIndex < rssFeed.items.lengthl itemIndex++){
String processedItem = await processRssItem(rssFeed.items[itemIndex]);
yield processedItem;
}
}
void refreshRssFeed(){
var stream = processRSSFeedItems();
stream.listen((item) => setState(() => something(item));
}
有没有什么方法可以为所有项目调用 processRssItem() 而无需等待前一个过程完成并在它们到达时产生它们?
你可以做到
List<Future<String>> futures = <Future<String>>[];
for(int itemIndex = 0; itemIndex < rssFeed.items.length; itemIndex++){
futures.add(processRssItem(rssFeed.items[itemIndex]));
}
// Waits for multiple futures to complete and collects their results.
final result = await Future.wait<String>(futures);
for(var i = 0 ; i < result.length; i++) {
yield result[i];
}
您可以在元素到达时将它们添加到 StreamController 中,然后 return 流。
streams should wait for subscribers before starting their work 被认为是最佳实践。这是通过使用 StreamController
.
的 onListen
回调来实现的
import 'dart:async';
...
Stream<String> processRSSFeedItems() {
late StreamController<String> controller;
void startProcessRssFeedItems() {
RSSFeed rssFeed = RSSFeed("some URL");
int workers = rssFeed.items.length;
for (int itemIndex = 0; itemIndex < rssFeed.items.length; itemIndex++) {
processRssItem(rssFeed.items[itemIndex]).then((processedItem) {
controller.add(processedItem);
if (--workers == 0) {
controller.close();
}
});
}
}
controller = StreamController<String>(
onListen: startProcessRssFeedItems,
);
return controller.stream;
}
根据你的问题:
"For now, this processing is asynchronous and whenever it completes I
yield it but that means each item will be processed linearly. Is there
a way to process all the items once and yield them as they are
processed?"
“yield”在此上下文中等同于“return”。所以 Stream 不适合你的 use-case 因为 Streams 被设计为迭代地监听值,而不是“所有项目一次”。对于“所有项目一次”,您应该使用 Future
和 return
值,而不是 yield
。在 List<Future>
上使用 while
循环而不是使用 for
循环将处理它们 one-by-one:
Future getAllWhenProcessed()async{
var items = rssFeed.items;
var count = items.length;
var results = [];
while(count > 0){
results.add(await processRssItem(items[--count]));
}
return results;
}
然后
getAllWhenProcessed().then((value){ updateUi(); });
我有一个功能可以下载 RSS Feed 并对其进行解析。之后,我遍历每个团队并对其进行一些处理。现在,这个处理是异步的,每当它完成时我都会放弃它,但这意味着每个项目都将被线性处理。有没有办法一次性处理所有项目并在处理时产生它们?
高级代码:
Stream<String> processRSSFeedItems() async* {
RSSFeed rssFeed = RSSFeed("some URL");
for(int itemIndex = 0; itemIndex < rssFeed.items.lengthl itemIndex++){
String processedItem = await processRssItem(rssFeed.items[itemIndex]);
yield processedItem;
}
}
void refreshRssFeed(){
var stream = processRSSFeedItems();
stream.listen((item) => setState(() => something(item));
}
有没有什么方法可以为所有项目调用 processRssItem() 而无需等待前一个过程完成并在它们到达时产生它们?
你可以做到
List<Future<String>> futures = <Future<String>>[];
for(int itemIndex = 0; itemIndex < rssFeed.items.length; itemIndex++){
futures.add(processRssItem(rssFeed.items[itemIndex]));
}
// Waits for multiple futures to complete and collects their results.
final result = await Future.wait<String>(futures);
for(var i = 0 ; i < result.length; i++) {
yield result[i];
}
您可以在元素到达时将它们添加到 StreamController 中,然后 return 流。
streams should wait for subscribers before starting their work 被认为是最佳实践。这是通过使用 StreamController
.
onListen
回调来实现的
import 'dart:async';
...
Stream<String> processRSSFeedItems() {
late StreamController<String> controller;
void startProcessRssFeedItems() {
RSSFeed rssFeed = RSSFeed("some URL");
int workers = rssFeed.items.length;
for (int itemIndex = 0; itemIndex < rssFeed.items.length; itemIndex++) {
processRssItem(rssFeed.items[itemIndex]).then((processedItem) {
controller.add(processedItem);
if (--workers == 0) {
controller.close();
}
});
}
}
controller = StreamController<String>(
onListen: startProcessRssFeedItems,
);
return controller.stream;
}
根据你的问题:
"For now, this processing is asynchronous and whenever it completes I yield it but that means each item will be processed linearly. Is there a way to process all the items once and yield them as they are processed?"
“yield”在此上下文中等同于“return”。所以 Stream 不适合你的 use-case 因为 Streams 被设计为迭代地监听值,而不是“所有项目一次”。对于“所有项目一次”,您应该使用 Future
和 return
值,而不是 yield
。在 List<Future>
上使用 while
循环而不是使用 for
循环将处理它们 one-by-one:
Future getAllWhenProcessed()async{
var items = rssFeed.items;
var count = items.length;
var results = [];
while(count > 0){
results.add(await processRssItem(items[--count]));
}
return results;
}
然后
getAllWhenProcessed().then((value){ updateUi(); });