AWS 集成 spring:延长可见性超时

Aws integration spring: Extend Visibility Timeout

是否可以延长正在发送的消息的可见时间。

参见:

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html

部分:更改消息的可见性超时。

http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sqs/AmazonSQSClient.html#changeMessageVisibility-com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest-

总而言之,我希望能够为正在发送的给定消息延长第一个设置的可见性超时。

例如,如果 15 秒已经过去,我想将超时再延长 20 秒。上面 java 文档中的更好示例。

根据我对以上链接的理解,您可以在亚马逊端进行此操作。

以下是我当前的设置;

  SqsMessageDrivenChannelAdapter adapter =
  new SqsMessageDrivenChannelAdapter(queue);
  adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
  adapter.setMaxNumberOfMessages(1);
  adapter.setSendTimeout(2000);
  adapter.setVisibilityTimeout(200);
  adapter.setWaitTimeOut(20);

是否可以延长此超时时间?

好的。看来我明白你的意思了。

我们可以使用 API:

更改特定消息的可见性
AmazonSQS.changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout)

为此目的,在下游流程中,您必须访问(注入)AmazonSQS bean 并从 Message:

中提取特殊的 headers
@Autowired
AmazonSQS amazonSqs;

@Autowired
ResourceIdResolver resourceIdResolver;
...


MessageHeaders headers = message.getHeaders();

DestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(this.amazonSqs, this.resourceIdResolver);

String queueUrl = destinationResolver.resolveDestination(headers.get(AwsHeaders.QUEUE));

String receiptHandle = headers.get(AwsHeaders.RECEIPT_HANDLE);

amazonSqs.changeMessageVisibility(queueUrl, receiptHandle, YOUR_DESIRED_VISIBILITY_TIMEOUT);

但是,呃,我同意我们应该提供一些关于此事的内容作为 out-of-the-box 功能。这甚至可能类似于 QueueMessageAcknowledgment 作为新的 header。或者甚至只是这个方法的另一种 changeMessageVisibility 方法。

请就此 SO 主题 link 为 Spring Cloud AWS 项目提出 GH 问题。

Spring Cloud AWS 从版本 2.0 开始支持此功能。在 SQS 侦听器方法中注入一个 Visiblity 参数就可以了:

  @SqsListener(value = "my-sqs-queue")
  void onMessageReceived(@Payload String payload, Visibility visibility) {
    ...
    var extension = visibility.extend(20);
    ...
  }

请注意,extend 将异步工作,并且 return 将成为 Future。因此,如果您想进一步确定处理过程,消息的可见性确实在 AWS 方面得到了扩展,可以使用 extension.get() 阻止 Future 或使用 extension.isDone()[ 查询 Future =15=]