Spring 启动批处理 - 使用 CompositeItemWriter 停止和启动多线程步骤

Spring Boot Batch - Stop and Start a multithreaded step with CompositeItemWriter

我正在尝试通过调度程序停止和启动多线程步骤。但是我遇到了异常,因为

Caused by: org.springframework.dao.InvalidDataAccessResourceUsageException: Unexpected cursor position change.

如果我理解正确,我们将无法重新启动多线程步骤。但我没有重新开始。我通过 stepExecution.setTerminateOnly() 通过 ChunkListener() 停止工作并尝试通过 [=40 开始=]() 在调度程序中。这是我的代码;

public class BatchConfiguration {
    
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
        
    @Autowired
    public DataSource dataSource;
    
        @Bean
        public TaskExecutor taskExecutor(){
            SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("user_purge");
            asyncTaskExecutor.setConcurrencyLimit(5);
            return asyncTaskExecutor;
        }
    
        @Bean
        public Job userPurgeJob() {
            return jobBuilderFactory.get("userPurgeJob")
                    .start(userPurgeStep())
                    .listener(new JobLoggerListener())
                    .build();
        }   
            
        @Bean
        public Step userPurgeStep() {
            return stepBuilderFactory.get("userPurgeStep")
                    .<UserInfo, UserInfo> chunk(10)
                    .reader(userPurgeReader())
                    .writer(compositePurgeWriter())
                    .listener(new StopListener())
                    .taskExecutor(taskExecutor())
                    .build();
        }
        
        
        @Bean
        @StepScope
        public JdbcCursorItemReader<UserInfo> userPurgeReader(){
            JdbcCursorItemReader<UserInfo> reader = new JdbcCursorItemReader<UserInfo>();
            reader.setDataSource(dataSource);
            reader.setSql("SELECT user_id, user_status "
                    + "FROM db3.user_purge "
                    + "WHERE user_status = 'I' "
                    + "AND purge_status = 'N'");
            reader.setRowMapper(new SoftDeleteMapper());
      
            return reader;
        }
        
        @Bean
        public CompositeItemWriter<UserInfo> compositePurgeWriter() {
            CompositeItemWriter<UserInfo> compositeItemWriter = new CompositeItemWriter<>();
            compositeItemWriter.setDelegates(Arrays.asList(delMasterWriter(), delTableWriter()));
            return compositeItemWriter;
        }
        
            
        @Bean
        @StepScope
        public JdbcBatchItemWriter<UserInfo> delMasterWriter() {
            JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
            writer.setSql("UPDATE db3.userinfo "
                    + "SET user_status = :userStatus, "
                        + "updated = NOW() "
                        + "WHERE user_id = :userId");
            writer.setDataSource(dataSource);
            
            return writer;
        }
        
        @Bean
        @StepScope
        public JdbcBatchItemWriter<UserInfo> delTableWriter() {
            JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
            writer.setSql("UPDATE db3.user_purge SET purge_status = 'S', del_date = NOW() WHERE user_id = :userId");
            writer.setDataSource(dataSource);
            return writer;
        }
}

StopListener.java 此 ChunkListner class 实现用于在晚上 10 点到早上 6 点

之外的任何时间终止执行
public class StopListener implements ChunkListener{
    private StepExecution stepExecution;
    
    @Autowired
    AppUtils appUtils;
    
    @Override
    public void beforeChunk(ChunkContext context) {
        
    }
    
    @Override
    public void afterChunk(ChunkContext context) {
        if (stopJob()) {
            this.stepExecution.setTerminateOnly();            
        }       
    }
    
    @Override
    public void afterChunkError(ChunkContext context) {
                
    }

    //Check the time between 10pm and 6am
    private boolean terminateJob() {
        Date date = new Date();
        Calendar calendar = GregorianCalendar.getInstance(); 
        calendar.setTime(date); 
        calendar.get(Calendar.HOUR_OF_DAY);
        
        if(calendar.get(Calendar.HOUR_OF_DAY) >= 6 
                && calendar.get(Calendar.HOUR_OF_DAY) < 22) {           
            return true;
        }else {
            return false;
        }
    }

}

最后是我在应用程序中的调度程序方法 class。我正在使用 CommandLneRunner 接受参数。

