Kafka 中的 exactly-once 处理与重播

Exactly-once processing in Kafka with replay

我正在为我的活动使用 Kafka log/processing。我正在寻找(尽可能接近)恰好一次处理,同时在分区(重新)分配期间支持“重播”,通知重播的事件处理程序,以便它可以重建状态

这是我的代码:

private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final BiFunction<String, Boolean, String> eventHandler;
private final long[] startingCommitOffsets;

public void onParitionsAssigned(Collection<TopicPartition> partitions) {
  partitions.forEach(p -> startingCommitOffsets[p.partition()] = consumer.position(p));
  consumer.seekToBeginning(partitions);
}

public void run() {
  while (true) {
    var records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
    var commitRecords = new HashMap<TopicPartition, OffsetAndMetadata>();

    producer.beginTransation();

    records.forEach(r -> {
      var isReplay = r.offset() < startingCommitOffsets[r.partition()];
      var resultEvent = eventHandler.apply(r.value(), isReplay);
      producer.send(new ProducerRecord<>(r.topic(), r.key(), resultEvent));

      if (!isReplay) {
        commitRecords.put(new TopicPartition(r.topic(), r.partition(), new OffsetAndMetadata(r.offset()));
      }
    });

    producer.commitTransaction();

    if (!commitRecords.isEmpty()) {
      consumer.commitSync(commitRecords);
    }
  }
}

我的问题:

  1. 分配分区时,我保存当前位置并查找到开头。这并没有改变承诺的立场,是吗? (文档不清楚)
  2. product.commitTransaction()consumer.commitSync() 是两个独立的操作。如果后者失败,我们将已经提交了一些新事件,这些事件将在下次处理事件时复制 - 有没有办法将这些合并为一个操作?

When the partition is assigned, I save the current position and seek to the beginning. This doesn't changed the committed position does it?

在您明确调用 commitAsync() or commitSync()auto.commit.enable=true

之前,承诺的位置不会改变

producer.commitTransaction() and consumer.commitSync() are two separate operations. If the later fails, we would have already committed some new events which will be duplicated next time the events are processed Is there any way to combine these into one operations?

producer.sendOffsetsToTransaction()

此方法可能是您正在寻找的实现恰好一次处理的方法。

来自documentation

Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset+1.

更重要的是,

Note, that the consumer should have enable.auto.commit=false and should also not commit offsets manually (via sync or async commits).


您可以从 ConsumerRecord 中推断出 TopicPartition 和偏移量,您将得到 poll().

的结果

只需将它们(new TopicPartition(record.topic(), record.partition())new OffsetAndMetadata(record.offset()))存储在地图中并在您想要提交时传递它。

以下代码片段可以让您有所了解 (reference):

KafkaProducer producer = createKafkaProducer(
  “bootstrap.servers”, “localhost:9092”,
  “transactional.id”, “my-transactional-id”);

producer.initTransactions();

KafkaConsumer consumer = createKafkaConsumer(
  “bootstrap.servers”, “localhost:9092”,
  “group.id”, “my-group-id”,
  "isolation.level", "read_committed");

consumer.subscribe(singleton(“inputTopic”));
    while (true) {
      ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
      producer.beginTransaction();
      Map<TopicPartition, OffsetAndMetadata> map = new LinkedHashMap<>();
      for (ConsumerRecord record : records) {
        producer.send(producerRecord(“outputTopic”, record));
        map.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
      }
      producer.sendOffsetsToTransaction(offsetMap, group);  
      producer.commitTransaction();
    }

发送偏移量后,我们提交它们。