如何使用 Exactly-once 模式实现 Processor API
How to implement Processor API with Exactly-once mode
我正在研究 Kafka Stream 并使用 Processor API 来实现我的用例。下面的代码显示了 Process 方法,它向下游转发消息并在调用 commit
之前中止。这会导致流被重新处理并在 Sink 上复制消息。
public void process(String key, String value) {
context.forward(key, value);
..
..
//killed
context.commit();
}
processing.guarantee参数:
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
有没有办法仅在调用 commit
语句时应用转发。如果不是,实施 Exactly-once 模式的正确方法是什么。
谢谢
您可能希望将 context.commit() 包裹在 finally 块下以确保它被调用。但是,你也需要保证这个确实是处理成功后调用的。
确保您的接收器处于 read_committed
消费者模式,以便它只会看到已提交的消息。如果在事务中止之前将消息写入输出主题,则在中止时,消息仍然存在,只是未标记为已提交。第二次完成事务,因此消息和提交标记被添加到输出主题。如果您在未处于 read_committed
模式下阅读,那么您将看到所有消息(包括未提交的消息)并且它可能会显示为重复项,因为您会看到中止的结果和已提交的结果。
来自此处的 0.11 javadoc https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
Transactions were introduced in Kafka 0.11.0 wherein applications can
write to multiple topics and partitions atomically. In order for this
to work, consumers reading from these partitions should be configured
to only read committed data. This can be achieved by by setting the
isolation.level=read_committed in the consumer's configuration.
In read_committed mode, the consumer will read only those
transactional messages which have been successfully committed. It will
continue to read non-transactional messages as before. There is no
client-side buffering in read_committed mode. Instead, the end offset
of a partition for a read_committed consumer would be the offset of
the first message in the partition belonging to an open transaction.
This offset is known as the 'Last Stable Offset'(LSO).
A read_committed consumer will only read up till the LSO and filter out
any transactional messages which have been aborted. The LSO also
affects the behavior of seekToEnd(Collection) and
endOffsets(Collection) for read_committed consumers, details of which
are in each method's documentation. Finally, the fetch lag metrics are
also adjusted to be relative to the LSO for read_committed consumers.
Partitions with transactional messages will include commit or abort
markers which indicate the result of a transaction. There markers are
not returned to applications, yet have an offset in the log. As a
result, applications reading from topics with transactional messages
will see gaps in the consumed offsets. These missing messages would be
the transaction markers, and they are filtered out for consumers in
both isolation levels. Additionally, applications using read_committed
consumers may also see gaps due to aborted transactions, since those
messages would not be returned by the consumer and yet would have
valid offsets.
我正在研究 Kafka Stream 并使用 Processor API 来实现我的用例。下面的代码显示了 Process 方法,它向下游转发消息并在调用 commit
之前中止。这会导致流被重新处理并在 Sink 上复制消息。
public void process(String key, String value) {
context.forward(key, value);
..
..
//killed
context.commit();
}
processing.guarantee参数:
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
有没有办法仅在调用 commit
语句时应用转发。如果不是,实施 Exactly-once 模式的正确方法是什么。
谢谢
您可能希望将 context.commit() 包裹在 finally 块下以确保它被调用。但是,你也需要保证这个确实是处理成功后调用的。
确保您的接收器处于 read_committed
消费者模式,以便它只会看到已提交的消息。如果在事务中止之前将消息写入输出主题,则在中止时,消息仍然存在,只是未标记为已提交。第二次完成事务,因此消息和提交标记被添加到输出主题。如果您在未处于 read_committed
模式下阅读,那么您将看到所有消息(包括未提交的消息)并且它可能会显示为重复项,因为您会看到中止的结果和已提交的结果。
来自此处的 0.11 javadoc https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically. In order for this to work, consumers reading from these partitions should be configured to only read committed data. This can be achieved by by setting the isolation.level=read_committed in the consumer's configuration.
In read_committed mode, the consumer will read only those transactional messages which have been successfully committed. It will continue to read non-transactional messages as before. There is no client-side buffering in read_committed mode. Instead, the end offset of a partition for a read_committed consumer would be the offset of the first message in the partition belonging to an open transaction. This offset is known as the 'Last Stable Offset'(LSO).
A read_committed consumer will only read up till the LSO and filter out any transactional messages which have been aborted. The LSO also affects the behavior of seekToEnd(Collection) and endOffsets(Collection) for read_committed consumers, details of which are in each method's documentation. Finally, the fetch lag metrics are also adjusted to be relative to the LSO for read_committed consumers. Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction. There markers are not returned to applications, yet have an offset in the log. As a result, applications reading from topics with transactional messages will see gaps in the consumed offsets. These missing messages would be the transaction markers, and they are filtered out for consumers in both isolation levels. Additionally, applications using read_committed consumers may also see gaps due to aborted transactions, since those messages would not be returned by the consumer and yet would have valid offsets.