可以在 dart 中将值异步插入流中吗?

Possible to insert values into stream asynchronously in dart?

我有一个功能可以下载 RSS Feed 并对其进行解析。之后,我遍历每个团队并对其进行一些处理。现在,这个处理是异步的,每当它完成时我都会放弃它,但这意味着每个项目都将被线性处理。有没有办法一次性处理所有项目并在处理时产生它们?

高级代码:

Stream<String> processRSSFeedItems() async* {
    RSSFeed rssFeed = RSSFeed("some URL");
    for(int itemIndex = 0; itemIndex < rssFeed.items.lengthl itemIndex++){
         String processedItem = await processRssItem(rssFeed.items[itemIndex]);
         yield processedItem;
    }
}

void refreshRssFeed(){
    var stream = processRSSFeedItems();
    stream.listen((item) => setState(() => something(item));
}

有没有什么方法可以为所有项目调用 processRssItem() 而无需等待前一个过程完成并在它们到达时产生它们?

你可以做到

List<Future<String>> futures = <Future<String>>[];
for(int itemIndex = 0; itemIndex < rssFeed.items.length; itemIndex++){
     futures.add(processRssItem(rssFeed.items[itemIndex]));
}

// Waits for multiple futures to complete and collects their results.
final result = await Future.wait<String>(futures);

for(var i = 0 ; i < result.length; i++) {
    yield result[i];
}

您可以在元素到达时将它们添加到 StreamController 中,然后 return 流。

streams should wait for subscribers before starting their work 被认为是最佳实践。这是通过使用 StreamController.

onListen 回调来实现的
import 'dart:async';
...

Stream<String> processRSSFeedItems() {
  late StreamController<String> controller;

  void startProcessRssFeedItems() {
    RSSFeed rssFeed = RSSFeed("some URL");
    int workers = rssFeed.items.length;
    for (int itemIndex = 0; itemIndex < rssFeed.items.length; itemIndex++) {
      processRssItem(rssFeed.items[itemIndex]).then((processedItem) {
        controller.add(processedItem);
        if (--workers == 0) {
          controller.close();
        }
      });
    }
  }

  controller = StreamController<String>(
    onListen: startProcessRssFeedItems,
  );

  return controller.stream;
}

根据你的问题:

"For now, this processing is asynchronous and whenever it completes I yield it but that means each item will be processed linearly. Is there a way to process all the items once and yield them as they are processed?"

“yield”在此上下文中等同于“return”。所以 Stream 不适合你的 use-case 因为 Streams 被设计为迭代地监听值,而不是“所有项目一次”。对于“所有项目一次”,您应该使用 Futurereturn 值,而不是 yield。在 List<Future> 上使用 while 循环而不是使用 for 循环将处理它们 one-by-one:

    Future getAllWhenProcessed()async{
      var items = rssFeed.items;
      var count = items.length;
      var results = [];
      while(count > 0){
         results.add(await processRssItem(items[--count]));
       }
      return results;
    }

然后

getAllWhenProcessed().then((value){ updateUi(); });