提前检测到 AsyncListItemWriter 步骤结束
AsyncListItemWriter step end detected early
我已经编写了 ItemWriter
的异步版本来异步编写我的项目:
public class AsyncListItemWriter<T> implements ItemStreamWriter<T>, InitializingBean {
private ItemWriter<T> delegate;
private TaskExecutor taskExecutor = new SyncTaskExecutor();
public void afterPropertiesSet() throws Exception {
Assert.notNull(delegate, "A delegate ItemWriter must be provided.");
}
public void setDelegate(ItemWriter<T> delegate) {
this.delegate = delegate;
}
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).open(executionContext);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).update(executionContext);
}
}
@Override
public void close() throws ItemStreamException {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).close();
}
}
@Override
public void write(List<? extends T> items) {
StepExecution stepExecution = getStepExecution();
taskExecutor.execute(() -> {
if (stepExecution != null) {
StepSynchronizationManager.register(stepExecution);
}
try {
delegate.write(items);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (stepExecution != null) {
StepSynchronizationManager.close();
}
}
});
}
private StepExecution getStepExecution() {
StepContext context = StepSynchronizationManager.getContext();
if (context == null) {
return null;
}
StepExecution stepExecution = context.getStepExecution();
return stepExecution;
}
}
配置:
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(64);
executor.setMaxPoolSize(64);
executor.setQueueCapacity(64);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded-");
return executor;
}
@Bean
public ItemWriter<STModel> writer(){
return items -> {
Thread.sleep(1000);
System.out.println("Writing...");
for(STModel c : items) {
System.out.println("######### Writer : ------> " + c + " inside size : " + c.relation.size() + ", On : " +Thread.currentThread().getName());
}
};
}
@Bean
public AsyncListItemWriter<STModel> asyncWriter() throws Exception {
AsyncListItemWriter<STModel> asyncItemWriter = new AsyncListItemWriter<>();
asyncItemWriter.setDelegate(writer());
asyncItemWriter.setTaskExecutor(taskExecutor());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
@Bean
public Step sampleStep() throws Exception{
return stepBuilderFactory.get("processingStep")
.<STModel, STModel>chunk(10)
.reader(itpReader())
.writer(asyncWriter())
.build();
}
@Bean
public Job job() throws Exception{
return jobBuilderFactory.get("job")
.start(sampleStep())
.build();
}
读取所有文件后我收到此日志(文件在主线程上读取):
While Same Read ---->STModel(tripId=109138356-1_459178)
While Same Read ---->STModel(tripId=109138356-1_459178)
While Same Read ---->null
Read ---->null, On : main
2021-03-16 10:38:48.409 INFO 17042 --- [ main] o.s.batch.core.step.AbstractStep : Step: [processingStep] executed in 337ms
2021-03-16 10:38:48.412 INFO 17042 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 349ms
Writing...
######### Writer : ------> STModel(tripId=109138355-1_459164) inside size : 2, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459165) inside size : 1, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459166) inside size : 1, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459167) inside size : 1, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459168) inside size : 2, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507833-1_38959) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507835-1_38960) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507852-1_38961) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507863-1_38962) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507871-1_38963) inside size : 23, On : MultiThreaded-1
Writing...
######### Writer : ------> STModel(tripId=113507882-1_38964) inside size : 23, On : MultiThreaded-2
######### Writer : ------> STModel(tripId=113507890-1_38965) inside size : 23, On : MultiThreaded-2
######### Writer : ------> STModel(tripId=113507900-1_38966) inside size : 23, On : MultiThreaded-2
######### Writer : ------> STModel(tripId=113507911-1_38967) inside size : 23, On : MultiThreaded-2
如您所见,spring 批处理过早地检测到步骤结束,就在读取操作之后。
如何告诉 spring 批处理在所有写入任务完成后步骤结束?
As you can see, spring batch detect the end of the steap too early,
这是您应该期待的,因为您的编写器会在后台异步写入项目,而无需等待它们完成。一旦作者的 write
方法 returns,该步骤将继续读取下一个项目块,如果没有更多项目要读取,则可能会完成。这可能发生在之前 前一个块由您的异步任务执行程序在后台写入。
How can I tell to spring batch that the end of the step is after all writing tasks has been finished ?
在您的编写器中,您需要 submit
个任务(而不是 execute
个任务),获取它们的 Future
句柄并等待它们完成 Future.get
(如果需要超时)。
编辑: 添加示例
@Override
public void write(List<? extends T> items) {
List<Future<?>> futures = new ArrayList<>(items.size());
// write items in parallel (note the singleton list passed to the delegate)
for (T item : items) {
Future<?> future = taskExecutor.submit(() -> {
try {
delegate.write(Collections.singletonList(item));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
futures.add(future);
}
// wait for futures to finish
futures.forEach(future -> {
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
我在这里看到的唯一优势是项目将并行编写。这可以加快您为每个项目发送电子邮件的用例的速度。但是,我从您的评论中看到以下内容:
writing become concurrent to reading, so it speed up processing !
这是不正确的,读取下一个块仍然会等待当前写操作完成。因此,如上所述,您需要等待写入操作在您的编写器中完成,否则您的工作可能看起来已经完成,而项目仍在后台写入。
如果你真的希望读写都由多个线程同时完成,你需要使用多线程步骤:
@Bean
public Step sampleStep() throws Exception{
return stepBuilderFactory.get("processingStep")
.<STModel, STModel>chunk(10)
.reader(itpReader())
.writer(writer())
.taskExecutor(taskExecutor())
.build();
}
另一种选择是使用并发步骤,如本期所述:https://github.com/spring-projects/spring-batch/issues/2044. I have a PoC that uses a BlockingQueue
as a staging area here 如果您有兴趣。
我已经编写了 ItemWriter
的异步版本来异步编写我的项目:
public class AsyncListItemWriter<T> implements ItemStreamWriter<T>, InitializingBean {
private ItemWriter<T> delegate;
private TaskExecutor taskExecutor = new SyncTaskExecutor();
public void afterPropertiesSet() throws Exception {
Assert.notNull(delegate, "A delegate ItemWriter must be provided.");
}
public void setDelegate(ItemWriter<T> delegate) {
this.delegate = delegate;
}
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).open(executionContext);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).update(executionContext);
}
}
@Override
public void close() throws ItemStreamException {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).close();
}
}
@Override
public void write(List<? extends T> items) {
StepExecution stepExecution = getStepExecution();
taskExecutor.execute(() -> {
if (stepExecution != null) {
StepSynchronizationManager.register(stepExecution);
}
try {
delegate.write(items);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (stepExecution != null) {
StepSynchronizationManager.close();
}
}
});
}
private StepExecution getStepExecution() {
StepContext context = StepSynchronizationManager.getContext();
if (context == null) {
return null;
}
StepExecution stepExecution = context.getStepExecution();
return stepExecution;
}
}
配置:
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(64);
executor.setMaxPoolSize(64);
executor.setQueueCapacity(64);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded-");
return executor;
}
@Bean
public ItemWriter<STModel> writer(){
return items -> {
Thread.sleep(1000);
System.out.println("Writing...");
for(STModel c : items) {
System.out.println("######### Writer : ------> " + c + " inside size : " + c.relation.size() + ", On : " +Thread.currentThread().getName());
}
};
}
@Bean
public AsyncListItemWriter<STModel> asyncWriter() throws Exception {
AsyncListItemWriter<STModel> asyncItemWriter = new AsyncListItemWriter<>();
asyncItemWriter.setDelegate(writer());
asyncItemWriter.setTaskExecutor(taskExecutor());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
@Bean
public Step sampleStep() throws Exception{
return stepBuilderFactory.get("processingStep")
.<STModel, STModel>chunk(10)
.reader(itpReader())
.writer(asyncWriter())
.build();
}
@Bean
public Job job() throws Exception{
return jobBuilderFactory.get("job")
.start(sampleStep())
.build();
}
读取所有文件后我收到此日志(文件在主线程上读取):
While Same Read ---->STModel(tripId=109138356-1_459178)
While Same Read ---->STModel(tripId=109138356-1_459178)
While Same Read ---->null
Read ---->null, On : main
2021-03-16 10:38:48.409 INFO 17042 --- [ main] o.s.batch.core.step.AbstractStep : Step: [processingStep] executed in 337ms
2021-03-16 10:38:48.412 INFO 17042 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 349ms
Writing...
######### Writer : ------> STModel(tripId=109138355-1_459164) inside size : 2, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459165) inside size : 1, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459166) inside size : 1, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459167) inside size : 1, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459168) inside size : 2, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507833-1_38959) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507835-1_38960) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507852-1_38961) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507863-1_38962) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507871-1_38963) inside size : 23, On : MultiThreaded-1
Writing...
######### Writer : ------> STModel(tripId=113507882-1_38964) inside size : 23, On : MultiThreaded-2
######### Writer : ------> STModel(tripId=113507890-1_38965) inside size : 23, On : MultiThreaded-2
######### Writer : ------> STModel(tripId=113507900-1_38966) inside size : 23, On : MultiThreaded-2
######### Writer : ------> STModel(tripId=113507911-1_38967) inside size : 23, On : MultiThreaded-2
如您所见,spring 批处理过早地检测到步骤结束,就在读取操作之后。
如何告诉 spring 批处理在所有写入任务完成后步骤结束?
As you can see, spring batch detect the end of the steap too early,
这是您应该期待的,因为您的编写器会在后台异步写入项目,而无需等待它们完成。一旦作者的 write
方法 returns,该步骤将继续读取下一个项目块,如果没有更多项目要读取,则可能会完成。这可能发生在之前 前一个块由您的异步任务执行程序在后台写入。
How can I tell to spring batch that the end of the step is after all writing tasks has been finished ?
在您的编写器中,您需要 submit
个任务(而不是 execute
个任务),获取它们的 Future
句柄并等待它们完成 Future.get
(如果需要超时)。
编辑: 添加示例
@Override
public void write(List<? extends T> items) {
List<Future<?>> futures = new ArrayList<>(items.size());
// write items in parallel (note the singleton list passed to the delegate)
for (T item : items) {
Future<?> future = taskExecutor.submit(() -> {
try {
delegate.write(Collections.singletonList(item));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
futures.add(future);
}
// wait for futures to finish
futures.forEach(future -> {
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
我在这里看到的唯一优势是项目将并行编写。这可以加快您为每个项目发送电子邮件的用例的速度。但是,我从您的评论中看到以下内容:
writing become concurrent to reading, so it speed up processing !
这是不正确的,读取下一个块仍然会等待当前写操作完成。因此,如上所述,您需要等待写入操作在您的编写器中完成,否则您的工作可能看起来已经完成,而项目仍在后台写入。
如果你真的希望读写都由多个线程同时完成,你需要使用多线程步骤:
@Bean
public Step sampleStep() throws Exception{
return stepBuilderFactory.get("processingStep")
.<STModel, STModel>chunk(10)
.reader(itpReader())
.writer(writer())
.taskExecutor(taskExecutor())
.build();
}
另一种选择是使用并发步骤,如本期所述:https://github.com/spring-projects/spring-batch/issues/2044. I have a PoC that uses a BlockingQueue
as a staging area here 如果您有兴趣。