从 node.js 流管道产生的对象中创建 RxJS Observable 的最佳方法是什么?
What's the best way to create a RxJS Observable out of objects that are yielded from a node.js stream pipeline?
我能够实现这个,但我无法解释为什么如果我没有另一个异步函数 (processAfterDeclaration
) 总是试图从可观察,recordObservable
.
设置为运行源文件
npm init -y
npm i rxjs@7.4.0 byline
执行我想要的操作的源文件,但方式令人困惑
// node.js 14
const fs = require('fs');
const pipeline = require('util').promisify(require('stream').pipeline);
const byline = require('byline');
const { Observable } = require('rxjs');
const { take } = require('rxjs/operators');
const sleep = ms => new Promise(r => setTimeout(r, ms));
let recordObservable;
(async () => {
const inputFilePath = 'temp.csv';
try {
const data = 'a,b,c\n' +
'1,2,3\n' +
'10,20,30\n' +
'100,200,300';
fs.writeFileSync(inputFilePath, data);
console.log('starting pipeline');
// remove this line, and the `await pipeline` resolves, but process exits early?
processAfterDeclaration().catch(console.error);
await pipeline(
fs.createReadStream(inputFilePath),
byline.createStream(),
async function* (sourceStream) {
console.log('making observable', inputFilePath);
recordObservable = new Observable(async subscriber => {
for await (const lineBuffer of sourceStream) {
subscriber.next(lineBuffer.toString());
}
subscriber.complete();
});
console.log('made observable', recordObservable);
}
);
console.log('pipeline done', recordObservable);
} catch (error) {
console.error(error);
} finally {
fs.unlinkSync(inputFilePath);
}
})();
async function processAfterDeclaration() {
while (!recordObservable) {
await sleep(100);
}
console.log('can process');
await recordObservable
.pipe(take(2))
.subscribe(console.log)
}
编辑:最好放弃 node.js stream.pipeline。我认为使用管道是最好的,因为它应该是最有效的并且提供背压,但我想测试 RxJS 提供的一些东西。
edit2:能够放弃 stream.pipeline 的更多原因是我仍然可以使用 pipe
方法并将任何可读流作为参数提供给 from
函数。然后我可以使用 subscribe
方法来 write/append 从可观察到我的输出流的每一件事,然后在我的订阅上调用 add
来添加拆卸逻辑,特别是用于关闭我的写入流。我希望 RxJS from
能够帮助确定何时关闭作为输入给出的读取流。最后,我可能会推荐 await lastValueFrom(myObservable)
或 firstValueFrom
。
RxJS from
运算符
RxJS from
运算符会将异步迭代器(如节点流)转换为可观察对象!
我不能 run/test 你的代码,但这个大概的东西应该可以工作。
const fs = require('fs');
const byline = require('byline');
const { from } = require('rxjs');
const { map, take, finalize} = require('rxjs/operators');
const inputFilePath = 'temp.csv';
(async () => {
const data = 'a,b,c\n' +
'1,2,3\n' +
'10,20,30\n' +
'100,200,300';
fs.writeFileSync(inputFilePath, data);
console.log('starting pipeline');
from(byline(fs.createReadStream(inputFilePath)))
.pipe(
map(lineBuffer => lineBuffer.toString()),
take(2),
finalize(() => fs.unlinkSync(inputFilePath))
)
.subscribe(console.log);
})();
你的第二个异步函数
I'm not able to explain why the process will exit with code 0 if I don't have another async function (processAfterDeclaration) always trying to pull from the Observable
如果您定义了一个函数并且从不调用它,那么该函数将永远不会计算任何东西。
如果您定义了一个可观察对象但从不订阅它,那么该可观察对象也永远不会做任何事情。这与在定义它们的那一刻开始的承诺不同。你只需要订阅那个 observable,它不需要一个单独的函数。
这应该是一样的:
recordObservable = new Observable(async subscriber => {
for await (const lineBuffer of sourceStream) {
subscriber.next(lineBuffer.toString());
}
subscriber.complete();
});
recordObservable.pipe(
take(2)
).subscribe(console.log)
第二个异步函数
I'm not able to explain why the process will exit with code 0 if I
don't have another async function (processAfterDeclaration) always
trying to pull from the Observable
逻辑错误是 await pipeline
永远不会解决或拒绝,因为管道中的第 3 步永远不会产生任何东西,因为没有任何东西会从 recordObservable
订阅和拉取。是不小心写的死锁
我能够实现这个,但我无法解释为什么如果我没有另一个异步函数 (processAfterDeclaration
) 总是试图从可观察,recordObservable
.
设置为运行源文件
npm init -y
npm i rxjs@7.4.0 byline
执行我想要的操作的源文件,但方式令人困惑
// node.js 14
const fs = require('fs');
const pipeline = require('util').promisify(require('stream').pipeline);
const byline = require('byline');
const { Observable } = require('rxjs');
const { take } = require('rxjs/operators');
const sleep = ms => new Promise(r => setTimeout(r, ms));
let recordObservable;
(async () => {
const inputFilePath = 'temp.csv';
try {
const data = 'a,b,c\n' +
'1,2,3\n' +
'10,20,30\n' +
'100,200,300';
fs.writeFileSync(inputFilePath, data);
console.log('starting pipeline');
// remove this line, and the `await pipeline` resolves, but process exits early?
processAfterDeclaration().catch(console.error);
await pipeline(
fs.createReadStream(inputFilePath),
byline.createStream(),
async function* (sourceStream) {
console.log('making observable', inputFilePath);
recordObservable = new Observable(async subscriber => {
for await (const lineBuffer of sourceStream) {
subscriber.next(lineBuffer.toString());
}
subscriber.complete();
});
console.log('made observable', recordObservable);
}
);
console.log('pipeline done', recordObservable);
} catch (error) {
console.error(error);
} finally {
fs.unlinkSync(inputFilePath);
}
})();
async function processAfterDeclaration() {
while (!recordObservable) {
await sleep(100);
}
console.log('can process');
await recordObservable
.pipe(take(2))
.subscribe(console.log)
}
编辑:最好放弃 node.js stream.pipeline。我认为使用管道是最好的,因为它应该是最有效的并且提供背压,但我想测试 RxJS 提供的一些东西。
edit2:能够放弃 stream.pipeline 的更多原因是我仍然可以使用 pipe
方法并将任何可读流作为参数提供给 from
函数。然后我可以使用 subscribe
方法来 write/append 从可观察到我的输出流的每一件事,然后在我的订阅上调用 add
来添加拆卸逻辑,特别是用于关闭我的写入流。我希望 RxJS from
能够帮助确定何时关闭作为输入给出的读取流。最后,我可能会推荐 await lastValueFrom(myObservable)
或 firstValueFrom
。
RxJS from
运算符
RxJS from
运算符会将异步迭代器(如节点流)转换为可观察对象!
我不能 run/test 你的代码,但这个大概的东西应该可以工作。
const fs = require('fs');
const byline = require('byline');
const { from } = require('rxjs');
const { map, take, finalize} = require('rxjs/operators');
const inputFilePath = 'temp.csv';
(async () => {
const data = 'a,b,c\n' +
'1,2,3\n' +
'10,20,30\n' +
'100,200,300';
fs.writeFileSync(inputFilePath, data);
console.log('starting pipeline');
from(byline(fs.createReadStream(inputFilePath)))
.pipe(
map(lineBuffer => lineBuffer.toString()),
take(2),
finalize(() => fs.unlinkSync(inputFilePath))
)
.subscribe(console.log);
})();
你的第二个异步函数
I'm not able to explain why the process will exit with code 0 if I don't have another async function (processAfterDeclaration) always trying to pull from the Observable
如果您定义了一个函数并且从不调用它,那么该函数将永远不会计算任何东西。
如果您定义了一个可观察对象但从不订阅它,那么该可观察对象也永远不会做任何事情。这与在定义它们的那一刻开始的承诺不同。你只需要订阅那个 observable,它不需要一个单独的函数。
这应该是一样的:
recordObservable = new Observable(async subscriber => {
for await (const lineBuffer of sourceStream) {
subscriber.next(lineBuffer.toString());
}
subscriber.complete();
});
recordObservable.pipe(
take(2)
).subscribe(console.log)
第二个异步函数
I'm not able to explain why the process will exit with code 0 if I don't have another async function (processAfterDeclaration) always trying to pull from the Observable
逻辑错误是 await pipeline
永远不会解决或拒绝,因为管道中的第 3 步永远不会产生任何东西,因为没有任何东西会从 recordObservable
订阅和拉取。是不小心写的死锁