从 stdout 中删除一个流以便添加另一个流的正确方法是什么?

What is the proper way to remove a stream from stdout so that another stream may be added?

我正在 Dart 中启动一个进程,将其 stdout 流附加到 stdout,以便可以将结果打印到终端,如下所示:

Process.start(executable, ['list','of','args']).then((proc) {
  stdout.addStream(proc.stdout);
  stderr.addStream(proc.stderr);
  return proc.exitCode;
});

但是,一旦完成,我想启动一个新进程并再次开始(这个函数将被调用多次)。有时,我会收到错误消息:

Uncaught Error: Bad State: StreamSink is already bound to a stream

查看 dart 文档,看起来我可能需要做一些类似 stdout.close()stdout.flush() 的事情,但这些似乎并不能解决问题。处理将多个流按顺序绑定到 streamsink 的正确方法是什么?

您也可以使用

StreamSubscription subscr = proc.stdout.listen(io.stdout.add);
...
subscr.cancel();

或使用可配置的处理程序

class StdOutHandler { 
  IOSink sink; 
  call(data) => sink.add(); 
}

void main() {
  var proc = await Process.start(...);
  var stdoutHandler = new StdOutHandler()..sink = stdout;
  var subscription = proc.stdout.listen(stdoutHandler);
  ....
  stdoutHandler.sink = ...
} 

addStream returns 指示流添加何时完成的 Future。应该只有一个流 addStream 同时到 StreamSink.

根据你want/need要做的事情,你现在有 2 个选择:

  • 将进程的输出多路复用到标准输出中。
  • 等待 addStream 完成。

后者更简单:

Process.start(executable, ['list','of','args']).then((proc) async {
  await stdout.addStream(proc.stdout);  // Edit: don't do this.
  await stdout.addStream(proc.stderr);
  return proc.exitCode;
});

注意正文中的 async 修饰符,以及正文中的两个 await

编辑:不立即收听 stderr 是错误的。 (您的程序可能会阻塞它)。

如果您的程序输出足够小,您可以切换到 Process.run:

Process.run(executable, ['list','of','args']).then((procResult) {
  stdout.write(procResult.stdout);
  stdout.write(procResult.stderr);
  return procResult.exitCode;
});

不过,它不会交错 stdout 和 stderr。

您不能在同一个接收器上一次多次调用 addStream。 接收器处于 "manual" 模式或 "automatic" 模式,后者通过添加流触发。在该流添加完成之前,暂停将路由到正在添加的流而不是控制器,并且在完成之前不允许您手动添加事件。就好像添加的流接管了接收器,直到它完成。

要将两个(或更多)流添加到同一个交错的接收器,您必须手动执行。一种方法是使用一种通用的方法来交错流,如以下代码。另一种选择是自己监听两个流并将事件添加到接收器:

Stream interleave(Iterable<Stream> streams) {
  List subscriptions = [];
  StreamController controller;
  controller = new StreamController(
    onListen: () {
      int active = 0;
      void done() {
        active--;
        if (active <= 0) controller.close();
      }
      for (var stream in streams) {
        active++;
        var sub = stream.listen(controller.add, 
                                onError: controller.addError,
                                onDone: done);
        subscriptions.add(sub);             
      }
    },
    onPause: () {
      for (var sub in subscriptions) { sub.pause(); }
    }, 
    onResume: () {
      for (var sub in subscriptions) { sub.resume(); }
    },
    onCancel: () {
      for (var sub in subscriptions) { sub.cancel(); }  
    }
  );
  return controller.stream;
}

(未彻底测试!)