Spring http 客户端 xd 问题

Spring xd issue with http-client

我有 spring xd 模块,使用 rabbitmq 作为传输。我的模块有调用 rest url http://x.y.z/test

的 http 源 http 客户端处理器
stream create --name cycletest4 --definition "http |  http-client --url='''https://x.y.z/test''' --httpMethod=GET | log" 

http post --data '{ "messageAttribute": { "channelType" : "EML", "contentKey" : "20020", "messageFormat" : "1", "contentSubscriber" : "dmttts", "languageCode" : "en-ca" }, "substitutionKeyValueData" : { "SvcgLOBCd": "CA", "User": "user", "phone": "yyyy, "accountLast": "tttt", "userName": "LP", "Company": "bbbb", "firstName": "Ryan" } }'

现在,当我的休息客户端抛出任何异常,如 404 或连接超时异常,并且消息返回 http|http-client 之间的兔子队列时

我的理解是只有连接超时异常会被放回队列,任何其他异常或 200 都会将消息移动到下一个组件,它是 http-client| log.But 当我尝试它时,所有异常都被放回到 http|http-client 之间的队列中。

现在我的用例是我想重试所有套接字时间/连接超时异常。我想写入日志或文件接收器的任何其他系统异常 50x 错误?如何根据异常实现 this.Basically我想路由重试和非重试异常。

只有 2xx 个结果会转到 log

4xx5xx 被认为是错误。

您将需要一个自定义 http-client 模块来捕获您认为 'ok' 的异常并将它们转发到 output 频道。像...

<int:service-activator input-channel="input" ref="gw" />

<int:gateway request-channel="toHttp" error-channel="errors" />

<int:chain input-channel="errors" output-channel="output">
    <!-- examine payload.cause (http status code etc) and decide whether 
         to throw an exception or return the status code for sending to output -->
</int:chain>

我们尝试按照上面的建议实施。它现在正在过滤 4XX 错误并将有效负载发送到下一个模块。但是,消息在内部 rabbit mq 中未被确认。更改后 2XX 的情况下也不会发生确认。在更改之前,2XX 确认正常进行。 5XX 重试继续发生就好了。请让我们知道如何克服这个问题。

这里是上下文xml

<service-activator input-channel="inputX" ref="gw" />

<gateway id="gw" default-request-channel="toHttp" error-channel="errors" />

<beans:bean id="responseInterceptor" class="com.batch.httpclient.ResponseInterceptor">
</beans:bean>

<chain input-channel="errors" output-channel="output">
<transformer ref="responseInterceptor"  />
</chain>


<int-http:outbound-gateway id='batch-http'  header-mapper="headerMapper"
                           request-channel='toHttp' url-expression="${url}" http-method="${httpMethod}"
                           expected-response-type='java.lang.String' charset='${charset}'
                           reply-timeout='${replyTimeout}' reply-channel='output'>
</int-http:outbound-gateway>

<beans:bean id="headerMapper" class="org.springframework.integration.http.support.DefaultHttpHeaderMapper"
            factory-method="outboundMapper">
    <beans:property name="outboundHeaderNames" value="*"/>
    <beans:property name="userDefinedHeaderPrefix" value=""/>
</beans:bean>

<channel id="output" />
<channel id="input" />
<channel id="inputX" />
<channel id="toHttp" />

ResponseInterceptor 的 Java 代码如下所示 -

public Message<String> transform(ErrorMessage errorMessage) {


        if(null != errorMessage && null != errorMessage.getPayload() && null != errorMessage.getPayload().getCause()
                && null != errorMessage.getPayload().getCause().getMessage()){
            String rootCause = errorMessage.getPayload().getCause().getMessage();
            //check if the error message contains 400 or 404 http code.
            if(rootCause.contains("400") || rootCause.contains("404")){
                return MessageBuilder.withPayload(errorMessage.getPayload().getCause().getMessage())
                        .copyHeaders(errorMessage.getHeaders())
                        .removeHeader("errorChannel")
                        .removeHeader("replyChannel")
                        .setReplyChannelName("output").setErrorChannelName(null).build();
            }

        }
        return null;
    }

}