如何在 NodeJS 12 Lambda 函数中使用 async/await 将 S3 JSON 文件流式读取到 postgreSQL?
How to stream read an S3 JSON file to postgreSQL using async/await in a NodeJS 12 Lambda function?
我没有意识到这么简单的任务会有多危险。
我们正在尝试流式读取存储在 S3 中的 JSON 文件——我认为我们已经完成了这部分工作。
我们的 .on('data')
回调被调用,但 Node 挑选并选择它想要 运行 的位 - 似乎是随机的。
我们设置了一个流reader。
stream.on('data', async x => {
await saveToDb(x); // This doesn't await. It processes saveToDb up until it awaits.
});
有时 db 调用会到达 db -- 但大多数时候不会。
我得出的结论是 EventEmitter 在 async/await 事件处理程序方面存在问题。
只要您的代码是同步的,它似乎就会与您的异步方法一起运行。但是,在您等待的时候,它会随机决定是否实际执行或不执行。
它流式传输各种块,我们可以 console.log
将它们导出并查看数据。但是,一旦我们尝试发起 await/async 呼叫,我们就不再看到可靠的消息。
我运行在 AWS Lambda 中使用它,我被告知有特殊注意事项,因为显然它们在某些情况下会停止处理?
我尝试在 IFFY 中围绕 await 调用,但这也没有用。
我错过了什么?
有没有办法告诉 JavaScript--“好吧,我需要你同步地 运行 这个异步任务。我是认真的——也不要去触发任何更多的事件通知。只是坐在这里等着。"?
TL;DR:
- 使用异步迭代器从流管道的末尾拉取!
- 不要在任何流代码中使用异步函数!
详情:
关于 async/await
和流的生命之谜似乎包含在 Async Iterators
中!
简而言之,我将一些流通过管道连接在一起,最后,我创建了一个异步迭代器来从末尾提取内容,以便我可以异步调用 db。
ChunkStream 为我做的唯一一件事是排队最多 1,000 个来调用数据库,而不是为每个项目调用。我是队列的新手,所以可能已经有更好的方法了。
// ...
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const JSONbigint = require('json-bigint');
JSON.parse = JSONbigint.parse; // Let there be proper bigint handling!
JSON.stringify = JSONbigint.stringify;
const stream = require('stream');
const JSONStream = require('JSONStream');
exports.handler = async (event, context) => {
// ...
let bucket, key;
try {
bucket = event.Records[0].s3.bucket.name;
key = event.Records[0].s3.object.key;
console.log(`Fetching S3 file: Bucket: ${bucket}, Key: ${key}`);
const parser = JSONStream.parse('*'); // Converts file to JSON objects
let chunkStream = new ChunkStream(1000); // Give the db a chunk of work instead of one item at a time
let endStream = s3.getObject({ Bucket: bucket, Key: key }).createReadStream().pipe(parser).pipe(chunkStream);
let totalProcessed = 0;
async function processChunk(chunk) {
let chunkString = JSON.stringify(chunk);
console.log(`Upserting ${chunk.length} items (starting with index ${totalProcessed}) items to the db.`);
await updateDb(chunkString, pool, 1000); // updateDb and pool are part of missing code
totalProcessed += chunk.length;
}
// Async iterator
for await (const batch of endStream) {
// console.log(`Processing batch (${batch.length})`, batch);
await processChunk(batch);
}
} catch (ex) {
context.fail("stream S3 file failed");
throw ex;
}
};
class ChunkStream extends stream.Transform {
constructor(maxItems, options = {}) {
options.objectMode = true;
super(options);
this.maxItems = maxItems;
this.batch = [];
}
_transform(item, enc, cb) {
this.batch.push(item);
if (this.batch.length >= this.maxItems) {
// console.log(`ChunkStream: Chunk ready (${this.batch.length} items)`);
this.push(this.batch);
// console.log('_transform - Restarting the batch');
this.batch = [];
}
cb();
}
_flush(cb) {
// console.log(`ChunkStream: Flushing stream (${this.batch.length} items)`);
if (this.batch.length > 0) {
this.push(this.batch);
this.batch = [];
}
cb();
}
}
我没有意识到这么简单的任务会有多危险。
我们正在尝试流式读取存储在 S3 中的 JSON 文件——我认为我们已经完成了这部分工作。
我们的 .on('data')
回调被调用,但 Node 挑选并选择它想要 运行 的位 - 似乎是随机的。
我们设置了一个流reader。
stream.on('data', async x => {
await saveToDb(x); // This doesn't await. It processes saveToDb up until it awaits.
});
有时 db 调用会到达 db -- 但大多数时候不会。 我得出的结论是 EventEmitter 在 async/await 事件处理程序方面存在问题。 只要您的代码是同步的,它似乎就会与您的异步方法一起运行。但是,在您等待的时候,它会随机决定是否实际执行或不执行。
它流式传输各种块,我们可以 console.log
将它们导出并查看数据。但是,一旦我们尝试发起 await/async 呼叫,我们就不再看到可靠的消息。
我运行在 AWS Lambda 中使用它,我被告知有特殊注意事项,因为显然它们在某些情况下会停止处理?
我尝试在 IFFY 中围绕 await 调用,但这也没有用。
我错过了什么? 有没有办法告诉 JavaScript--“好吧,我需要你同步地 运行 这个异步任务。我是认真的——也不要去触发任何更多的事件通知。只是坐在这里等着。"?
TL;DR:
- 使用异步迭代器从流管道的末尾拉取!
- 不要在任何流代码中使用异步函数!
详情:
关于 async/await
和流的生命之谜似乎包含在 Async Iterators
中!
简而言之,我将一些流通过管道连接在一起,最后,我创建了一个异步迭代器来从末尾提取内容,以便我可以异步调用 db。 ChunkStream 为我做的唯一一件事是排队最多 1,000 个来调用数据库,而不是为每个项目调用。我是队列的新手,所以可能已经有更好的方法了。
// ...
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const JSONbigint = require('json-bigint');
JSON.parse = JSONbigint.parse; // Let there be proper bigint handling!
JSON.stringify = JSONbigint.stringify;
const stream = require('stream');
const JSONStream = require('JSONStream');
exports.handler = async (event, context) => {
// ...
let bucket, key;
try {
bucket = event.Records[0].s3.bucket.name;
key = event.Records[0].s3.object.key;
console.log(`Fetching S3 file: Bucket: ${bucket}, Key: ${key}`);
const parser = JSONStream.parse('*'); // Converts file to JSON objects
let chunkStream = new ChunkStream(1000); // Give the db a chunk of work instead of one item at a time
let endStream = s3.getObject({ Bucket: bucket, Key: key }).createReadStream().pipe(parser).pipe(chunkStream);
let totalProcessed = 0;
async function processChunk(chunk) {
let chunkString = JSON.stringify(chunk);
console.log(`Upserting ${chunk.length} items (starting with index ${totalProcessed}) items to the db.`);
await updateDb(chunkString, pool, 1000); // updateDb and pool are part of missing code
totalProcessed += chunk.length;
}
// Async iterator
for await (const batch of endStream) {
// console.log(`Processing batch (${batch.length})`, batch);
await processChunk(batch);
}
} catch (ex) {
context.fail("stream S3 file failed");
throw ex;
}
};
class ChunkStream extends stream.Transform {
constructor(maxItems, options = {}) {
options.objectMode = true;
super(options);
this.maxItems = maxItems;
this.batch = [];
}
_transform(item, enc, cb) {
this.batch.push(item);
if (this.batch.length >= this.maxItems) {
// console.log(`ChunkStream: Chunk ready (${this.batch.length} items)`);
this.push(this.batch);
// console.log('_transform - Restarting the batch');
this.batch = [];
}
cb();
}
_flush(cb) {
// console.log(`ChunkStream: Flushing stream (${this.batch.length} items)`);
if (this.batch.length > 0) {
this.push(this.batch);
this.batch = [];
}
cb();
}
}