如何在 NodeJS 12 Lambda 函数中使用 async/await 将 S3 JSON 文件流式读取到 postgreSQL?

How to stream read an S3 JSON file to postgreSQL using async/await in a NodeJS 12 Lambda function?

我没有意识到这么简单的任务会有多危险。 我们正在尝试流式读取存储在 S3 中的 JSON 文件——我认为我们已经完成了这部分工作。 我们的 .on('data') 回调被调用,但 Node 挑选并选择它想要 运行 的位 - 似乎是随机的。

我们设置了一个流reader。

stream.on('data', async x => { 
  await saveToDb(x);  // This doesn't await.  It processes saveToDb up until it awaits.
});

有时 db 调用会到达 db -- 但大多数时候不会。 我得出的结论是 EventEmitter 在 async/await 事件处理程序方面存在问题。 只要您的代码是同步的,它似乎就会与您的异步方法一起运行。但是,在您等待的时候,它会随机决定是否实际执行或不执行。

它流式传输各种块,我们可以 console.log 将它们导出并查看数据。但是,一旦我们尝试发起 await/async 呼叫,我们就不再看到可靠的消息。

我运行在 AWS Lambda 中使用它,我被告知有特殊注意事项,因为显然它们在某些情况下会停止处理?

我尝试在 IFFY 中围绕 await 调用,但这也没有用。

我错过了什么? 有没有办法告诉 JavaScript--“好吧,我需要你同步地 运行 这个异步任务。我是认真的——也不要去触发任何更多的事件通知。只是坐在这里等着。"?

TL;DR:

  • 使用异步迭代器从流管道的末尾拉取!
  • 不要在任何流代码中使用异步函数!

详情:

关于 async/await 和流的生命之谜似乎包含在 Async Iterators 中!

简而言之,我将一些流通过管道连接在一起,最后,我创建了一个异步迭代器来从末尾提取内容,以便我可以异步调用 db。 ChunkStream 为我做的唯一一件事是排队最多 1,000 个来调用数据库,而不是为每个项目调用。我是队列的新手,所以可能已经有更好的方法了。

// ...
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const JSONbigint = require('json-bigint');
JSON.parse = JSONbigint.parse; // Let there be proper bigint handling!
JSON.stringify = JSONbigint.stringify;
const stream = require('stream');
const JSONStream = require('JSONStream');

exports.handler = async (event, context) => {
    // ...
    let bucket, key;
    try {
        bucket = event.Records[0].s3.bucket.name;
        key = event.Records[0].s3.object.key;
        console.log(`Fetching S3 file: Bucket: ${bucket}, Key: ${key}`);
        const parser = JSONStream.parse('*'); // Converts file to JSON objects
        let chunkStream = new ChunkStream(1000); // Give the db a chunk of work instead of one item at a time
        let endStream = s3.getObject({ Bucket: bucket, Key: key }).createReadStream().pipe(parser).pipe(chunkStream);
        
        let totalProcessed = 0;
        async function processChunk(chunk) {
            let chunkString = JSON.stringify(chunk);
            console.log(`Upserting ${chunk.length} items (starting with index ${totalProcessed}) items to the db.`);
            await updateDb(chunkString, pool, 1000); // updateDb and pool are part of missing code
            totalProcessed += chunk.length;
        }
        
        // Async iterator
        for await (const batch of endStream) {
            // console.log(`Processing batch (${batch.length})`, batch);
            await processChunk(batch);
        }
    } catch (ex) {
        context.fail("stream S3 file failed");
        throw ex;
    }
};

class ChunkStream extends stream.Transform {
    constructor(maxItems, options = {}) {
        options.objectMode = true;
        super(options);
        this.maxItems = maxItems;
        this.batch = [];
    }
    _transform(item, enc, cb) {
        this.batch.push(item);
        if (this.batch.length >= this.maxItems) {
            // console.log(`ChunkStream: Chunk ready (${this.batch.length} items)`);
            this.push(this.batch);
            // console.log('_transform - Restarting the batch');
            this.batch = [];
        }
        cb();
    }
    _flush(cb) {
        // console.log(`ChunkStream: Flushing stream (${this.batch.length} items)`);
        if (this.batch.length > 0) {
            this.push(this.batch);
            this.batch = [];
        }
        cb();
    }
}