Spring 批处理集成 Start/Stop 来自控制器的文件适配器

Spring Batch Integration Start/Stop File Adapter from Controller

我不确定我的端点是否设置正确。我的最终目标是当我在两个适配器上调用停止时,如果一个文件被放置在入站通道适配器正在轮询的目录中,则该文件将不会被处理。事实并非如此——文件仍在接受轮询和处理。

目前,当代码运行时,从一开始就启动了 6 个任务计划程序:

2015-08-03 18:12:40,011 [task-scheduler-6] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:12:40,011 [task-scheduler-5] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:12:40,011 [task-scheduler-8] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:12:40,011 [task-scheduler-1] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:12:40,011 [task-scheduler-3] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:12:40,011 [task-scheduler-1] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'

当我发出止损指令时

@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop()

在其中一个适配器上,我剩下 5 个任务计划程序。这是正在发出的停止命令:

2015-08-03 18:15:28,392 [http-8080-1] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'control'
2015-08-03 18:15:28,392 [http-8080-1] DEBUG org.springframework.integration.channel.DirectChannel - preSend on channel 'control', message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop(), headers={timestamp=1438650928392, id=8107d99e-667c-1e78-31a0-dbcac2a6b03e}]
2015-08-03 18:15:28,392 [http-8080-1] DEBUG org.springframework.integration.handler.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@f275f321] received message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop(), headers={timestamp=1438650928392, id=8107d99e-667c-1e78-31a0-dbcac2a6b03e}]
2015-08-03 18:15:28,407 [http-8080-1] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'
2015-08-03 18:15:28,407 [http-8080-1] INFO  org.springframework.integration.endpoint.SourcePollingChannelAdapter - stopped mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter
2015-08-03 18:15:28,407 [http-8080-1] DEBUG org.springframework.integration.handler.ServiceActivatingHandler - handler 'ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@f275f321]' produced no reply for request Message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop(), headers={timestamp=1438650928392, id=8107d99e-667c-1e78-31a0-dbcac2a6b03e}]
2015-08-03 18:15:28,407 [http-8080-1] DEBUG org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'control', message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop(), headers={timestamp=1438650928392, id=8107d99e-667c-1e78-31a0-dbcac2a6b03e}]

这是第一个停止命令后生成的任务计划程序,现在只有 5 个:

2015-08-03 18:15:30,017 [task-scheduler-7] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:15:30,017 [task-scheduler-1] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:15:30,017 [task-scheduler-6] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:15:30,017 [task-scheduler-10] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:15:30,017 [task-scheduler-2] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'

当我在第二个适配器上发出第二个停止时

@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop()

命令如下:

2015-08-03 18:21:49,652 [http-8080-1] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'control'
2015-08-03 18:21:49,668 [http-8080-1] DEBUG org.springframework.integration.channel.DirectChannel - preSend on channel 'control', message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop(), headers={timestamp=1438651309652, id=d4ba5ca8-db6f-4ee9-564e-9a2df2f6fb4d}]
2015-08-03 18:21:49,668 [http-8080-1] DEBUG org.springframework.integration.handler.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@f275f321] received message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop(), headers={timestamp=1438651309652, id=d4ba5ca8-db6f-4ee9-564e-9a2df2f6fb4d}]
2015-08-03 18:21:49,668 [http-8080-1] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'
2015-08-03 18:21:49,668 [http-8080-1] INFO  org.springframework.integration.endpoint.SourcePollingChannelAdapter - stopped mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter
2015-08-03 18:21:49,668 [http-8080-1] DEBUG org.springframework.integration.handler.ServiceActivatingHandler - handler 'ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@f275f321]' produced no reply for request Message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop(), headers={timestamp=1438651309652, id=d4ba5ca8-db6f-4ee9-564e-9a2df2f6fb4d}]
2015-08-03 18:21:49,668 [http-8080-1] DEBUG org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'control', message: GenericMessage [payload=@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop(), headers={timestamp=1438651309652, id=d4ba5ca8-db6f-4ee9-564e-9a2df2f6fb4d}]

我剩下 4 个任务计划程序:

2015-08-03 18:23:40,014 [task-scheduler-2] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:23:40,014 [task-scheduler-6] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:23:40,014 [task-scheduler-5] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 18:23:40,014 [task-scheduler-7] DEBUG org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'

这里是MecFilePoller的代码class:

@Configuration
@PropertySource("classpath:mec.properties") 
@EnableIntegration
@IntegrationComponentScan
public class MecFilePoller {

private Job job;

private String fileParameterName;

@Autowired
MecProperty mecProperty;

@Bean
MessageChannel control() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel="control")
public ExpressionControlBusFactoryBean controlBus() {
    return new ExpressionControlBusFactoryBean();
}

public void setFileParameterName(String fileParameterName) {
    this.fileParameterName = fileParameterName;
}

public void setJob(Job job) {
    this.job = job;
}


