使用 Spring Kafka 在单个事务中写入两个 Kafka 主题
Write to two Kafka topics in a single transaction using Spring Kafka
我正在尝试找出是否有一种方法可以使用 Kafka 的事务功能在一个事务中写入两个主题。
我知道使用 Kafka 事务的典型场景是消费者-生产者模式,这似乎有据可查。
我尝试过的:
- 为每个主题创建了一个
KafkaTransactionManager
- 配置每个
ProducerFactory
使用各自的事务管理器
- 使用
KafkaTransactionManager
的两个实例创建了一个 ChainedTransactionManger
为每个主题创建了一个 KafkaTemplate
然后我在执行以下操作的方法上使用了 @Transactional(transactionManager = "chainedTx")
注释:
template1.send("topic1", "example payload");
template2.send("topic2", "example payload");
这行不通。 KafkaTemplate
是事务性的,但是当调用 send()
方法时,没有正在进行的事务,我得到一个 IllegalStateException
.
我打算尝试 KafkaTemplate.executeInTransaction()
方法,但 Javadoc 指出这仅适用于本地事务,因此它似乎不符合我的需要。
我的下一步是尝试直接使用 Kafka 的 Producer API 来查看此模式是否有效,但如果有人能告诉我我在浪费时间而 Kafka 不会,我将不胜感激'不支持事务性写入多个主题。
我确实在 Confluent 关于 Kafka 事务支持的博客中找到了这个声明:
Transactions enable atomic writes to multiple Kafka topics and partitions...
但我还没有找到任何例子来证明它。
第一个生产者的配置
@配置
public class ControlProducerConfig {
@Bean("controlTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("controlTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
第二生产者的配置
@Configuration
public class PayloadProducerConfig {
@Bean("payloadTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("payloadTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
主要class
@EnableTransactionManagement
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean("chainedTx")
public ChainedTransactionManager chained(
@Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
@Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {
return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
}
@Bean OnStart onStart(PostTwoMessages postTwoMessages) {
return new OnStart(postTwoMessages);
}
@Bean
public PostTwoMessages postTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {
return new PostTwoMessages(controlTemplate, payloadTemplate);
}
}
应用程序启动时
public class OnStart implements ApplicationListener<ApplicationReadyEvent> {
private PostTwoMessages postTwoMessages;
public OnStart(PostTwoMessages postTwoMessages) {
this.postTwoMessages = postTwoMessages;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
postTwoMessages.run();
}
}
发布两条消息
public class PostTwoMessages {
private final KafkaTemplate<String, String> controlTemplate;
private final KafkaTemplate<String, String> payloadTemplate;
public PostTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {
this.controlTemplate = controlTemplate;
this.payloadTemplate = payloadTemplate;
}
@Transactional(transactionManager = "chainedTx")
public void run() {
UUID uuid = UUID.randomUUID();
controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
}
}
它应该可以工作;你有@EnableTransactionManagement
吗?
但是,交易不能跨越 2 个不同的生产者;您必须使用相同的模板进行两次发送。否则它是 2 个不同的交易。
编辑
这是一个 Spring 引导应用程序的示例:
EDIT2
更新示例以显示通过 executeInTransaction
使用本地事务。
@SpringBootApplication
public class So54865968Application {
public static void main(String[] args) {
SpringApplication.run(So54865968Application.class, args);
}
@Bean
public ApplicationRunner runner(Foo foo) {
return args -> {
foo.runInTx();
System.out.println("Committed 1");
foo.runInLocalTx();
System.out.println("Committed 2");
};
}
@Bean
public Foo foo(KafkaTemplate<String, Object> template) {
return new Foo(template);
}
@Bean
public Bar bar() {
return new Bar();
}
@Bean
public NewTopic topic1() {
return new NewTopic("so54865968-1", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so54865968-2", 1, (short) 1);
}
public static class Foo {
private final KafkaTemplate<String, Object> template;
public Foo(KafkaTemplate<String, Object> template) {
this.template = template;
}
@Transactional(transactionManager = "kafkaTransactionManager")
public void runInTx() throws InterruptedException {
this.template.send("so54865968-1", 42);
this.template.send("so54865968-2", "texttest");
System.out.println("Sent 2; waiting a few seconds to commit");
Thread.sleep(5_000);
}
public void runInLocalTx() throws InterruptedException {
this.template.executeInTransaction(t -> {
t.send("so54865968-1", 43);
t.send("so54865968-2", "texttest2");
System.out.println("Sent 2; waiting a few seconds to commit");
try {
Thread.sleep(5_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true;
});
}
}
public static class Bar {
@KafkaListener(id = "foo", topics = { "so54865968-1", "so54865968-2" })
public void haandler(byte[] bytes) {
if (bytes.length == 4) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
System.out.println("Received int " + bb.getInt());
}
else {
System.out.println("Received string " + new String(bytes));
}
}
}
}
和
spring.kafka.producer.transaction-id-prefix=tx-id
spring.kafka.producer.properties.value.serializer=com.example.CompositeSerializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.properties.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
和
public class CompositeSerializer implements Serializer<Object> {
private final StringSerializer stringSerializer = new StringSerializer();
private final IntegerSerializer intSerializer = new IntegerSerializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Object data) {
return data instanceof Integer ? intSerializer.serialize(topic, (Integer) data)
: stringSerializer.serialize(topic, (String) data);
}
@Override
public void close() {
}
}
和
Received int 42
Received string texttest
停顿 5 秒后,两人都出现了。
我正在尝试找出是否有一种方法可以使用 Kafka 的事务功能在一个事务中写入两个主题。
我知道使用 Kafka 事务的典型场景是消费者-生产者模式,这似乎有据可查。
我尝试过的:
- 为每个主题创建了一个
KafkaTransactionManager
- 配置每个
ProducerFactory
使用各自的事务管理器 - 使用
KafkaTransactionManager
的两个实例创建了一个 为每个主题创建了一个
KafkaTemplate
然后我在执行以下操作的方法上使用了
@Transactional(transactionManager = "chainedTx")
注释:template1.send("topic1", "example payload"); template2.send("topic2", "example payload");
ChainedTransactionManger
这行不通。 KafkaTemplate
是事务性的,但是当调用 send()
方法时,没有正在进行的事务,我得到一个 IllegalStateException
.
我打算尝试 KafkaTemplate.executeInTransaction()
方法,但 Javadoc 指出这仅适用于本地事务,因此它似乎不符合我的需要。
我的下一步是尝试直接使用 Kafka 的 Producer API 来查看此模式是否有效,但如果有人能告诉我我在浪费时间而 Kafka 不会,我将不胜感激'不支持事务性写入多个主题。
我确实在 Confluent 关于 Kafka 事务支持的博客中找到了这个声明:
Transactions enable atomic writes to multiple Kafka topics and partitions...
但我还没有找到任何例子来证明它。
第一个生产者的配置
@配置 public class ControlProducerConfig {
@Bean("controlTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("controlTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
第二生产者的配置
@Configuration
public class PayloadProducerConfig {
@Bean("payloadTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("payloadTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
主要class
@EnableTransactionManagement
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean("chainedTx")
public ChainedTransactionManager chained(
@Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
@Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {
return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
}
@Bean OnStart onStart(PostTwoMessages postTwoMessages) {
return new OnStart(postTwoMessages);
}
@Bean
public PostTwoMessages postTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {
return new PostTwoMessages(controlTemplate, payloadTemplate);
}
}
应用程序启动时
public class OnStart implements ApplicationListener<ApplicationReadyEvent> {
private PostTwoMessages postTwoMessages;
public OnStart(PostTwoMessages postTwoMessages) {
this.postTwoMessages = postTwoMessages;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
postTwoMessages.run();
}
}
发布两条消息
public class PostTwoMessages {
private final KafkaTemplate<String, String> controlTemplate;
private final KafkaTemplate<String, String> payloadTemplate;
public PostTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {
this.controlTemplate = controlTemplate;
this.payloadTemplate = payloadTemplate;
}
@Transactional(transactionManager = "chainedTx")
public void run() {
UUID uuid = UUID.randomUUID();
controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
}
}
它应该可以工作;你有@EnableTransactionManagement
吗?
但是,交易不能跨越 2 个不同的生产者;您必须使用相同的模板进行两次发送。否则它是 2 个不同的交易。
编辑
这是一个 Spring 引导应用程序的示例:
EDIT2
更新示例以显示通过 executeInTransaction
使用本地事务。
@SpringBootApplication
public class So54865968Application {
public static void main(String[] args) {
SpringApplication.run(So54865968Application.class, args);
}
@Bean
public ApplicationRunner runner(Foo foo) {
return args -> {
foo.runInTx();
System.out.println("Committed 1");
foo.runInLocalTx();
System.out.println("Committed 2");
};
}
@Bean
public Foo foo(KafkaTemplate<String, Object> template) {
return new Foo(template);
}
@Bean
public Bar bar() {
return new Bar();
}
@Bean
public NewTopic topic1() {
return new NewTopic("so54865968-1", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so54865968-2", 1, (short) 1);
}
public static class Foo {
private final KafkaTemplate<String, Object> template;
public Foo(KafkaTemplate<String, Object> template) {
this.template = template;
}
@Transactional(transactionManager = "kafkaTransactionManager")
public void runInTx() throws InterruptedException {
this.template.send("so54865968-1", 42);
this.template.send("so54865968-2", "texttest");
System.out.println("Sent 2; waiting a few seconds to commit");
Thread.sleep(5_000);
}
public void runInLocalTx() throws InterruptedException {
this.template.executeInTransaction(t -> {
t.send("so54865968-1", 43);
t.send("so54865968-2", "texttest2");
System.out.println("Sent 2; waiting a few seconds to commit");
try {
Thread.sleep(5_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true;
});
}
}
public static class Bar {
@KafkaListener(id = "foo", topics = { "so54865968-1", "so54865968-2" })
public void haandler(byte[] bytes) {
if (bytes.length == 4) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
System.out.println("Received int " + bb.getInt());
}
else {
System.out.println("Received string " + new String(bytes));
}
}
}
}
和
spring.kafka.producer.transaction-id-prefix=tx-id
spring.kafka.producer.properties.value.serializer=com.example.CompositeSerializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.properties.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
和
public class CompositeSerializer implements Serializer<Object> {
private final StringSerializer stringSerializer = new StringSerializer();
private final IntegerSerializer intSerializer = new IntegerSerializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Object data) {
return data instanceof Integer ? intSerializer.serialize(topic, (Integer) data)
: stringSerializer.serialize(topic, (String) data);
}
@Override
public void close() {
}
}
和
Received int 42
Received string texttest
停顿 5 秒后,两人都出现了。