DynamoDb put 在 lambda 测试或 cron 中不 write/save 到 table 但在无服务器中工作
DynamoDb put does not write/save to table in lambda test or cron but works in Serverless
我正在抓取一堆 API 并将数据保存到 dynamodb table。
当 运行 在本地 serverless invoke local -f runAggregator
时一切正常。
但是,在我设置 cron 之后,我注意到东西没有保存到 Dynamodb table。
这是我的函数:
module.exports.runAggregator = async (event) => {
await runModules({ saveJobs: true });
return {
statusCode: 200,
body: JSON.stringify(
{
message: "Aggregate",
input: event,
},
null,
2
),
};
};
和runModules
函数:
module.exports = async ({ saveJobs }) => {
if (saveJobs) {
const flushDb = await flushDynamoDbTable();
console.log("Flushing Database: Complete");
console.log(flushDb);
}
// pseudo code
const allJobs = myLongArrayOfJobsFromApis
const goodJobs = allJobs.filter((job) => {
if (job.category) {
if (!job.category.includes("Missing map for")) return job;
}
});
// This runs absolutely fine locally...
if (saveJobs) goodJobs.forEach(saveJob); // see below for function
const badJobs = allJobs.filter((job) => {
if (!job.category) return job; // no role found from API
if (job.category.includes("Missing map for")) return job;
});
console.log("Total Jobs", allJobs.length);
console.log("Good Jobs", goodJobs.length);
console.log("Malformed Jobs", badJobs.length);
return uniqBy(badJobs, "category");
};
saveJob 函数
// saveJob.js
module.exports = (job) => {
validateJob(job);
dynamoDb
.put({
TableName: tableName,
Item: job,
})
.promise();
};
我很困惑为什么这在本地工作正常,而不是当我在 lambda 控制台中 运行 a 'test' 时。我只是因为 table 在 cron 有 运行.
之后才发现
saveJob
执行异步操作 (ddb.put().promise()
) 但您既不等待其完成也不返回承诺。
由于 runModules
函数中的 forEach 也不会等待任何东西,该函数甚至在调用 dynamodb 之前完成(因为 promises 与同步代码的工作方式)并且进程在lambda 的执行。
在本地你不是 运行 lambda,而是看起来像它的东西。存在细微差别,功能完成后发生的事情就是其中之一。所以它可能在本地工作,但它不会在实际的 lambda 上工作。
您需要做的是确保等待您对 dynamodb 的调用。类似于:
// saveJob.js
module.exports = (job) => {
validateJob(job);
return dynamoDb
.put({
TableName: tableName,
Item: job,
})
.promise();
};
在你的主要功能中:
...
if (saveJobs) await Promise.all(...goodJobs.map(job => saveJob(job)))
// or with a Promise lib such as bluebird:
if (saveJobs) await Promise.map(goodJobs, job => saveJob(job))
// (or Promise.each(...) if you need to make sure this happens in sequence and not in parallel)
注意:您 could/should 调用一次(或至少更少次)batchWriteItem 操作,而不是多次调用 dynamodb.put,一次最多可以写入 25 个项目呼叫,在此过程中节省了相当多的延迟。
我正在抓取一堆 API 并将数据保存到 dynamodb table。
当 运行 在本地 serverless invoke local -f runAggregator
时一切正常。
但是,在我设置 cron 之后,我注意到东西没有保存到 Dynamodb table。
这是我的函数:
module.exports.runAggregator = async (event) => {
await runModules({ saveJobs: true });
return {
statusCode: 200,
body: JSON.stringify(
{
message: "Aggregate",
input: event,
},
null,
2
),
};
};
和runModules
函数:
module.exports = async ({ saveJobs }) => {
if (saveJobs) {
const flushDb = await flushDynamoDbTable();
console.log("Flushing Database: Complete");
console.log(flushDb);
}
// pseudo code
const allJobs = myLongArrayOfJobsFromApis
const goodJobs = allJobs.filter((job) => {
if (job.category) {
if (!job.category.includes("Missing map for")) return job;
}
});
// This runs absolutely fine locally...
if (saveJobs) goodJobs.forEach(saveJob); // see below for function
const badJobs = allJobs.filter((job) => {
if (!job.category) return job; // no role found from API
if (job.category.includes("Missing map for")) return job;
});
console.log("Total Jobs", allJobs.length);
console.log("Good Jobs", goodJobs.length);
console.log("Malformed Jobs", badJobs.length);
return uniqBy(badJobs, "category");
};
saveJob 函数
// saveJob.js
module.exports = (job) => {
validateJob(job);
dynamoDb
.put({
TableName: tableName,
Item: job,
})
.promise();
};
我很困惑为什么这在本地工作正常,而不是当我在 lambda 控制台中 运行 a 'test' 时。我只是因为 table 在 cron 有 运行.
之后才发现saveJob
执行异步操作 (ddb.put().promise()
) 但您既不等待其完成也不返回承诺。
由于 runModules
函数中的 forEach 也不会等待任何东西,该函数甚至在调用 dynamodb 之前完成(因为 promises 与同步代码的工作方式)并且进程在lambda 的执行。
在本地你不是 运行 lambda,而是看起来像它的东西。存在细微差别,功能完成后发生的事情就是其中之一。所以它可能在本地工作,但它不会在实际的 lambda 上工作。
您需要做的是确保等待您对 dynamodb 的调用。类似于:
// saveJob.js
module.exports = (job) => {
validateJob(job);
return dynamoDb
.put({
TableName: tableName,
Item: job,
})
.promise();
};
在你的主要功能中:
...
if (saveJobs) await Promise.all(...goodJobs.map(job => saveJob(job)))
// or with a Promise lib such as bluebird:
if (saveJobs) await Promise.map(goodJobs, job => saveJob(job))
// (or Promise.each(...) if you need to make sure this happens in sequence and not in parallel)
注意:您 could/should 调用一次(或至少更少次)batchWriteItem 操作,而不是多次调用 dynamodb.put,一次最多可以写入 25 个项目呼叫,在此过程中节省了相当多的延迟。