将大型 koa 请求正文上传到 AWS S3 的正确方法是什么?
What is the correct way upload large koa request body to AWS S3?
我正在构建应用程序后端。客户端 post 文件作为请求主体发送到服务器,然后服务器将文件上传到 AWS S3。服务器正在使用 NodeJS 和 koa web 框架。
如果我使用raw-body
获取post正文进行缓冲,当文件很大时,缓冲区很大,并导致内存不足错误。
如果我直接将 ctx.req(一个 IncomingMessage 对象)传递给 S3.putObject,AWS SDK 会抛出一个错误 Cannot determine length of [object Object]
,看起来 AWS SDK 会尝试获取流的长度然后开始分段上传.
AWS SDK 版本 2.383.0(当前最新)
节点 10.14.2
此时,我写了一个函数,以流的形式从IncomingMessage中读取,等待数据事件填满一个大缓冲区(16MB),然后分块上传到S3,这很好地解决了问题,但是我仍在寻找更好的解决方案。
几个月后运行,我认为我的最终解决方案是稳定可靠的。
主要概念是从IncomingMessage流中接收到一个缓冲区,缓冲区达到一定大小后,将当前部分放入S3,然后继续读取流直到结束。
const uploaderLogger = Log4js.getLogger('customUploader');
function customMultiPartUpload(s3, bucket, key, incomingMessage, partSizeInByte) {
return new Promise((resolve) => {
partSizeInByte = partSizeInByte || uploadBufferMB * 1024 * 1024;
uploaderLogger.debug(`part size is ${partSizeInByte}`);
let uploadId = null;
let partNumber = 0;
let parts = [];
let fileSize = 0;
let reserveBuffer = Buffer.alloc(0);
const sendBuffer = Buffer.alloc(partSizeInByte);
const md5Hash = Crypto.createHash('md5');
const doUpload = async (uploadBuffer) => {
if (!uploadId) {
uploaderLogger.debug('multipart upload not initialized');
const createData = await s3.createMultipartUpload({
Bucket: bucket,
Key: key
}).promise();
uploadId = createData.UploadId;
uploaderLogger.debug(`uploadId ${uploadId}`);
partNumber = 0;
}
fileSize += uploadBuffer.length;
uploaderLogger.debug(`buffer length ${uploadBuffer.length}, total ${fileSize}`);
partNumber += 1;
uploaderLogger.debug(`part number ${partNumber}`);
md5Hash.update(uploadBuffer);
const partData = await s3.uploadPart({
Bucket: bucket,
Key: key,
PartNumber: partNumber,
UploadId: uploadId,
Body: uploadBuffer
}).promise();
parts.push({
PartNumber: partNumber,
ETag: partData.ETag
});
uploaderLogger.debug(`etag ${partData.ETag}`);
};
incomingMessage.on('data', async (chunkBuffer) => {
incomingMessage.pause();
reserveBuffer = Buffer.concat([ reserveBuffer, chunkBuffer ]);
if (reserveBuffer.length > partSizeInByte) {
do {
reserveBuffer.copy(sendBuffer, 0, 0, partSizeInByte);
reserveBuffer = reserveBuffer.slice(partSizeInByte);
await doUpload(sendBuffer);
} while (reserveBuffer.length > partSizeInByte);
}
incomingMessage.resume();
});
incomingMessage.on('end', async () => {
uploaderLogger.debug('stream end');
if (reserveBuffer.length > 0) {
await doUpload(reserveBuffer);
}
if (uploadId) {
uploaderLogger.debug('uploadId not null');
await s3.completeMultipartUpload({
Bucket: bucket,
Key: key,
UploadId: uploadId,
MultipartUpload: {
Parts: parts
}
}).promise();
uploaderLogger.debug('multipart upload complete');
}
const hash = md5Hash.digest('hex');
resolve({
size: fileSize,
hash: hash
});
uploaderLogger.debug(`return file size ${fileSize}, hash ${hash}`);
});
});
}
调整partSizeInByte
以适应您的服务器内存使用情况,当服务器处理许多请求时,太大的部分大小可能会导致 OOM,太小的部分大小可能小于 S3 部分限制。
我正在构建应用程序后端。客户端 post 文件作为请求主体发送到服务器,然后服务器将文件上传到 AWS S3。服务器正在使用 NodeJS 和 koa web 框架。
如果我使用raw-body
获取post正文进行缓冲,当文件很大时,缓冲区很大,并导致内存不足错误。
如果我直接将 ctx.req(一个 IncomingMessage 对象)传递给 S3.putObject,AWS SDK 会抛出一个错误 Cannot determine length of [object Object]
,看起来 AWS SDK 会尝试获取流的长度然后开始分段上传.
AWS SDK 版本 2.383.0(当前最新)
节点 10.14.2
此时,我写了一个函数,以流的形式从IncomingMessage中读取,等待数据事件填满一个大缓冲区(16MB),然后分块上传到S3,这很好地解决了问题,但是我仍在寻找更好的解决方案。
几个月后运行,我认为我的最终解决方案是稳定可靠的。
主要概念是从IncomingMessage流中接收到一个缓冲区,缓冲区达到一定大小后,将当前部分放入S3,然后继续读取流直到结束。
const uploaderLogger = Log4js.getLogger('customUploader');
function customMultiPartUpload(s3, bucket, key, incomingMessage, partSizeInByte) {
return new Promise((resolve) => {
partSizeInByte = partSizeInByte || uploadBufferMB * 1024 * 1024;
uploaderLogger.debug(`part size is ${partSizeInByte}`);
let uploadId = null;
let partNumber = 0;
let parts = [];
let fileSize = 0;
let reserveBuffer = Buffer.alloc(0);
const sendBuffer = Buffer.alloc(partSizeInByte);
const md5Hash = Crypto.createHash('md5');
const doUpload = async (uploadBuffer) => {
if (!uploadId) {
uploaderLogger.debug('multipart upload not initialized');
const createData = await s3.createMultipartUpload({
Bucket: bucket,
Key: key
}).promise();
uploadId = createData.UploadId;
uploaderLogger.debug(`uploadId ${uploadId}`);
partNumber = 0;
}
fileSize += uploadBuffer.length;
uploaderLogger.debug(`buffer length ${uploadBuffer.length}, total ${fileSize}`);
partNumber += 1;
uploaderLogger.debug(`part number ${partNumber}`);
md5Hash.update(uploadBuffer);
const partData = await s3.uploadPart({
Bucket: bucket,
Key: key,
PartNumber: partNumber,
UploadId: uploadId,
Body: uploadBuffer
}).promise();
parts.push({
PartNumber: partNumber,
ETag: partData.ETag
});
uploaderLogger.debug(`etag ${partData.ETag}`);
};
incomingMessage.on('data', async (chunkBuffer) => {
incomingMessage.pause();
reserveBuffer = Buffer.concat([ reserveBuffer, chunkBuffer ]);
if (reserveBuffer.length > partSizeInByte) {
do {
reserveBuffer.copy(sendBuffer, 0, 0, partSizeInByte);
reserveBuffer = reserveBuffer.slice(partSizeInByte);
await doUpload(sendBuffer);
} while (reserveBuffer.length > partSizeInByte);
}
incomingMessage.resume();
});
incomingMessage.on('end', async () => {
uploaderLogger.debug('stream end');
if (reserveBuffer.length > 0) {
await doUpload(reserveBuffer);
}
if (uploadId) {
uploaderLogger.debug('uploadId not null');
await s3.completeMultipartUpload({
Bucket: bucket,
Key: key,
UploadId: uploadId,
MultipartUpload: {
Parts: parts
}
}).promise();
uploaderLogger.debug('multipart upload complete');
}
const hash = md5Hash.digest('hex');
resolve({
size: fileSize,
hash: hash
});
uploaderLogger.debug(`return file size ${fileSize}, hash ${hash}`);
});
});
}
调整partSizeInByte
以适应您的服务器内存使用情况,当服务器处理许多请求时,太大的部分大小可能会导致 OOM,太小的部分大小可能小于 S3 部分限制。