Apache camel 和 SQS 扩展客户端库

Apache camel and SQS Extended Client Library

所以。如您所知,AWS Simple Queue Service 对消息的大小有一些限制。还有一个 library called Extended Client Library that help us to avoid this limitation by using S3 bucket storage. And I also use Apache Camel 来管理我的队列。

我想我只是误解了这个概念,但我在使 Apache Camel 和扩展 SQS 库一起工作时遇到了问题。

所以首先我定义了一条新路线

from("aws-sqs://queue?amazonSQSClient=#sqsClient" +
            "&maxMessagesPerPoll=10" +
            "&deleteAfterRead=false" +
            "&concurrentConsumers=10" +
            "&attributeNames=All" +
            "&messageAttributeNames=All"
        ).log(body());

sqsClient 定义在 camel-context.xml

    <bean name="sqsClient" class="com.amazonaws.services.sqs.AmazonSQSAsyncClient">
    <constructor-arg>
        <bean class="com.amazonaws.auth.BasicAWSCredentials">
            <constructor-arg value="${access.key}"/>
            <constructor-arg value="${secret.key}"/>
        </bean>
    </constructor-arg>
    <property name="region" ref="awsRegion"/>
</bean>

而且这点处理最大256kb的消息是没有问题的。

让我们更进一步。 Extended SQS Library 检测消息是否大于 256 kb 并引用 S3 存储中的文件。然后简单地获取它。

["com.amazon.sqs.javamessaging.MessageS3Pointer",{"s3BucketName":"test-bucket-ascelhem","s3Key":"b022dfd8-ed40-4213-a41e-5a0e82090ef4"}]

在客户端能够使用它之前有一些预配置:

首先,我们设置启用大负载支持的 SQS 扩展客户端配置。

ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration().withLargePayloadSupportEnabled(s3, s3BucketName);

最后

AmazonSQS sqsExtended = new AmazonSQSExtendedClient(new AmazonSQSClient(credentials), extendedClientConfig);
        Region sqsRegion = Region.getRegion(Regions.US_WEST_1);
        sqsExtended.setRegion(sqsRegion);

那么我可以通过 Spring DSL 配置它还是应该获取消息并在处理器内部获取消息正文?

接下来就是帮我们获取消息正文的方法了。

    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
    List<Message> messages = sqsExtended.receiveMessage(receiveMessageRequest).getMessages();

Apache camel 是否以某种方式覆盖了 getMessage?因为在应用程序内部我们可以通过调用方法body()来获取消息体。

如果有机会以这种方式配置 Apache Camel 以便它可以处理非常大的消息,new ExtendedClientConfiguration() .withLargePayloadSupportEnabled(s3, s3BucketName); 行让我很困惑,因为我不知道如何调用静态方法在 xml 文件中。

最后一行

<bean name="sqsClient" class="com.amazonaws.services.sqs.AmazonSQSAsyncClient">

这里我们使用AmazonSQSAsyncClient,但是正如我前面提到的,Extended Client Library 通过AmazonSQS...接口初始化?所以,我完全被困住了。

我还找到了使用这个库的 example。这可能对你有帮助。

谢谢。

所以我找到的最佳解决方案是创建一个进程,该进程首先从 s3 对象获取消息正文并对其进行处理。