Spring Kafka/Spring Cloud Stream 如何保证涉及数据库和 Kafka 的事务性/原子性?

How does Spring Kafka/Spring Cloud Stream guarantee the transactionality / atomicity involving a Database and Kafka?

Spring Kafka, and thus Spring Cloud Stream, allow us to create transactional Producers and Processors. We can see that functionality in action in one of the sample projects: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:

@Transactional
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public PersonEvent process(PersonEvent data) {
        logger.info("Received event={}", data);
        Person person = new Person();
        person.setName(data.getName());

        if(shouldFail.get()) {
            shouldFail.set(false);
            throw new RuntimeException("Simulated network error");
        } else {
            //We fail every other request as a test
            shouldFail.set(true);
        }
        logger.info("Saving person={}", person);

        Person savedPerson = repository.save(person);

        PersonEvent event = new PersonEvent();
        event.setName(savedPerson.getName());
        event.setType("PersonSaved");
        logger.info("Sent event={}", event);
        return event;
    }

在此摘录中,有一个 从 Kafka 主题读取,一个写入数据库,另一个写入另一个 Kafka 主题,所有这些都是事务性的。

我想知道并希望得到回答的是,技术上是如何实现和实施的。

由于数据源和 Kafka 不参与 XA 事务(2 阶段提交),实现如何保证本地事务可以从 Kafka 读取、提交到数据库并写入 Kafka 所有这些事务?

没有保证,只能在 Kafka 自身内部。

Spring 提供事务 synchronization 因此提交非常接近,但是 DB 有可能提交而 Kafka 没有。所以你必须处理重复的可能性。

正确的做法是,当直接使用 spring-kafka 时,不是使用 @Transactional,而是在侦听器容器中使用 ChainedKafkaTransactionManager

参见 Transaction Synchronization

另请参阅 Distributed transactions in Spring, with and without XA 和“Best Efforts 1PC 模式”作为背景。

但是,对于 Stream,不支持链式事务管理器,因此需要 @Transactional(使用 DB 事务管理器)。这将提供与链式 tx 管理器类似的结果,数据库首先提交,就在 Kafka 之前。