DynamoDB javascript 除非我增加写入容量,否则 SDK batchWriteItem 不会完成
DynamoDB javascript SDK batchWriteItem doesn't complete unless I increase write capacity
我正在 运行 进行一系列单元测试(node.js 4.x、aws-sdk、mocha),在每次测试前将数据加载到 table然后在测试后清除table。
我有两个测试失败,因为 ConditionExpression
触发了 ConditionCheckFailedException
。但是,如果我增加 read/write 容量,他们的测试就会通过。
据我了解,SDK 会处理限制异常并为您重试它们,所以为什么我的测试不会 运行 变慢并通过?相反,似乎测试无法完成 scan
-> batchWriteItem
过程,因此当新测试开始时,table 中仍有记录。
团队成员告诉我,他们遇到了类似的问题,他们只是提高了吞吐量来解决问题。这不适合我。要么我做错了什么,我的测试存在竞争条件,要么应该有一个我可以实施的模式来确保我的操作在受到限制时完成?我应该能够使用节流指标来通知我何时需要增加吞吐量,但我仍然应该能够继续重试,直到 运行 内存不足。
有其他人 运行 参与其中吗?您做了什么来处理这个问题?
经过一些调试后,我注意到 UnprocessedItems
响应元素。在查找 UnprocessedItems
in the docs 之后,我意识到我应该更仔细地阅读。下面的代码将 运行 一个带有延迟(指数退避)的重试循环:
var clearEventTable = function (tableName, client, cleared) {
var exclusiveStartKey = null;
var retryCount = 0;
var read = function(query, callback) {
client.scan(query, function (err, page) {
if(err) {
console.log(err);
return callback(err);
}
retryCount = 0;
exclusiveStartKey = page.LastEvaluatedKey || null;
if(page.Count == 0) {
return callback(null, {});
}
if(page.Count < 25 && exclusiveStartKey) {
console.log("read capacity limit reached: " + JSON.stringify(page, null, 2));
}
var keys = _.map(page.Items, function(n) {
return { DeleteRequest: { Key: n } };
});
var batch = {
RequestItems: {},
ReturnConsumedCapacity: "INDEXES",
ReturnItemCollectionMetrics: "SIZE"
};
batch.RequestItems[tableName] = keys;
callback(null, batch);
});
};
var write = function(batch, callback) {
if(batch && batch.RequestItems){
client.batchWriteItem(batch, function(err, result) {
if(err) {
console.log(err);
return callback(err);
}
if(Object.keys(result.UnprocessedItems).length !== 0) {
console.log("Retry batchWriteItem: " + JSON.stringify(result, null, 2));
retryCount++;
var retry = {
RequestItems: result.UnprocessedItems,
ReturnConsumedCapacity: "INDEXES",
ReturnItemCollectionMetrics: "SIZE"
};
// retry with exponential backoff
var delay = retryCount > 0 ? (50 * Math.pow(2, retryCount - 1)) : 0;
setTimeout(write(retry, callback), delay);
return;
}
callback(null, result);
});
} else {
callback(null);
}
};
var params = {
TableName: tableName,
ProjectionExpression: "aggregateId,id",
Limit: 25, // max 25 per batchWriteItem
ConsistentRead: false,
ReturnConsumedCapacity: "TOTAL"
};
async.doWhilst(function (next) {
// retrieve entities
if (exclusiveStartKey)
params.ExclusiveStartKey = exclusiveStartKey;
async.compose(write, read)(params, function (err, result) {
if (err) next(err);
else next(null, result);
});
}, function () {
// test if we need to load more
return exclusiveStartKey !== null;
}, function (err, r) {
// return results
if (err) {
console.log(err);
return cleared(err);
}
return cleared(null);;
});
};
另请查看为 Lambda 配置的内存量。可能太低,达到最大值会导致不可预测的结果 IMX。
我正在 运行 进行一系列单元测试(node.js 4.x、aws-sdk、mocha),在每次测试前将数据加载到 table然后在测试后清除table。
我有两个测试失败,因为 ConditionExpression
触发了 ConditionCheckFailedException
。但是,如果我增加 read/write 容量,他们的测试就会通过。
据我了解,SDK 会处理限制异常并为您重试它们,所以为什么我的测试不会 运行 变慢并通过?相反,似乎测试无法完成 scan
-> batchWriteItem
过程,因此当新测试开始时,table 中仍有记录。
团队成员告诉我,他们遇到了类似的问题,他们只是提高了吞吐量来解决问题。这不适合我。要么我做错了什么,我的测试存在竞争条件,要么应该有一个我可以实施的模式来确保我的操作在受到限制时完成?我应该能够使用节流指标来通知我何时需要增加吞吐量,但我仍然应该能够继续重试,直到 运行 内存不足。
有其他人 运行 参与其中吗?您做了什么来处理这个问题?
经过一些调试后,我注意到 UnprocessedItems
响应元素。在查找 UnprocessedItems
in the docs 之后,我意识到我应该更仔细地阅读。下面的代码将 运行 一个带有延迟(指数退避)的重试循环:
var clearEventTable = function (tableName, client, cleared) {
var exclusiveStartKey = null;
var retryCount = 0;
var read = function(query, callback) {
client.scan(query, function (err, page) {
if(err) {
console.log(err);
return callback(err);
}
retryCount = 0;
exclusiveStartKey = page.LastEvaluatedKey || null;
if(page.Count == 0) {
return callback(null, {});
}
if(page.Count < 25 && exclusiveStartKey) {
console.log("read capacity limit reached: " + JSON.stringify(page, null, 2));
}
var keys = _.map(page.Items, function(n) {
return { DeleteRequest: { Key: n } };
});
var batch = {
RequestItems: {},
ReturnConsumedCapacity: "INDEXES",
ReturnItemCollectionMetrics: "SIZE"
};
batch.RequestItems[tableName] = keys;
callback(null, batch);
});
};
var write = function(batch, callback) {
if(batch && batch.RequestItems){
client.batchWriteItem(batch, function(err, result) {
if(err) {
console.log(err);
return callback(err);
}
if(Object.keys(result.UnprocessedItems).length !== 0) {
console.log("Retry batchWriteItem: " + JSON.stringify(result, null, 2));
retryCount++;
var retry = {
RequestItems: result.UnprocessedItems,
ReturnConsumedCapacity: "INDEXES",
ReturnItemCollectionMetrics: "SIZE"
};
// retry with exponential backoff
var delay = retryCount > 0 ? (50 * Math.pow(2, retryCount - 1)) : 0;
setTimeout(write(retry, callback), delay);
return;
}
callback(null, result);
});
} else {
callback(null);
}
};
var params = {
TableName: tableName,
ProjectionExpression: "aggregateId,id",
Limit: 25, // max 25 per batchWriteItem
ConsistentRead: false,
ReturnConsumedCapacity: "TOTAL"
};
async.doWhilst(function (next) {
// retrieve entities
if (exclusiveStartKey)
params.ExclusiveStartKey = exclusiveStartKey;
async.compose(write, read)(params, function (err, result) {
if (err) next(err);
else next(null, result);
});
}, function () {
// test if we need to load more
return exclusiveStartKey !== null;
}, function (err, r) {
// return results
if (err) {
console.log(err);
return cleared(err);
}
return cleared(null);;
});
};
另请查看为 Lambda 配置的内存量。可能太低,达到最大值会导致不可预测的结果 IMX。