SQS FIFO 使用 MessageGroupId 接收消息

SQS FIFO Using MessageGroupId to receive message

如何使用 messagegroupid 参数仅接收标有我需要的 ID 的队列消息?

我一直在尝试使用下面的行来检索,但它也总是会收到来自其他组 ID 的所有队列消息。

List<Message> messages = sqs.receiveMessage(receiveMessageRequest.withAttributeNames("MessageGroupId")).getMessages();

正确的做法应该是什么?

ReceiveMessageRequest is not used for filtering based on message attributes. If you look at the docs for ReceiveMessageRequest.html.withAttributeNames() 它说:

A list of attributes that need to be returned along with each message.

一般来说,您无法过滤从 SQS 返回的消息。可以限制数量但不能说,例如"give me all the messages that match this pattern".

我的解决方案是利用 ChangeMes​​sageVisibilityBatchRequest (see docs),它实质上将消息发送回队列以进行重新处理。

我的 lambda 有一个基于时间的触发器。每次打开时,我都会收到消息,直到没有消息为止。对于每个批次,我按 MessageGroupId 对消息进行分组,处理并删除第一组,然后将剩余的组消息发送回队列,以便在下一次迭代中提取。

这是我的代码的要点:

(注意 _awsSqsService.SendBackToQueueAsync(groupMessages) 方法最终通过 AWS SQS ChangeMessageVisibilityBatchRequest 将消息发送回队列)

public async Task Run()
{
    var batchContainsMessages = true;
    while (batchContainsMessages)
    {
        var messageBatch = await _awsSqsService.GetMessageBatchAsync();
        if(messageBatch.Messages.Count > 0 && messageBatch.HttpStatusCode == HttpStatusCode.OK)
        {
            await ProcessMessageBatchAsync(messageBatch.Messages);
        }
        else
        {
            batchContainsMessages = false;
        }
    }
}

private async Task ProcessMessageBatchAsync(List<Message> messages)
{
    // SQS fifo queues will often return a batch of messages with different MessageGroupIds.
    // Because of this, we need to group them ourselves, process one group (the first group), 
    // and send the rest back to the queue to be processed in the next iteration. 
    // This ensures that we process as many messages as possible per group in a single batch (max is 10)
    var messageGroups = GetMessageGroups(messages);

    var isFirstGroup = true;
    foreach (var group in messageGroups)
    {
        var groupId = Int32.Parse(group.Key);
        var groupMessages = group.Value;
        if (isFirstGroup)
        {
            isFirstGroup = false;
            await ProcessMessagesAsync(groupId, groupMessages);
            await _awsSqsService.DeleteMessagesAsync(groupMessages);
        }
        else
        {
            await _awsSqsService.SendBackToQueueAsync(groupMessages);
        }
    }
}