SQS FIFO 使用 MessageGroupId 接收消息
SQS FIFO Using MessageGroupId to receive message
如何使用 messagegroupid 参数仅接收标有我需要的 ID 的队列消息?
我一直在尝试使用下面的行来检索,但它也总是会收到来自其他组 ID 的所有队列消息。
List<Message> messages = sqs.receiveMessage(receiveMessageRequest.withAttributeNames("MessageGroupId")).getMessages();
正确的做法应该是什么?
ReceiveMessageRequest is not used for filtering based on message attributes. If you look at the docs for ReceiveMessageRequest.html.withAttributeNames() 它说:
A list of attributes that need to be returned along with each message.
一般来说,您无法过滤从 SQS 返回的消息。可以限制数量但不能说,例如"give me all the messages that match this pattern".
我的解决方案是利用 ChangeMessageVisibilityBatchRequest (see docs),它实质上将消息发送回队列以进行重新处理。
我的 lambda 有一个基于时间的触发器。每次打开时,我都会收到消息,直到没有消息为止。对于每个批次,我按 MessageGroupId 对消息进行分组,处理并删除第一组,然后将剩余的组消息发送回队列,以便在下一次迭代中提取。
这是我的代码的要点:
(注意 _awsSqsService.SendBackToQueueAsync(groupMessages)
方法最终通过 AWS SQS ChangeMessageVisibilityBatchRequest
将消息发送回队列)
public async Task Run()
{
var batchContainsMessages = true;
while (batchContainsMessages)
{
var messageBatch = await _awsSqsService.GetMessageBatchAsync();
if(messageBatch.Messages.Count > 0 && messageBatch.HttpStatusCode == HttpStatusCode.OK)
{
await ProcessMessageBatchAsync(messageBatch.Messages);
}
else
{
batchContainsMessages = false;
}
}
}
private async Task ProcessMessageBatchAsync(List<Message> messages)
{
// SQS fifo queues will often return a batch of messages with different MessageGroupIds.
// Because of this, we need to group them ourselves, process one group (the first group),
// and send the rest back to the queue to be processed in the next iteration.
// This ensures that we process as many messages as possible per group in a single batch (max is 10)
var messageGroups = GetMessageGroups(messages);
var isFirstGroup = true;
foreach (var group in messageGroups)
{
var groupId = Int32.Parse(group.Key);
var groupMessages = group.Value;
if (isFirstGroup)
{
isFirstGroup = false;
await ProcessMessagesAsync(groupId, groupMessages);
await _awsSqsService.DeleteMessagesAsync(groupMessages);
}
else
{
await _awsSqsService.SendBackToQueueAsync(groupMessages);
}
}
}
如何使用 messagegroupid 参数仅接收标有我需要的 ID 的队列消息?
我一直在尝试使用下面的行来检索,但它也总是会收到来自其他组 ID 的所有队列消息。
List<Message> messages = sqs.receiveMessage(receiveMessageRequest.withAttributeNames("MessageGroupId")).getMessages();
正确的做法应该是什么?
ReceiveMessageRequest is not used for filtering based on message attributes. If you look at the docs for ReceiveMessageRequest.html.withAttributeNames() 它说:
A list of attributes that need to be returned along with each message.
一般来说,您无法过滤从 SQS 返回的消息。可以限制数量但不能说,例如"give me all the messages that match this pattern".
我的解决方案是利用 ChangeMessageVisibilityBatchRequest (see docs),它实质上将消息发送回队列以进行重新处理。
我的 lambda 有一个基于时间的触发器。每次打开时,我都会收到消息,直到没有消息为止。对于每个批次,我按 MessageGroupId 对消息进行分组,处理并删除第一组,然后将剩余的组消息发送回队列,以便在下一次迭代中提取。
这是我的代码的要点:
(注意 _awsSqsService.SendBackToQueueAsync(groupMessages)
方法最终通过 AWS SQS ChangeMessageVisibilityBatchRequest
将消息发送回队列)
public async Task Run()
{
var batchContainsMessages = true;
while (batchContainsMessages)
{
var messageBatch = await _awsSqsService.GetMessageBatchAsync();
if(messageBatch.Messages.Count > 0 && messageBatch.HttpStatusCode == HttpStatusCode.OK)
{
await ProcessMessageBatchAsync(messageBatch.Messages);
}
else
{
batchContainsMessages = false;
}
}
}
private async Task ProcessMessageBatchAsync(List<Message> messages)
{
// SQS fifo queues will often return a batch of messages with different MessageGroupIds.
// Because of this, we need to group them ourselves, process one group (the first group),
// and send the rest back to the queue to be processed in the next iteration.
// This ensures that we process as many messages as possible per group in a single batch (max is 10)
var messageGroups = GetMessageGroups(messages);
var isFirstGroup = true;
foreach (var group in messageGroups)
{
var groupId = Int32.Parse(group.Key);
var groupMessages = group.Value;
if (isFirstGroup)
{
isFirstGroup = false;
await ProcessMessagesAsync(groupId, groupMessages);
await _awsSqsService.DeleteMessagesAsync(groupMessages);
}
else
{
await _awsSqsService.SendBackToQueueAsync(groupMessages);
}
}
}