处理集成流程中可能发生的异常
Handling exception that might occur in an integration flow
我有一个 REST 控制器,它调用一个用 @MessagingGateway(errorChannel = ERROR_CHANNEL)
注释的网关
这样,无论下游发生什么错误,由网关启动的集成流都会流入一个错误通道,该通道将由另一个集成流处理,这是按预期工作的。
现在,还有另一种情况,其中集成流从 Kafka 读取消息,将这些消息路由到另一个通道,另一个集成流处理这些消息,另一个流将 HTTP 请求发送到远程服务。
public IntegrationFlowBuilder attachmentEventTenantRouter(String tenantId) {
return attachmentEventBaseFlow(".*")
.filter(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY) != null && m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY, String.class).equalsIgnoreCase(tenantId));
}
private IntegrationFlowBuilder attachmentEventBaseFlow(String eventRegex) {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory.createContainer(topic)).errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME))
.log(LoggingHandler.Level.DEBUG, "Inside Kafka Consumer")
.filter(Message.class, m -> filter(m, eventRegex))
.transform(messageToEventTransformer);
}
@Bean
public IntegrationFlow kafkaConsumerFlow() {
return fromKafkaFlowHelper.attachmentEventTenantRouter(TENANT_ID)
.route(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.EVENT_TYPE_KEY, String.class), p -> p
.resolutionRequired(false)
.channelMapping("eventType", "transformMessagesFromKafkaAndPublishAnotherEvent")
.defaultOutputChannel("nullChannel"))
.get();
}
@Bean
public IntegrationFlow transformMessagesFromKafkaAndPublishAnotherEvent() {
return flow -> flow
.transform(transformer)
.handle( getKafkaHandler() );
}
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(gatewayCall, e -> e.advice(expressionAdvice()));
}
如何处理上述流程中可能出现的异常情况?
如您所见,我正在使用 ExpressionEvaluatingRequestHandlerAdvice,它为 handle 方法工作,但不确定如何处理转换器中可能发生的异常?
配置了错误通道的消息网关在rest controller调用网关时起作用,但是当流程由Kafka消费者发起时,我不知道如何实现这一点。
谢谢。
在 Artem 回应添加澄清后编辑:
这是向远程服务发布请求的集成流配置,其异常似乎没有被捕获并路由到没有 ExpressionEvaluatingRequestHandlerAdvice 的 errorChannel:
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(getOAuth2Handler(HttpMethod.PUT, "remote url"), e -> e.advice(expressionEvaluatingRequestHandlerAdvice));
}
private OAuth2RequestHandler getOAuth2Handler(HttpMethod httpMethod, String url) {
return new OAuth2RequestHandler(oAuth2RestTemplate, httpMethod, url);
}
和class OAuth2RequestHandler 实现了一个 MessageHandler
@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
String requestBody = (String) message.getPayload();
ResponseEntity<String> response = oAuth2RestTemplate.exchange(url, httpMethod, new HttpEntity<>(requestBody), String.class);
}
我看到您已经在 Kafka message-driven 通道适配器上使用了 errorChannel()
。那么,问题是什么?
这部分流程与提到的 @MesaagingGateway
及其 errorChannel
配置完全相同。
我有一个 REST 控制器,它调用一个用 @MessagingGateway(errorChannel = ERROR_CHANNEL)
这样,无论下游发生什么错误,由网关启动的集成流都会流入一个错误通道,该通道将由另一个集成流处理,这是按预期工作的。
现在,还有另一种情况,其中集成流从 Kafka 读取消息,将这些消息路由到另一个通道,另一个集成流处理这些消息,另一个流将 HTTP 请求发送到远程服务。
public IntegrationFlowBuilder attachmentEventTenantRouter(String tenantId) {
return attachmentEventBaseFlow(".*")
.filter(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY) != null && m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY, String.class).equalsIgnoreCase(tenantId));
}
private IntegrationFlowBuilder attachmentEventBaseFlow(String eventRegex) {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory.createContainer(topic)).errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME))
.log(LoggingHandler.Level.DEBUG, "Inside Kafka Consumer")
.filter(Message.class, m -> filter(m, eventRegex))
.transform(messageToEventTransformer);
}
@Bean
public IntegrationFlow kafkaConsumerFlow() {
return fromKafkaFlowHelper.attachmentEventTenantRouter(TENANT_ID)
.route(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.EVENT_TYPE_KEY, String.class), p -> p
.resolutionRequired(false)
.channelMapping("eventType", "transformMessagesFromKafkaAndPublishAnotherEvent")
.defaultOutputChannel("nullChannel"))
.get();
}
@Bean
public IntegrationFlow transformMessagesFromKafkaAndPublishAnotherEvent() {
return flow -> flow
.transform(transformer)
.handle( getKafkaHandler() );
}
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(gatewayCall, e -> e.advice(expressionAdvice()));
}
如何处理上述流程中可能出现的异常情况?
如您所见,我正在使用 ExpressionEvaluatingRequestHandlerAdvice,它为 handle 方法工作,但不确定如何处理转换器中可能发生的异常?
配置了错误通道的消息网关在rest controller调用网关时起作用,但是当流程由Kafka消费者发起时,我不知道如何实现这一点。
谢谢。
在 Artem 回应添加澄清后编辑:
这是向远程服务发布请求的集成流配置,其异常似乎没有被捕获并路由到没有 ExpressionEvaluatingRequestHandlerAdvice 的 errorChannel:
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(getOAuth2Handler(HttpMethod.PUT, "remote url"), e -> e.advice(expressionEvaluatingRequestHandlerAdvice));
}
private OAuth2RequestHandler getOAuth2Handler(HttpMethod httpMethod, String url) {
return new OAuth2RequestHandler(oAuth2RestTemplate, httpMethod, url);
}
和class OAuth2RequestHandler 实现了一个 MessageHandler
@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
String requestBody = (String) message.getPayload();
ResponseEntity<String> response = oAuth2RestTemplate.exchange(url, httpMethod, new HttpEntity<>(requestBody), String.class);
}
我看到您已经在 Kafka message-driven 通道适配器上使用了 errorChannel()
。那么,问题是什么?
这部分流程与提到的 @MesaagingGateway
及其 errorChannel
配置完全相同。