Dart Stream firstWhere 不会立即解析

Dart Stream firstWhere does not immediately resolve

例如,我有这个简单的代码:

//create stream with numbers from 1 to 100, delayed by 10sec duration
Stream<int> countStream() async* {
  for (int i = 1; i <= 100; i++) {
    yield i;      
    sleep(Duration(seconds: 10));
  }
}

void main() async {
  var x = await countStream().firstWhere((element) => element == 1); //here Im waiting for number 1
  print(x);
}

问题是firstWhere不是在yield 1之后就退出,而是在yield 2之后, 并打印 10 秒。

为什么?在我的现实生活应用程序中,我有转换为消息流的 websocket 流,并等待特定的消息。但是因为 websocket 流没有产生另一条消息,firstWhere 挂起。

这是我的原始代码:

  Stream<Message> lines() async* {
    var partial = '';

    await for (String chunk in ws!) { //ws is WebSocket
      var lines = chunk.split('\n');
      lines[0] = partial + lines[0];
      partial = lines.removeLast();
      for (final line in lines) {
        var msg = Message.parse(line); //Message.parse returns CodeMessage object
        if (msg != null) yield msg;
      }
    }
  }

//at some place in code this hangs because last arrived message is CodeMessage
var msg = await lines().firstWhere((obj) => obj is CodeMessage);
print(msg);

有没有其他方法可以做到这一点或者我哪里错了?

这里有两件事出错了。

  1. 首先,在你的例子中:

    这里使用sleep是不合适的。 考虑此功能的文档:

    Use this with care, as no asynchronous operations can be processed in an isolate while it is blocked in a sleep call.

    您的 countStream 函数尽管是异步的,但会阻止 睡觉时整个隔离。

    试试这个:

    await Future<void>.delayed(const Duration(seconds: 10));
    
  2. 现在,谈谈 firstWhere 没有立即解决的真正原因:

    让我们来做个简单的实验:

    Stream<int> testStream() async* {
      for (var i = 0; i < 100; ++i) {
        print(i);
        yield i;
      }
    }
    
    void main() {
      late final StreamSubscription streamSubscription;
      streamSubscription = testStream().listen((value) {
        if (value == 1) streamSubscription.cancel();
      }); 
    }
    

    输出:

    0
    1
    2
    

    这是怎么回事?订阅在 i == 1 取消 - 为什么循环一直持续到 2?

    答案很简单。在到达另一个 yield 语句之前,异步生成器函数不会停止(因此流不会关闭)。这是由于 event loop works:

    的方式

    Once a Dart function starts executing, it continues executing until it exits. In other words, Dart functions can’t be interrupted by other Dart code.

    该函数在再次让步之前没有机会停止,因为取消流订阅无法立即停止它。

    firstWhere 使用 Dart 内部的 _cancelAndValue 函数来完成一个值。在流关闭之前它不会完成,并且在到达下一个 yield 之前流不会关闭 - 在您的情况下,这可能会延迟甚至永远不会发生。

    在使用异步生成器函数时解决此问题的唯一方法是在下一次延迟之前添加另一个 yieldreturn 语句。