Spring Cloud Stream Kafka consumer/producer API exactly once 语义(事务性)
Spring Cloud Stream Kafka consumer/producer API exactly once semantics (transactional)
Spring 启用事务和动态目标的 Cloud Stream Kafka 出现问题。我有两种不同的服务
- 第一个服务将从 Solace 队列中监听并将其生成到 kafka topic-1(启用事务的地方)
- 第二个服务将从 kafka topic-1 上方监听并将其写入另一个 kafka topic-2(我们没有手动提交,启用事务以生成其他主题,自动提交偏移量为 false & isolation.level 设置为 read_commited),我们将动态识别主题名称,因此我们使用动态目标解析器
现在第二个服务有问题,如果只是 运行 作为@StreamListener 和@SendTo 的服务,它按预期工作正常。但是当我开始使用动态目标时出现以下问题:
动态目的地
Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:810) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:423) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:351) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
尝试了两种动态目标解析器的方法:
dynamic destination resolver
spring 云卡夫卡的 yml:
spring:
cloud.stream:
bindings:
input:
destination: test_input
content-type: application/json
group: test_group
output:
destination: test_output
content-type: application/json
kafka.binder:
configuration:
isolation.level: read_committed
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
ssl.truststore.location: jks
ssl.truststore.password:
ssl.endpoint.identification.algorithm: null
brokers: broker1:9092,broker2:9092,broker3:9092
auto-create-topics: false
transaction:
transaction-id-prefix: trans-2
producer:
configuration:
retries: 2000
acks: all
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
ssl.truststore.location: jks
ssl.truststore.password:
ssl.endpoint.identification.algorithm: null
这是这个问题的背景
更新代码:
@Autowired
private BinderAwareChannelResolver resolver;
@StreamListener(target = Processor.INPUT)
public void consumer(@Payload Object inMessage, @Headers Map headers) {
String topicName = null;
String itemType = null;
try {
TransactionSynchronizationManager.setActualTransactionActive(true);
itemType = msgService.itemTypeExtract((String) inMessage);
topicName = msgService.getTopicName(itemType, (String) inMessage);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
resolver.resolveDestination("destination_topic")
.send(MessageBuilder.createMessage(inMessage, new MessageHeaders(headersMap)), 10000);
} catch (Exception e) {
LOGGER.error("error " + e.getMessage());
}
}
活页夹中存在错误;我打开了 an issue to get it fixed.
Spring 启用事务和动态目标的 Cloud Stream Kafka 出现问题。我有两种不同的服务
- 第一个服务将从 Solace 队列中监听并将其生成到 kafka topic-1(启用事务的地方)
- 第二个服务将从 kafka topic-1 上方监听并将其写入另一个 kafka topic-2(我们没有手动提交,启用事务以生成其他主题,自动提交偏移量为 false & isolation.level 设置为 read_commited),我们将动态识别主题名称,因此我们使用动态目标解析器
现在第二个服务有问题,如果只是 运行 作为@StreamListener 和@SendTo 的服务,它按预期工作正常。但是当我开始使用动态目标时出现以下问题:
动态目的地
Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:810) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:423) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:351) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) [spring-integration-core-5.1.0.RELEASE.jar:5.1.0.RELEASE]
尝试了两种动态目标解析器的方法:
dynamic destination resolver
spring 云卡夫卡的 yml:
spring:
cloud.stream:
bindings:
input:
destination: test_input
content-type: application/json
group: test_group
output:
destination: test_output
content-type: application/json
kafka.binder:
configuration:
isolation.level: read_committed
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
ssl.truststore.location: jks
ssl.truststore.password:
ssl.endpoint.identification.algorithm: null
brokers: broker1:9092,broker2:9092,broker3:9092
auto-create-topics: false
transaction:
transaction-id-prefix: trans-2
producer:
configuration:
retries: 2000
acks: all
security.protocol: SASL_SSL
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
ssl.truststore.location: jks
ssl.truststore.password:
ssl.endpoint.identification.algorithm: null
这是这个问题的背景
更新代码:
@Autowired
private BinderAwareChannelResolver resolver;
@StreamListener(target = Processor.INPUT)
public void consumer(@Payload Object inMessage, @Headers Map headers) {
String topicName = null;
String itemType = null;
try {
TransactionSynchronizationManager.setActualTransactionActive(true);
itemType = msgService.itemTypeExtract((String) inMessage);
topicName = msgService.getTopicName(itemType, (String) inMessage);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
resolver.resolveDestination("destination_topic")
.send(MessageBuilder.createMessage(inMessage, new MessageHeaders(headersMap)), 10000);
} catch (Exception e) {
LOGGER.error("error " + e.getMessage());
}
}
活页夹中存在错误;我打开了 an issue to get it fixed.