Kafka Stream中幂等性和exactly-once的区别

Difference between idempotence and exactly-once in Kafka Stream

我正在查看文档,据我所知,我们可以通过启用 idempotence=true

实现恰好一次交易

idempotence: The Idempotent producer enables exactly once for a producer against a single topic. Basically each single message send has stonger guarantees and will not be duplicated in case there's an error

那么,如果我们已经拥有幂等性,那么为什么我们在 Kafka Stream 中还需要另一个 属性 exactly-once?幂等性和恰好一次

有什么区别

为什么 exactly-once 属性 在普通 Kafka Producer 中不可用?

Kafka 流从 端到端 的角度提供了恰好一次的语义(从一个主题消费,处理该消息,然后产生到另一个主题)。但是,您只提到了 生产者的 幂等属性。这只是全部图片的一小部分。

让我改一下问题:

Why do we need the exactly-once delivery semantic at the consumer side while we already have guaranteed the exactly-once delivery semantic at the producer side?

答案: 因为恰好一次交付语义不仅在生产步骤,而且在整个处理流程。要实现语义上的exactly-once delivery,生产和消费必须满足一些条件。

这是一般情况:进程 A 向主题 T 生成消息。同时,进程 B 尝试使用来自主题 T 的消息。我们要确保进程 B 永远不会处理一条消息两次。

生产者部分:我们必须确保生产者永远不会两次生产一条消息。我们可以使用 Kafka Idempotent Producer

消费者部分: 以下是消费者的基本工作流程:

  • 第一步:消费者从Kafka的topic中拉取消息M成功
  • 第 2 步:消费者尝试执行作业,作业 returns 成功。
  • 第 3 步:消费者将消息的偏移量提交给 Kafka 代理。

以上步骤只是一条快乐之路。现实中出现的问题很多。

  • 场景一:步骤二的作业执行成功,但是消费者崩溃了。由于这种意外情况,消费者尚未提交消息的偏移量。当消费者重启时,消息会被消费两次。
  • 场景2:当消费者在第3步提交偏移量时,由于硬件故障(例如:CPU,内存冲突,......)而崩溃,重新启动时,消费者无法知道是否已成功提交偏移量。

因为可能会发生很多问题,作业的执行和提交偏移量必须是原子的,以保证消费者端的exactly-once delivery语义。这并不意味着我们不能,但需要付出很多努力才能确保恰好一次交付语义。 Kafka Stream 支持工程师的工作。

注意到: Kafka Stream 提供 "exactly-once stream processing"。它指的是从一个主题中消费,在 Kafka 主题中物化中间状态并产生一个。如果我们的应用程序依赖于其他一些外部服务(数据库,服务...),我们必须确保我们的外部依赖项在这些情况下可以保证精确一次。

TL,DR:全流exactly-once需要生产者和消费者的配合

参考文献:

在分布式环境中,故障是一种非常常见的情况,随时都可能发生。在Kafka环境下,broker可能会崩溃、网络故障、处理失败、发布消息失败或消费消息失败等。 这些不同的场景引入了不同类型的数据丢失和重复。

故障场景

A(确认失败): 生产者成功发布消息,重试>1,但由于失败而无法收到确认。在这种情况下,生产者将重试可能引入重复的相同消息。

B(Producer process failed in batch messages): Producer 发送了一批消息失败,几乎没有发布成功。在这种情况下,一旦生产者重新启动,它将再次重新发布批次中的所有消息,这将在 Kafka 中引入重复项。

C(即发即弃失败) 生产者发布了重试=0(即发即弃)的消息。如果发布失败,将不会意识到并发送下一条消息,这将导致消息丢失。

D(Consumer failed in batch message) 消费者从 Kafka 收到一批消息并手动提交它们的偏移量 (enable.auto.commit=false)。如果消费者在提交给 Kafka 之前失败,下次消费者将再次消费相同的记录,从而在消费者端复制重复。

Exactly-Once 语义

In this case, even if a producer tries to resend a message, it leads to the message will be published and consumed by consumers exactly once.

为了在Kafka中实现Exactly-Once语义,它使用下面3个属性

  1. enable.idempotence=true (地址 a, b & c)
  2. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5(生产者每次连接总是有一个进行中的请求)
  3. isolation.level=read_committed (地址 d )

启用幂等(enable.idempotence=true)

Idempotent delivery enables the producer to write a message to Kafka exactly once to a particular partition of a topic during the lifetime of a single producer without data loss and order per partition.

"注意开启幂等性要求MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION小于等于5,RETRIES_CONFIG大于0,ACKS_CONFIG为'all'。如果用户没有明确设置这些值,将选择合适的值。如果设置不兼容的值,将抛出 ConfigException"

为了实现幂等性,Kafka 在生成消息时使用一个称为产品 ID 或 PID 的唯一 ID 和序列号。生产者不断递增每条发布的消息的序列号,该消息映射到唯一的 PID。代理总是将当前序列号与前一个序列号进行比较,如果新序列号不比前一个序列号大 +1,则它会拒绝,这避免了重复,同时如果大于则在消息中显示丢失

在失败的情况下,代理会将序列号与前一个进行比较,如果序列号没有增加 +1 将拒绝该消息。

交易(isolation.level)

事务使我们能够以原子方式更新多个主题分区中的数据。交易中包含的所有记录都将被成功保存,或者 none 将被成功保存。它允许您在同一事务中提交您的消费者偏移量以及您已处理的数据,从而允许端到端的恰好一次语义。

生产者不等待向 Kafka 写入消息,而生产者使用 beginTransaction、commitTransaction 和 abortTransaction(以防失败) 消费者使用 isolation.level read_committed 或 read_uncommitted

  • read_committed:消费者将始终只读取已提交的数据。
  • read_uncommitted:按偏移顺序读取所有消息,无需等待 提交交易

如果 isolation.level=read_committed 的消费者收到尚未完成的事务的控制消息,它将不会从该分区发送更多消息,直到生产者提交或中止事务或者发生事务超时。事务超时由生产者使用配置 transaction.timeout.ms(默认 1 分钟)确定。

生产者和消费者中的 Exactly-Once

在我们有独立的生产者和消费者的正常情况下。生产者必须是幂等的,同时管理事务,所以消费者可以使用 isolation.level 到只读 read_committed 使整个过程成为一个原子操作。 这保证了生产者将始终与源系统同步。即使生产者崩溃或事务中止,它始终是一致的,并且将一条消息或一批消息作为一个单元发布一次。

同一消费者将收到一条消息或一批消息作为一个单元。

In Exactly-Once semantic Producer along with Consumer will appear as atomic operation which will operate as one unit. Either publish and get consumed once at all or aborted.

Kafka Stream 中恰好一次

Kafka Stream 使用来自主题 A 的消息,处理消息并将其发布到主题 B,一旦发布使用提交(主要提交 运行 卧底)将所有状态存储数据刷新到磁盘。

Kafka Stream 中的Exactly-once 是一种读取-处理-写入模式,可保证此操作将被视为原子操作。由于 Kafka Stream 同时满足生产者、消费者和事务的需要,Kafka Stream 带有特殊参数 processing.guarantee,可以 exactly_once 或 at_least_once,这让生活变得轻松,无需单独处理所有参数。

Kafka Streams atomically updates consumer offsets, local state stores, state store changelog topics, and production to output topics all together. If anyone of these steps fails, all of the changes are rolled back.

processing.guarantee: exactly_once 自动提供以下参数,您无需显式设置

  1. isolation.level=read_committed
  2. enable.idempotence=真
  3. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5