Javascript (ES6) 可迭代流
Javascript (ES6) iterable stream
是否有使用 ES6 生成器使流可迭代的模式?
见下文 'MakeStreamIterable'。
import {createReadStream} from 'fs'
let fileName = 'largeFile.txt'
let readStream = createReadStream(fileName, {
encoding: 'utf8',
bufferSize: 1024
})
let myIterableAsyncStream = MakeStreamIterable(readStream)
for (let data of myIterableAsyncStream) {
let str = data.toString('utf8')
console.log(str)
}
我对 co 或 bluebird 的 coroutine 或使用 deasync 阻塞不感兴趣。
黄金是MakeStreamIterable应该是一个有效的函数。
Is there a pattern for making a stream iterable using ES6 generators?
不,这无法实现,因为发电机是同步的。他们必须知道他们在什么时候产出什么。异步数据源的迭代目前只能通过使用某种基于回调的实现来实现。因此,如果您的意思是 'a valid function whose result can be given to a for-of
loop'.
,则无法使 MakeStreamIterable
'a valid function'
流是异步的
流表示在可能无限的时间内异步接收的可能无限量的数据。如果我们看一下 definition of an iterator on MDN,我们可以更详细地定义它是什么使它成为 'uniterable':
的流
An object is an iterator when it knows how to access items from a collection one at a time, while keeping track of its current position within that sequence. In JavaScript an iterator is an object that provides a next() method which returns the next item in the sequence. This method returns an object with two properties: done
and value
.
(重点是我自己。)
让我们从这个定义中挑选出可迭代对象的属性。对象必须...
- 了解如何一次访问一个集合中的项目;
- 能够跟踪其在数据序列中的当前位置;
- 并提供一种方法
next
,该方法检索具有 属性 的对象,该对象包含序列中的下一个 value
或通知迭代是 done
。
流不符合上述任何条件,因为...
- 当接收数据并且无法'look into the future'找到下一个值时,它不受控制;
- 它无法知道何时或是否已收到所有数据,只有在流关闭时才知道;
- 并且它没有实现 iterable protocol,因此不会公开
for-of
可以使用的 next
方法。
______
假装(迭代)
我们不能实际上迭代从流接收的数据(绝对不使用for-of
),但是我们可以构建一个接口假装通过使用承诺(耶!)并在闭包中抽象出流的事件处理程序。
// MakeStreamIterable.js
export default function MakeStreamIterable (stream) {
let collection = []
let index = 0
let callback
let resolve, reject
stream
.on('error', err => reject && reject(err))
.on('end', () => resolve && resolve(collection))
.on('data', data => {
collection.push(data)
try {
callback && callback(data, index++)
} catch (err) {
this.end()
reject(err)
}
})
function each (cb) {
if(callback) {
return promise
}
callback = (typeof cb === 'function') ? cb : null
if (callback && !!collection) {
collection.forEach(callback)
index = collection.length
}
return promise
}
promise = new Promise((res, rej) => {
resolve = res
reject = rej
})
promise.each = each
return promise
}
我们可以这样使用它:
import {MakeStreamIterable} from './MakeStreamIterable'
let myIterableAsyncStream = MakeStreamIterable(readStream)
myIterableAsyncStream
.each((data, i) => {
let str = data.toString('utf8')
console.log(i, str)
})
.then(() => console.log('completed'))
.catch(err => console.log(err))
关于此实现的注意事项:
- 没有必要在 'iterable stream' 上立即调用
each
。
- 当调用
each
时,在其调用之前收到的所有值都将逐一传递给回调 forEach
样式。之后所有后续数据都会立即传递给回调。
- 函数 returns 一个 Promise,它在流结束时解析完整的
collection
数据,这意味着我们实际上根本不必调用 each
如果方法each
提供的迭代并不令人满意。
- 我培养了将其称为迭代器的错误语义,因此我是一个糟糕的人。请向有关部门举报我。
很快您就可以使用 Async Iterators and Generators。在节点 9.8 中,您可以通过 运行 --harmony
命令行选项使用它。
async function* streamAsyncIterator(stream) {
// Get a lock on the stream
const reader = stream.getReader();
try {
while (true) {
// Read from the stream
const {done, value} = await reader.read();
// Exit if we're done
if (done) return;
// Else yield the chunk
yield value;
}
}
finally {
reader.releaseLock();
}
}
async function example() {
const response = await fetch(url);
for await (const chunk of streamAsyncIterator(response.body)) {
// …
}
}
感谢 Jake Archibald 提供上述 examples。
2020 年更新:
看起来流在未来将是“本地”可迭代的——只需要等待浏览器来实现它:
- https://streams.spec.whatwg.org/#rs-asynciterator
- https://github.com/whatwg/streams/issues/778
- https://bugs.chromium.org/p/chromium/issues/detail?id=929585
- https://bugzilla.mozilla.org/show_bug.cgi?id=1525852
- https://bugs.webkit.org/show_bug.cgi?id=194379
for await (const chunk of stream) {
...
}
是否有使用 ES6 生成器使流可迭代的模式?
见下文 'MakeStreamIterable'。
import {createReadStream} from 'fs'
let fileName = 'largeFile.txt'
let readStream = createReadStream(fileName, {
encoding: 'utf8',
bufferSize: 1024
})
let myIterableAsyncStream = MakeStreamIterable(readStream)
for (let data of myIterableAsyncStream) {
let str = data.toString('utf8')
console.log(str)
}
我对 co 或 bluebird 的 coroutine 或使用 deasync 阻塞不感兴趣。
黄金是MakeStreamIterable应该是一个有效的函数。
Is there a pattern for making a stream iterable using ES6 generators?
不,这无法实现,因为发电机是同步的。他们必须知道他们在什么时候产出什么。异步数据源的迭代目前只能通过使用某种基于回调的实现来实现。因此,如果您的意思是 'a valid function whose result can be given to a for-of
loop'.
MakeStreamIterable
'a valid function'
流是异步的
流表示在可能无限的时间内异步接收的可能无限量的数据。如果我们看一下 definition of an iterator on MDN,我们可以更详细地定义它是什么使它成为 'uniterable':
的流An object is an iterator when it knows how to access items from a collection one at a time, while keeping track of its current position within that sequence. In JavaScript an iterator is an object that provides a next() method which returns the next item in the sequence. This method returns an object with two properties:
done
andvalue
.
(重点是我自己。)
让我们从这个定义中挑选出可迭代对象的属性。对象必须...
- 了解如何一次访问一个集合中的项目;
- 能够跟踪其在数据序列中的当前位置;
- 并提供一种方法
next
,该方法检索具有 属性 的对象,该对象包含序列中的下一个value
或通知迭代是done
。
流不符合上述任何条件,因为...
- 当接收数据并且无法'look into the future'找到下一个值时,它不受控制;
- 它无法知道何时或是否已收到所有数据,只有在流关闭时才知道;
- 并且它没有实现 iterable protocol,因此不会公开
for-of
可以使用的next
方法。
______
假装(迭代)
我们不能实际上迭代从流接收的数据(绝对不使用for-of
),但是我们可以构建一个接口假装通过使用承诺(耶!)并在闭包中抽象出流的事件处理程序。
// MakeStreamIterable.js
export default function MakeStreamIterable (stream) {
let collection = []
let index = 0
let callback
let resolve, reject
stream
.on('error', err => reject && reject(err))
.on('end', () => resolve && resolve(collection))
.on('data', data => {
collection.push(data)
try {
callback && callback(data, index++)
} catch (err) {
this.end()
reject(err)
}
})
function each (cb) {
if(callback) {
return promise
}
callback = (typeof cb === 'function') ? cb : null
if (callback && !!collection) {
collection.forEach(callback)
index = collection.length
}
return promise
}
promise = new Promise((res, rej) => {
resolve = res
reject = rej
})
promise.each = each
return promise
}
我们可以这样使用它:
import {MakeStreamIterable} from './MakeStreamIterable'
let myIterableAsyncStream = MakeStreamIterable(readStream)
myIterableAsyncStream
.each((data, i) => {
let str = data.toString('utf8')
console.log(i, str)
})
.then(() => console.log('completed'))
.catch(err => console.log(err))
关于此实现的注意事项:
- 没有必要在 'iterable stream' 上立即调用
each
。 - 当调用
each
时,在其调用之前收到的所有值都将逐一传递给回调forEach
样式。之后所有后续数据都会立即传递给回调。 - 函数 returns 一个 Promise,它在流结束时解析完整的
collection
数据,这意味着我们实际上根本不必调用each
如果方法each
提供的迭代并不令人满意。 - 我培养了将其称为迭代器的错误语义,因此我是一个糟糕的人。请向有关部门举报我。
很快您就可以使用 Async Iterators and Generators。在节点 9.8 中,您可以通过 运行 --harmony
命令行选项使用它。
async function* streamAsyncIterator(stream) {
// Get a lock on the stream
const reader = stream.getReader();
try {
while (true) {
// Read from the stream
const {done, value} = await reader.read();
// Exit if we're done
if (done) return;
// Else yield the chunk
yield value;
}
}
finally {
reader.releaseLock();
}
}
async function example() {
const response = await fetch(url);
for await (const chunk of streamAsyncIterator(response.body)) {
// …
}
}
感谢 Jake Archibald 提供上述 examples。
2020 年更新:
看起来流在未来将是“本地”可迭代的——只需要等待浏览器来实现它:
- https://streams.spec.whatwg.org/#rs-asynciterator
- https://github.com/whatwg/streams/issues/778
- https://bugs.chromium.org/p/chromium/issues/detail?id=929585
- https://bugzilla.mozilla.org/show_bug.cgi?id=1525852
- https://bugs.webkit.org/show_bug.cgi?id=194379
for await (const chunk of stream) {
...
}