stream.Transform.from 的用法

Usage of stream.Transform.from

我想在没有任何额外库的情况下快速声明一个转换流。通过 stream.Transform.from 将异步生成器转换为转换流看起来是个不错的选择。

someReadable.pipe(
        stream.Transform.from(async function* (source, writable) {
          for await (const chunk of source) {
            yield JSON.stringify(chunk, null, 2) + "\n\n";
          }
        })
      )

为什么以上不起作用?

TypeScript 抛出:

Error:(8, 9) TS2345: Argument of type 'Readable' is not assignable to parameter of type 'WritableStream'.
  Type 'Readable' is missing the following properties from type 'WritableStream': writable, write, end
Error:(8, 31) TS2345: Argument of type '(source: any, writable: any) => AsyncGenerator<string, void, unknown>' is not assignable to parameter of type 'Iterable<any> | AsyncIterable<any>'.
  Property '[Symbol.asyncIterator]' is missing in type '(source: any, writable: any) => AsyncGenerator<string, void, unknown>' but required in type 'AsyncIterable<any>'.
Error:(14, 8) TS2339: Property 'pipe' does not exist on type 'WritableStream'.

read 函数实际上并非来自 Transform class。 TransformDuplex 的 child class,它是 ReadableWritable 的 child。 stream 模块中唯一的 .from 函数是 Readable.from 函数,所以你实际上是在调用它。

你可以自己验证一下:

$ node
Welcome to Node.js v12.14.0.
Type ".help" for more information.

> const stream = require('stream')
> stream.Readable.from === stream.Transform.from
true

不幸的是,stream 模块似乎没有 Transform.fromWritable.from

来自 https://nodejs.org/api/stream.html#stream_types_of_streams:

Additionally, this module includes the utility functions stream.pipeline(), stream.finished() and stream.Readable.from().

通过新的实验性 stream.compose API,您可以将异步迭代器、生成器和函数转换为流。来自 docs 的示例:

import { compose } from 'stream';
import { finished } from 'stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
  yield 'Hello';
  yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
  for await (const chunk of source) {
    res += chunk;
  }
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD'

Source