Spring 集成 + Spring 批处理:作业不会停止
Spring Integration + Spring Batch: the job doesn`t stop
我想从ftp服务器读取文件,然后将它保存到本地存储库并从服务器删除,运行读取文件的工作,在数据库上找到一条记录,更改一个参数和保存它。
出了什么问题:作业没有完成;增加工资并多次节省员工。
Spring 集成配置:
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer(DefaultFtpSessionFactory sessionFactory) {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
fileSynchronizer.setRemoteDirectory(remoteDirectory);
fileSynchronizer.setDeleteRemoteFiles(true);
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(cron = "*/5 * * * * ?"))
public FtpInboundFileSynchronizingMessageSource ftpInboundFileSynchronizingMessageSource(FtpInboundFileSynchronizer fileSynchronizer) throws Exception {
FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(fileSynchronizer);
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(new File(localDirectory));
messageSource.setLocalFilter(new AcceptOnceFileListFilter<>());
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "fileInputChannel")
public FileWritingMessageHandler fileWritingMessageHandler() {
FileWritingMessageHandler messageHandler = new FileWritingMessageHandler(new File(localDirectory));
messageHandler.setOutputChannelName("jobLaunchRequestChannel");
return messageHandler;
}
@ServiceActivator(inputChannel = "jobLaunchRequestChannel", outputChannel = "jobLaunchingGatewayChannel")
public JobLaunchRequest jobLaunchRequest(File file) throws IOException {
String[] content = FileUtils.readFileToString(file, "UTF-8").split("\s+");
JobParameters jobParameters = new JobParametersBuilder()
.addString("filename", file.getAbsolutePath())
.addString("id", content[0]).addString("salary", content[1])
// .addLong("time", System.currentTimeMillis())
.toJobParameters();
return new JobLaunchRequest(increaseSalaryJob, jobParameters);
}
@Bean
@ServiceActivator(inputChannel = "jobLaunchingGatewayChannel")
public JobLaunchingGateway jobLaunchingGateway(SimpleJobLauncher jobLauncher) {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
jobLaunchingGateway.setOutputChannelName("finish");
return jobLaunchingGateway;
}
@ServiceActivator(inputChannel = "finish")
public void finish() {
System.out.println("FINISH");
}
}
Spring批量配置:
@Bean
public Job increaseSalaryJob(CustomJobListener listener, Step step1) {
return jobBuilderFactory.get("increaseSalaryJob")
.preventRestart()
.listener(listener)
.start(step1)
.build();
}
@Bean
public Step step1(ItemReader<Employee> reader) {
return stepBuilderFactory.get("step1")
.transactionManager(transactionManager)
.<Employee, Employee> chunk(1)
.reader(reader)
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public ItemReader<Employee> reader(@Value("#{jobParameters[id]}") Integer id) {
log.info("reader");
return () -> employeeService.get(id);
}
@Bean
@StepScope
public ItemProcessor<Employee, Employee> processor() {
log.info("processor");
return employee -> {
log.info(employee.getName() + " had salary " + employee.getSalary());
Integer salary = employee.getSalary() + 1;
employee.setSalary(salary);
log.info(employee.getName() + " have salary " + employee.getSalary());
return employee;
};
}
@Bean
@StepScope
public ItemWriter<Employee> writer() {
log.info("writer");
return employees -> {
for (Employee employee : employees) {
try {
employeeService.update(employee);
log.info(employee.getName() + " updated with salary " + employee.getSalary());
} catch (ValidationException e) {
e.printStackTrace();
}
}
};
}
@Bean
public MapJobRepositoryFactoryBean jobRepositoryFactoryBean(PlatformTransactionManager transactionManager) {
return new MapJobRepositoryFactoryBean(transactionManager);
}
@Bean
public JobRepository jobRepository(MapJobRepositoryFactoryBean jobRepositoryFactoryBean) throws Exception {
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
return jobRepositoryFactoryBean.getObject();
}
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
我很乐意提供任何帮助。
您需要确保您的 reader returns null
在某些时候。这就是该步骤解释没有更多数据要处理并退出的方式(如果没有更多步骤 运行,这反过来将停止周围的作业)。
就是说,我看到您面向块的步骤的输入是单个 id
。对于这个用例,一个简单的 tasklet 就足够了,不需要具有单个输入记录和 chunkSize=1
.
的面向块的 tasklet
我想从ftp服务器读取文件,然后将它保存到本地存储库并从服务器删除,运行读取文件的工作,在数据库上找到一条记录,更改一个参数和保存它。
出了什么问题:作业没有完成;增加工资并多次节省员工。
Spring 集成配置:
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer(DefaultFtpSessionFactory sessionFactory) {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
fileSynchronizer.setRemoteDirectory(remoteDirectory);
fileSynchronizer.setDeleteRemoteFiles(true);
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(cron = "*/5 * * * * ?"))
public FtpInboundFileSynchronizingMessageSource ftpInboundFileSynchronizingMessageSource(FtpInboundFileSynchronizer fileSynchronizer) throws Exception {
FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(fileSynchronizer);
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(new File(localDirectory));
messageSource.setLocalFilter(new AcceptOnceFileListFilter<>());
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "fileInputChannel")
public FileWritingMessageHandler fileWritingMessageHandler() {
FileWritingMessageHandler messageHandler = new FileWritingMessageHandler(new File(localDirectory));
messageHandler.setOutputChannelName("jobLaunchRequestChannel");
return messageHandler;
}
@ServiceActivator(inputChannel = "jobLaunchRequestChannel", outputChannel = "jobLaunchingGatewayChannel")
public JobLaunchRequest jobLaunchRequest(File file) throws IOException {
String[] content = FileUtils.readFileToString(file, "UTF-8").split("\s+");
JobParameters jobParameters = new JobParametersBuilder()
.addString("filename", file.getAbsolutePath())
.addString("id", content[0]).addString("salary", content[1])
// .addLong("time", System.currentTimeMillis())
.toJobParameters();
return new JobLaunchRequest(increaseSalaryJob, jobParameters);
}
@Bean
@ServiceActivator(inputChannel = "jobLaunchingGatewayChannel")
public JobLaunchingGateway jobLaunchingGateway(SimpleJobLauncher jobLauncher) {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
jobLaunchingGateway.setOutputChannelName("finish");
return jobLaunchingGateway;
}
@ServiceActivator(inputChannel = "finish")
public void finish() {
System.out.println("FINISH");
}
}
Spring批量配置:
@Bean
public Job increaseSalaryJob(CustomJobListener listener, Step step1) {
return jobBuilderFactory.get("increaseSalaryJob")
.preventRestart()
.listener(listener)
.start(step1)
.build();
}
@Bean
public Step step1(ItemReader<Employee> reader) {
return stepBuilderFactory.get("step1")
.transactionManager(transactionManager)
.<Employee, Employee> chunk(1)
.reader(reader)
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public ItemReader<Employee> reader(@Value("#{jobParameters[id]}") Integer id) {
log.info("reader");
return () -> employeeService.get(id);
}
@Bean
@StepScope
public ItemProcessor<Employee, Employee> processor() {
log.info("processor");
return employee -> {
log.info(employee.getName() + " had salary " + employee.getSalary());
Integer salary = employee.getSalary() + 1;
employee.setSalary(salary);
log.info(employee.getName() + " have salary " + employee.getSalary());
return employee;
};
}
@Bean
@StepScope
public ItemWriter<Employee> writer() {
log.info("writer");
return employees -> {
for (Employee employee : employees) {
try {
employeeService.update(employee);
log.info(employee.getName() + " updated with salary " + employee.getSalary());
} catch (ValidationException e) {
e.printStackTrace();
}
}
};
}
@Bean
public MapJobRepositoryFactoryBean jobRepositoryFactoryBean(PlatformTransactionManager transactionManager) {
return new MapJobRepositoryFactoryBean(transactionManager);
}
@Bean
public JobRepository jobRepository(MapJobRepositoryFactoryBean jobRepositoryFactoryBean) throws Exception {
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
return jobRepositoryFactoryBean.getObject();
}
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
我很乐意提供任何帮助。
您需要确保您的 reader returns null
在某些时候。这就是该步骤解释没有更多数据要处理并退出的方式(如果没有更多步骤 运行,这反过来将停止周围的作业)。
就是说,我看到您面向块的步骤的输入是单个 id
。对于这个用例,一个简单的 tasklet 就足够了,不需要具有单个输入记录和 chunkSize=1
.