Spring 集成 DSL 错误处理程序线程 ID
Spring Integration DSL Error Handler Thread ID
目前我正在跟踪正在处理的活动线程,因为在我没有任何处理线程之前不会让系统关闭
例如
package com.example.demo.flow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;
import java.io.File;
import java.util.concurrent.Executors;
/**
* Created by on 03/01/2020.
*/
@Component
@Slf4j
public class TestFlow {
@Bean
public StandardIntegrationFlow errorChannelHandler() {
return IntegrationFlows.from("testChannel")
.handle(o -> {
log.info("Handling error....{}", o);
}).get();
}
@Bean
public IntegrationFlow testFile() {
IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
.errorChannel("testChannel")))
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
.transform(o -> {
throw new RuntimeException("Failing on purpose");
}).handle(o -> {
});
return testChannel.get();
}
}
我已经为集成流程启用了多个文件,但在错误处理程序中线程不同
我怎么知道它来自哪个线程?
有没有我能找到的,因为这很关键
根据您当前的配置,testChannel
是一个 DrectChannel
,因此您发送给它的任何内容都将在您发送的线程上进行处理。
所以Thread.currentThread()
就够你判断了。
对于更通用的解决方案,考虑将 MessagePublishingErrorHandler
作为一个 bean,并使用 ChannelUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME
覆盖默认值。此 MessagePublishingErrorHandler
可以与自定义 ErrorMessageStrategy
一起提供。在那里,当您创建 ErrorMessage
时,您可以添加具有相同 Thread.currentThread()
信息的自定义 header 以继续该错误通道处理,即使它是在单独的线程中完成的。
您也可以只抛出该信息的异常!
目前我正在跟踪正在处理的活动线程,因为在我没有任何处理线程之前不会让系统关闭
例如
package com.example.demo.flow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;
import java.io.File;
import java.util.concurrent.Executors;
/**
* Created by on 03/01/2020.
*/
@Component
@Slf4j
public class TestFlow {
@Bean
public StandardIntegrationFlow errorChannelHandler() {
return IntegrationFlows.from("testChannel")
.handle(o -> {
log.info("Handling error....{}", o);
}).get();
}
@Bean
public IntegrationFlow testFile() {
IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
.errorChannel("testChannel")))
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
.transform(o -> {
throw new RuntimeException("Failing on purpose");
}).handle(o -> {
});
return testChannel.get();
}
}
我已经为集成流程启用了多个文件,但在错误处理程序中线程不同 我怎么知道它来自哪个线程?
有没有我能找到的,因为这很关键
根据您当前的配置,testChannel
是一个 DrectChannel
,因此您发送给它的任何内容都将在您发送的线程上进行处理。
所以Thread.currentThread()
就够你判断了。
对于更通用的解决方案,考虑将 MessagePublishingErrorHandler
作为一个 bean,并使用 ChannelUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME
覆盖默认值。此 MessagePublishingErrorHandler
可以与自定义 ErrorMessageStrategy
一起提供。在那里,当您创建 ErrorMessage
时,您可以添加具有相同 Thread.currentThread()
信息的自定义 header 以继续该错误通道处理,即使它是在单独的线程中完成的。
您也可以只抛出该信息的异常!