为什么 KafkaTransactionManager 没有应用到这个 Spring Cloud Stream Kafka Producer?
Why isn't the KafkaTransactionManager being applied to this Spring Cloud Stream Kafka Producer?
我已经配置了一个 Spring Cloud Stream Kafka 应用程序来使用事务(full source code available on Github):
spring:
application:
name: message-relay-service
cloud:
stream:
kafka:
binder:
transaction:
transaction-id-prefix: message-relay-tx-
producer:
configuration:
retries: 1
acks: all
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
bindings:
output:
destination: transfer
contentType: application/*+avro
schema-registry-client:
endpoint: http://localhost:8081
schema:
avro:
subjectNamingStrategy: org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
datasource:
url: jdbc:h2:tcp://localhost:9090/mem:mydb
driver-class-name: org.h2.Driver
username: sa
password:
jpa:
hibernate:
ddl-auto: create
database-platform: org.hibernate.dialect.H2Dialect
server:
port: 8085
此应用程序有一个计划任务,使用 @Scheduled
任务定期检查要发送到数据库中的记录。此方法用 @Transactional
注释,主要 class 定义 @EnableTransactionManagement
.
然而,在调试代码时,我发现 KafkaTransactionManager 没有被执行,也就是说,没有 Kafka 事务。有什么问题吗?
@EnableTransactionManagement
@EnableBinding(Source::class)
@EnableScheduling
@SpringBootApplication
class MessageRelayServiceApplication
fun main(args: Array<String>) {
runApplication<MessageRelayServiceApplication>(*args)
}
---
@Component
class MessageRelay(private val outboxService: OutboxService,
private val source: Source) {
@Transactional
@Scheduled(fixedDelay = 10000)
fun checkOutbox() {
val pending = outboxService.getPending()
pending.forEach {
val message = MessageBuilder.withPayload(it.payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, it.messageKey)
.setHeader(MessageHeaders.CONTENT_TYPE, it.contentType)
.build()
source.output().send(message)
outboxService.markAsProcessed(it.id)
}
}
}
我在 account-service
中没有看到 @EnableTransactionManagement
,仅在 message-relay-service
中看到 @EnableTransactionManagement
。
无论如何,目前不支持您的场景;事务绑定器是为消费者启动事务的处理器设计的,在消费者线程上发送的任何记录都参与该事务,消费者将偏移量发送到事务然后提交事务。
它不是为 producer-only 绑定而设计的;请针对活页夹打开一个 GitHub 问题,因为它应该得到支持。
我不确定为什么您没有看到交易开始,但即使看到了,问题是 @Transactional
将使用 Boot 的 auto-configured KTM(和生产者工厂)和绑定使用不同的生产者工厂(来自您的配置的工厂)。
即使交易正在进行中,生产者也不会参与。
我已经配置了一个 Spring Cloud Stream Kafka 应用程序来使用事务(full source code available on Github):
spring:
application:
name: message-relay-service
cloud:
stream:
kafka:
binder:
transaction:
transaction-id-prefix: message-relay-tx-
producer:
configuration:
retries: 1
acks: all
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
bindings:
output:
destination: transfer
contentType: application/*+avro
schema-registry-client:
endpoint: http://localhost:8081
schema:
avro:
subjectNamingStrategy: org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
datasource:
url: jdbc:h2:tcp://localhost:9090/mem:mydb
driver-class-name: org.h2.Driver
username: sa
password:
jpa:
hibernate:
ddl-auto: create
database-platform: org.hibernate.dialect.H2Dialect
server:
port: 8085
此应用程序有一个计划任务,使用 @Scheduled
任务定期检查要发送到数据库中的记录。此方法用 @Transactional
注释,主要 class 定义 @EnableTransactionManagement
.
然而,在调试代码时,我发现 KafkaTransactionManager 没有被执行,也就是说,没有 Kafka 事务。有什么问题吗?
@EnableTransactionManagement
@EnableBinding(Source::class)
@EnableScheduling
@SpringBootApplication
class MessageRelayServiceApplication
fun main(args: Array<String>) {
runApplication<MessageRelayServiceApplication>(*args)
}
---
@Component
class MessageRelay(private val outboxService: OutboxService,
private val source: Source) {
@Transactional
@Scheduled(fixedDelay = 10000)
fun checkOutbox() {
val pending = outboxService.getPending()
pending.forEach {
val message = MessageBuilder.withPayload(it.payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, it.messageKey)
.setHeader(MessageHeaders.CONTENT_TYPE, it.contentType)
.build()
source.output().send(message)
outboxService.markAsProcessed(it.id)
}
}
}
我在 account-service
中没有看到 @EnableTransactionManagement
,仅在 message-relay-service
中看到 @EnableTransactionManagement
。
无论如何,目前不支持您的场景;事务绑定器是为消费者启动事务的处理器设计的,在消费者线程上发送的任何记录都参与该事务,消费者将偏移量发送到事务然后提交事务。
它不是为 producer-only 绑定而设计的;请针对活页夹打开一个 GitHub 问题,因为它应该得到支持。
我不确定为什么您没有看到交易开始,但即使看到了,问题是 @Transactional
将使用 Boot 的 auto-configured KTM(和生产者工厂)和绑定使用不同的生产者工厂(来自您的配置的工厂)。
即使交易正在进行中,生产者也不会参与。