Kafka 中的 exactly-once 处理与重播
Exactly-once processing in Kafka with replay
我正在为我的活动使用 Kafka log/processing。我正在寻找(尽可能接近)恰好一次处理,同时在分区(重新)分配期间支持“重播”,通知重播的事件处理程序,以便它可以重建状态
这是我的代码:
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final BiFunction<String, Boolean, String> eventHandler;
private final long[] startingCommitOffsets;
public void onParitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(p -> startingCommitOffsets[p.partition()] = consumer.position(p));
consumer.seekToBeginning(partitions);
}
public void run() {
while (true) {
var records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
var commitRecords = new HashMap<TopicPartition, OffsetAndMetadata>();
producer.beginTransation();
records.forEach(r -> {
var isReplay = r.offset() < startingCommitOffsets[r.partition()];
var resultEvent = eventHandler.apply(r.value(), isReplay);
producer.send(new ProducerRecord<>(r.topic(), r.key(), resultEvent));
if (!isReplay) {
commitRecords.put(new TopicPartition(r.topic(), r.partition(), new OffsetAndMetadata(r.offset()));
}
});
producer.commitTransaction();
if (!commitRecords.isEmpty()) {
consumer.commitSync(commitRecords);
}
}
}
我的问题:
- 分配分区时,我保存当前位置并查找到开头。这并没有改变承诺的立场,是吗? (文档不清楚)
product.commitTransaction()
和 consumer.commitSync()
是两个独立的操作。如果后者失败,我们将已经提交了一些新事件,这些事件将在下次处理事件时复制 - 有没有办法将这些合并为一个操作?
When the partition is assigned, I save the current position and seek
to the beginning. This doesn't changed the committed position does it?
在您明确调用 commitAsync() or commitSync()
或 auto.commit.enable=true
之前,承诺的位置不会改变
producer.commitTransaction()
and consumer.commitSync()
are two separate
operations. If the later fails, we would have already committed some
new events which will be duplicated next time the events are processed
Is there any way to combine these into one operations?
producer.sendOffsetsToTransaction()
此方法可能是您正在寻找的实现恰好一次处理的方法。
Sends a list of specified offsets to the consumer group coordinator,
and also marks those offsets as part of the current transaction. These
offsets will be considered committed only if the transaction is
committed successfully. The committed offset should be the next
message your application will consume, i.e. lastProcessedMessageOffset+1
.
更重要的是,
Note, that the consumer should have enable.auto.commit=false
and
should also not commit offsets manually (via sync or async commits).
您可以从 ConsumerRecord
中推断出 TopicPartition
和偏移量,您将得到 poll()
.
的结果
只需将它们(new TopicPartition(record.topic(), record.partition())
和 new OffsetAndMetadata(record.offset())
)存储在地图中并在您想要提交时传递它。
以下代码片段可以让您有所了解 (reference):
KafkaProducer producer = createKafkaProducer(
“bootstrap.servers”, “localhost:9092”,
“transactional.id”, “my-transactional-id”);
producer.initTransactions();
KafkaConsumer consumer = createKafkaConsumer(
“bootstrap.servers”, “localhost:9092”,
“group.id”, “my-group-id”,
"isolation.level", "read_committed");
consumer.subscribe(singleton(“inputTopic”));
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> map = new LinkedHashMap<>();
for (ConsumerRecord record : records) {
producer.send(producerRecord(“outputTopic”, record));
map.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
}
producer.sendOffsetsToTransaction(offsetMap, group);
producer.commitTransaction();
}
发送偏移量后,我们提交它们。
我正在为我的活动使用 Kafka log/processing。我正在寻找(尽可能接近)恰好一次处理,同时在分区(重新)分配期间支持“重播”,通知重播的事件处理程序,以便它可以重建状态
这是我的代码:
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final BiFunction<String, Boolean, String> eventHandler;
private final long[] startingCommitOffsets;
public void onParitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(p -> startingCommitOffsets[p.partition()] = consumer.position(p));
consumer.seekToBeginning(partitions);
}
public void run() {
while (true) {
var records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
var commitRecords = new HashMap<TopicPartition, OffsetAndMetadata>();
producer.beginTransation();
records.forEach(r -> {
var isReplay = r.offset() < startingCommitOffsets[r.partition()];
var resultEvent = eventHandler.apply(r.value(), isReplay);
producer.send(new ProducerRecord<>(r.topic(), r.key(), resultEvent));
if (!isReplay) {
commitRecords.put(new TopicPartition(r.topic(), r.partition(), new OffsetAndMetadata(r.offset()));
}
});
producer.commitTransaction();
if (!commitRecords.isEmpty()) {
consumer.commitSync(commitRecords);
}
}
}
我的问题:
- 分配分区时,我保存当前位置并查找到开头。这并没有改变承诺的立场,是吗? (文档不清楚)
product.commitTransaction()
和consumer.commitSync()
是两个独立的操作。如果后者失败,我们将已经提交了一些新事件,这些事件将在下次处理事件时复制 - 有没有办法将这些合并为一个操作?
When the partition is assigned, I save the current position and seek to the beginning. This doesn't changed the committed position does it?
在您明确调用 commitAsync() or commitSync()
或 auto.commit.enable=true
producer.commitTransaction()
andconsumer.commitSync()
are two separate operations. If the later fails, we would have already committed some new events which will be duplicated next time the events are processed Is there any way to combine these into one operations?
producer.sendOffsetsToTransaction()
此方法可能是您正在寻找的实现恰好一次处理的方法。
Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. The committed offset should be the next message your application will consume, i.e.
lastProcessedMessageOffset+1
.
更重要的是,
Note, that the consumer should have
enable.auto.commit=false
and should also not commit offsets manually (via sync or async commits).
您可以从 ConsumerRecord
中推断出 TopicPartition
和偏移量,您将得到 poll()
.
只需将它们(new TopicPartition(record.topic(), record.partition())
和 new OffsetAndMetadata(record.offset())
)存储在地图中并在您想要提交时传递它。
以下代码片段可以让您有所了解 (reference):
KafkaProducer producer = createKafkaProducer(
“bootstrap.servers”, “localhost:9092”,
“transactional.id”, “my-transactional-id”);
producer.initTransactions();
KafkaConsumer consumer = createKafkaConsumer(
“bootstrap.servers”, “localhost:9092”,
“group.id”, “my-group-id”,
"isolation.level", "read_committed");
consumer.subscribe(singleton(“inputTopic”));
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> map = new LinkedHashMap<>();
for (ConsumerRecord record : records) {
producer.send(producerRecord(“outputTopic”, record));
map.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
}
producer.sendOffsetsToTransaction(offsetMap, group);
producer.commitTransaction();
}
发送偏移量后,我们提交它们。