是否可以在 SQS 中获取消息总数?
Is it possible to get total number of message in SQS?
我看到有 2 个单独的指标 ApproximateNumberOfMessagesVisible
和 ApproximateNumberOfMessagesNotVisible
。
使用可见消息数会导致处理 pods 在从队列中拾取消息后立即触发终止,因为它们不再可见。如果我使用不可见的消息数,它不会按比例放大。
我正在尝试使用水平 pod 自动缩放器和来自 SQS 的外部指标来缩放 kubernetes 服务。这是模板外部指标:
apiVersion: metrics.aws/v1alpha1
kind: ExternalMetric
metadata:
name: metric-name
spec:
name: metric-name
queries:
- id: metric_name
metricStat:
metric:
namespace: "AWS/SQS"
metricName: "ApproximateNumberOfMessagesVisible"
dimensions:
- name: QueueName
value: "queue_name"
period: 60
stat: Average
unit: Count
returnData: true
这是 HPA 模板:
kind: HorizontalPodAutoscaler
apiVersion: autoscaling/v2beta1
metadata:
name: hpa-name
spec:
scaleTargetRef:
apiVersion: apps/v1beta1
kind: Deployment
name: deployment-name
minReplicas: 1
maxReplicas: 50
metrics:
- type: External
external:
metricName: metric-name
targetAverageValue: 1
如果我可以定义另一个自定义指标,即这两个指标的总和,问题就会得到解决,否则我还能如何解决这个问题?
这似乎是一个 Thrashing 案例 - 由于所评估指标的动态特性,副本数量经常波动。
恕我直言,这里有几个选项。
您可以考虑添加 StabilizationWindow to your HPA and also probably limit the scale down rate。您必须尝试一些指标组合,看看哪种最适合您,因为您最好了解您在基础架构中看到的指标的性质(ApproximateNumberOfMessagesVisible
在这种情况下)。
我们使用 lambda 获取两个指标并发布一个自定义指标,该指标是传输中和等待消息的总和,并以您想要的任何频率使用 cloudwatch 事件触发此 lambda,https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#rules:action=create
这里是供参考的lambda代码:
const AWS = require('aws-sdk');
const cloudwatch = new AWS.CloudWatch({region: ''}); // fill region here
const sqs = new AWS.SQS();
const SQS_URL = ''; // fill queue url here
async function getSqsMetric(queueUrl) {
var params = {
QueueUrl: queueUrl,
AttributeNames: ['All']
};
return new Promise((res, rej) => {
sqs.getQueueAttributes(params, function(err, data) {
if (err) rej(err);
else res(data);
});
})
}
function buildMetric(numMessages) {
return {
Namespace: 'yourcompany-custom-metrics',
MetricData: [{
MetricName: 'mymetric',
Dimensions: [{
Name: 'env',
Value: 'prod'
}],
Timestamp: new Date(),
Unit: 'Count',
Value: numMessages
}]
}
}
async function pushMetrics(metrics) {
await new Promise((res) => cloudwatch.putMetricData(metrics, (err, data) => {
if (err) {
console.log('err', err, err.stack); // an error occurred
res(err);
} else {
console.log('response', data); // successful response
res(data);
}
}));
}
exports.handler = async (event) => {
console.log('Started');
const sqsMetrics = await getSqsMetric(SQS_URL).catch(console.error);
var queueSize = null;
if (sqsMetrics) {
console.log('Got sqsMetrics', sqsMetrics);
if (sqsMetrics.Attributes) {
queueSize = parseInt(sqsMetrics.Attributes.ApproximateNumberOfMessages) + parseInt(sqsMetrics.Attributes.ApproximateNumberOfMessagesNotVisible);
console.log('Pushing', queueSize);
await pushMetrics(buildMetric(queueSize))
}
} else {
console.log('Failed fetching sqsMetrics');
}
const response = {
statusCode: 200,
body: JSON.stringify('Pushed ' + queueSize),
};
return response;
};
我看到有 2 个单独的指标 ApproximateNumberOfMessagesVisible
和 ApproximateNumberOfMessagesNotVisible
。
使用可见消息数会导致处理 pods 在从队列中拾取消息后立即触发终止,因为它们不再可见。如果我使用不可见的消息数,它不会按比例放大。
我正在尝试使用水平 pod 自动缩放器和来自 SQS 的外部指标来缩放 kubernetes 服务。这是模板外部指标:
apiVersion: metrics.aws/v1alpha1
kind: ExternalMetric
metadata:
name: metric-name
spec:
name: metric-name
queries:
- id: metric_name
metricStat:
metric:
namespace: "AWS/SQS"
metricName: "ApproximateNumberOfMessagesVisible"
dimensions:
- name: QueueName
value: "queue_name"
period: 60
stat: Average
unit: Count
returnData: true
这是 HPA 模板:
kind: HorizontalPodAutoscaler
apiVersion: autoscaling/v2beta1
metadata:
name: hpa-name
spec:
scaleTargetRef:
apiVersion: apps/v1beta1
kind: Deployment
name: deployment-name
minReplicas: 1
maxReplicas: 50
metrics:
- type: External
external:
metricName: metric-name
targetAverageValue: 1
如果我可以定义另一个自定义指标,即这两个指标的总和,问题就会得到解决,否则我还能如何解决这个问题?
这似乎是一个 Thrashing 案例 - 由于所评估指标的动态特性,副本数量经常波动。
恕我直言,这里有几个选项。
您可以考虑添加 StabilizationWindow to your HPA and also probably limit the scale down rate。您必须尝试一些指标组合,看看哪种最适合您,因为您最好了解您在基础架构中看到的指标的性质(ApproximateNumberOfMessagesVisible
在这种情况下)。
我们使用 lambda 获取两个指标并发布一个自定义指标,该指标是传输中和等待消息的总和,并以您想要的任何频率使用 cloudwatch 事件触发此 lambda,https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#rules:action=create
这里是供参考的lambda代码:
const AWS = require('aws-sdk');
const cloudwatch = new AWS.CloudWatch({region: ''}); // fill region here
const sqs = new AWS.SQS();
const SQS_URL = ''; // fill queue url here
async function getSqsMetric(queueUrl) {
var params = {
QueueUrl: queueUrl,
AttributeNames: ['All']
};
return new Promise((res, rej) => {
sqs.getQueueAttributes(params, function(err, data) {
if (err) rej(err);
else res(data);
});
})
}
function buildMetric(numMessages) {
return {
Namespace: 'yourcompany-custom-metrics',
MetricData: [{
MetricName: 'mymetric',
Dimensions: [{
Name: 'env',
Value: 'prod'
}],
Timestamp: new Date(),
Unit: 'Count',
Value: numMessages
}]
}
}
async function pushMetrics(metrics) {
await new Promise((res) => cloudwatch.putMetricData(metrics, (err, data) => {
if (err) {
console.log('err', err, err.stack); // an error occurred
res(err);
} else {
console.log('response', data); // successful response
res(data);
}
}));
}
exports.handler = async (event) => {
console.log('Started');
const sqsMetrics = await getSqsMetric(SQS_URL).catch(console.error);
var queueSize = null;
if (sqsMetrics) {
console.log('Got sqsMetrics', sqsMetrics);
if (sqsMetrics.Attributes) {
queueSize = parseInt(sqsMetrics.Attributes.ApproximateNumberOfMessages) + parseInt(sqsMetrics.Attributes.ApproximateNumberOfMessagesNotVisible);
console.log('Pushing', queueSize);
await pushMetrics(buildMetric(queueSize))
}
} else {
console.log('Failed fetching sqsMetrics');
}
const response = {
statusCode: 200,
body: JSON.stringify('Pushed ' + queueSize),
};
return response;
};