Spring 集成转换失败回滚 JMS 且未转发到错误通道
Spring Integration Transform Failure rolling back JMS and not forwarding to error channel
使用引导 2.2.2 和集成 5.2.2 - 当 XML 消息来自 File
并且解组失败(即它不是 XML)时消息继续正如预期 errorChannel
。但是,当消息来自 JMS,通过相同的通道路由并且解组失败时,它 not 路由到 errorChannel
并且消息回滚到 JMS。之后,我陷入了 SAXParseException
的无限循环,因为同一条消息。
我从 Proper ultimate way to migrate JMS event listening to Spring Integration with Spring Boot 中遵循了这个示例
。是否有一些我没有考虑的隐含交易控制?我如何让 Spring 集成将消息转发到 errorChannel
并从传入队列提交 'get'?
代码概要如下;
@Bean
public IntegrationFlow fileReader() {
return IntegrationFlows
.from(
Files
.inboundAdapter( ... )
...
.get(), e -> e.poller(Pollers.fixedDelay(1000))
)
.transform(new FileToStringTransformer())
.channel("backUpChannel")
.get();
}
@Bean
public IntegrationFlow getMessageFromJms(ConnectionFactory connectionFactory, @Value("${queues.myQueue}") String myQueue) {
return IntegrationFlows.from(
Jms
.messageDrivenChannelAdapter(connectionFactory)
.destination(myQueue)
)
.channel("backUpChannel")
.get();
}
@Bean
public IntegrationFlow doBackUp() {
return IntegrationFlows
.from("backUpChannel")
.<String>handle((payload, headers) -> {
String uuid = headers.get(MessageHeaders.ID).toString();
File backUpFile = new File("c:/backup/" + uuid + ".txt");
byte[] payloadContent = payload.getBytes();
try {
java.nio.file.Files.write(backUpFile.toPath(), payloadContent);
} catch (IOException e) {
e.printStackTrace();
}
return payload;
})
.channel("XXX")
.get();
}
@Bean
public Jaxb2Marshaller unmarshaller() {
Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
unmarshaller.setClassesToBeBound(MyClass.class);
return unmarshaller;
}
@Bean
public IntegrationFlow handleParseXml() {
return IntegrationFlows
.from("XXX")
.transform(new UnmarshallingTransformer(unmarshaller()))
.channel("YYY")
.get();
}
您需要将.errorChannel(...)
添加到消息驱动的通道适配器。
使用引导 2.2.2 和集成 5.2.2 - 当 XML 消息来自 File
并且解组失败(即它不是 XML)时消息继续正如预期 errorChannel
。但是,当消息来自 JMS,通过相同的通道路由并且解组失败时,它 not 路由到 errorChannel
并且消息回滚到 JMS。之后,我陷入了 SAXParseException
的无限循环,因为同一条消息。
我从 Proper ultimate way to migrate JMS event listening to Spring Integration with Spring Boot 中遵循了这个示例
。是否有一些我没有考虑的隐含交易控制?我如何让 Spring 集成将消息转发到 errorChannel
并从传入队列提交 'get'?
代码概要如下;
@Bean
public IntegrationFlow fileReader() {
return IntegrationFlows
.from(
Files
.inboundAdapter( ... )
...
.get(), e -> e.poller(Pollers.fixedDelay(1000))
)
.transform(new FileToStringTransformer())
.channel("backUpChannel")
.get();
}
@Bean
public IntegrationFlow getMessageFromJms(ConnectionFactory connectionFactory, @Value("${queues.myQueue}") String myQueue) {
return IntegrationFlows.from(
Jms
.messageDrivenChannelAdapter(connectionFactory)
.destination(myQueue)
)
.channel("backUpChannel")
.get();
}
@Bean
public IntegrationFlow doBackUp() {
return IntegrationFlows
.from("backUpChannel")
.<String>handle((payload, headers) -> {
String uuid = headers.get(MessageHeaders.ID).toString();
File backUpFile = new File("c:/backup/" + uuid + ".txt");
byte[] payloadContent = payload.getBytes();
try {
java.nio.file.Files.write(backUpFile.toPath(), payloadContent);
} catch (IOException e) {
e.printStackTrace();
}
return payload;
})
.channel("XXX")
.get();
}
@Bean
public Jaxb2Marshaller unmarshaller() {
Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
unmarshaller.setClassesToBeBound(MyClass.class);
return unmarshaller;
}
@Bean
public IntegrationFlow handleParseXml() {
return IntegrationFlows
.from("XXX")
.transform(new UnmarshallingTransformer(unmarshaller()))
.channel("YYY")
.get();
}
您需要将.errorChannel(...)
添加到消息驱动的通道适配器。