为什么 KafkaTransactionManager 没有应用到这个 Spring Cloud Stream Kafka Producer?

Why isn't the KafkaTransactionManager being applied to this Spring Cloud Stream Kafka Producer?

我已经配置了一个 Spring Cloud Stream Kafka 应用程序来使用事务(full source code available on Github):

spring:
  application:
    name: message-relay-service
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: message-relay-tx-
            producer:
              configuration:
                retries: 1
                acks: all
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

      bindings:
        output:
          destination: transfer
          contentType: application/*+avro
      schema-registry-client:
        endpoint: http://localhost:8081
      schema:
        avro:
          subjectNamingStrategy: org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
  datasource:
    url: jdbc:h2:tcp://localhost:9090/mem:mydb
    driver-class-name: org.h2.Driver
    username: sa
    password:
  jpa:
    hibernate:
      ddl-auto: create
    database-platform: org.hibernate.dialect.H2Dialect

server:
  port: 8085

此应用程序有一个计划任务,使用 @Scheduled 任务定期检查要发送到数据库中的记录。此方法用 @Transactional 注释,主要 class 定义 @EnableTransactionManagement.

然而,在调试代码时,我发现 KafkaTransactionManager 没有被执行,也就是说,没有 Kafka 事务。有什么问题吗?

@EnableTransactionManagement
@EnableBinding(Source::class)
@EnableScheduling
@SpringBootApplication
class MessageRelayServiceApplication

fun main(args: Array<String>) {
    runApplication<MessageRelayServiceApplication>(*args)
}

---

@Component
class MessageRelay(private val outboxService: OutboxService,
                   private val source: Source) {

    @Transactional
    @Scheduled(fixedDelay = 10000)
    fun checkOutbox() {
        val pending = outboxService.getPending()
        pending.forEach {
            val message = MessageBuilder.withPayload(it.payload)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, it.messageKey)
                    .setHeader(MessageHeaders.CONTENT_TYPE, it.contentType)
                    .build()
            source.output().send(message)
            outboxService.markAsProcessed(it.id)
        }
    }

}

我在 account-service 中没有看到 @EnableTransactionManagement,仅在 message-relay-service 中看到 @EnableTransactionManagement

无论如何,目前不支持您的场景;事务绑定器是为消费者启动事务的处理器设计的,在消费者线程上发送的任何记录都参与该事务,消费者将偏移量发送到事务然后提交事务。

它不是为 producer-only 绑定而设计的;请针对活页夹打开一个 GitHub 问题,因为它应该得到支持。

我不确定为什么您没有看到交易开始,但即使看到了,问题是 @Transactional 将使用 Boot 的 auto-configured KTM(和生产者工厂)和绑定使用不同的生产者工厂(来自您的配置的工厂)。

即使交易正在进行中,生产者也不会参与。