覆盖在@MessagingGateway 中配置的 errorChannel
Overriding errorChannel configured in @MessagingGateway
我已如下配置 @MessagingGateway 以使用错误通道,它按预期工作。
@MessagingGateway(errorChannel = "DefaultInboundErrorHandlerChannel")
public interface InboundMessagingGateway {
@Gateway(requestChannel = "InboundEntryChannel")
void receive(XferRes response);
}
在流程中,我将对象传递给转换器,如下所示:
第 1 步:
@Transformer(inputChannel = "InboundEntryChannel", outputChannel = "TransmissionLogChannel")
public CassandraEntity createEntity(
org.springframework.messaging.Message<XferRes> message) throws ParseException {
XferRes response = message.getPayload();
CassandraEntity entity = new CassandraEntity();
// ... getters & setter ommitted for brevity
return entity;
}
接下来,我更新实体如下:
第 2 步:
@ServiceActivator(inputChannel = "TransmissionLogChannel", outputChannel="PublishChannel")
public XferRes updateCassandraEntity(
org.springframework.messaging.Message<XferRes> message) {
XferRes response = message.getPayload();
this.cassandraServiceImpl.update(response);
return response;
}
最后,我 post 一个 Kafka 主题如下:
第 3 步:
@ServiceActivator(inputChannel = "PublishChannel")
public void publish(org.springframework.messaging.Message<XferRes> message){
XferRes response = message.getPayload();
publisher.post(response);
}
如果出现错误,我 post 向服务发布错误对象以记录摄取的消息:
@ServiceActivator(inputChannel="defaultInboundErrorHandlerChannel")
public void handleInvalidRequest(org.springframework.messaging.Message<MessageHandlingException> message) throws ParseException {
XferRes originalRequest = (XferRes) message.getPayload().getFailedMessage().getPayload();
this.postToErrorBoard(originalRequest)
}
如果更新数据库时步骤2:发生错误,那么我也想调用步骤3。一个简单的方法是删除 Step 2 并从 Step 1 调用更新数据库。
在 Spring 集成中是否有任何其他方法可以调用 步骤 3 而不管是否发生错误。
这个技巧叫做PublishSubscribeChannel
。因为我看到你在第二步重用一个有效载荷发送到第三步,所以它绝对是 PublishSubscribeChannel
和它的两个顺序订阅者的用例。
我的意思是你创建了一个 PublishSubscribeChannel
@Bean
并且那些 @ServiceActivator
正在使用这个频道的名称。
更多信息在 Reference Manual 中。注意 ignoreFailures
属性:
/**
* Specify whether failures for one or more of the handlers should be
* ignored. By default this is <code>false</code> meaning that an Exception
* will be thrown whenever a handler fails. To override this and suppress
* Exceptions, set the value to <code>true</code>.
* @param ignoreFailures true if failures should be ignored.
*/
public void setIgnoreFailures(boolean ignoreFailures) {
我已如下配置 @MessagingGateway 以使用错误通道,它按预期工作。
@MessagingGateway(errorChannel = "DefaultInboundErrorHandlerChannel")
public interface InboundMessagingGateway {
@Gateway(requestChannel = "InboundEntryChannel")
void receive(XferRes response);
}
在流程中,我将对象传递给转换器,如下所示:
第 1 步:
@Transformer(inputChannel = "InboundEntryChannel", outputChannel = "TransmissionLogChannel")
public CassandraEntity createEntity(
org.springframework.messaging.Message<XferRes> message) throws ParseException {
XferRes response = message.getPayload();
CassandraEntity entity = new CassandraEntity();
// ... getters & setter ommitted for brevity
return entity;
}
接下来,我更新实体如下: 第 2 步:
@ServiceActivator(inputChannel = "TransmissionLogChannel", outputChannel="PublishChannel")
public XferRes updateCassandraEntity(
org.springframework.messaging.Message<XferRes> message) {
XferRes response = message.getPayload();
this.cassandraServiceImpl.update(response);
return response;
}
最后,我 post 一个 Kafka 主题如下: 第 3 步:
@ServiceActivator(inputChannel = "PublishChannel")
public void publish(org.springframework.messaging.Message<XferRes> message){
XferRes response = message.getPayload();
publisher.post(response);
}
如果出现错误,我 post 向服务发布错误对象以记录摄取的消息:
@ServiceActivator(inputChannel="defaultInboundErrorHandlerChannel")
public void handleInvalidRequest(org.springframework.messaging.Message<MessageHandlingException> message) throws ParseException {
XferRes originalRequest = (XferRes) message.getPayload().getFailedMessage().getPayload();
this.postToErrorBoard(originalRequest)
}
如果更新数据库时步骤2:发生错误,那么我也想调用步骤3。一个简单的方法是删除 Step 2 并从 Step 1 调用更新数据库。
在 Spring 集成中是否有任何其他方法可以调用 步骤 3 而不管是否发生错误。
这个技巧叫做PublishSubscribeChannel
。因为我看到你在第二步重用一个有效载荷发送到第三步,所以它绝对是 PublishSubscribeChannel
和它的两个顺序订阅者的用例。
我的意思是你创建了一个 PublishSubscribeChannel
@Bean
并且那些 @ServiceActivator
正在使用这个频道的名称。
更多信息在 Reference Manual 中。注意 ignoreFailures
属性:
/**
* Specify whether failures for one or more of the handlers should be
* ignored. By default this is <code>false</code> meaning that an Exception
* will be thrown whenever a handler fails. To override this and suppress
* Exceptions, set the value to <code>true</code>.
* @param ignoreFailures true if failures should be ignored.
*/
public void setIgnoreFailures(boolean ignoreFailures) {