从 Dart 中的流创建新流

Create a new stream from a stream in Dart

我怀疑我对 Dart 中的 Streams 的理解可能有一些漏洞...

我有一种情况,我希望 Dart 应用程序响应间歇性输入(这会立即建议使用 Streamss -- 或者 Futures,也许)。我可以使用 listener 函数实现我想要的行为,但我想知道如何以更好、更 Dartesque 的方式做到这一点。

作为一个简单的例子,下面的(工作)程序侦听来自用户的键盘输入,并向文档添加一个 div 元素,其中包含自上一个 space 以来键入的内容,每当space 栏被击中。

import  'dart:html';

main() {
  listenForSpaces(showInput);
}

void  listenForSpaces(void  Function(String) listener) {
  var input =  List<String>();
  document.onKeyDown.listen((keyboardEvent) {
    var key = keyboardEvent.key;
    if (key ==  " ") {
      listener(input.join());
      input.clear();
    } else {
      input.add(key.length >  1  ?  "[$key]"  : key);
    }
  });
}

void  showInput(String message) {
  document.body.children.add(DivElement()..text = message);
}

我想做的是根据我正在收听的 Stream 创建一个新的 Stream(在上面的示例中,创建一个新的 Stream 来自 onKeyDown)。换句话说,我可以将上面的程序设置为:

var myStream = ...
myStream.listen(showInput);

我怀疑有一种方法可以创建一个Stream,然后在不同的时间和地点,向其中插入元素或调用它发出一个值:感觉好像我错过了什么简单的。无论如何,我们将不胜感激对文档的任何帮助或指导。

有多种方法,并且有一个完整的包 rxdart 允许各种事情。

只有最终消费者应该使用 listen 并且只有在您需要明确想要取消订阅时才使用,否则使用 forEach

如果您想像示例中那样操作事件,请使用 map

我原本不打算回答我自己的问题,但后来我在 dartlang creating streams 文章中找到了这个问题的非常简单的答案;如果对其他人有帮助:

具体来说,如果我们想创建一个可以在代码中的任意时间和位置插入元素的流,我们可以通过 StreamController class 来实现。这个 class 的实例有一个 add 方法;我们可以简单地使用实例的 stream 属性 作为我们的流。

例如,我问题中的代码可以重写为:

import 'dart:html';
import 'dart:async';

main() async {
  // The desired implementation stated in the question:
  var myStream = listenForSpaces();
  myStream.listen(showInput);
}

Stream<String> listenForSpaces() {
  // Use the StreamController class.
  var controller = StreamController<String>();

  var input = List<String>();
  document.onKeyDown.listen((keyboardEvent) {
    var key = keyboardEvent.key;
    if (key == " ") {
      // Add items to the controller's stream.
      controller.add(input.join());
      input.clear();
    } else {
      input.add(key.length > 1 ? "[$key]" : key);
    }
  });

  // Listen to the controller's stream.
  return controller.stream;
}

void showInput(String message) {
  document.body.children.add(DivElement()..text = message);
}

(如文章中所述,如果我们想像这样从头开始设置流,我们需要小心,因为没有什么可以阻止我们将项目插入到没有关联的活动订阅者的流中; 插入的项目在这种情况下会被缓冲,这可能会导致内存泄漏。)

使用 async* 函数从现有流创建新流相当容易。 对于普通流,我会这样做:

Stream<String> listenForSpaces() async* {
  var input = <String>[];
  await for (var keyboardEvent in document.onKeyDown) {
    var key = keyboardEvent.key;
    if (key == " ") {
      yield input.join();
      input.clear();
    } else {
      input.add(key.length > 1 ? "[$key]" : key);
    }
  }
}

async* 函数会将暂停传播到基础流,并且它可能会在 yield 期间暂停源。 这可能是也可能不是您想要的,因为暂停 DOM 事件流可能会导致您错过事件。对于 DOM 流,我可能更愿意使用上面基于 StreamController 的解决方案。