在多线程步骤中使用 FlatFileItem 编写器

Using FlatFileItem writer in multithreaded step

我在阅读有关 FlatFileItemWriter 的 spring 文档时有点困惑,他们说它不是线程安全的,所以我正在考虑将它包装到 SynchronizedItemStreamWriter 中,但在这个 link https://docs.spring.io/spring-batch/docs/current-SNAPSHOT/api/org/springframework/batch/item/support/SynchronizedItemStreamWriter.html 他们说:例如,在多线程步骤中使用 FlatFileItemWriter 不需要同步写入。 有什么解释吗?

Spring 批处理文件编写器中的线程安全与两个方面有关:更新执行上下文和写入输出文件。这是两件不同的事情,在多线程步骤中使用这些编写器时应仔细考虑。我会分别解释这两个方面。

1。关于执行上下文更新的线程安全

出于可重启性的原因,文件编写器(平面文件、json、xml 等)使用名为 current.count 的键更新执行上下文。此键用于重新启动场景,以了解在之前的 运行 中写入了多少项,并将文件 运行 分类为最后已知的正确偏移量。如果您在多线程上下文中使用这样的编写器,则此键可能会被使用共享编写器的不同线程覆盖,这是有问题的。这就是为什么建议在这种情况下使用 saveState 标志关闭状态管理。 Multi-threaded Step 部分中有更多详细信息。

2。关于写入输出文件的线程安全性

将数据写入结构化 JSON 或 XML 文件与写入平面文件不同。如果您在多线程步骤中使用非同步 JsonItemWriterStaxEventItemWriter,一个线程的输出可能会覆盖另一个线程的输出,您最终会得到一个不正确的输出文件。这是 StaxEventItemWriter:

的简单示例
import javax.sql.DataSource;
import javax.xml.bind.annotation.XmlRootElement;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.batch.item.support.SynchronizedItemStreamWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.batch.item.xml.builder.StaxEventItemWriterBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.oxm.Marshaller;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;

@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {

    @Bean
    public SynchronizedItemStreamReader<Person> itemReader() {
        String sql = "select * from person";
        JdbcCursorItemReader<Person> personItemReader = new JdbcCursorItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .sql(sql)
                .beanRowMapper(Person.class)
                .build();
        SynchronizedItemStreamReader<Person> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
        synchronizedItemStreamReader.setDelegate(personItemReader);
        return synchronizedItemStreamReader;
    }

//  @Bean
//  public SynchronizedItemStreamWriter<Person> itemWriter() {
//      Jaxb2Marshaller marchaller = new Jaxb2Marshaller();
//      marchaller.setClassesToBeBound(Person.class);
//      StaxEventItemWriter<Person> personStaxEventItemWriter = new StaxEventItemWriterBuilder<Person>()
//              .name("personItemWriter")
//              .resource(new FileSystemResource("persons.xml"))
//              .marshaller(marchaller)
//              .rootTagName("persons")
//              .build();
//      SynchronizedItemStreamWriter<Person> synchronizedItemStreamWriter = new SynchronizedItemStreamWriter<>();
//      synchronizedItemStreamWriter.setDelegate(personStaxEventItemWriter);
//      return synchronizedItemStreamWriter;
//  }

    @Bean
    public StaxEventItemWriter<Person> itemWriter() {
        Jaxb2Marshaller marchaller = new Jaxb2Marshaller();
        marchaller.setClassesToBeBound(Person.class);
        return new StaxEventItemWriterBuilder<Person>()
                .name("personItemWriter")
                .resource(new FileSystemResource("persons.xml"))
                .marshaller(marchaller)
                .rootTagName("persons")
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Person, Person>chunk(5)
                        .reader(itemReader())
                        .writer(itemWriter())
                        .taskExecutor(new SimpleAsyncTaskExecutor())
                        .build())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    @Bean
    public DataSource dataSource() {
        EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.HSQL)
                .addScript("/org/springframework/batch/core/schema-hsqldb.sql")
                .build();
        JdbcTemplate jdbcTemplate = new JdbcTemplate(embeddedDatabase);
        jdbcTemplate.execute("create table person (id int primary key, name varchar(20));");
        for (int i = 1; i <= 10; i++) {
            jdbcTemplate.execute(String.format("insert into person values (%s, 'foo%s');", i, i));
        }
        return embeddedDatabase;
    }

    @XmlRootElement
    static class Person {
        private int id;
        private String name;

        public Person() {
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String toString() {
            return "Person{id=" + id + ", name='" + name + '\'' + '}';
        }
    }

}

此示例从数据库 table 中读取项目并将它们写入文件。使用非同步编写器,示例生成以下 persons.xml 文件:

<?xml version='1.0' encoding='UTF-8'?>
<persons>
    <name>foo7</name>
</person><name>foo5</name><id/></id>><idson<name>foo7</name></person><name>
foo5
</name><id/></id><name>foo7</name></person><name>foo9</name></person><name>
foo7
</name></person><name>foo5
</name><id/></id>e></id></person></persons></person>><id>6</id><person>
<id>9</id>
<name>foo6</name>
</name><person>
<id>10</id>
<name>foo10</name>
</person></persons>

这显然是不正确的。使用同步编写器,可以正确生成输出文件。即使 FlatFileItemWriter 未同步,也不会发生此问题,因为平面文件不包含额外的 XML 元素到 open/close 标签或 JSON 元素,如 [=20] =] 和 , 来分隔数组和对象。所以在这种情况下不需要同步写入。这就是 SynchronizedItemStreamWriter:

的 Javadoc 中描述的内容
This decorator is useful when using a non thread-safe item writer in a multi-threaded
step. Typical delegate examples are the JsonFileItemWriter and StaxEventItemWriter.

It should be noted that synchronizing writes might introduce some performance
degradation, so this decorator should be used wisely and only when necessary.
For example, using a FlatFileItemWriter in a multi-threaded step does NOT
require synchronizing writes, so using this decorator in such use case might
be counter productive.