@Bean
@InboundChannelAdapter(value = "ssnInboundFileChannel", poller = @Poller(cron="${mec/SSN_POLLER}"))
public MessageSource<File> fileMessageSourceSSN() {
     System.out.println("SSN POLLING...");
     FileReadingMessageSource source = initialSetUp();
     source.setDirectory(new File(mecProperty.getProperty(MecConstants.SSN_SFTP_WORKING_DIR)));
     System.out.println("enter fileMessageSource in following dir....." + mecProperty.getProperty(MecConstants.SSN_SFTP_WORKING_DIR));
     return source;
}

@Bean
@InboundChannelAdapter(value = "mdwInboundFileChannel", poller = @Poller(cron="${mec/MDW_POLLER}"))
public MessageSource<File> fileMessageSourceMDW() {
    System.out.println("MDW POLLING...");
    FileReadingMessageSource source = initialSetUp();
    source.setDirectory(new File(mecProperty.getProperty(MecConstants.MDW_SFTP_WORKING_DIR)));
    System.out.println("enter fileMessageSource in following dir....." + mecProperty.getProperty(MecConstants.MDW_SFTP_WORKING_DIR));
    return source;
}

private FileReadingMessageSource initialSetUp() {
    FileReadingMessageSource source = new FileReadingMessageSource();
    CompositeFileListFilter<File> compositeFileListFilter = new CompositeFileListFilter<File>(); 
    SimplePatternFileListFilter simplePatternFileListFilter = new SimplePatternFileListFilter("*.done");
    AcceptOnceFileListFilter<File> acceptOnceFileListFilter = new AcceptOnceFileListFilter<File>();

    compositeFileListFilter.addFilter(simplePatternFileListFilter);
    compositeFileListFilter.addFilter(acceptOnceFileListFilter);

    source.setFilter(compositeFileListFilter);
    return source;
}

@Transformer(inputChannel="mdwInboundFileChannel",outputChannel="mdwOutboundJobRequestChannel")
public JobLaunchRequest toRequestMDW(Message<File> message) {
    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    jobParametersBuilder.addString(INPUT_FILE_NAME, message.getPayload().getAbsolutePath());
    return new JobLaunchRequest((Job) appContext.getBean("mecmdwJob"), jobParametersBuilder.toJobParameters());
}    

@Transformer(inputChannel="ssnInboundFileChannel",outputChannel="ssnOutboundJobRequestChannel")
public JobLaunchRequest toRequestSSN(Message<File> message) {
    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    jobParametersBuilder.addString(INPUT_FILE_NAME, message.getPayload().getAbsolutePath());
    return new JobLaunchRequest((Job) appContext.getBean("mecssnJob"), jobParametersBuilder.toJobParameters());
}    

public void commandTypePollerRequest(String command, String type) {
    if(command.equals("stop") && type.equals("mdw")){
        control().send(new GenericMessage<String>(
                "@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.stop()"));
    }
    else if(command.equals("stop") && type.equals("ssn")){
        control().send(new GenericMessage<String>(
                "@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.stop()"));
    }
    else if(command.equals("start") && type.equals("mdw")){
        control().send(new GenericMessage<String>(
                "@'mecFilePoller.fileMessageSourceMDW.inboundChannelAdapter'.start()"));
    }
    else if(command.equals("start") && type.equals("ssn")){
        control().send(new GenericMessage<String>(
                "@'mecFilePoller.fileMessageSourceSSN.inboundChannelAdapter'.start()"));
    }
}

}

这是 MecPollerController class:

@Controller
public class MECPollerController {

@Autowired
MecFilePoller mecFilePoller;

@Autowired
private ApplicationContext appContext;

@Autowired
MessageChannel ssnOutboundJobRequestChannel;

@Autowired
MessageChannel mdwOutboundJobRequestChannel;

@RequestMapping(value = "ui/stopMdwPoller.action", method = RequestMethod.GET)
public void stopMdwPollerRequest() {
    mecFilePoller.commandTypePollerRequest("stop","mdw");
}

@RequestMapping(value = "ui/stopSsnPoller.action", method = RequestMethod.GET)
public void stopSsnPollerRequest() {
    mecFilePoller.commandTypePollerRequest("stop","ssn");
}

@RequestMapping(value = "ui/startMdwPoller.action", method = RequestMethod.GET)
public void startMdwPollerRequest() {
    mecFilePoller.commandTypePollerRequest("start","mdw");
}

@RequestMapping(value = "ui/startSsnPoller.action", method = RequestMethod.GET)
public void startSsnPollerRequest() {
    mecFilePoller.commandTypePollerRequest("start","ssn");
}

}

这是一个 Spring 批处理作业的 XML 配置,job-mecmdw.xml:

<int:channel id="mdwInboundFileChannel" />
<int:channel id="mdwOutboundJobRequestChannel" />
<int:channel id="mdwJobLaunchReplyChannel" />
<int:annotation-config />       
<bean class="org.springframework.integration.core.MessagingTemplate"></bean>

