收到回复消息但接收线程已经收到回复

Reply message received but the receiving thread has already received a reply

我有一个流程使用 pub/sub 基于通道的网关和 2 个订阅者将消息发送到 2 个 SQS 队列。在这两者之间,我有链来进行消息转换。聚合器用于汇总每次网关调用的报告。 Happy path 工作正常,但是当我的转换器抛出错误时,我收到此消息 Reply message received but the receiving thread has already received a reply 并且聚合器被调用但是 Runner 中的 future.get 永远不会 returns。示例配置和测试代码如下:

配置

<!-- Gateway to Publish Data to SQS -->
    <task:executor id="dataExecutor" pool-size="10"/>
    <int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" apply-sequence="true"/>
    <int:publish-subscribe-channel id="sqsResultChannel"/>
    <int:publish-subscribe-channel id="gatewayErrorChannel"/>
    <int:publish-subscribe-channel id="gatewayReplyChannel"/>

    <int:gateway service-interface="com.abc.DataPublishGateway" id="dataPublishGateway"
                 error-channel="gatewayErrorChannel">
        <int:method name="publishToDataService"
                    payload-expression="#args[0]"
                    request-channel="dataChannel"
                    reply-channel="gatewayReplyChannel">
        </int:method>
    </int:gateway>

    <int:chain input-channel="gatewayErrorChannel" output-channel="sqsResultChannel">
        <int:header-enricher>
            <int:correlation-id expression="payload.failedMessage.headers.correlationId" />
            <int:header name="sequenceSize" expression="payload.failedMessage.headers.sequenceSize" />
            <int:header name="sequenceNumber" expression="payload.failedMessage.headers.sequenceNumber" />
        </int:header-enricher>
    </int:chain>


    <!-- Route to system-a SQS -->
    <int:channel id="sqsSystemAPublishChannel" />
    <int:chain input-channel="dataChannel" output-channel="sqsSystemAPublishChannel">
        <int:header-enricher>
            <int:header name="targetSystem" value="system-a" />
        </int:header-enricher>
        <int:transformer expression="payload/2"/> <!-- Simulate transformer error -->
    </int:chain>

    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="a-queue"
                                          channel="sqsSystemAPublishChannel"
                                          success-channel="sqsResultChannel"
                                          failure-channel="gatewayErrorChannel"/>

    <!-- Route to system-b SQS -->
    <int:channel id="sqsSystemBPublishChannel" />
    <int:chain input-channel="dataChannel" output-channel="sqsSystemBPublishChannel">
        <int:header-enricher>
            <int:header name="targetSystem" value="system-b" />
        </int:header-enricher>
        <int:transformer expression="payload.toLowerCase() "/>
    </int:chain>

    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="b-queue"
                                          channel="sqsSystemBPublishChannel"
                                          success-channel="sqsResultChannel"
                                          failure-channel="gatewayErrorChannel"/>


    <bean class="com.abc.DataResponseAggregator" id="responseAggregator" />
    <int:aggregator input-channel="sqsResultChannel" output-channel="gatewayReplyChannel" ref="responseAggregator"/>

    <!-- Generic Error Channel Logger -->
    <int:logging-channel-adapter log-full-message="true"
                                 logger-name="errorLogger"
                                 level="ERROR"
                                 channel="errorChannel"
                                 id="globalErrorLoggingAdapter"/>

聚合器

public class DataResponseAggregator {

    public Map<String, String> aggregate(List<Message> responses) {

        Map<String, String> resultMap = new HashMap<>();

        responses.forEach(message -> {
            if (message instanceof ErrorMessage) {
                String exceptionMessage = ((ErrorMessage) message).getPayload().getCause().getMessage();
                String targetSystem = ((MessagingException) message.getPayload()).getFailedMessage().getHeaders()
                        .get("targetSystem").toString();
                resultMap.put(targetSystem, exceptionMessage);
            }
            else {

                String targetSystem = message.getHeaders().get("targetSystem").toString();
                resultMap.put(targetSystem, "Ack -> " + message.getHeaders().get("aws_messageId").toString());
            }
        });
        return resultMap;
    }
}

网关

public interface DataPublishGateway {

    Future<Map<String, String>> publishToDataService(String message);
}

亚军

