从 node.js 流管道产生的对象中创建 RxJS Observable 的最佳方法是什么?

What's the best way to create a RxJS Observable out of objects that are yielded from a node.js stream pipeline?

我能够实现这个,但我无法解释为什么如果我没有另一个异步函数 (processAfterDeclaration) 总是试图从可观察,recordObservable.

设置为运行源文件

npm init -y
npm i rxjs@7.4.0 byline

执行我想要的操作的源文件,但方式令人困惑

// node.js 14
const fs = require('fs');
const pipeline = require('util').promisify(require('stream').pipeline);

const byline = require('byline');
const { Observable } = require('rxjs');
const { take } = require('rxjs/operators');

const sleep = ms => new Promise(r => setTimeout(r, ms));
let recordObservable;

(async () => {
  const inputFilePath = 'temp.csv';

  try {
    const data = 'a,b,c\n' +
      '1,2,3\n' +
      '10,20,30\n' +
      '100,200,300';

    fs.writeFileSync(inputFilePath, data);

    console.log('starting pipeline');
    // remove this line, and the `await pipeline` resolves, but process exits early?
    processAfterDeclaration().catch(console.error);
    
    await pipeline(
      fs.createReadStream(inputFilePath),
      byline.createStream(),
      async function* (sourceStream) {
        console.log('making observable', inputFilePath);

        recordObservable = new Observable(async subscriber => {
          for await (const lineBuffer of sourceStream) {
            subscriber.next(lineBuffer.toString());
          }
          subscriber.complete();
        });

        console.log('made observable', recordObservable);
      }
    );

    console.log('pipeline done', recordObservable);
  } catch (error) {
    console.error(error);
  } finally {
    fs.unlinkSync(inputFilePath);
  }
})();

async function processAfterDeclaration() {
  while (!recordObservable) {
    await sleep(100);
  }

  console.log('can process');
  await recordObservable
    .pipe(take(2))
    .subscribe(console.log)
}

编辑:最好放弃 node.js stream.pipeline。我认为使用管道是最好的,因为它应该是最有效的并且提供背压,但我想测试 RxJS 提供的一些东西。

edit2:能够放弃 stream.pipeline 的更多原因是我仍然可以使用 pipe 方法并将任何可读流作为参数提供给 from 函数。然后我可以使用 subscribe 方法来 write/append 从可观察到我的输出流的每一件事,然后在我的订阅上调用 add 来添加拆卸逻辑,特别是用于关闭我的写入流。我希望 RxJS from 能够帮助确定何时关闭作为输入给出的读取流。最后,我可能会推荐 await lastValueFrom(myObservable)firstValueFrom

RxJS from 运算符

RxJS from 运算符会将异步迭代器(如节点流)转换为可观察对象!

我不能 run/test 你的代码,但这个大概的东西应该可以工作。

const fs = require('fs');

const byline = require('byline');
const { from } = require('rxjs');
const { map, take, finalize} = require('rxjs/operators');

const inputFilePath = 'temp.csv';

(async () => {
  const data = 'a,b,c\n' +
    '1,2,3\n' +
    '10,20,30\n' +
    '100,200,300';

  fs.writeFileSync(inputFilePath, data);

  console.log('starting pipeline');

  from(byline(fs.createReadStream(inputFilePath)))
    .pipe(
      map(lineBuffer => lineBuffer.toString()),
      take(2),
      finalize(() => fs.unlinkSync(inputFilePath))
    )
    .subscribe(console.log);
})();

你的第二个异步函数

I'm not able to explain why the process will exit with code 0 if I don't have another async function (processAfterDeclaration) always trying to pull from the Observable

如果您定义了一个函数并且从不调用它,那么该函数将永远不会计算任何东西。

如果您定义了一个可观察对象但从不订阅它,那么该可观察对象也永远不会做任何事情。这与在定义它们的那一刻开始的承诺不同。你只需要订阅那个 observable,它不需要一个单独的函数。

这应该是一样的:

recordObservable = new Observable(async subscriber => {
  for await (const lineBuffer of sourceStream) {
    subscriber.next(lineBuffer.toString());
  }
  subscriber.complete();
});

recordObservable.pipe(
  take(2)
).subscribe(console.log)

第二个异步函数

I'm not able to explain why the process will exit with code 0 if I don't have another async function (processAfterDeclaration) always trying to pull from the Observable

逻辑错误是 await pipeline 永远不会解决或拒绝,因为管道中的第 3 步永远不会产生任何东西,因为没有任何东西会从 recordObservable 订阅和拉取。是不小心写的死锁