DynamoDb put 在 lambda 测试或 cron 中不 write/save 到 table 但在无服务器中工作

DynamoDb put does not write/save to table in lambda test or cron but works in Serverless

我正在抓取一堆 API 并将数据保存到 dynamodb table。

当 运行 在本地 serverless invoke local -f runAggregator 时一切正常。

但是,在我设置 cron 之后,我注意到东西没有保存到 Dynamodb table。

这是我的函数:

module.exports.runAggregator = async (event) => {
  await runModules({ saveJobs: true });

  return {
    statusCode: 200,
    body: JSON.stringify(
      {
        message: "Aggregate",
        input: event,
      },
      null,
      2
    ),
  };
};

runModules函数:

module.exports = async ({ saveJobs }) => {
  if (saveJobs) {
    const flushDb = await flushDynamoDbTable();
    console.log("Flushing Database: Complete");
    console.log(flushDb);
  }

  // pseudo code
  const allJobs = myLongArrayOfJobsFromApis

  const goodJobs = allJobs.filter((job) => {
    if (job.category) {
      if (!job.category.includes("Missing map for")) return job;
    }
  });

  // This runs absolutely fine locally...
  if (saveJobs) goodJobs.forEach(saveJob); // see below for function

  const badJobs = allJobs.filter((job) => {
    if (!job.category) return job; // no role found from API
    if (job.category.includes("Missing map for")) return job;
  });

  console.log("Total Jobs", allJobs.length);
  console.log("Good Jobs", goodJobs.length);
  console.log("Malformed Jobs", badJobs.length);

  return uniqBy(badJobs, "category");
};

saveJob 函数

// saveJob.js
module.exports = (job) => {
  validateJob(job);

  dynamoDb
    .put({
      TableName: tableName,
      Item: job,
    })
    .promise();
};

我很困惑为什么这在本地工作正常,而不是当我在 lambda 控制台中 运行 a 'test' 时。我只是因为 table 在 cron 有 运行.

之后才发现

saveJob 执行异步操作 (ddb.put().promise()) 但您既不等待其完成也不返回承诺。

由于 runModules 函数中的 forEach 也不会等待任何东西,该函数甚至在调用 dynamodb 之前完成(因为 promises 与同步代码的工作方式)并且进程在lambda 的执行。

在本地你不是 运行 lambda,而是看起来像它的东西。存在细微差别,功能完成后发生的事情就是其中之一。所以它可能在本地工作,但它不会在实际的 lambda 上工作。

您需要做的是确保等待您对 dynamodb 的调用。类似于:

// saveJob.js
module.exports = (job) => {
  validateJob(job);

  return dynamoDb
    .put({
      TableName: tableName,
      Item: job,
    })
    .promise();
};

在你的主要功能中:

...
if (saveJobs) await Promise.all(...goodJobs.map(job => saveJob(job)))

// or with a Promise lib such as bluebird:
if (saveJobs) await Promise.map(goodJobs, job => saveJob(job))
// (or Promise.each(...) if you need to make sure this happens in sequence and not in parallel)

注意:您 could/should 调用一次(或至少更少次)batchWriteItem 操作,而不是多次调用 dynamodb.put,一次最多可以写入 25 个项目呼叫,在此过程中节省了相当多的延迟。