如何将节点可读流转换为 RX 可观察流

How to convert node readable stream to RX observable

如果我有一个 Node js 流,例如来自 process.stdinfs.createReadStream,我如何使用 RxJs5 将其转换为 RxJs Observable 流?

我看到 RxJs-Node 有一个 fromReadableStream 方法,但是好像快一年没更新了。

RxJs-Node 实现基于 RxJs4,但无需太多工作即可移植到 RxJs5 https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52

以下内容适用于 v4 和 v5(免责声明 未经测试):

fromStream: function (stream, finishEventName, dataEventName) {
    stream.pause();

    finishEventName || (finishEventName = 'end');
    dataEventName || (dataEventName = 'data');

    return Observable.create(function (observer) {

      // This is the "next" event
      const data$ = Observable.fromEvent(stream, dataEventName);

      // Map this into an error event
      const error$ = Observable.fromEvent(stream, 'error')
        .flatMap(err => Observable.throw(err));

      // Shut down the stream
      const complete$ = Observable.fromEvent(stream, finishEventName);

      // Put it all together and subscribe
      const sub = data$
        .merge(error$)
        .takeUntil(complete$)
        .subscribe(observer);

      // Start the underlying node stream
      stream.resume();

      // Return a handle to destroy the stream
      return sub;
    })

    // Avoid recreating the stream on duplicate subscriptions
    .share();
  },

对于寻找此内容的任何人,根据 Mark 的建议,我为 rxjs5 改编了 rx-node fromStream 实现。

import { Observable } from 'rxjs';

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
  stream.pause();

  return new Observable((observer) => {
    function dataHandler(data) {
      observer.next(data);
    }

    function errorHandler(err) {
      observer.error(err);
    }

    function endHandler() {
      observer.complete();
    }

    stream.addListener(dataEventName, dataHandler);
    stream.addListener('error', errorHandler);
    stream.addListener(finishEventName, endHandler);

    stream.resume();

    return () => {
      stream.removeListener(dataEventName, dataHandler);
      stream.removeListener('error', errorHandler);
      stream.removeListener(finishEventName, endHandler);
    };
  }).share();
}

请注意,它本质上破坏了流的所有背压功能。 Observables 是一种推送技术。所有输入块都将被读取并尽快推送给观察者。根据您的情况,这可能不是最佳解决方案。

上面的答案会起作用,但不支持背压。如果您尝试使用 createReadStream 读取大文件,它们将读取内存中的整个文件。

这是我的背压支持实现: rxjs-stream

因为 Node v11.14.0 流支持 for await https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator

这意味着您可以将 stream 传递给 from() 运算符。

Under hood rxjs(v7.x.x) 将调用 fromAsyncIterable() 将 return Observable