@SpringBootApplication
@EnableScheduling
public class UserPurgeBatchApplication implements CommandLineRunner{
    static final Logger LOG = LogManager.getLogger(UserPurgeBatchApplication.class);
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private ApplicationContext context;
    
    @Autowired
    private JobRepository jobRepository;
    
    @Autowired
    private JobOperator jobOperator;
    
    private String jobName;
    private JobParameters jobParameters;
    private String inputFile;
    private String usertype;
    private boolean jobStatus = false;
    private String completionStatus;    

    public static void main(String[] args) throws Exception{
        SpringApplication.run(UserPurgeBatchApplication.class, args);           
        
    }
    
    @Override
    public void run(String... args) throws Exception {
        this.jobName = args[0];
        
        this.jobParameters = new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters();
                
        
        LOG.info("||| Launching the JOB: " + jobName);  
        this.completionStatus  = jobSelector(jobName, jobParameters).getExitCode();
        LOG.info(">>> JOB completed with status: " + this.completionStatus);
    }


    public ExitStatus jobSelector(String jobName, JobParameters jobParameters) {        
        Job job = this.context.getBean(jobName, Job.class);
        
        try {
            return this.jobLauncher.run(job,  jobParameters).getExitStatus();
        } catch (JobExecutionAlreadyRunningException | 
                JobRestartException | 
                JobInstanceAlreadyCompleteException | 
                JobParametersInvalidException e) {
            
            e.printStackTrace();
        }
        
        return new ExitStatus("FAILED");
    }
    
    
    @Scheduled(cron = "0 0/30 22-23,23,0-6 * * *")
    public void batchStartScheduler() {
        LOG.info("---Beginning of batchScheduler()---");
        
        Long lastExecutionID = jobRepository.getLastJobExecution(jobName, jobParameters).getId();
        String jobStatus = jobRepository.getLastJobExecution(jobName, jobParameters).getStatus().toString();
        Job job = this.context.getBean(jobName, Job.class);
        
        if(!jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_COMPLETED)) {
            if(jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_STOPPED) 
                    || jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_FAILED)) {
                    
                try {
                LOG.info("|||Starting the Job...");
                    this.jobParameters = new JobParametersBuilder(jobParameters)
                            .addLong("time", System.currentTimeMillis())
                            .toJobParameters();
                    
                    this.jobLauncher.run(job,  jobParameters);
                } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
                        | JobParametersInvalidException e) {
                    e.printStackTrace();
                }
            }
        }else {
            LOG.info("Scheduler not executed!!");           
        }
                
        LOG.info("---End of batchScheduler()---");
    }
    
}

有一些困惑。 运行 方法是否会在失败时总是尝试重新启动之前的执行?因为我可以看到它仍在重新启动,这可能就是造成这种情况的原因。我试图提供新的 JobParameter,希望它能再次启动它。我希望我从 ChunkListener 停止的方法没问题。但不知何故,我想从 Scheduler 再次开始这项工作,我肯定需要一个多线程步骤。我也希望多线程步骤中的 CompositeWriter 也可以。将不胜感激。提前致谢!

更新 : 最后我可以通过添加 reader.setVerifyCursorPosition(false) 使其工作。但我认为我需要按照 Mahmoud Ben Hassine 的建议使用线程安全 Reader。所以我正在尝试使用 JdbcPagingItemReader 但出现错误“必须指定 sortKey”。我想我已经指定了它但不确定它是否正确。这是我的 JdbcPagingItemReader

@Bean
public JdbcPagingItemReader<UserInfo> jdbcPagingItemReader() {
    JdbcPagingItemReader<UserInfo> pagingItemReader = new JdbcPagingItemReader<>();

    pagingItemReader.setDataSource(dataSource);
    pagingItemReader.setFetchSize(3);
    pagingItemReader.setRowMapper(new SoftDeleteMapper());

    MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
    mySqlPagingQueryProvider.setSelectClause("SELECT user_id, user_status");
    mySqlPagingQueryProvider.setFromClause("FROM db3.user_purge");
    mySqlPagingQueryProvider.setWhereClause( "WHERE user_status = 'I' "
                                                + "AND purge_status = 'N'");

    Map<String, Order> orderByKeys = new HashMap<>();
    orderByKeys.put("user_id", Order.ASCENDING);

    mySqlPagingQueryProvider.setSortKeys(orderByKeys);
    pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);

    return pagingItemReader;
}

