Spring 集成 DSL 错误处理程序线程 ID

Spring Integration DSL Error Handler Thread ID

目前我正在跟踪正在处理的活动线程,因为在我没有任何处理线程之前不会让系统关闭

例如

package com.example.demo.flow;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;

import java.io.File;
import java.util.concurrent.Executors;

/**
 * Created by on 03/01/2020.
 */
@Component
@Slf4j
public class TestFlow {

    @Bean
    public StandardIntegrationFlow errorChannelHandler() {

        return IntegrationFlows.from("testChannel")
                .handle(o -> {

                    log.info("Handling error....{}", o);
                }).get();
    }

    @Bean
    public IntegrationFlow testFile() {


        IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
                e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
                        .errorChannel("testChannel")))
                .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
                .transform(o -> {

                    throw new RuntimeException("Failing on purpose");

                }).handle(o -> {
                });

        return testChannel.get();


    }


}

我已经为集成流程启用了多个文件,但在错误处理程序中线程不同 我怎么知道它来自哪个线程?

有没有我能找到的,因为这很关键

根据您当前的配置,testChannel 是一个 DrectChannel,因此您发送给它的任何内容都将在您发送的线程上进行处理。 所以Thread.currentThread()就够你判断了。

对于更通用的解决方案,考虑将 MessagePublishingErrorHandler 作为一个 bean,并使用 ChannelUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME 覆盖默认值。此 MessagePublishingErrorHandler 可以与自定义 ErrorMessageStrategy 一起提供。在那里,当您创建 ErrorMessage 时,您可以添加具有相同 Thread.currentThread() 信息的自定义 header 以继续该错误通道处理,即使它是在单独的线程中完成的。

您也可以只抛出该信息的异常!