正在 Spring 批量保存文件信息 MultiResourceItemReader
Saving file information in Spring batch MultiResourceItemReader
我有一个包含文本文件的目录。我想处理文件并将数据写入数据库。我通过使用 MultiResourceItemReader
.
做到了这一点
我有一个场景,每当文件到来时,第一步是保存文件信息,如文件名、日志中文件中的记录数 table(custom table).
因为我使用了MultiResourceItemReader
,它加载所有文件一次,我写的代码在服务器启动时执行一次。我尝试使用 getCurrentResource()
方法,但它返回 null。
请参考下面的代码。
NetFileProcessController.java
@Slf4j
@RestController
@RequestMapping("/netProcess")
public class NetFileProcessController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("netFileParseJob")
private Job job;
@GetMapping(path = "/process")
public @ResponseBody StatusResponse process() throws ServiceException {
try {
Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("date", new JobParameter(new Date()));
jobLauncher.run(job, new JobParameters(parameters));
return new StatusResponse(true);
} catch (Exception e) {
log.error("Exception", e);
Throwable rootException = ExceptionUtils.getRootCause(e);
String errMessage = rootException.getMessage();
log.info("Root cause is instance of JobInstanceAlreadyCompleteException --> "+(rootException instanceof JobInstanceAlreadyCompleteException));
if(rootException instanceof JobInstanceAlreadyCompleteException){
log.info(errMessage);
return new StatusResponse(false, "This job has been completed already!");
} else{
throw new ServiceException(errMessage);
}
}
}
}
BatchConfig.java
@Configuration
@EnableBatchProcessing
public class BatchConfig {
private JobBuilderFactory jobBuilderFactory;
@Autowired
public void setJobBuilderFactory(JobBuilderFactory jobBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
}
@Autowired
StepBuilderFactory stepBuilderFactory;
@Value("file:${input.files.location}${input.file.pattern}")
private Resource[] netFileInputs;
@Value("${net.file.column.names}")
private String netFilecolumnNames;
@Value("${net.file.column.lengths}")
private String netFileColumnLengths;
@Autowired
NetFileInfoTasklet netFileInfoTasklet;
@Autowired
NetFlatFileProcessor netFlatFileProcessor;
@Autowired
NetFlatFileWriter netFlatFileWriter;
@Bean
public Job netFileParseJob() {
return jobBuilderFactory.get("netFileParseJob")
.incrementer(new RunIdIncrementer())
.start(netFileStep())
.build();
}
public Step netFileStep() {
return stepBuilderFactory.get("netFileStep")
.<NetDetailsDTO, NetDetailsDTO>chunk(1)
.reader(new NetFlatFileReader(netFileInputs, netFilecolumnNames, netFileColumnLengths))
.processor(netFlatFileProcessor)
.writer(netFlatFileWriter)
.build();
}
}
NetFlatFileReader.java
@Slf4j
public class NetFlatFileReader extends MultiResourceItemReader<NetDetailsDTO> {
public netFlatFileReader(Resource[] netFileInputs, String netFilecolumnNames, String netFileColumnLengths) {
setResources(netFileInputs);
setDelegate(reader(netFilecolumnNames, netFileColumnLengths));
}
private FlatFileItemReader<NetDetailsDTO> reader(String netFilecolumnNames, String netFileColumnLengths) {
FlatFileItemReader<NetDetailsDTO> flatFileItemReader = new FlatFileItemReader<>();
FixedLengthTokenizer tokenizer = CommonUtil.fixedLengthTokenizer(netFilecolumnNames, netFileColumnLengths);
FieldSetMapper<NetDetailsDTO> mapper = createMapper();
DefaultLineMapper<NetDetailsDTO> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(mapper);
flatFileItemReader.setLineMapper(lineMapper);
return flatFileItemReader;
}
/*
* Mapping column data to DTO
*/
private FieldSetMapper<NetDetailsDTO> createMapper() {
BeanWrapperFieldSetMapper<NetDetailsDTO> mapper = new BeanWrapperFieldSetMapper<>();
try {
mapper.setTargetType(NetDetailsDTO.class);
} catch(Exception e) {
log.error("Exception in mapping column data to dto ", e);
}
return mapper;
}
}
我被困在这种情况下,感谢任何帮助
我认为 MultiResourceItemReader
不适合您的情况。我会 运行 每个文件一份工作,出于使一件事做一件事并做好的所有原因:
- 您的准备步骤将按设计工作
- 并行 运行 多个作业并提高文件摄取吞吐量会更容易
- 万一失败,您只会重新启动失败文件的作业
编辑:添加示例
Resource[] netFileInputs = ... // same code that looks for file as currently in your reader
for (Resource netFileInput : netFileInputs) {
Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("netFileInput", new JobParameter(netFileInput.getFilename()));
jobLauncher.run(job, new JobParameters(parameters));
}
我有一个包含文本文件的目录。我想处理文件并将数据写入数据库。我通过使用 MultiResourceItemReader
.
我有一个场景,每当文件到来时,第一步是保存文件信息,如文件名、日志中文件中的记录数 table(custom table).
因为我使用了MultiResourceItemReader
,它加载所有文件一次,我写的代码在服务器启动时执行一次。我尝试使用 getCurrentResource()
方法,但它返回 null。
请参考下面的代码。
NetFileProcessController.java
@Slf4j
@RestController
@RequestMapping("/netProcess")
public class NetFileProcessController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("netFileParseJob")
private Job job;
@GetMapping(path = "/process")
public @ResponseBody StatusResponse process() throws ServiceException {
try {
Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("date", new JobParameter(new Date()));
jobLauncher.run(job, new JobParameters(parameters));
return new StatusResponse(true);
} catch (Exception e) {
log.error("Exception", e);
Throwable rootException = ExceptionUtils.getRootCause(e);
String errMessage = rootException.getMessage();
log.info("Root cause is instance of JobInstanceAlreadyCompleteException --> "+(rootException instanceof JobInstanceAlreadyCompleteException));
if(rootException instanceof JobInstanceAlreadyCompleteException){
log.info(errMessage);
return new StatusResponse(false, "This job has been completed already!");
} else{
throw new ServiceException(errMessage);
}
}
}
}
BatchConfig.java
@Configuration
@EnableBatchProcessing
public class BatchConfig {
private JobBuilderFactory jobBuilderFactory;
@Autowired
public void setJobBuilderFactory(JobBuilderFactory jobBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
}
@Autowired
StepBuilderFactory stepBuilderFactory;
@Value("file:${input.files.location}${input.file.pattern}")
private Resource[] netFileInputs;
@Value("${net.file.column.names}")
private String netFilecolumnNames;
@Value("${net.file.column.lengths}")
private String netFileColumnLengths;
@Autowired
NetFileInfoTasklet netFileInfoTasklet;
@Autowired
NetFlatFileProcessor netFlatFileProcessor;
@Autowired
NetFlatFileWriter netFlatFileWriter;
@Bean
public Job netFileParseJob() {
return jobBuilderFactory.get("netFileParseJob")
.incrementer(new RunIdIncrementer())
.start(netFileStep())
.build();
}
public Step netFileStep() {
return stepBuilderFactory.get("netFileStep")
.<NetDetailsDTO, NetDetailsDTO>chunk(1)
.reader(new NetFlatFileReader(netFileInputs, netFilecolumnNames, netFileColumnLengths))
.processor(netFlatFileProcessor)
.writer(netFlatFileWriter)
.build();
}
}
NetFlatFileReader.java
@Slf4j
public class NetFlatFileReader extends MultiResourceItemReader<NetDetailsDTO> {
public netFlatFileReader(Resource[] netFileInputs, String netFilecolumnNames, String netFileColumnLengths) {
setResources(netFileInputs);
setDelegate(reader(netFilecolumnNames, netFileColumnLengths));
}
private FlatFileItemReader<NetDetailsDTO> reader(String netFilecolumnNames, String netFileColumnLengths) {
FlatFileItemReader<NetDetailsDTO> flatFileItemReader = new FlatFileItemReader<>();
FixedLengthTokenizer tokenizer = CommonUtil.fixedLengthTokenizer(netFilecolumnNames, netFileColumnLengths);
FieldSetMapper<NetDetailsDTO> mapper = createMapper();
DefaultLineMapper<NetDetailsDTO> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(mapper);
flatFileItemReader.setLineMapper(lineMapper);
return flatFileItemReader;
}
/*
* Mapping column data to DTO
*/
private FieldSetMapper<NetDetailsDTO> createMapper() {
BeanWrapperFieldSetMapper<NetDetailsDTO> mapper = new BeanWrapperFieldSetMapper<>();
try {
mapper.setTargetType(NetDetailsDTO.class);
} catch(Exception e) {
log.error("Exception in mapping column data to dto ", e);
}
return mapper;
}
}
我被困在这种情况下,感谢任何帮助
我认为 MultiResourceItemReader
不适合您的情况。我会 运行 每个文件一份工作,出于使一件事做一件事并做好的所有原因:
- 您的准备步骤将按设计工作
- 并行 运行 多个作业并提高文件摄取吞吐量会更容易
- 万一失败,您只会重新启动失败文件的作业
编辑:添加示例
Resource[] netFileInputs = ... // same code that looks for file as currently in your reader
for (Resource netFileInput : netFileInputs) {
Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("netFileInput", new JobParameter(netFileInput.getFilename()));
jobLauncher.run(job, new JobParameters(parameters));
}