我更新的步骤

@Bean
public Step userPurgeStep() {
    return stepBuilderFactory.get("userPurgeStep")
            .<UserInfo, UserInfo> chunk(10)
            .reader(jdbcPagingItemReader())
            .writer(compositeSoftDelWriter())
            .listener(new StopListener())
            .taskExecutor(taskExecutor())
            .build();
}

多线程与重启不兼容。如 Javadoc 中所述,如果在多线程步骤中使用 JdbcCursorItemReader,则应将 saveState 设置为 false。

此外,JdbcCursorItemReader 不是线程安全的,因为它包装了一个非线程安全的 ResultSet 对象,还因为它继承自 AbstractItemCountingItemStreamItemReader,即 not thread safe 两者都不。所以在多线程步骤中使用它是不正确的。这实际上是您的问题 Unexpected cursor position change 的原因。并发线程无意中修改了光标位置。

您需要通过将 reader 包装在 SynchronizedIteamStreamReader 中来同步对 reader 的访问,或者使用 is thread safe.

JdbcPagingItemReader

编辑: 添加示例 JdbcPagingItemReader

这是一个独立的基于 docker 的示例:

import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import com.mysql.cj.jdbc.MysqlDataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.utility.DockerImageName;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@ContextConfiguration
public class SO67614305 {

    private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0.24");

    @ClassRule
    public static MySQLContainer<?> mysql = new MySQLContainer<>(MYSQL_IMAGE);
    
    @Autowired
    private DataSource dataSource;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job job;
    
    @Before
    public void setUp() {
        String schema = "/org/springframework/batch/core/schema-mysql.sql";
        String data = // the script is inline here to have a self contained example
                "create table person (ID int not null primary key, name varchar(20));" +
                "insert into person values (1, 'foo1'); insert into person values (2, 'foo2');" +
                "insert into person values (3, 'foo3'); insert into person values (4, 'foo4');";
        ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
        databasePopulator.addScript(new ClassPathResource(schema));
        databasePopulator.addScript(new ByteArrayResource(data.getBytes()));
        databasePopulator.execute(this.dataSource);
    }

    @Test
    public void testJob() throws Exception {
        // given
        JobParameters jobParameters = new JobParameters();
        
        // when
        JobExecution jobExecution = this.jobLauncher.run(this.job, jobParameters);

        // then
        Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
    }

    @Configuration
    @EnableBatchProcessing
    static class TestConfiguration {

        @Bean
        public DataSource dataSource() throws Exception {
            MysqlDataSource datasource = new MysqlDataSource();
            datasource.setURL(mysql.getJdbcUrl());
            datasource.setUser(mysql.getUsername());
            datasource.setPassword(mysql.getPassword());
            datasource.setUseSSL(false);
            return datasource;
        }

        @Bean
        public JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception {
            MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
            mySqlPagingQueryProvider.setSelectClause("SELECT id, name");
            mySqlPagingQueryProvider.setFromClause("FROM person");
            Map<String, Order> orderByKeys = new HashMap<>();
            orderByKeys.put("id", Order.DESCENDING);
            mySqlPagingQueryProvider.setSortKeys(orderByKeys);

            JdbcPagingItemReader<Person> pagingItemReader = new JdbcPagingItemReader<>();
            pagingItemReader.setDataSource(dataSource());
            pagingItemReader.setFetchSize(2);
            pagingItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
            pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
            return pagingItemReader;
        }

        @Bean
        public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
            return jobs.get("job")
                    .start(steps.get("step").chunk(2)
                            .reader(jdbcPagingItemReader())
                            .writer(items -> items.forEach(System.out::println))
                            .build())
                    .build();
        }

        static class Person {
            int id;
            String name;

            public int getId() {
                return id;
            }

            public void setId(int id) {
                this.id = id;
            }

            public String getName() {
                return name;
            }

            public void setName(String name) {
                this.name = name;
            }

            @Override
            public String toString() {
                return "Person{" +
                        "id=" + id +
                        ", name='" + name + '\'' +
                        '}';
            }
        }

    }
}

这会按预期按降序打印项目,而不会抱怨缺少排序键:

Person{id=4, name='foo4'}
Person{id=3, name='foo3'}
Person{id=2, name='foo2'}
Person{id=1, name='foo1'}