<!--    THIS WAS MOVED TO MecFilePoller
<int:transformer input-channel="mdwInboundFileChannel" output-channel="mdwOutboundJobRequestChannel">
    <bean
        class="org.batch.poller.MecFilePoller">
        <property name="job" ref="mecmdwJob" />
        <property name="fileParameterName" value="input.file.name" />
    </bean>
</int:transformer>
-->
<batch-int:job-launching-gateway request-channel="mdwOutboundJobRequestChannel" reply-channel="mdwJobLaunchReplyChannel" />
<int:logging-channel-adapter channel="mdwJobLaunchReplyChannel" />

这是另一个 Spring 批处理作业的 XML 配置,job-mecssn.xml:

<int:annotation-config />           
<int:channel id="ssnInboundFileChannel" />
<int:channel id="ssnOutboundJobRequestChannel" />
<int:channel id="ssnJobLaunchReplyChannel" />   

<!--    THIS WAS MOVED TO MecFilePoller
<int:transformer input-channel="ssnInboundFileChannel"
    output-channel="ssnOutboundJobRequestChannel">
    <bean
        class="org.batch.poller.MecFilePoller">
        <property name="job" ref="mecssnJob" />
        <property name="fileParameterName" value="input.file.name" />
    </bean>
</int:transformer>
-->

<batch-int:job-launching-gateway request-channel="ssnOutboundJobRequestChannel" reply-channel="ssnJobLaunchReplyChannel" />
<int:logging-channel-adapter channel="ssnJobLaunchReplyChannel" />  

入站通道适配器由(至少两个)bean 组成;在这种情况下,MessageSourceSourcePollingChannelAdapter.

在您的示例中,消息源获取指定的 bean 名称 (filePoller)。通道适配器名称已生成,将是

filePoller.fileMessageSource.inboundChannelAdapter

(@Configuration classbean名+方法名+类型)

我们有一个 open JIRA issue 来记录这些生成的 bean 名称。

由于名称是带点的,所以在使用控制总线时需要引用它:

@'filePoller.fileMessageSource.inboundChannelAdapter'.stop()

编辑

我刚刚测试了这个,它工作得很好...

@Configuration
@EnableIntegration
public class Foo {

    @Bean
    QueueChannel inboundFileChannel() {
        return new QueueChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "inboundFileChannel", poller = @Poller(fixedDelay="1000"))
    public MessageSource<?> fileMessageSource() {
        MessageSource<?> source = mock(MessageSource.class);
        return source;
    }

    @Bean
    MessageChannel control() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel="control")
    public ExpressionControlBusFactoryBean controlBus() {
        return new ExpressionControlBusFactoryBean();
    }

    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(Foo.class);
        Thread.sleep(2000);
        ctx.getBean("control", MessageChannel.class).send(
                new GenericMessage<String>("@'foo.fileMessageSource.inboundChannelAdapter'.stop()"));
        assertFalse(ctx.getBean(SourcePollingChannelAdapter.class).isRunning());
        Thread.sleep(2000);
        ctx.close();
    }
}

EDIT2 :

我在上面的例子中睡了一会儿,看到这个...

2015-08-03 12:23:34,528 [task-scheduler-1] DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 12:23:35,530 [task-scheduler-2] DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
2015-08-03 12:23:35,615 [main] DEBUG: org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'control'
2015-08-03 12:23:35,616 [main] DEBUG: org.springframework.integration.channel.DirectChannel - preSend on channel 'control', message: GenericMessage [payload=@'foo.fileMessageSource.inboundChannelAdapter'.stop(), headers={id=d0049777-025c-5439-6ee7-d66bce34868a, timestamp=1438619015616}]
2015-08-03 12:23:35,616 [main] DEBUG: org.springframework.integration.handler.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@1e0b4072] received message: GenericMessage [payload=@'foo.fileMessageSource.inboundChannelAdapter'.stop(), headers={id=d0049777-025c-5439-6ee7-d66bce34868a, timestamp=1438619015616}]
2015-08-03 12:23:35,617 [main] DEBUG: org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'foo.fileMessageSource.inboundChannelAdapter'
2015-08-03 12:23:35,618 [main] INFO : org.springframework.integration.endpoint.SourcePollingChannelAdapter - stopped foo.fileMessageSource.inboundChannelAdapter
2015-08-03 12:23:35,618 [main] DEBUG: org.springframework.integration.handler.ServiceActivatingHandler - handler 'ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@1e0b4072]' produced no reply for request Message: GenericMessage [payload=@'foo.fileMessageSource.inboundChannelAdapter'.stop(), headers={id=d0049777-025c-5439-6ee7-d66bce34868a, timestamp=1438619015616}]
2015-08-03 12:23:35,618 [main] DEBUG: org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'control', message: GenericMessage [payload=@'foo.fileMessageSource.inboundChannelAdapter'.stop(), headers={id=d0049777-025c-5439-6ee7-d66bce34868a, timestamp=1438619015616}]
2015-08-03 12:23:35,618 [main] DEBUG: org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'foo.fileMessageSource.inboundChannelAdapter'

(停止后无投票)。