调用多个 AWS Lambdas 不会产生并行进程
Invoking multiple AWS Lambdas doesn't make paralel processes
我正在尝试从另一个 lambda 函数调用多个 lambda 函数(一个 lambda 函数,它将 运行 分离并行进程)。第一个 运行s 作为 cron lambda,它只从 db 查询文档,然后使用文档的参数调用另一个 lambda。这个 cron lambda 运行s 每五分钟一次并正确查询文档。我正在用两个文档测试第二个 lambda。问题是每次调用第二个 lambda 时它只处理一个文档——每次它处理另一个它在上一次调用时没有处理的文档:
例如:
- 文档 1
- 文档 2
首先,调用第二个 lambda -> process doc 1
其次,调用第二个 lambda -> process doc 2
第三,调用第二个lambda -> process doc 1
第四次调用第二个 lambda -> 处理文档 2
等...
第一个 (cron) lambda 代码:
aws.config.update({
region : env.lambdaRegion,
accessKeyId: env.lambdaAccessKeyId,
secretAccessKey: env.lambdaSecretAccessKey,
});
const lambda = new aws.Lambda({
region: env.lambdaRegion,
});
exports.handler = async (event: any, context: any) => {
context.callbackWaitsForEmptyEventLoop = false;
return new Promise(async (resolve, reject) => {
for (let i = 0; i < 100; i++) {
const doc = await mongo.db.collection('docs').
findOneAndUpdate(
{
status: 1,
lambdaProcessing: null,
},
{ $set: { lambdaProcessing: new Date() } },
{
sort: { processedAt: 1 },
returnNewDocument: true,
},
);
if (doc.value && doc.value._id) {
const params = {
FunctionName: env.lambdaName,
InvocationType: 'Event',
Payload: JSON.stringify({ docId: doc.value._id }),
};
lambda.invoke(params);
} else {
if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
break;
}
}
}
resolve();
});
};
第二个 lambda 函数:
exports.handler = async (event: any, ctx: any) => {
ctx.callbackWaitsForEmptyEventLoop = false;
if (event && event.docId) {
const doc = await mongo.db.collection('docs').findById(event.docId);
return await processDoc(doc);
} else {
throw new Error('doc ID is not present.');
}
};
对于 运行 并行的多个 lambda 没有 "ugly" cronjob 解决方案,我建议使用类型为 Parallel
的 AWS 步骤函数。您可以在 serverless.yml
中设置逻辑,函数调用本身就是 lambda 函数。您可以通过 callback
的第二个参数传递数据。如果数据大于 32kb,我建议使用 S3 bucket/database。
例子serverless.yml
stepFunctions:
stateMachines:
test:
name: 'test'
definition:
Comment: "Testing tips-like state structure"
StartAt: GatherData
States:
GatherData:
Type: Parallel
Branches:
-
StartAt: GatherDataA
States:
GatherDataA:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstA"
TimeoutSeconds: 15
End: true
-
StartAt: GatherDataB
States:
GatherDataB:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstB"
TimeoutSeconds: 15
End: true
Next: ResolveData
ResolveData:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-resolveAB"
TimeoutSeconds: 15
End: true
示例处理程序
module.exports.firstA = (event, context, callback) => {
const data = {
id: 3,
somethingElse: ['Hello', 'World'],
};
callback(null, data);
};
module.exports.firstB = (event, context, callback) => {
const data = {
id: 12,
somethingElse: ['olleH', 'dlroW'],
};
callback(null, data);
};
module.exports.resolveAB = (event, context, callback) => {
console.log("resolving data from a and b: ", event);
const [dataFromA, dataFromB] = event;
callback(null, event);
};
更多信息见
关键是为我们要调用的每个 lambda 创建新的单独 aws.Lambda()
实例,然后我们必须解析并等待我们调用的每个 lambda(promieses 数组)。如果调用的 lambda 不需要等待,这是可以的,因此我们不会在 AWS 上浪费处理时间 - 因此调用的 lambda 开始处理,然后在不等待其响应的情况下解析,以便主(cron)lambda 可以解析。
固定的 (cron) lambda 处理程序:
aws.config.update({
region : env.lambdaRegion,
accessKeyId: env.lambdaAccessKeyId,
secretAccessKey: env.lambdaSecretAccessKey,
});
exports.handler = async (event: any, context: any) => {
context.callbackWaitsForEmptyEventLoop = false;
return new Promise(async (resolve, reject) => {
const promises: any = [];
for (let i = 0; i < 100; i++) {
const doc = await global['mongo'].db.collection('docs').
findOneAndUpdate(
{
status: 1,
lambdaProcessing: null,
},
{ $set: { lambdaProcessing: new Date() } },
{
sort: { processedAt: 1 },
returnNewDocument: true,
},
);
if (doc.value && doc.value._id) {
const params = {
FunctionName: env.lambdaName,
InvocationType: 'Event',
Payload: JSON.stringify({ docId: doc.value._id }),
};
const lambda = new aws.Lambda({
region: env.lambdaRegion,
maxRetries: 0,
});
promises.push(
new Promise((invokeResolve, invokeReject) => {
lambda.invoke(params, (error, data) => {
if (error) { console.error('ERROR: ', error); }
if (data) { console.log('SUCCESS:', data); }
// Resolve invoke promise in any case.
invokeResolve();
});
}),
);
} else {
if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
break;
}
}
}
await Promise.all(promises);
resolve();
});
};
第二个(处理)lambda:
exports.handler = async (event: any, ctx: any) => {
ctx.callbackWaitsForEmptyEventLoop = false;
if (event && event.docId) {
const doc = await mongo.db.collection('docs').findById(event.docId);
processDoc(doc);
return ctx.succeed('Completed.');
} else {
throw new Error('Doc ID is not present.');
}
};
我不知道是否有更好的方法来使用严格的 lambda 函数来实现这一点,但这可行。
我正在尝试从另一个 lambda 函数调用多个 lambda 函数(一个 lambda 函数,它将 运行 分离并行进程)。第一个 运行s 作为 cron lambda,它只从 db 查询文档,然后使用文档的参数调用另一个 lambda。这个 cron lambda 运行s 每五分钟一次并正确查询文档。我正在用两个文档测试第二个 lambda。问题是每次调用第二个 lambda 时它只处理一个文档——每次它处理另一个它在上一次调用时没有处理的文档:
例如:
- 文档 1
- 文档 2
首先,调用第二个 lambda -> process doc 1
其次,调用第二个 lambda -> process doc 2
第三,调用第二个lambda -> process doc 1
第四次调用第二个 lambda -> 处理文档 2
等...
第一个 (cron) lambda 代码:
aws.config.update({
region : env.lambdaRegion,
accessKeyId: env.lambdaAccessKeyId,
secretAccessKey: env.lambdaSecretAccessKey,
});
const lambda = new aws.Lambda({
region: env.lambdaRegion,
});
exports.handler = async (event: any, context: any) => {
context.callbackWaitsForEmptyEventLoop = false;
return new Promise(async (resolve, reject) => {
for (let i = 0; i < 100; i++) {
const doc = await mongo.db.collection('docs').
findOneAndUpdate(
{
status: 1,
lambdaProcessing: null,
},
{ $set: { lambdaProcessing: new Date() } },
{
sort: { processedAt: 1 },
returnNewDocument: true,
},
);
if (doc.value && doc.value._id) {
const params = {
FunctionName: env.lambdaName,
InvocationType: 'Event',
Payload: JSON.stringify({ docId: doc.value._id }),
};
lambda.invoke(params);
} else {
if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
break;
}
}
}
resolve();
});
};
第二个 lambda 函数:
exports.handler = async (event: any, ctx: any) => {
ctx.callbackWaitsForEmptyEventLoop = false;
if (event && event.docId) {
const doc = await mongo.db.collection('docs').findById(event.docId);
return await processDoc(doc);
} else {
throw new Error('doc ID is not present.');
}
};
对于 运行 并行的多个 lambda 没有 "ugly" cronjob 解决方案,我建议使用类型为 Parallel
的 AWS 步骤函数。您可以在 serverless.yml
中设置逻辑,函数调用本身就是 lambda 函数。您可以通过 callback
的第二个参数传递数据。如果数据大于 32kb,我建议使用 S3 bucket/database。
例子serverless.yml
stepFunctions:
stateMachines:
test:
name: 'test'
definition:
Comment: "Testing tips-like state structure"
StartAt: GatherData
States:
GatherData:
Type: Parallel
Branches:
-
StartAt: GatherDataA
States:
GatherDataA:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstA"
TimeoutSeconds: 15
End: true
-
StartAt: GatherDataB
States:
GatherDataB:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstB"
TimeoutSeconds: 15
End: true
Next: ResolveData
ResolveData:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-resolveAB"
TimeoutSeconds: 15
End: true
示例处理程序
module.exports.firstA = (event, context, callback) => {
const data = {
id: 3,
somethingElse: ['Hello', 'World'],
};
callback(null, data);
};
module.exports.firstB = (event, context, callback) => {
const data = {
id: 12,
somethingElse: ['olleH', 'dlroW'],
};
callback(null, data);
};
module.exports.resolveAB = (event, context, callback) => {
console.log("resolving data from a and b: ", event);
const [dataFromA, dataFromB] = event;
callback(null, event);
};
更多信息见
关键是为我们要调用的每个 lambda 创建新的单独 aws.Lambda()
实例,然后我们必须解析并等待我们调用的每个 lambda(promieses 数组)。如果调用的 lambda 不需要等待,这是可以的,因此我们不会在 AWS 上浪费处理时间 - 因此调用的 lambda 开始处理,然后在不等待其响应的情况下解析,以便主(cron)lambda 可以解析。
固定的 (cron) lambda 处理程序:
aws.config.update({
region : env.lambdaRegion,
accessKeyId: env.lambdaAccessKeyId,
secretAccessKey: env.lambdaSecretAccessKey,
});
exports.handler = async (event: any, context: any) => {
context.callbackWaitsForEmptyEventLoop = false;
return new Promise(async (resolve, reject) => {
const promises: any = [];
for (let i = 0; i < 100; i++) {
const doc = await global['mongo'].db.collection('docs').
findOneAndUpdate(
{
status: 1,
lambdaProcessing: null,
},
{ $set: { lambdaProcessing: new Date() } },
{
sort: { processedAt: 1 },
returnNewDocument: true,
},
);
if (doc.value && doc.value._id) {
const params = {
FunctionName: env.lambdaName,
InvocationType: 'Event',
Payload: JSON.stringify({ docId: doc.value._id }),
};
const lambda = new aws.Lambda({
region: env.lambdaRegion,
maxRetries: 0,
});
promises.push(
new Promise((invokeResolve, invokeReject) => {
lambda.invoke(params, (error, data) => {
if (error) { console.error('ERROR: ', error); }
if (data) { console.log('SUCCESS:', data); }
// Resolve invoke promise in any case.
invokeResolve();
});
}),
);
} else {
if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
break;
}
}
}
await Promise.all(promises);
resolve();
});
};
第二个(处理)lambda:
exports.handler = async (event: any, ctx: any) => {
ctx.callbackWaitsForEmptyEventLoop = false;
if (event && event.docId) {
const doc = await mongo.db.collection('docs').findById(event.docId);
processDoc(doc);
return ctx.succeed('Completed.');
} else {
throw new Error('Doc ID is not present.');
}
};
我不知道是否有更好的方法来使用严格的 lambda 函数来实现这一点,但这可行。