Spring 集成 + Spring 批处理:作业不会停止

Spring Integration + Spring Batch: the job doesn`t stop

我想从ftp服务器读取文件,然后将它保存到本地存储库并从服务器删除,运行读取文件的工作,在数据库上找到一条记录,更改一个参数和保存它。

出了什么问题:作业没有完成;增加工资并多次节省员工。

Spring 集成配置:

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer(DefaultFtpSessionFactory sessionFactory) {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
        fileSynchronizer.setRemoteDirectory(remoteDirectory);
        fileSynchronizer.setDeleteRemoteFiles(true);
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(cron = "*/5 * * * * ?"))
    public FtpInboundFileSynchronizingMessageSource ftpInboundFileSynchronizingMessageSource(FtpInboundFileSynchronizer fileSynchronizer) throws Exception {
        FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(fileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new File(localDirectory));
        messageSource.setLocalFilter(new AcceptOnceFileListFilter<>());
        return messageSource;
    }

    @Bean
    @ServiceActivator(inputChannel = "fileInputChannel")
    public FileWritingMessageHandler fileWritingMessageHandler() {
        FileWritingMessageHandler messageHandler = new FileWritingMessageHandler(new File(localDirectory));
        messageHandler.setOutputChannelName("jobLaunchRequestChannel");
        return messageHandler;
    }

    @ServiceActivator(inputChannel = "jobLaunchRequestChannel", outputChannel = "jobLaunchingGatewayChannel")
    public JobLaunchRequest jobLaunchRequest(File file) throws IOException {
        String[] content = FileUtils.readFileToString(file, "UTF-8").split("\s+");
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("filename", file.getAbsolutePath())
                .addString("id", content[0]).addString("salary", content[1])
//                .addLong("time", System.currentTimeMillis())
                .toJobParameters();
        return new JobLaunchRequest(increaseSalaryJob, jobParameters);
    }

    @Bean
    @ServiceActivator(inputChannel = "jobLaunchingGatewayChannel")
    public JobLaunchingGateway jobLaunchingGateway(SimpleJobLauncher jobLauncher) {
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
        jobLaunchingGateway.setOutputChannelName("finish");
        return jobLaunchingGateway;
    }

    @ServiceActivator(inputChannel = "finish")
    public void finish() {
        System.out.println("FINISH");
    }
}

Spring批量配置:

 @Bean
    public Job increaseSalaryJob(CustomJobListener listener, Step step1) {
        return jobBuilderFactory.get("increaseSalaryJob")
                .preventRestart()
                .listener(listener)
                .start(step1)
                .build();
    }

    @Bean
    public Step step1(ItemReader<Employee> reader) {
        return stepBuilderFactory.get("step1")
                .transactionManager(transactionManager)
                .<Employee, Employee> chunk(1)
                .reader(reader)
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public ItemReader<Employee> reader(@Value("#{jobParameters[id]}") Integer id) {
        log.info("reader");
        return () -> employeeService.get(id);
    }

    @Bean
    @StepScope
    public ItemProcessor<Employee, Employee> processor() {
        log.info("processor");
        return employee -> {
            log.info(employee.getName() + " had salary " + employee.getSalary());
            Integer salary = employee.getSalary() + 1;
            employee.setSalary(salary);
            log.info(employee.getName() + " have salary " + employee.getSalary());

            return employee;
        };
    }

    @Bean
    @StepScope
    public ItemWriter<Employee> writer() {
        log.info("writer");
        return employees -> {
            for (Employee employee : employees) {
                try {
                    employeeService.update(employee);
                    log.info(employee.getName() + " updated with salary " + employee.getSalary());
                } catch (ValidationException e) {
                    e.printStackTrace();
                }
            }
        };
    }

    @Bean
    public MapJobRepositoryFactoryBean jobRepositoryFactoryBean(PlatformTransactionManager transactionManager) {
        return new MapJobRepositoryFactoryBean(transactionManager);
    }

    @Bean
    public JobRepository jobRepository(MapJobRepositoryFactoryBean jobRepositoryFactoryBean) throws Exception {
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        return jobRepositoryFactoryBean.getObject();
    }

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }

我很乐意提供任何帮助。

您需要确保您的 reader returns null 在某些时候。这就是该步骤解释没有更多数据要处理并退出的方式(如果没有更多步骤 运行,这反过来将停止周围的作业)。

就是说,我看到您面向块的步骤的输入是单个 id。对于这个用例,一个简单的 tasklet 就足够了,不需要具有单个输入记录和 chunkSize=1.

的面向块的 tasklet