Spring 启动批处理 - 使用 CompositeItemWriter 停止和启动多线程步骤
Spring Boot Batch - Stop and Start a multithreaded step with CompositeItemWriter
我正在尝试通过调度程序停止和启动多线程步骤。但是我遇到了异常,因为
Caused by: org.springframework.dao.InvalidDataAccessResourceUsageException: Unexpected cursor position change.
如果我理解正确,我们将无法重新启动多线程步骤。但我没有重新开始。我通过 stepExecution.setTerminateOnly() 通过 ChunkListener() 停止工作并尝试通过 [=40 开始=]() 在调度程序中。这是我的代码;
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("user_purge");
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
@Bean
public Job userPurgeJob() {
return jobBuilderFactory.get("userPurgeJob")
.start(userPurgeStep())
.listener(new JobLoggerListener())
.build();
}
@Bean
public Step userPurgeStep() {
return stepBuilderFactory.get("userPurgeStep")
.<UserInfo, UserInfo> chunk(10)
.reader(userPurgeReader())
.writer(compositePurgeWriter())
.listener(new StopListener())
.taskExecutor(taskExecutor())
.build();
}
@Bean
@StepScope
public JdbcCursorItemReader<UserInfo> userPurgeReader(){
JdbcCursorItemReader<UserInfo> reader = new JdbcCursorItemReader<UserInfo>();
reader.setDataSource(dataSource);
reader.setSql("SELECT user_id, user_status "
+ "FROM db3.user_purge "
+ "WHERE user_status = 'I' "
+ "AND purge_status = 'N'");
reader.setRowMapper(new SoftDeleteMapper());
return reader;
}
@Bean
public CompositeItemWriter<UserInfo> compositePurgeWriter() {
CompositeItemWriter<UserInfo> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(delMasterWriter(), delTableWriter()));
return compositeItemWriter;
}
@Bean
@StepScope
public JdbcBatchItemWriter<UserInfo> delMasterWriter() {
JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("UPDATE db3.userinfo "
+ "SET user_status = :userStatus, "
+ "updated = NOW() "
+ "WHERE user_id = :userId");
writer.setDataSource(dataSource);
return writer;
}
@Bean
@StepScope
public JdbcBatchItemWriter<UserInfo> delTableWriter() {
JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("UPDATE db3.user_purge SET purge_status = 'S', del_date = NOW() WHERE user_id = :userId");
writer.setDataSource(dataSource);
return writer;
}
}
StopListener.java
此 ChunkListner class 实现用于在晚上 10 点到早上 6 点
之外的任何时间终止执行
public class StopListener implements ChunkListener{
private StepExecution stepExecution;
@Autowired
AppUtils appUtils;
@Override
public void beforeChunk(ChunkContext context) {
}
@Override
public void afterChunk(ChunkContext context) {
if (stopJob()) {
this.stepExecution.setTerminateOnly();
}
}
@Override
public void afterChunkError(ChunkContext context) {
}
//Check the time between 10pm and 6am
private boolean terminateJob() {
Date date = new Date();
Calendar calendar = GregorianCalendar.getInstance();
calendar.setTime(date);
calendar.get(Calendar.HOUR_OF_DAY);
if(calendar.get(Calendar.HOUR_OF_DAY) >= 6
&& calendar.get(Calendar.HOUR_OF_DAY) < 22) {
return true;
}else {
return false;
}
}
}
最后是我在应用程序中的调度程序方法 class。我正在使用 CommandLneRunner 接受参数。
@SpringBootApplication
@EnableScheduling
public class UserPurgeBatchApplication implements CommandLineRunner{
static final Logger LOG = LogManager.getLogger(UserPurgeBatchApplication.class);
@Autowired
private JobLauncher jobLauncher;
@Autowired
private ApplicationContext context;
@Autowired
private JobRepository jobRepository;
@Autowired
private JobOperator jobOperator;
private String jobName;
private JobParameters jobParameters;
private String inputFile;
private String usertype;
private boolean jobStatus = false;
private String completionStatus;
public static void main(String[] args) throws Exception{
SpringApplication.run(UserPurgeBatchApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
this.jobName = args[0];
this.jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
LOG.info("||| Launching the JOB: " + jobName);
this.completionStatus = jobSelector(jobName, jobParameters).getExitCode();
LOG.info(">>> JOB completed with status: " + this.completionStatus);
}
public ExitStatus jobSelector(String jobName, JobParameters jobParameters) {
Job job = this.context.getBean(jobName, Job.class);
try {
return this.jobLauncher.run(job, jobParameters).getExitStatus();
} catch (JobExecutionAlreadyRunningException |
JobRestartException |
JobInstanceAlreadyCompleteException |
JobParametersInvalidException e) {
e.printStackTrace();
}
return new ExitStatus("FAILED");
}
@Scheduled(cron = "0 0/30 22-23,23,0-6 * * *")
public void batchStartScheduler() {
LOG.info("---Beginning of batchScheduler()---");
Long lastExecutionID = jobRepository.getLastJobExecution(jobName, jobParameters).getId();
String jobStatus = jobRepository.getLastJobExecution(jobName, jobParameters).getStatus().toString();
Job job = this.context.getBean(jobName, Job.class);
if(!jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_COMPLETED)) {
if(jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_STOPPED)
|| jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_FAILED)) {
try {
LOG.info("|||Starting the Job...");
this.jobParameters = new JobParametersBuilder(jobParameters)
.addLong("time", System.currentTimeMillis())
.toJobParameters();
this.jobLauncher.run(job, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
}
}else {
LOG.info("Scheduler not executed!!");
}
LOG.info("---End of batchScheduler()---");
}
}
有一些困惑。
运行 方法是否会在失败时总是尝试重新启动之前的执行?因为我可以看到它仍在重新启动,这可能就是造成这种情况的原因。我试图提供新的 JobParameter,希望它能再次启动它。我希望我从 ChunkListener 停止的方法没问题。但不知何故,我想从 Scheduler 再次开始这项工作,我肯定需要一个多线程步骤。我也希望多线程步骤中的 CompositeWriter 也可以。将不胜感激。提前致谢!
更新
: 最后我可以通过添加 reader.setVerifyCursorPosition(false) 使其工作。但我认为我需要按照 Mahmoud Ben Hassine 的建议使用线程安全 Reader。所以我正在尝试使用 JdbcPagingItemReader 但出现错误“必须指定 sortKey”。我想我已经指定了它但不确定它是否正确。这是我的 JdbcPagingItemReader
@Bean
public JdbcPagingItemReader<UserInfo> jdbcPagingItemReader() {
JdbcPagingItemReader<UserInfo> pagingItemReader = new JdbcPagingItemReader<>();
pagingItemReader.setDataSource(dataSource);
pagingItemReader.setFetchSize(3);
pagingItemReader.setRowMapper(new SoftDeleteMapper());
MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
mySqlPagingQueryProvider.setSelectClause("SELECT user_id, user_status");
mySqlPagingQueryProvider.setFromClause("FROM db3.user_purge");
mySqlPagingQueryProvider.setWhereClause( "WHERE user_status = 'I' "
+ "AND purge_status = 'N'");
Map<String, Order> orderByKeys = new HashMap<>();
orderByKeys.put("user_id", Order.ASCENDING);
mySqlPagingQueryProvider.setSortKeys(orderByKeys);
pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
return pagingItemReader;
}
我更新的步骤
@Bean
public Step userPurgeStep() {
return stepBuilderFactory.get("userPurgeStep")
.<UserInfo, UserInfo> chunk(10)
.reader(jdbcPagingItemReader())
.writer(compositeSoftDelWriter())
.listener(new StopListener())
.taskExecutor(taskExecutor())
.build();
}
多线程与重启不兼容。如 Javadoc 中所述,如果在多线程步骤中使用 JdbcCursorItemReader
,则应将 saveState
设置为 false。
此外,JdbcCursorItemReader
不是线程安全的,因为它包装了一个非线程安全的 ResultSet
对象,还因为它继承自 AbstractItemCountingItemStreamItemReader
,即 not thread safe 两者都不。所以在多线程步骤中使用它是不正确的。这实际上是您的问题 Unexpected cursor position change
的原因。并发线程无意中修改了光标位置。
您需要通过将 reader 包装在 SynchronizedIteamStreamReader
中来同步对 reader 的访问,或者使用 is thread safe.
的 JdbcPagingItemReader
编辑: 添加示例 JdbcPagingItemReader
这是一个独立的基于 docker 的示例:
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import com.mysql.cj.jdbc.MysqlDataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.utility.DockerImageName;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@ContextConfiguration
public class SO67614305 {
private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0.24");
@ClassRule
public static MySQLContainer<?> mysql = new MySQLContainer<>(MYSQL_IMAGE);
@Autowired
private DataSource dataSource;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@Before
public void setUp() {
String schema = "/org/springframework/batch/core/schema-mysql.sql";
String data = // the script is inline here to have a self contained example
"create table person (ID int not null primary key, name varchar(20));" +
"insert into person values (1, 'foo1'); insert into person values (2, 'foo2');" +
"insert into person values (3, 'foo3'); insert into person values (4, 'foo4');";
ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
databasePopulator.addScript(new ClassPathResource(schema));
databasePopulator.addScript(new ByteArrayResource(data.getBytes()));
databasePopulator.execute(this.dataSource);
}
@Test
public void testJob() throws Exception {
// given
JobParameters jobParameters = new JobParameters();
// when
JobExecution jobExecution = this.jobLauncher.run(this.job, jobParameters);
// then
Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
@Configuration
@EnableBatchProcessing
static class TestConfiguration {
@Bean
public DataSource dataSource() throws Exception {
MysqlDataSource datasource = new MysqlDataSource();
datasource.setURL(mysql.getJdbcUrl());
datasource.setUser(mysql.getUsername());
datasource.setPassword(mysql.getPassword());
datasource.setUseSSL(false);
return datasource;
}
@Bean
public JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception {
MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
mySqlPagingQueryProvider.setSelectClause("SELECT id, name");
mySqlPagingQueryProvider.setFromClause("FROM person");
Map<String, Order> orderByKeys = new HashMap<>();
orderByKeys.put("id", Order.DESCENDING);
mySqlPagingQueryProvider.setSortKeys(orderByKeys);
JdbcPagingItemReader<Person> pagingItemReader = new JdbcPagingItemReader<>();
pagingItemReader.setDataSource(dataSource());
pagingItemReader.setFetchSize(2);
pagingItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
return pagingItemReader;
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
return jobs.get("job")
.start(steps.get("step").chunk(2)
.reader(jdbcPagingItemReader())
.writer(items -> items.forEach(System.out::println))
.build())
.build();
}
static class Person {
int id;
String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
}
}
这会按预期按降序打印项目,而不会抱怨缺少排序键:
Person{id=4, name='foo4'}
Person{id=3, name='foo3'}
Person{id=2, name='foo2'}
Person{id=1, name='foo1'}
我正在尝试通过调度程序停止和启动多线程步骤。但是我遇到了异常,因为
Caused by: org.springframework.dao.InvalidDataAccessResourceUsageException: Unexpected cursor position change.
如果我理解正确,我们将无法重新启动多线程步骤。但我没有重新开始。我通过 stepExecution.setTerminateOnly() 通过 ChunkListener() 停止工作并尝试通过 [=40 开始=]() 在调度程序中。这是我的代码;
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("user_purge");
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
@Bean
public Job userPurgeJob() {
return jobBuilderFactory.get("userPurgeJob")
.start(userPurgeStep())
.listener(new JobLoggerListener())
.build();
}
@Bean
public Step userPurgeStep() {
return stepBuilderFactory.get("userPurgeStep")
.<UserInfo, UserInfo> chunk(10)
.reader(userPurgeReader())
.writer(compositePurgeWriter())
.listener(new StopListener())
.taskExecutor(taskExecutor())
.build();
}
@Bean
@StepScope
public JdbcCursorItemReader<UserInfo> userPurgeReader(){
JdbcCursorItemReader<UserInfo> reader = new JdbcCursorItemReader<UserInfo>();
reader.setDataSource(dataSource);
reader.setSql("SELECT user_id, user_status "
+ "FROM db3.user_purge "
+ "WHERE user_status = 'I' "
+ "AND purge_status = 'N'");
reader.setRowMapper(new SoftDeleteMapper());
return reader;
}
@Bean
public CompositeItemWriter<UserInfo> compositePurgeWriter() {
CompositeItemWriter<UserInfo> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(delMasterWriter(), delTableWriter()));
return compositeItemWriter;
}
@Bean
@StepScope
public JdbcBatchItemWriter<UserInfo> delMasterWriter() {
JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("UPDATE db3.userinfo "
+ "SET user_status = :userStatus, "
+ "updated = NOW() "
+ "WHERE user_id = :userId");
writer.setDataSource(dataSource);
return writer;
}
@Bean
@StepScope
public JdbcBatchItemWriter<UserInfo> delTableWriter() {
JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("UPDATE db3.user_purge SET purge_status = 'S', del_date = NOW() WHERE user_id = :userId");
writer.setDataSource(dataSource);
return writer;
}
}
StopListener.java 此 ChunkListner class 实现用于在晚上 10 点到早上 6 点
之外的任何时间终止执行public class StopListener implements ChunkListener{
private StepExecution stepExecution;
@Autowired
AppUtils appUtils;
@Override
public void beforeChunk(ChunkContext context) {
}
@Override
public void afterChunk(ChunkContext context) {
if (stopJob()) {
this.stepExecution.setTerminateOnly();
}
}
@Override
public void afterChunkError(ChunkContext context) {
}
//Check the time between 10pm and 6am
private boolean terminateJob() {
Date date = new Date();
Calendar calendar = GregorianCalendar.getInstance();
calendar.setTime(date);
calendar.get(Calendar.HOUR_OF_DAY);
if(calendar.get(Calendar.HOUR_OF_DAY) >= 6
&& calendar.get(Calendar.HOUR_OF_DAY) < 22) {
return true;
}else {
return false;
}
}
}
最后是我在应用程序中的调度程序方法 class。我正在使用 CommandLneRunner 接受参数。
@SpringBootApplication
@EnableScheduling
public class UserPurgeBatchApplication implements CommandLineRunner{
static final Logger LOG = LogManager.getLogger(UserPurgeBatchApplication.class);
@Autowired
private JobLauncher jobLauncher;
@Autowired
private ApplicationContext context;
@Autowired
private JobRepository jobRepository;
@Autowired
private JobOperator jobOperator;
private String jobName;
private JobParameters jobParameters;
private String inputFile;
private String usertype;
private boolean jobStatus = false;
private String completionStatus;
public static void main(String[] args) throws Exception{
SpringApplication.run(UserPurgeBatchApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
this.jobName = args[0];
this.jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
LOG.info("||| Launching the JOB: " + jobName);
this.completionStatus = jobSelector(jobName, jobParameters).getExitCode();
LOG.info(">>> JOB completed with status: " + this.completionStatus);
}
public ExitStatus jobSelector(String jobName, JobParameters jobParameters) {
Job job = this.context.getBean(jobName, Job.class);
try {
return this.jobLauncher.run(job, jobParameters).getExitStatus();
} catch (JobExecutionAlreadyRunningException |
JobRestartException |
JobInstanceAlreadyCompleteException |
JobParametersInvalidException e) {
e.printStackTrace();
}
return new ExitStatus("FAILED");
}
@Scheduled(cron = "0 0/30 22-23,23,0-6 * * *")
public void batchStartScheduler() {
LOG.info("---Beginning of batchScheduler()---");
Long lastExecutionID = jobRepository.getLastJobExecution(jobName, jobParameters).getId();
String jobStatus = jobRepository.getLastJobExecution(jobName, jobParameters).getStatus().toString();
Job job = this.context.getBean(jobName, Job.class);
if(!jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_COMPLETED)) {
if(jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_STOPPED)
|| jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_FAILED)) {
try {
LOG.info("|||Starting the Job...");
this.jobParameters = new JobParametersBuilder(jobParameters)
.addLong("time", System.currentTimeMillis())
.toJobParameters();
this.jobLauncher.run(job, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
}
}else {
LOG.info("Scheduler not executed!!");
}
LOG.info("---End of batchScheduler()---");
}
}
有一些困惑。 运行 方法是否会在失败时总是尝试重新启动之前的执行?因为我可以看到它仍在重新启动,这可能就是造成这种情况的原因。我试图提供新的 JobParameter,希望它能再次启动它。我希望我从 ChunkListener 停止的方法没问题。但不知何故,我想从 Scheduler 再次开始这项工作,我肯定需要一个多线程步骤。我也希望多线程步骤中的 CompositeWriter 也可以。将不胜感激。提前致谢!
更新 : 最后我可以通过添加 reader.setVerifyCursorPosition(false) 使其工作。但我认为我需要按照 Mahmoud Ben Hassine 的建议使用线程安全 Reader。所以我正在尝试使用 JdbcPagingItemReader 但出现错误“必须指定 sortKey”。我想我已经指定了它但不确定它是否正确。这是我的 JdbcPagingItemReader
@Bean
public JdbcPagingItemReader<UserInfo> jdbcPagingItemReader() {
JdbcPagingItemReader<UserInfo> pagingItemReader = new JdbcPagingItemReader<>();
pagingItemReader.setDataSource(dataSource);
pagingItemReader.setFetchSize(3);
pagingItemReader.setRowMapper(new SoftDeleteMapper());
MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
mySqlPagingQueryProvider.setSelectClause("SELECT user_id, user_status");
mySqlPagingQueryProvider.setFromClause("FROM db3.user_purge");
mySqlPagingQueryProvider.setWhereClause( "WHERE user_status = 'I' "
+ "AND purge_status = 'N'");
Map<String, Order> orderByKeys = new HashMap<>();
orderByKeys.put("user_id", Order.ASCENDING);
mySqlPagingQueryProvider.setSortKeys(orderByKeys);
pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
return pagingItemReader;
}
我更新的步骤
@Bean
public Step userPurgeStep() {
return stepBuilderFactory.get("userPurgeStep")
.<UserInfo, UserInfo> chunk(10)
.reader(jdbcPagingItemReader())
.writer(compositeSoftDelWriter())
.listener(new StopListener())
.taskExecutor(taskExecutor())
.build();
}
多线程与重启不兼容。如 Javadoc 中所述,如果在多线程步骤中使用 JdbcCursorItemReader
,则应将 saveState
设置为 false。
此外,JdbcCursorItemReader
不是线程安全的,因为它包装了一个非线程安全的 ResultSet
对象,还因为它继承自 AbstractItemCountingItemStreamItemReader
,即 not thread safe 两者都不。所以在多线程步骤中使用它是不正确的。这实际上是您的问题 Unexpected cursor position change
的原因。并发线程无意中修改了光标位置。
您需要通过将 reader 包装在 SynchronizedIteamStreamReader
中来同步对 reader 的访问,或者使用 is thread safe.
JdbcPagingItemReader
编辑: 添加示例 JdbcPagingItemReader
这是一个独立的基于 docker 的示例:
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import com.mysql.cj.jdbc.MysqlDataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.utility.DockerImageName;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@ContextConfiguration
public class SO67614305 {
private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0.24");
@ClassRule
public static MySQLContainer<?> mysql = new MySQLContainer<>(MYSQL_IMAGE);
@Autowired
private DataSource dataSource;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@Before
public void setUp() {
String schema = "/org/springframework/batch/core/schema-mysql.sql";
String data = // the script is inline here to have a self contained example
"create table person (ID int not null primary key, name varchar(20));" +
"insert into person values (1, 'foo1'); insert into person values (2, 'foo2');" +
"insert into person values (3, 'foo3'); insert into person values (4, 'foo4');";
ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
databasePopulator.addScript(new ClassPathResource(schema));
databasePopulator.addScript(new ByteArrayResource(data.getBytes()));
databasePopulator.execute(this.dataSource);
}
@Test
public void testJob() throws Exception {
// given
JobParameters jobParameters = new JobParameters();
// when
JobExecution jobExecution = this.jobLauncher.run(this.job, jobParameters);
// then
Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
@Configuration
@EnableBatchProcessing
static class TestConfiguration {
@Bean
public DataSource dataSource() throws Exception {
MysqlDataSource datasource = new MysqlDataSource();
datasource.setURL(mysql.getJdbcUrl());
datasource.setUser(mysql.getUsername());
datasource.setPassword(mysql.getPassword());
datasource.setUseSSL(false);
return datasource;
}
@Bean
public JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception {
MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
mySqlPagingQueryProvider.setSelectClause("SELECT id, name");
mySqlPagingQueryProvider.setFromClause("FROM person");
Map<String, Order> orderByKeys = new HashMap<>();
orderByKeys.put("id", Order.DESCENDING);
mySqlPagingQueryProvider.setSortKeys(orderByKeys);
JdbcPagingItemReader<Person> pagingItemReader = new JdbcPagingItemReader<>();
pagingItemReader.setDataSource(dataSource());
pagingItemReader.setFetchSize(2);
pagingItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
return pagingItemReader;
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
return jobs.get("job")
.start(steps.get("step").chunk(2)
.reader(jdbcPagingItemReader())
.writer(items -> items.forEach(System.out::println))
.build())
.build();
}
static class Person {
int id;
String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
}
}
这会按预期按降序打印项目,而不会抱怨缺少排序键:
Person{id=4, name='foo4'}
Person{id=3, name='foo3'}
Person{id=2, name='foo2'}
Person{id=1, name='foo1'}