KafkaProducer 和 KafkaTransactionManager 异常时不回滚
KafkaProducer and KafkaTransactionManager No Rollback on exception
Objective-
从(源)MQ 队列消费消息并发布到
a) 另一个(目标)MQ 队列和
b) 事务中的 Kafka 主题,从而避免在 MQ 或 Kafka 发布失败的情况下从源 MQ 中删除消息。
使用的框架
Spring 引导版本 - 2.1.5
Spring JMS -5.1.7
Spring Kafka- 2.2.6
融合 Kafka- 5.3
MQ-9
卡夫卡配置
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
@Slf4j
@Getter
@Setter
@ToString
@EnableTransactionManagement
public class KafkaConfig {
/** injected local properties */
public Map<String, Object> producerConfigs() throws IOException{
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
log.info("Value of transaction id 0 {}",transactionIdPrefix);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,transactionIdPrefix);
sslCommonConfigs(props);
return props;
}
public Map<String, Object> consumerConfigs() throws IOException{
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutocommit);
sslCommonConfigs(props);
return props;
}
public Map<String, Object> sslCommonConfigs(Map<String, Object> props) throws IOException {
log.info("kafka config {}",this);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, specificAvroReader);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, FileUtil.decodeCertFile(trustStoreValue, "kafka_truststore.jks"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePw);
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE); //"JKS"
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, FileUtil.decodeCertFile(keyStoreValue, "kafka_keystore.jks"));
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePw);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyStorePw);
return props;
}
@Bean
public ProducerFactory producerFactory() throws IOException {
DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setTransactionIdPrefix(this.transactionIdPrefix);
return producerFactory;
}
@Bean
public KafkaTemplate<String, RawPage> ddaKafkaTemplate() throws IOException {
return new KafkaTemplate<String, RawPage>(producerFactory());
}
@Bean
public KafkaTransactionManager<String,RawPage> kafkaTransactionManager(ProducerFactory<String, RawPage> producerFactory) {
log.info("producerFactory.transactionCapable() {}",producerFactory.transactionCapable());
KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory);
transactionManager.setNestedTransactionAllowed(true);
transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
return transactionManager;
}
**应用配置Class*
@Slf4j
@Configuration
public class ApplicationConfig {
@Bean
public JmsListenerContainerFactory<?> myMessageFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer,
ChainedTransactionManager chainedTransactionManager) {
log.debug("Connection factory instance as received {} {}",connectionFactory,configurer);
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setTransactionManager(chainedTransactionManager);
factory.setSessionTransacted(true);
configurer.configure(factory, connectionFactory);
log.debug("Returning the myMessageFactory factory instance as {}",factory);
return factory;
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory){
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(KafkaTransactionManager kafkaTransactionManager, JmsTransactionManager jmsTransactionManager){
return new ChainedKafkaTransactionManager(jmsTransactionManager,kafkaTransactionManager );
}
}
实际消费者和发布代码
@Service
@Slf4j
@Setter
@Getter
public class MyMessageProcessor {
@Autowired
private KafkaTemplate<String, Event> kafkaTemplate;
@Autowired
private JmsTemplate jmsTemplate;
@JmsListener(destination = "desintationQueue"
,containerFactory = "myMessageFactory")
public void receiveMessage(TextMessage message){
try {
log.info("Received message {}",message.getText());
send(destinationQueueName,message.getText());
// build avro event
publish(evnet);
// only acknowledge if the message is successfully processed till kafka publication
message.acknowledge();
}catch (JMSException|CustomKafkaPublicationException e){
log.error("Error in consuming the message from sourceSystem {}", ExceptionUtils.getStackTrace(e));
}
}
public void send(String queueName,final String msg) throws RawPagePublicationException{
if(StringUtils.isEmpty(msg)|| StringUtils.isEmpty(queueName)){
String errorMessage = String.format("Incorrect message and queue details msg %s queueName %s.",msg,queueName);
log.error(errorMessage);
throw new CustomKafkaPublicationException(errorMessage);
}
log.info("Publishing the message to destination queue {} at time in millis {}",queueName,System.currentTimeMillis());
jmsTemplate.convertAndSend(queueName, msg);
log.info("Published the message to queue {} at time in millis {}",queueName,System.currentTimeMillis());
}
主Spring开机class
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
@EnableJms
@Slf4j
@EnableTransactionManagement
@EnableRetry
@EnableAutoConfiguration(exclude =
{JmsHealthIndicatorAutoConfiguration.class, KafkaAutoConfiguration.class})
public class MyConsumerApplication {
错误日志
- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:49.270 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-12-08 19:39:49.332 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Created JMS transaction on Session [com.ibm.mq.jms.MQSession@1a7d298f] from Connection [com.ibm.mq.jms.MQConnection@5d644e4a]
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.333 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state READY to IN_TRANSACTION
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.848 DEBUG 37524 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Received message of type [class com.ibm.jms.JMSTextMessage] from consumer [com.ibm.mq.jms.MQQueueReceiver@2826f379] of transactional session [com.ibm.mq.jms.MQSession@1a7d298f]
2019-12-08 19:39:49.849 DEBUG 37524 --- [enerContainer-1] .s.j.l.a.MessagingMessageListenerAdapter : Processing [org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@53b59c04]
2019-12-08 19:39:49.849 INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Received message 1221222112#
2019-12-08 19:39:49.859 INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Publishing the message to destination queue DEST_QUEUE at time in millis 1575833989859
2019-12-08 19:39:49.861 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.861 DEBUG 37524 --- [enerContainer-1] o.springframework.jms.core.JmsTemplate : Executing callback on JMS Session: com.ibm.mq.jms.MQSession@1a7d298f
2019-12-08 19:39:49.875 DEBUG 37524 --- [enerContainer-1] o.springframework.jms.core.JmsTemplate : Sending created message:
JMSMessage class: jms_text
JMSType: null
JMSDeliveryMode: 2
JMSDeliveryDelay: 0
JMSDeliveryTime: 0
JMSExpiration: 0
JMSPriority: 4
JMSMessageID: null
JMSTimestamp: 0
JMSCorrelationID: null
JMSDestination: null
JMSReplyTo: null
JMSRedelivered: false
1221222112#
2019-12-08 19:39:49.902 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.905 INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Published the message to queue DEST_QUEUE at time in millis 1575833989905
2019-12-08 19:39:49.997 DEBUG 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Publishing message with key 106e096a-4633-49c8-abaa-a8d0bade84d2: value {"content": "1221222112#", "sourceType": "MQ", "sourceLocation": "MINT", "msgType": null, "correlationId": "106e096a-4633-49c8-abaa-a8d0bade84d2", "receivedTs": 1575833989997}
2019-12-08 19:39:49.999 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.999 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.007 TRACE 37524 --- [enerContainer-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Requesting metadata update for topic TOPIC-SAMP-DATA.
2019-12-08 19:39:50.170 INFO 37524 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: O5fhv74bT9KIkV17ia8snQ
2019-12-08 19:39:50.309 ERROR 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Error publishing raw page with exception org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroTypeException: Not an enum: null for schema: {"type":"enum","name":"MsgType","namespace":"com.mysample.avro","symbols":["TYPE"]}
at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:218)
at org.apache.avro.specific.SpecificDatumWriter.writeEnum(SpecificDatumWriter.java:61)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:133)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199)
at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:106)
at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
.
2019-12-08 19:39:50.310 ERROR 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Error in consuming the message from sourceSystem com.mysample.consumer.exception.CustomKafkaPublicationException: Error publishing raw page with exception org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroTypeException: Not an enum: null for schema: {"type":"enum","name":"MsgType","namespace":"com.mysample.avro","symbols":["TYPE"]}
at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:218)
at org.apache.avro.specific.SpecificDatumWriter.writeEnum(SpecificDatumWriter.java:61)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:133)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199)
at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:106)
at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
.
at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:111)
at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
2019-12-08 19:39:50.312 TRACE 37524 --- [enerContainer-1] .s.j.l.a.MessagingMessageListenerAdapter : No result object given - no result to handle
2019-12-08 19:39:50.312 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.316 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Enqueuing transactional request (type=EndTxnRequest, transactionalId=SAMP-CON-0, producerId=435000, producerEpoch=39, result=COMMIT)
2019-12-08 19:39:50.317 DEBUG 37524 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Not sending EndTxn for completed transaction since no partitions or offsets were successfully added
2019-12-08 19:39:50.317 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:50.317 DEBUG 37524 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state COMMITTING_TRANSACTION to READY
2019-12-08 19:39:50.317 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] from thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Initiating transaction commit
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Committing JMS transaction on Session [com.ibm.mq.jms.MQSession@1a7d298f]
2019-12-08 19:39:50.396 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] from thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.411 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Resuming suspended transaction after completion of inner transaction
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:50.412 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
问题
Kakfa 生产者不回滚事务,因此
- 目标队列有不应该存在的数据
- 源队列不再有消息
- kafka主题没有消息。
已尝试
在回滚中使用 Throwable 与 Exception
将发布到队列和主题的代码分开classes
我们如何测试回滚场景是否适用于 Kafka 生产者?
at com.mysample.consumer.MyMessageProcessor$$FastClassBySpringCGLIB$$a0eef45f.invoke(<generated>)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
我在堆栈跟踪中没有看到事务拦截器,这意味着 @Transactional
不工作 - 您需要 @EnableTransactionManagement
@Configuration
class。
但是,您真的不需要 @Transactional
,只需将 ChainedTransactionManager
注入侦听器容器工厂,以便在那里启动两个事务。
尝试使用 ChainedKafkaTransactionManager
并将所有使用过的事务管理器连接到其中:
@Bean
public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(DataSourceTransactionManager dataSourceTransactionManager,
KafkaTransactionManager<String, String> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<String, String>(kafkaTransactionManager, dataSourceTransactionManager);
}
在我的例子中,DataSourceTransactionManager 已被 KafkaTransactionManager 取代。因此 @Transactional 停止工作:不再回滚 RuntimeExceptions。
ChainedTransactionManager 按预期工作。请记住,这不是 XA 事务,而是一个简单的链式 1-Phase-Commit.
Objective-
从(源)MQ 队列消费消息并发布到
a) 另一个(目标)MQ 队列和
b) 事务中的 Kafka 主题,从而避免在 MQ 或 Kafka 发布失败的情况下从源 MQ 中删除消息。
使用的框架
Spring 引导版本 - 2.1.5
Spring JMS -5.1.7
Spring Kafka- 2.2.6
融合 Kafka- 5.3
MQ-9
卡夫卡配置
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
@Slf4j
@Getter
@Setter
@ToString
@EnableTransactionManagement
public class KafkaConfig {
/** injected local properties */
public Map<String, Object> producerConfigs() throws IOException{
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
log.info("Value of transaction id 0 {}",transactionIdPrefix);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,transactionIdPrefix);
sslCommonConfigs(props);
return props;
}
public Map<String, Object> consumerConfigs() throws IOException{
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutocommit);
sslCommonConfigs(props);
return props;
}
public Map<String, Object> sslCommonConfigs(Map<String, Object> props) throws IOException {
log.info("kafka config {}",this);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, specificAvroReader);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, FileUtil.decodeCertFile(trustStoreValue, "kafka_truststore.jks"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePw);
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE); //"JKS"
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, FileUtil.decodeCertFile(keyStoreValue, "kafka_keystore.jks"));
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePw);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyStorePw);
return props;
}
@Bean
public ProducerFactory producerFactory() throws IOException {
DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setTransactionIdPrefix(this.transactionIdPrefix);
return producerFactory;
}
@Bean
public KafkaTemplate<String, RawPage> ddaKafkaTemplate() throws IOException {
return new KafkaTemplate<String, RawPage>(producerFactory());
}
@Bean
public KafkaTransactionManager<String,RawPage> kafkaTransactionManager(ProducerFactory<String, RawPage> producerFactory) {
log.info("producerFactory.transactionCapable() {}",producerFactory.transactionCapable());
KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory);
transactionManager.setNestedTransactionAllowed(true);
transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
return transactionManager;
}
**应用配置Class*
@Slf4j
@Configuration
public class ApplicationConfig {
@Bean
public JmsListenerContainerFactory<?> myMessageFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer,
ChainedTransactionManager chainedTransactionManager) {
log.debug("Connection factory instance as received {} {}",connectionFactory,configurer);
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setTransactionManager(chainedTransactionManager);
factory.setSessionTransacted(true);
configurer.configure(factory, connectionFactory);
log.debug("Returning the myMessageFactory factory instance as {}",factory);
return factory;
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory){
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(KafkaTransactionManager kafkaTransactionManager, JmsTransactionManager jmsTransactionManager){
return new ChainedKafkaTransactionManager(jmsTransactionManager,kafkaTransactionManager );
}
}
实际消费者和发布代码
@Service
@Slf4j
@Setter
@Getter
public class MyMessageProcessor {
@Autowired
private KafkaTemplate<String, Event> kafkaTemplate;
@Autowired
private JmsTemplate jmsTemplate;
@JmsListener(destination = "desintationQueue"
,containerFactory = "myMessageFactory")
public void receiveMessage(TextMessage message){
try {
log.info("Received message {}",message.getText());
send(destinationQueueName,message.getText());
// build avro event
publish(evnet);
// only acknowledge if the message is successfully processed till kafka publication
message.acknowledge();
}catch (JMSException|CustomKafkaPublicationException e){
log.error("Error in consuming the message from sourceSystem {}", ExceptionUtils.getStackTrace(e));
}
}
public void send(String queueName,final String msg) throws RawPagePublicationException{
if(StringUtils.isEmpty(msg)|| StringUtils.isEmpty(queueName)){
String errorMessage = String.format("Incorrect message and queue details msg %s queueName %s.",msg,queueName);
log.error(errorMessage);
throw new CustomKafkaPublicationException(errorMessage);
}
log.info("Publishing the message to destination queue {} at time in millis {}",queueName,System.currentTimeMillis());
jmsTemplate.convertAndSend(queueName, msg);
log.info("Published the message to queue {} at time in millis {}",queueName,System.currentTimeMillis());
}
主Spring开机class
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
@EnableJms
@Slf4j
@EnableTransactionManagement
@EnableRetry
@EnableAutoConfiguration(exclude =
{JmsHealthIndicatorAutoConfiguration.class, KafkaAutoConfiguration.class})
public class MyConsumerApplication {
错误日志
- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:49.270 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-12-08 19:39:49.332 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Created JMS transaction on Session [com.ibm.mq.jms.MQSession@1a7d298f] from Connection [com.ibm.mq.jms.MQConnection@5d644e4a]
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.333 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state READY to IN_TRANSACTION
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.848 DEBUG 37524 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Received message of type [class com.ibm.jms.JMSTextMessage] from consumer [com.ibm.mq.jms.MQQueueReceiver@2826f379] of transactional session [com.ibm.mq.jms.MQSession@1a7d298f]
2019-12-08 19:39:49.849 DEBUG 37524 --- [enerContainer-1] .s.j.l.a.MessagingMessageListenerAdapter : Processing [org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@53b59c04]
2019-12-08 19:39:49.849 INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Received message 1221222112#
2019-12-08 19:39:49.859 INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Publishing the message to destination queue DEST_QUEUE at time in millis 1575833989859
2019-12-08 19:39:49.861 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.861 DEBUG 37524 --- [enerContainer-1] o.springframework.jms.core.JmsTemplate : Executing callback on JMS Session: com.ibm.mq.jms.MQSession@1a7d298f
2019-12-08 19:39:49.875 DEBUG 37524 --- [enerContainer-1] o.springframework.jms.core.JmsTemplate : Sending created message:
JMSMessage class: jms_text
JMSType: null
JMSDeliveryMode: 2
JMSDeliveryDelay: 0
JMSDeliveryTime: 0
JMSExpiration: 0
JMSPriority: 4
JMSMessageID: null
JMSTimestamp: 0
JMSCorrelationID: null
JMSDestination: null
JMSReplyTo: null
JMSRedelivered: false
1221222112#
2019-12-08 19:39:49.902 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.905 INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Published the message to queue DEST_QUEUE at time in millis 1575833989905
2019-12-08 19:39:49.997 DEBUG 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Publishing message with key 106e096a-4633-49c8-abaa-a8d0bade84d2: value {"content": "1221222112#", "sourceType": "MQ", "sourceLocation": "MINT", "msgType": null, "correlationId": "106e096a-4633-49c8-abaa-a8d0bade84d2", "receivedTs": 1575833989997}
2019-12-08 19:39:49.999 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.999 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.007 TRACE 37524 --- [enerContainer-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Requesting metadata update for topic TOPIC-SAMP-DATA.
2019-12-08 19:39:50.170 INFO 37524 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: O5fhv74bT9KIkV17ia8snQ
2019-12-08 19:39:50.309 ERROR 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Error publishing raw page with exception org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroTypeException: Not an enum: null for schema: {"type":"enum","name":"MsgType","namespace":"com.mysample.avro","symbols":["TYPE"]}
at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:218)
at org.apache.avro.specific.SpecificDatumWriter.writeEnum(SpecificDatumWriter.java:61)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:133)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199)
at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:106)
at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
.
2019-12-08 19:39:50.310 ERROR 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor : Error in consuming the message from sourceSystem com.mysample.consumer.exception.CustomKafkaPublicationException: Error publishing raw page with exception org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroTypeException: Not an enum: null for schema: {"type":"enum","name":"MsgType","namespace":"com.mysample.avro","symbols":["TYPE"]}
at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:218)
at org.apache.avro.specific.SpecificDatumWriter.writeEnum(SpecificDatumWriter.java:61)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:133)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199)
at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:106)
at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
.
at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:111)
at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
2019-12-08 19:39:50.312 TRACE 37524 --- [enerContainer-1] .s.j.l.a.MessagingMessageListenerAdapter : No result object given - no result to handle
2019-12-08 19:39:50.312 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.316 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Enqueuing transactional request (type=EndTxnRequest, transactionalId=SAMP-CON-0, producerId=435000, producerEpoch=39, result=COMMIT)
2019-12-08 19:39:50.317 DEBUG 37524 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Not sending EndTxn for completed transaction since no partitions or offsets were successfully added
2019-12-08 19:39:50.317 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:50.317 DEBUG 37524 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1, transactionalId=SAMP-CON-0] Transition from state COMMITTING_TRANSACTION to READY
2019-12-08 19:39:50.317 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] from thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Initiating transaction commit
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Committing JMS transaction on Session [com.ibm.mq.jms.MQSession@1a7d298f]
2019-12-08 19:39:50.396 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] from thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.411 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Resuming suspended transaction after completion of inner transaction
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-12-08 19:39:50.412 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionManager : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
问题
Kakfa 生产者不回滚事务,因此
- 目标队列有不应该存在的数据
- 源队列不再有消息
- kafka主题没有消息。
已尝试
在回滚中使用 Throwable 与 Exception
将发布到队列和主题的代码分开classes
我们如何测试回滚场景是否适用于 Kafka 生产者?
at com.mysample.consumer.MyMessageProcessor$$FastClassBySpringCGLIB$$a0eef45f.invoke(<generated>)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
我在堆栈跟踪中没有看到事务拦截器,这意味着 @Transactional
不工作 - 您需要 @EnableTransactionManagement
@Configuration
class。
但是,您真的不需要 @Transactional
,只需将 ChainedTransactionManager
注入侦听器容器工厂,以便在那里启动两个事务。
尝试使用 ChainedKafkaTransactionManager
并将所有使用过的事务管理器连接到其中:
@Bean
public ChainedKafkaTransactionManager<String, String> chainedTransactionManager(DataSourceTransactionManager dataSourceTransactionManager,
KafkaTransactionManager<String, String> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<String, String>(kafkaTransactionManager, dataSourceTransactionManager);
}
在我的例子中,DataSourceTransactionManager 已被 KafkaTransactionManager 取代。因此 @Transactional 停止工作:不再回滚 RuntimeExceptions。
ChainedTransactionManager 按预期工作。请记住,这不是 XA 事务,而是一个简单的链式 1-Phase-Commit.