提前检测到 AsyncListItemWriter 步骤结束

AsyncListItemWriter step end detected early

我已经编写了 ItemWriter 的异步版本来异步编写我的项目:

public class AsyncListItemWriter<T> implements ItemStreamWriter<T>, InitializingBean {

    private ItemWriter<T> delegate;

    private TaskExecutor taskExecutor = new SyncTaskExecutor();

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(delegate, "A delegate ItemWriter must be provided.");
    }

    public void setDelegate(ItemWriter<T> delegate) {
        this.delegate = delegate;
    }

    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).open(executionContext);
        }
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).update(executionContext);
        }
    }

    @Override
    public void close() throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).close();
        }
    }

    @Override
    public void write(List<? extends T> items)  {
        StepExecution stepExecution = getStepExecution();
        taskExecutor.execute(() -> {
            if (stepExecution != null) {
                StepSynchronizationManager.register(stepExecution);
            }
            try {
                delegate.write(items);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (stepExecution != null) {
                    StepSynchronizationManager.close();
                }
            }
        });
    }

    private StepExecution getStepExecution() {
        StepContext context = StepSynchronizationManager.getContext();
        if (context == null) {
            return null;
        }
        StepExecution stepExecution = context.getStepExecution();
        return stepExecution;
    }
}

配置:

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(64);
        executor.setMaxPoolSize(64);
        executor.setQueueCapacity(64);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("MultiThreaded-");
        return executor;
    }

    @Bean
    public ItemWriter<STModel> writer(){
        return items -> {
            Thread.sleep(1000);
            System.out.println("Writing...");
            for(STModel c : items) {
                System.out.println("######### Writer : ------> " + c + " inside size : " + c.relation.size() + ", On : " +Thread.currentThread().getName());
            }
        };
    }

    @Bean
    public AsyncListItemWriter<STModel> asyncWriter() throws Exception {
        AsyncListItemWriter<STModel> asyncItemWriter = new AsyncListItemWriter<>();
        asyncItemWriter.setDelegate(writer());
        asyncItemWriter.setTaskExecutor(taskExecutor());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

    @Bean
    public Step sampleStep() throws Exception{
        return stepBuilderFactory.get("processingStep")
                .<STModel, STModel>chunk(10)
                .reader(itpReader())
                .writer(asyncWriter())
                .build();
    }

    @Bean
    public Job job() throws Exception{
        return jobBuilderFactory.get("job")
                .start(sampleStep())
                .build();
    }

读取所有文件后我收到此日志(文件在主线程上读取):

While Same Read ---->STModel(tripId=109138356-1_459178)
While Same Read ---->STModel(tripId=109138356-1_459178)
While Same Read ---->null
Read ---->null, On : main
2021-03-16 10:38:48.409 INFO 17042 --- [ main] o.s.batch.core.step.AbstractStep : Step: [processingStep] executed in 337ms
2021-03-16 10:38:48.412 INFO 17042 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 349ms
Writing...
######### Writer : ------> STModel(tripId=109138355-1_459164) inside size : 2, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459165) inside size : 1, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459166) inside size : 1, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459167) inside size : 1, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=109138355-1_459168) inside size : 2, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507833-1_38959) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507835-1_38960) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507852-1_38961) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507863-1_38962) inside size : 23, On : MultiThreaded-1
######### Writer : ------> STModel(tripId=113507871-1_38963) inside size : 23, On : MultiThreaded-1
Writing...
######### Writer : ------> STModel(tripId=113507882-1_38964) inside size : 23, On : MultiThreaded-2
######### Writer : ------> STModel(tripId=113507890-1_38965) inside size : 23, On : MultiThreaded-2
######### Writer : ------> STModel(tripId=113507900-1_38966) inside size : 23, On : MultiThreaded-2
######### Writer : ------> STModel(tripId=113507911-1_38967) inside size : 23, On : MultiThreaded-2


如您所见,spring 批处理过早地检测到步骤结束,就在读取操作之后。

如何告诉 spring 批处理在所有写入任务完成后步骤结束?

As you can see, spring batch detect the end of the steap too early,

这是您应该期待的,因为您的编写器会在后台异步写入项目,而无需等待它们完成。一旦作者的 write 方法 returns,该步骤将继续读取下一个项目块,如果没有更多项目要读取,则可能会完成。这可能发生在之前 前一个块由您的异步任务执行程序在后台写入。

How can I tell to spring batch that the end of the step is after all writing tasks has been finished ?

在您的编写器中,您需要 submit 个任务(而不是 execute 个任务),获取它们的 Future 句柄并等待它们完成 Future.get(如果需要超时)。

编辑: 添加示例

@Override
public void write(List<? extends T> items) {
    List<Future<?>> futures = new ArrayList<>(items.size());
    // write items in parallel (note the singleton list passed to the delegate)
    for (T item : items) { 
        Future<?> future = taskExecutor.submit(() -> {
            try {
                delegate.write(Collections.singletonList(item));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        futures.add(future);
    }
    // wait for futures to finish
    futures.forEach(future -> {
        try {
            future.get(10, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });
}

我在这里看到的唯一优势是项目将并行编写。这可以加快您为每个项目发送电子邮件的用例的速度。但是,我从您的评论中看到以下内容:

writing become concurrent to reading, so it speed up processing !

这是不正确的,读取下一个块仍然会等待当前写操作完成。因此,如上所述,您需要等待写入操作在您的编写器中完成,否则您的工作可能看起来已经完成,而项目仍在后台写入。

如果你真的希望读写都由多个线程同时完成,你需要使用多线程步骤:

@Bean
public Step sampleStep() throws Exception{
    return stepBuilderFactory.get("processingStep")
                .<STModel, STModel>chunk(10)
                .reader(itpReader())
                .writer(writer())
                .taskExecutor(taskExecutor())
                .build();
}

另一种选择是使用并发步骤,如本期所述:https://github.com/spring-projects/spring-batch/issues/2044. I have a PoC that uses a BlockingQueue as a staging area here 如果您有兴趣。