SQS 最大消息数

SQS maxNumberOfMessages

使用 Java 客户端应用程序,我在 SQS 队列中查询消息。该队列有 12,000 条消息作为测试设置。我正在使用 openJDK 和最新的 aws-java-sdk (software.amazon.awssdk 2.10.62) pom.xml 进一步显示。

我看到的问题是,尽管设置了 maxNumberOfMessages(10),但我只得到 3 个。我知道这是最大值,不能保证消息数量然而,返回的消息数量没有变化。它总是 3。

AWS Documentation: MaxNumberOfMessages The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1. Type: Integer Required: No

Consuming Messages Using Short Polling

When you consume messages from a queue using short polling, Amazon SQS samples a subset of its servers (based on a weighted random distribution) and returns messages from only those servers. Thus, a particular ReceiveMessage request might not return all of your messages. However, if you have fewer than 1,000 messages in your queue, a subsequent request will return your messages. If you keep consuming from your queues, Amazon SQS samples all of its servers, and you receive all of your messages.

因此,我们在 java 中测试了两个客户端,同时使用较旧的 aws sdk 和较新的 aws sdk,结果相同。总是只返回 3 条消息。

有趣的是,如果不是 运行 在外部(在我强大的桌面上)将应用程序 运行 作为 AWS Lambda,您会收到 10 条消息。这个 lambda 测试是由一位同事使用 JavaScript 完成的。

所以问题仍然存在,为什么我们每次请求只能收到 3 条消息,而在 lambda 中似乎可以收到 10 条消息。

鉴于每个请求都有成本是基于亚马逊利润的加权随机分配=))

SQS 测试方法:

public void SQStart()
{
    AwsBasicCredentials awsCreds = AwsBasicCredentials.create("accessKeyID", "secretKeyID");
    AwsCredentialsProvider creds = StaticCredentialsProvider.create(awsCreds);
    SqsClient sqs = SqsClient.builder().credentialsProvider(creds).region(Region.EU_WEST_1).build();
    GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder()
            .queueName(QUEUE_NAME)
            .build();
    String queueUrl = sqs.getQueueUrl(getQueueRequest).queueUrl();

    for (int x =1; x < 100; x++) {
        ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
                .queueUrl(queueUrl)
                .maxNumberOfMessages(10)
                .build();


        List<Message> messages = sqs.receiveMessage(receiveMessageRequest).messages();
        if (messages.size() > 3 ) {
            System.out.println("YEY More than 3 Messages: "+ messages.size());
        }
    }
}

POM.XML:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>groupId</groupId>
    <artifactId>SQSTest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>bom</artifactId>
                <version>2.10.62</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>

            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.9</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.10</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-s3</artifactId>
            <version>1.11.720</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.1</version>
        </dependency>
    </dependencies>
</project>

Given there is a cost per request is the weighted random distribution based on amazon profit =))

很明显,您的目标是降低成本,无论是通过向 SQS 发送更少的请求,还是通过强制 SQS 传送最大数量的可用消息。

正如您在问题中所述,SQS 没有义务传送最大数量的可用消息。但是,假设您还不知道,我想通知您一些事情。


长轮询

亚马逊简单队列服务的 Developer Guide 指出:

The process of consuming messages from a queue depends on whether you use short or long polling. By default, Amazon SQS uses short polling, querying only a subset of its servers (based on a weighted random distribution) to determine whether any messages are available for a response. You can use long polling to reduce your costs while allowing your consumers to receive messages as soon as they arrive in the queue.

您发送到 SQS 的消息可能都存储在不同的服务器上。如文档所述,如果您的队列设置为使用 short polling,则只能查询一部分服务器。我的猜测是你在调用 receiveMessage 时不走运,每次只返回 3

如果我们在同一文档页面上查看 长轮询 的好处,它指出:

Long polling offers the following benefits:

  • Eliminate empty responses by allowing Amazon SQS to wait until a message is available in a queue before sending a response. Unless the connection times out, the response to the ReceiveMessage request contains at least one of the available messages, up to the maximum number of messages specified in the ReceiveMessage action.

  • Eliminate false empty responses by querying all—rather than a subset of—Amazon SQS servers.

第二个项目符号在这里非常重要。即使您没有看到空响应,也可能存在更多消息存储在未被查询的服务器上。如果您启用长轮询,假设总共有超过 3 个服务器,您应该会看到返回的消息数量增加。

因此,我的建议是在您的队列上启用长轮询。为此,请参阅 Setting Up Long Polling 页面。


正如 DevilCode 在他下面的 中提到的,他能够通过使用 FIFO 队列而不是标准队列并启用长轮询来解决他的问题。

我认为 this 与 Jacob 指出的 question.As 类似,长轮询似乎是解决问题的方法。

长轮询:

        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
              .withWaitTimeSeconds(10)     // long poll: wait 10 seconds, max is 20 seconds
              .withMaxNumberOfMessages(10);