Kafka Exactly once 与事务生产者
Kafka Exactly once with Transactional Producer
我正在尝试使用事务性 producer/consumer.
完全理解 Kafka 一次
我遇到了下面的例子。但是,我仍然很难理解一次。此代码正确吗?
producer.sendOffsetsToTransaction - What this code does? Should this be done to the same target topic?
什么是系统在consumer.commitSync()之前崩溃了; // 相同的消息会被再次读取并产生重复的消息?
public class ExactlyOnceLowLevel {
public void runConsumer() throws Exception {
final KafkaConsumer<byte[], byte[]> consumer = createConsumer();
final Producer<Long, String> producer = createProducer();
producer.initTransactions();
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
try {
final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
producer.beginTransaction();
for (final ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
final ProducerRecord<Long, String> producerRecord =
new ProducerRecord<>(TOPIC_1, new BigInteger(record.key()).longValue(), record.value().toString());
// send returns Future
final RecordMetadata metadata = producer.send(producerRecord).get();
currentOffsets.put(new TopicPartition(TOPIC_1, record.partition()), new OffsetAndMetadata(record.offset()));
}
producer.sendOffsetsToTransaction(currentOffsets, "my-transactional-consumer-group"); // a bit annoying here to reference group id rwice
producer.commitTransaction();
consumer.commitSync();
currentOffsets.clear();
// EXACTLY ONCE!
}
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
e.printStackTrace();
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
}
catch (final KafkaException e) {
e.printStackTrace();
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
finally {
producer.flush();
producer.close();
}
}
}
private static KafkaConsumer<byte[], byte[]> createConsumer() {
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); // this has to be
return new KafkaConsumer<>(consumerConfig);
}
private static Producer<Long, String> createProducer() {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, 3); // this is now safe !!!!
props.put(ProducerConfig.ACKS_CONFIG, "all"); // this has to be all
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // this has to be 1
return new KafkaProducer<>(props);
}
public static void main(final String... args) throws Exception {
final ExactlyOnceLowLevel example = new ExactlyOnceLowLevel();
example.runConsumer();
}
}
在 Kafka 事务中使用 read/process/write 模式时,您不应尝试向消费者提交偏移量。正如您所暗示的那样,这可能会导致问题。
在这个用例中,需要将偏移量添加到交易中,您应该只使用 sendOffsetsToTransaction()
来做到这一点。该方法确保仅在完整事务成功时才提交这些偏移量。见 Javadoc:
Sends a list of specified offsets to the consumer group coordinator,
and also marks those offsets as part of the current transaction. These
offsets will be considered committed only if the transaction is
committed successfully. The committed offset should be the next
message your application will consume, i.e. lastProcessedMessageOffset
+ 1.
This method should be used when you need to batch consumed and
produced messages together, typically in a consume-transform-produce
pattern. Thus, the specified consumerGroupId should be the same as
config parameter group.id of the used consumer. Note, that the
consumer should have enable.auto.commit=false and should also not
commit offsets manually (via sync or async commits).
我正在尝试使用事务性 producer/consumer.
完全理解 Kafka 一次我遇到了下面的例子。但是,我仍然很难理解一次。此代码正确吗?
producer.sendOffsetsToTransaction - What this code does? Should this be done to the same target topic?
什么是系统在consumer.commitSync()之前崩溃了; // 相同的消息会被再次读取并产生重复的消息?
public class ExactlyOnceLowLevel {
public void runConsumer() throws Exception {
final KafkaConsumer<byte[], byte[]> consumer = createConsumer();
final Producer<Long, String> producer = createProducer();
producer.initTransactions();
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
try {
final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
producer.beginTransaction();
for (final ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
final ProducerRecord<Long, String> producerRecord =
new ProducerRecord<>(TOPIC_1, new BigInteger(record.key()).longValue(), record.value().toString());
// send returns Future
final RecordMetadata metadata = producer.send(producerRecord).get();
currentOffsets.put(new TopicPartition(TOPIC_1, record.partition()), new OffsetAndMetadata(record.offset()));
}
producer.sendOffsetsToTransaction(currentOffsets, "my-transactional-consumer-group"); // a bit annoying here to reference group id rwice
producer.commitTransaction();
consumer.commitSync();
currentOffsets.clear();
// EXACTLY ONCE!
}
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
e.printStackTrace();
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
}
catch (final KafkaException e) {
e.printStackTrace();
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
finally {
producer.flush();
producer.close();
}
}
}
private static KafkaConsumer<byte[], byte[]> createConsumer() {
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); // this has to be
return new KafkaConsumer<>(consumerConfig);
}
private static Producer<Long, String> createProducer() {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, 3); // this is now safe !!!!
props.put(ProducerConfig.ACKS_CONFIG, "all"); // this has to be all
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // this has to be 1
return new KafkaProducer<>(props);
}
public static void main(final String... args) throws Exception {
final ExactlyOnceLowLevel example = new ExactlyOnceLowLevel();
example.runConsumer();
}
}
在 Kafka 事务中使用 read/process/write 模式时,您不应尝试向消费者提交偏移量。正如您所暗示的那样,这可能会导致问题。
在这个用例中,需要将偏移量添加到交易中,您应该只使用 sendOffsetsToTransaction()
来做到这一点。该方法确保仅在完整事务成功时才提交这些偏移量。见 Javadoc:
Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern. Thus, the specified consumerGroupId should be the same as config parameter group.id of the used consumer. Note, that the consumer should have enable.auto.commit=false and should also not commit offsets manually (via sync or async commits).