如何将节点可读流转换为 RX 可观察流
How to convert node readable stream to RX observable
如果我有一个 Node js 流,例如来自 process.stdin
或 fs.createReadStream
,我如何使用 RxJs5 将其转换为 RxJs Observable 流?
我看到 RxJs-Node 有一个 fromReadableStream
方法,但是好像快一年没更新了。
RxJs-Node 实现基于 RxJs4,但无需太多工作即可移植到 RxJs5 https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
以下内容适用于 v4 和 v5(免责声明 未经测试):
fromStream: function (stream, finishEventName, dataEventName) {
stream.pause();
finishEventName || (finishEventName = 'end');
dataEventName || (dataEventName = 'data');
return Observable.create(function (observer) {
// This is the "next" event
const data$ = Observable.fromEvent(stream, dataEventName);
// Map this into an error event
const error$ = Observable.fromEvent(stream, 'error')
.flatMap(err => Observable.throw(err));
// Shut down the stream
const complete$ = Observable.fromEvent(stream, finishEventName);
// Put it all together and subscribe
const sub = data$
.merge(error$)
.takeUntil(complete$)
.subscribe(observer);
// Start the underlying node stream
stream.resume();
// Return a handle to destroy the stream
return sub;
})
// Avoid recreating the stream on duplicate subscriptions
.share();
},
对于寻找此内容的任何人,根据 Mark 的建议,我为 rxjs5 改编了 rx-node fromStream
实现。
import { Observable } from 'rxjs';
// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
stream.pause();
return new Observable((observer) => {
function dataHandler(data) {
observer.next(data);
}
function errorHandler(err) {
observer.error(err);
}
function endHandler() {
observer.complete();
}
stream.addListener(dataEventName, dataHandler);
stream.addListener('error', errorHandler);
stream.addListener(finishEventName, endHandler);
stream.resume();
return () => {
stream.removeListener(dataEventName, dataHandler);
stream.removeListener('error', errorHandler);
stream.removeListener(finishEventName, endHandler);
};
}).share();
}
请注意,它本质上破坏了流的所有背压功能。 Observables 是一种推送技术。所有输入块都将被读取并尽快推送给观察者。根据您的情况,这可能不是最佳解决方案。
上面的答案会起作用,但不支持背压。如果您尝试使用 createReadStream 读取大文件,它们将读取内存中的整个文件。
这是我的背压支持实现:
rxjs-stream
因为 Node v11.14.0 流支持 for await
https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator
这意味着您可以将 stream 传递给 from()
运算符。
Under hood rxjs(v7.x.x) 将调用 fromAsyncIterable()
将 return Observable
如果我有一个 Node js 流,例如来自 process.stdin
或 fs.createReadStream
,我如何使用 RxJs5 将其转换为 RxJs Observable 流?
我看到 RxJs-Node 有一个 fromReadableStream
方法,但是好像快一年没更新了。
RxJs-Node 实现基于 RxJs4,但无需太多工作即可移植到 RxJs5 https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
以下内容适用于 v4 和 v5(免责声明 未经测试):
fromStream: function (stream, finishEventName, dataEventName) {
stream.pause();
finishEventName || (finishEventName = 'end');
dataEventName || (dataEventName = 'data');
return Observable.create(function (observer) {
// This is the "next" event
const data$ = Observable.fromEvent(stream, dataEventName);
// Map this into an error event
const error$ = Observable.fromEvent(stream, 'error')
.flatMap(err => Observable.throw(err));
// Shut down the stream
const complete$ = Observable.fromEvent(stream, finishEventName);
// Put it all together and subscribe
const sub = data$
.merge(error$)
.takeUntil(complete$)
.subscribe(observer);
// Start the underlying node stream
stream.resume();
// Return a handle to destroy the stream
return sub;
})
// Avoid recreating the stream on duplicate subscriptions
.share();
},
对于寻找此内容的任何人,根据 Mark 的建议,我为 rxjs5 改编了 rx-node fromStream
实现。
import { Observable } from 'rxjs';
// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
stream.pause();
return new Observable((observer) => {
function dataHandler(data) {
observer.next(data);
}
function errorHandler(err) {
observer.error(err);
}
function endHandler() {
observer.complete();
}
stream.addListener(dataEventName, dataHandler);
stream.addListener('error', errorHandler);
stream.addListener(finishEventName, endHandler);
stream.resume();
return () => {
stream.removeListener(dataEventName, dataHandler);
stream.removeListener('error', errorHandler);
stream.removeListener(finishEventName, endHandler);
};
}).share();
}
请注意,它本质上破坏了流的所有背压功能。 Observables 是一种推送技术。所有输入块都将被读取并尽快推送给观察者。根据您的情况,这可能不是最佳解决方案。
上面的答案会起作用,但不支持背压。如果您尝试使用 createReadStream 读取大文件,它们将读取内存中的整个文件。
这是我的背压支持实现: rxjs-stream
因为 Node v11.14.0 流支持 for await
https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator
这意味着您可以将 stream 传递给 from()
运算符。
Under hood rxjs(v7.x.x) 将调用 fromAsyncIterable()
将 return Observable