将数据传递给未来的步骤 - Spring 批处理
Passing Data to Future Steps - Spring Batch
正如我们在 Spring 批处理官方文档中看到的 "Passing Data to Future Steps - Spring Batch" 是可能的,但我们大多数人一直在努力解决它,因为在官方文档中他们提到了两种可能性。 阶梯级别和工作级别。问题是如何在步骤级别检索数据?
我的解决方案与官方文档中的相同,但没有用。所以我决定执行以下操作:
1- 为促销侦听器创建 bean:
<beans:bean id="promotionListener"
class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
<beans:property name="keys" value="someValues"/>
</beans:bean>
2- 在您的步骤中设置侦听器(您希望保存数据的步骤)
<listeners>
<listener ref="promotionListener"/>
</listeners>
3- 在您的编写器中的同一步骤(将保存数据的步骤)中,您保存数据。
private StepExecution stepExecution;
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
ExecutionContext executionContext = stepExecution.getExecutionContext();
Map<String, String> fieldsMap= new HashMap<>();
executionContext.put("keys", someValues);
}
@Override
public void write(List<? extends Map<String, String>> items) throws Exception {
LOGGER.info(items.toString());
Map<String, String> fieldsMap= new ConcurrentHashMap<>();
items.forEach(item -> item.forEach(fieldsMap::put));
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("keys", fieldsMap);
}
你可以看到,在我的例子中,我将数据保存在一个 Map (ConcurrentHashMap) 中。
4- 重要提示:在下一步中,您要检索我们在上一步中保存的数据。按照这个顺序,我们必须做:
声明将保存我们将检索的值的对象:
private Map fieldsMap;
关注JobExecution
@BeforeStep
public void retrieveInterStepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
for (StepExecution steps : stepExecutions) {
ExecutionContext executionContext = steps.getExecutionContext();
if (executionContext.containsKey("keys")) {
this.nationalityMap = (Map<String, String>) executionContext.get("keys");
}
}
}
就是这样!
您可能想知道为什么我没有按照官方文档中的写法进行操作?原因是我在同一份工作中使用 Steps。他们共享相同的作业执行。现在看看我调试模式的图片。
如有其他方法请多多指教
注意:请不要只是复制和粘贴官方文档中的代码,提供您自己的答案或实现。
spring批处理文档的link与此问题相关的如下
enter link description here
您将要从步骤执行上下文提升到作业执行上下文的键与数据本身混淆了。这种混淆来自两个地方:
<beans:property name="keys" value="someValues"/>
: someValues
应该是 someKeys
- 在
@BeforeStep
中调用 executionContext.put("keys", someValues);
是不正确的
让我说得更清楚一点。想象一下,您的工作有两个步骤:
- 第 1 步:reads/writes 一些项目并在步骤执行上下文中写入项目计数
- 第 2 步:需要访问项目计数并将其打印到控制台。
在这种情况下,您可以使用提升侦听器将键"count"从步骤1的步骤执行上下文提升到作业执行上下文,以便步骤2可以访问它。这是一个例子:
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
for (Integer item : items) {
System.out.println("item = " + item);
}
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
int count = stepContext.containsKey("count") ? stepContext.getInt("count") : 0;
stepContext.put("count", count + items.size());
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
};
}
@Bean
public Step step1() {
return steps.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return steps.get("step2")
.tasklet((contribution, chunkContext) -> {
// retrieve the key from the job execution context
Integer count = (Integer) chunkContext.getStepContext().getJobExecutionContext().get("count");
System.out.println("In step 2: step 1 wrote " + count + " items");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"count"});
return listener;
}
@Bean
public Job job() {
return jobs.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
这会打印:
item = 1
item = 2
item = 3
item = 4
In step 2: step 1 wrote 4 items
希望对您有所帮助。
正如我们在 Spring 批处理官方文档中看到的 "Passing Data to Future Steps - Spring Batch" 是可能的,但我们大多数人一直在努力解决它,因为在官方文档中他们提到了两种可能性。 阶梯级别和工作级别。问题是如何在步骤级别检索数据?
我的解决方案与官方文档中的相同,但没有用。所以我决定执行以下操作:
1- 为促销侦听器创建 bean:
<beans:bean id="promotionListener"
class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
<beans:property name="keys" value="someValues"/>
</beans:bean>
2- 在您的步骤中设置侦听器(您希望保存数据的步骤)
<listeners>
<listener ref="promotionListener"/>
</listeners>
3- 在您的编写器中的同一步骤(将保存数据的步骤)中,您保存数据。
private StepExecution stepExecution;
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
ExecutionContext executionContext = stepExecution.getExecutionContext();
Map<String, String> fieldsMap= new HashMap<>();
executionContext.put("keys", someValues);
}
@Override
public void write(List<? extends Map<String, String>> items) throws Exception {
LOGGER.info(items.toString());
Map<String, String> fieldsMap= new ConcurrentHashMap<>();
items.forEach(item -> item.forEach(fieldsMap::put));
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("keys", fieldsMap);
}
你可以看到,在我的例子中,我将数据保存在一个 Map (ConcurrentHashMap) 中。
4- 重要提示:在下一步中,您要检索我们在上一步中保存的数据。按照这个顺序,我们必须做:
声明将保存我们将检索的值的对象:
private Map fieldsMap;
关注JobExecution
@BeforeStep
public void retrieveInterStepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
for (StepExecution steps : stepExecutions) {
ExecutionContext executionContext = steps.getExecutionContext();
if (executionContext.containsKey("keys")) {
this.nationalityMap = (Map<String, String>) executionContext.get("keys");
}
}
}
就是这样! 您可能想知道为什么我没有按照官方文档中的写法进行操作?原因是我在同一份工作中使用 Steps。他们共享相同的作业执行。现在看看我调试模式的图片。
如有其他方法请多多指教
注意:请不要只是复制和粘贴官方文档中的代码,提供您自己的答案或实现。
spring批处理文档的link与此问题相关的如下 enter link description here
您将要从步骤执行上下文提升到作业执行上下文的键与数据本身混淆了。这种混淆来自两个地方:
<beans:property name="keys" value="someValues"/>
:someValues
应该是someKeys
- 在
@BeforeStep
中调用executionContext.put("keys", someValues);
是不正确的
让我说得更清楚一点。想象一下,您的工作有两个步骤:
- 第 1 步:reads/writes 一些项目并在步骤执行上下文中写入项目计数
- 第 2 步:需要访问项目计数并将其打印到控制台。
在这种情况下,您可以使用提升侦听器将键"count"从步骤1的步骤执行上下文提升到作业执行上下文,以便步骤2可以访问它。这是一个例子:
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@Override
public void write(List<? extends Integer> items) {
for (Integer item : items) {
System.out.println("item = " + item);
}
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
int count = stepContext.containsKey("count") ? stepContext.getInt("count") : 0;
stepContext.put("count", count + items.size());
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
};
}
@Bean
public Step step1() {
return steps.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return steps.get("step2")
.tasklet((contribution, chunkContext) -> {
// retrieve the key from the job execution context
Integer count = (Integer) chunkContext.getStepContext().getJobExecutionContext().get("count");
System.out.println("In step 2: step 1 wrote " + count + " items");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"count"});
return listener;
}
@Bean
public Job job() {
return jobs.get("job")
.start(step1())
.next(step2())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
这会打印:
item = 1
item = 2
item = 3
item = 4
In step 2: step 1 wrote 4 items
希望对您有所帮助。