@Bean
    CommandLineRunner runner(DataPublishGateway dataPublishGateway) {
        return args -> {

            String[] messages = new String[]{"Message 1", "Message 2"};

            List<Future<Map<String, String>>> futureList = new ArrayList<>();
            Arrays.stream(messages).forEach(s -> {
                futureList.add(dataPublishGateway.publishToDataService(s));
            });
            System.out.println("Processing Futures and Printing Results...");
            futureList.forEach(mapFuture -> {
                try {
                    mapFuture.get().entrySet().forEach(entry -> {
                        System.out.println(entry.getKey() + " - " + entry.getValue());
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

        };
    }

日志

14:47:21.650  INFO 91449 --- [pool-1-thread-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-09-07 14:47:21.651  INFO 91449 --- [pool-1-thread-2] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-09-07 14:47:21.655  WARN 91449 --- [pool-1-thread-2] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply: GenericMessage [payload={system-a=Expression evaluation failed: payload/2; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1030E: The operator 'DIVIDE' is not supported between objects of type 'java.lang.String' and 'java.lang.Integer', system-b=Ack -> 90b62dff-f3c3-4288-b5e3-8178e410f60d}, headers={aws_messageId=90b62dff-f3c3-4288-b5e3-8178e410f60d, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b519afe, sequenceNumber=2, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b519afe, sequenceSize=2, correlationId=dc914588-f9a9-537f-0623-94938f84cec4, aws_serviceResult={MD5OfMessageBody: 83b2330607fe8f817ce6d24249dea373,MD5OfMessageAttributes: 5f1f442c363809afbd334ff00232c834,MessageId: 90b62dff-f3c3-4288-b5e3-8178e410f60d,}, id=9a6ad209-5291-6216-100b-502b4c37eb01, targetSystem=system-b, timestamp=1599482841654}]
2020-09-07 14:47:21.655  WARN 91449 --- [pool-1-thread-1] cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply: GenericMessage [payload={system-a=Expression evaluation failed: payload/2; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1030E: The operator 'DIVIDE' is not supported between objects of type 'java.lang.String' and 'java.lang.Integer', system-b=Ack -> 3c8a4479-22e1-48d8-aca6-a86c252e90c1}, headers={aws_messageId=3c8a4479-22e1-48d8-aca6-a86c252e90c1, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@f5a99c3, sequenceNumber=2, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@f5a99c3, sequenceSize=2, correlationId=74ba49e0-fef8-02e6-bc4b-b2ccdd7cd13b, aws_serviceResult={MD5OfMessageBody: 1db65a6a0a818fd39655b95e33ada11d,MD5OfMessageAttributes: cd4eae624b7c115f948034aafaba01bc,MessageId: 3c8a4479-22e1-48d8-aca6-a86c252e90c1,}, id=3e3de3f3-e8b0-470e-e62f-31815e662857, targetSystem=system-b, timestamp=1599482841654}]

我错过了什么?我希望转换器的错误遵循以下流程:gatewayErrorChannel -> sqsResultChannel -> aggregator -> gatewayReplyChannel 因为聚合器输出出现在 WARN 消息中,所以它会这样做。但为什么 future.get 从来没有 return 而且似乎 gatewayReplyChannel 接收聚合器输出两次?

通过一些日志记录,最重要的是了解在网关消息上设置的错误和回复通道 ,我能够解决问题。

这是什么问题?

  1. 未在进入链的消息上设置 errorChannel。
  2. 如果两条链都抛出异常,则到达聚合器的错误消息没有设置回复通道,return 将聚合消息发送到网关的回复通道。最终配置如下所示:
<!-- Gateway to Publish Data to SQS -->
    <task:executor id="dataExecutor" pool-size="10"/>
    <int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" apply-sequence="true"/>
    <int:publish-subscribe-channel id="sqsResultChannel"/>
    <int:publish-subscribe-channel id="gatewayErrorChannel"/>
    <int:publish-subscribe-channel id="gatewayReplyChannel"/>

    <int:gateway service-interface="com.abc.DataPublishGateway" id="dataPublishGateway"
                 error-channel="gatewayErrorChannel">
        <int:method name="publishToDataService"
                    payload-expression="#args[0]"
                    request-channel="dataChannel"
                    reply-channel="gatewayReplyChannel">
        </int:method>
    </int:gateway>

    <int:chain input-channel="gatewayErrorChannel" output-channel="sqsResultChannel">
        <int:header-enricher>
            <int:correlation-id expression="payload.failedMessage.headers.correlationId" />
            <int:header name="sequenceSize" expression="payload.failedMessage.headers.sequenceSize" />
            <int:header name="sequenceNumber" expression="payload.failedMessage.headers.sequenceNumber" />
            <int:reply-channel expression="payload.failedMessage.headers.replyChannel" />
        </int:header-enricher>
    </int:chain>


    <!-- Route to system-a SQS -->
    <int:channel id="sqsSystemAPublishChannel" />
    <int:chain input-channel="dataChannel" output-channel="sqsSystemAPublishChannel">
        <int:header-enricher>
            <int:header name="targetSystem" value="system-a" />
            <int:error-channel ref="gatewayErrorChannel" overwrite="true" />
        </int:header-enricher>
        <int:transformer expression="payload/2"/> <!-- Simulate transformer error -->
    </int:chain>

    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="a-queue"
                                          channel="sqsSystemAPublishChannel"
                                          success-channel="sqsResultChannel"
                                          failure-channel="gatewayErrorChannel"/>

    <!-- Route to system-b SQS -->
    <int:channel id="sqsSystemBPublishChannel" />
    <int:chain input-channel="dataChannel" output-channel="sqsSystemBPublishChannel">
        <int:header-enricher>
            <int:header name="targetSystem" value="system-b" />
            <int:error-channel ref="gatewayErrorChannel" overwrite="true" />
        </int:header-enricher>
        <int:transformer expression="payload.toLowerCase() "/>
    </int:chain>

    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="b-queue"
                                          channel="sqsSystemBPublishChannel"
                                          success-channel="sqsResultChannel"
                                          failure-channel="gatewayErrorChannel"/>


    <bean class="com.abc.DataResponseAggregator" id="responseAggregator" />
    <int:aggregator input-channel="sqsResultChannel" output-channel="gatewayReplyChannel" ref="responseAggregator"/>

    <!-- Generic Error Channel Logger -->
    <int:logging-channel-adapter log-full-message="true"
                                 logger-name="errorLogger"
                                 level="ERROR"
                                 channel="errorChannel"
                                 id="globalErrorLoggingAdapter"/>