Spring http 客户端 xd 问题
Spring xd issue with http-client
我有 spring xd 模块,使用 rabbitmq 作为传输。我的模块有调用 rest url http://x.y.z/test
的 http 源 http 客户端处理器
stream create --name cycletest4 --definition "http | http-client --url='''https://x.y.z/test''' --httpMethod=GET | log"
http post --data '{ "messageAttribute": { "channelType" : "EML", "contentKey" : "20020", "messageFormat" : "1", "contentSubscriber" : "dmttts", "languageCode" : "en-ca" }, "substitutionKeyValueData" : { "SvcgLOBCd": "CA", "User": "user", "phone": "yyyy, "accountLast": "tttt", "userName": "LP", "Company": "bbbb", "firstName": "Ryan" } }'
现在,当我的休息客户端抛出任何异常,如 404 或连接超时异常,并且消息返回 http|http-client 之间的兔子队列时
我的理解是只有连接超时异常会被放回队列,任何其他异常或 200 都会将消息移动到下一个组件,它是 http-client| log.But 当我尝试它时,所有异常都被放回到 http|http-client 之间的队列中。
现在我的用例是我想重试所有套接字时间/连接超时异常。我想写入日志或文件接收器的任何其他系统异常 50x 错误?如何根据异常实现 this.Basically我想路由重试和非重试异常。
只有 2xx 个结果会转到 log
。
4xx
和 5xx
被认为是错误。
您将需要一个自定义 http-client
模块来捕获您认为 'ok' 的异常并将它们转发到 output
频道。像...
<int:service-activator input-channel="input" ref="gw" />
<int:gateway request-channel="toHttp" error-channel="errors" />
<int:chain input-channel="errors" output-channel="output">
<!-- examine payload.cause (http status code etc) and decide whether
to throw an exception or return the status code for sending to output -->
</int:chain>
我们尝试按照上面的建议实施。它现在正在过滤 4XX 错误并将有效负载发送到下一个模块。但是,消息在内部 rabbit mq 中未被确认。更改后 2XX 的情况下也不会发生确认。在更改之前,2XX 确认正常进行。 5XX 重试继续发生就好了。请让我们知道如何克服这个问题。
这里是上下文xml
<service-activator input-channel="inputX" ref="gw" />
<gateway id="gw" default-request-channel="toHttp" error-channel="errors" />
<beans:bean id="responseInterceptor" class="com.batch.httpclient.ResponseInterceptor">
</beans:bean>
<chain input-channel="errors" output-channel="output">
<transformer ref="responseInterceptor" />
</chain>
<int-http:outbound-gateway id='batch-http' header-mapper="headerMapper"
request-channel='toHttp' url-expression="${url}" http-method="${httpMethod}"
expected-response-type='java.lang.String' charset='${charset}'
reply-timeout='${replyTimeout}' reply-channel='output'>
</int-http:outbound-gateway>
<beans:bean id="headerMapper" class="org.springframework.integration.http.support.DefaultHttpHeaderMapper"
factory-method="outboundMapper">
<beans:property name="outboundHeaderNames" value="*"/>
<beans:property name="userDefinedHeaderPrefix" value=""/>
</beans:bean>
<channel id="output" />
<channel id="input" />
<channel id="inputX" />
<channel id="toHttp" />
ResponseInterceptor 的 Java 代码如下所示 -
public Message<String> transform(ErrorMessage errorMessage) {
if(null != errorMessage && null != errorMessage.getPayload() && null != errorMessage.getPayload().getCause()
&& null != errorMessage.getPayload().getCause().getMessage()){
String rootCause = errorMessage.getPayload().getCause().getMessage();
//check if the error message contains 400 or 404 http code.
if(rootCause.contains("400") || rootCause.contains("404")){
return MessageBuilder.withPayload(errorMessage.getPayload().getCause().getMessage())
.copyHeaders(errorMessage.getHeaders())
.removeHeader("errorChannel")
.removeHeader("replyChannel")
.setReplyChannelName("output").setErrorChannelName(null).build();
}
}
return null;
}
}
我有 spring xd 模块,使用 rabbitmq 作为传输。我的模块有调用 rest url http://x.y.z/test
的 http 源 http 客户端处理器stream create --name cycletest4 --definition "http | http-client --url='''https://x.y.z/test''' --httpMethod=GET | log"
http post --data '{ "messageAttribute": { "channelType" : "EML", "contentKey" : "20020", "messageFormat" : "1", "contentSubscriber" : "dmttts", "languageCode" : "en-ca" }, "substitutionKeyValueData" : { "SvcgLOBCd": "CA", "User": "user", "phone": "yyyy, "accountLast": "tttt", "userName": "LP", "Company": "bbbb", "firstName": "Ryan" } }'
现在,当我的休息客户端抛出任何异常,如 404 或连接超时异常,并且消息返回 http|http-client 之间的兔子队列时
我的理解是只有连接超时异常会被放回队列,任何其他异常或 200 都会将消息移动到下一个组件,它是 http-client| log.But 当我尝试它时,所有异常都被放回到 http|http-client 之间的队列中。
现在我的用例是我想重试所有套接字时间/连接超时异常。我想写入日志或文件接收器的任何其他系统异常 50x 错误?如何根据异常实现 this.Basically我想路由重试和非重试异常。
只有 2xx 个结果会转到 log
。
4xx
和 5xx
被认为是错误。
您将需要一个自定义 http-client
模块来捕获您认为 'ok' 的异常并将它们转发到 output
频道。像...
<int:service-activator input-channel="input" ref="gw" />
<int:gateway request-channel="toHttp" error-channel="errors" />
<int:chain input-channel="errors" output-channel="output">
<!-- examine payload.cause (http status code etc) and decide whether
to throw an exception or return the status code for sending to output -->
</int:chain>
我们尝试按照上面的建议实施。它现在正在过滤 4XX 错误并将有效负载发送到下一个模块。但是,消息在内部 rabbit mq 中未被确认。更改后 2XX 的情况下也不会发生确认。在更改之前,2XX 确认正常进行。 5XX 重试继续发生就好了。请让我们知道如何克服这个问题。
这里是上下文xml
<service-activator input-channel="inputX" ref="gw" />
<gateway id="gw" default-request-channel="toHttp" error-channel="errors" />
<beans:bean id="responseInterceptor" class="com.batch.httpclient.ResponseInterceptor">
</beans:bean>
<chain input-channel="errors" output-channel="output">
<transformer ref="responseInterceptor" />
</chain>
<int-http:outbound-gateway id='batch-http' header-mapper="headerMapper"
request-channel='toHttp' url-expression="${url}" http-method="${httpMethod}"
expected-response-type='java.lang.String' charset='${charset}'
reply-timeout='${replyTimeout}' reply-channel='output'>
</int-http:outbound-gateway>
<beans:bean id="headerMapper" class="org.springframework.integration.http.support.DefaultHttpHeaderMapper"
factory-method="outboundMapper">
<beans:property name="outboundHeaderNames" value="*"/>
<beans:property name="userDefinedHeaderPrefix" value=""/>
</beans:bean>
<channel id="output" />
<channel id="input" />
<channel id="inputX" />
<channel id="toHttp" />
ResponseInterceptor 的 Java 代码如下所示 -
public Message<String> transform(ErrorMessage errorMessage) {
if(null != errorMessage && null != errorMessage.getPayload() && null != errorMessage.getPayload().getCause()
&& null != errorMessage.getPayload().getCause().getMessage()){
String rootCause = errorMessage.getPayload().getCause().getMessage();
//check if the error message contains 400 or 404 http code.
if(rootCause.contains("400") || rootCause.contains("404")){
return MessageBuilder.withPayload(errorMessage.getPayload().getCause().getMessage())
.copyHeaders(errorMessage.getHeaders())
.removeHeader("errorChannel")
.removeHeader("replyChannel")
.setReplyChannelName("output").setErrorChannelName(null).build();
}
}
return null;
}
}