AWS Elastic Beanstalk 工作进程在运行中留下 SQS 消息
AWS Elastic Beanstalk worker process leaves SQS message in flight
我有一个 NodeJS Elastic Beanstalk 工作层环境,它从 SQS 队列获取消息并将它们发布到外部 url。下面的代码按预期工作,但消息在 SQS 中仍然是 "in flight"。我看不到任何有关如何通知 SQS 消息已成功处理的文档?
var http = require('http'),
request = require("request"),
fs = require('fs');
http.createServer(function (req, res) {
req.on('data', function (data) {
var jsonObj = JSON.parse(data);
var jsonString = JSON.stringify(jsonObj);
log("Processing " + jsonString);
request.post('http://example.com', {
json: true,
body: jsonString,
headers: {
"content-type": "application/json",
}
}, (error, res, body) => {
if (error) {
log(`Error ${error}`);
return;
}
returnResponse(res, JSON.stringify(body));
})
});
}).listen(process.env.PORT || 3000);
function returnResponse(httpResponse, message) {
log(`Response status code ${httpResponse.statusCode} - ${message}`);
httpResponse.writeHead(httpResponse.statusCode);
httpResponse.write(message);
httpResponse.end();
}
var log = function (entry) {
fs.appendFileSync('/tmp/output.log', new Date().toISOString() + ' - ' + entry + '\n');
};
消息 正在飞行 意味着它已被消费者使用,但尚未从队列中删除。如果在队列 可见性超时 期间未被消费者删除,SQS 将再次将其放回队列中。
如果您成功处理了消息,则应将其从队列中删除。如果您熟悉 AMQP 0-9-1,此操作类似于 确认消息。
因为您使用的是Node.js,this是官方SDK中的方法,您应该使用它来实现您的目标。
这是固定码:
var http = require('http'),
request = require("request"),
fs = require('fs');
http.createServer(function (req, res) {
req.on('data', function (data) {
var jsonObj = JSON.parse(data);
var jsonString = JSON.stringify(jsonObj);
log("Processing " + jsonString);
request.post('http://example.com', {
json: true,
body: jsonString,
headers: {
"content-type": "application/json",
}
}, (error, response, body) => {
if (error) {
log(`Error ${error}`);
return;
}
returnResponse(res, response.statusCode, JSON.stringify(body));
})
});
}).listen(process.env.PORT || 3000);
function returnResponse(res, statusCode, message) {
log(`Response status code ${statusCode} - ${message}`);
res.writeHead(statusCode, {'Content-Type': 'text/plain'});
res.write('Complete');
res.end();
}
var log = function (entry) {
fs.appendFileSync('/tmp/sample-app.log', new Date().toISOString() + ' - ' + entry + '\n');
};
来自 https://www.edureka.co/blog/aws-elastic-beanstalk/
The daemon pulls requests sent from an Amazon SQS queue. Based on the queue’s priority, SQS will send the message via a POSTrequest to the HTTP Path of the Worker Environment. The worker on receiving the message executes the tasks and sends an HTTP response once the operation is done. SQS on receiving response message deletes the message in the queue. If it fails to receive a response, it will continuously retry sending the messages.
问题是我之前的代码没有正确返回 HTTP 状态代码。
我有一个 NodeJS Elastic Beanstalk 工作层环境,它从 SQS 队列获取消息并将它们发布到外部 url。下面的代码按预期工作,但消息在 SQS 中仍然是 "in flight"。我看不到任何有关如何通知 SQS 消息已成功处理的文档?
var http = require('http'),
request = require("request"),
fs = require('fs');
http.createServer(function (req, res) {
req.on('data', function (data) {
var jsonObj = JSON.parse(data);
var jsonString = JSON.stringify(jsonObj);
log("Processing " + jsonString);
request.post('http://example.com', {
json: true,
body: jsonString,
headers: {
"content-type": "application/json",
}
}, (error, res, body) => {
if (error) {
log(`Error ${error}`);
return;
}
returnResponse(res, JSON.stringify(body));
})
});
}).listen(process.env.PORT || 3000);
function returnResponse(httpResponse, message) {
log(`Response status code ${httpResponse.statusCode} - ${message}`);
httpResponse.writeHead(httpResponse.statusCode);
httpResponse.write(message);
httpResponse.end();
}
var log = function (entry) {
fs.appendFileSync('/tmp/output.log', new Date().toISOString() + ' - ' + entry + '\n');
};
消息 正在飞行 意味着它已被消费者使用,但尚未从队列中删除。如果在队列 可见性超时 期间未被消费者删除,SQS 将再次将其放回队列中。
如果您成功处理了消息,则应将其从队列中删除。如果您熟悉 AMQP 0-9-1,此操作类似于 确认消息。
因为您使用的是Node.js,this是官方SDK中的方法,您应该使用它来实现您的目标。
这是固定码:
var http = require('http'),
request = require("request"),
fs = require('fs');
http.createServer(function (req, res) {
req.on('data', function (data) {
var jsonObj = JSON.parse(data);
var jsonString = JSON.stringify(jsonObj);
log("Processing " + jsonString);
request.post('http://example.com', {
json: true,
body: jsonString,
headers: {
"content-type": "application/json",
}
}, (error, response, body) => {
if (error) {
log(`Error ${error}`);
return;
}
returnResponse(res, response.statusCode, JSON.stringify(body));
})
});
}).listen(process.env.PORT || 3000);
function returnResponse(res, statusCode, message) {
log(`Response status code ${statusCode} - ${message}`);
res.writeHead(statusCode, {'Content-Type': 'text/plain'});
res.write('Complete');
res.end();
}
var log = function (entry) {
fs.appendFileSync('/tmp/sample-app.log', new Date().toISOString() + ' - ' + entry + '\n');
};
来自 https://www.edureka.co/blog/aws-elastic-beanstalk/
The daemon pulls requests sent from an Amazon SQS queue. Based on the queue’s priority, SQS will send the message via a POSTrequest to the HTTP Path of the Worker Environment. The worker on receiving the message executes the tasks and sends an HTTP response once the operation is done. SQS on receiving response message deletes the message in the queue. If it fails to receive a response, it will continuously retry sending the messages.
问题是我之前的代码没有正确返回 HTTP 状态代码。