Spring 用于 Apache Kafka:KafkaTemplate 行为与异步请求、批处理和 Max In-flight 为 1

Spring for Apache Kafka: KafkaTemplate Behavior with Async Requests, Batching, and Max In-flight of 1

Scenario/Use 案例: 我有一个 Spring 启动应用程序,使用 Spring 让 Kafka 向 Kafka 主题发送消息。完成特定事件(由 http 请求触发)后,将创建一个新线程(通过 Spring @Async),该线程调用 kafkatemplate.send() 并为 ListenableFuture 回调 return秒。处理 http 请求的原始线程 return 是对调用客户端的响应并被释放。

正常行为: 在正常的应用程序负载下,我已经验证了各个消息都已按需要发布到主题(回调成功或失败时的应用程序日志条目以及在 kafka 集群上查看主题中的消息)。如果我关闭所有 kafka 代理 3-5 分钟,然后将集群重新联机,应用程序的发布者立即重新建立与 kafka 的连接并继续发布消息。

问题行为: 但是,在执行负载测试时,如果我关闭所有 kafka 代理 3-5 分钟,然后将集群重新联机,Spring 应用程序的发布者继续显示所有发布尝试失败。这将持续大约 7 个小时,此时发布者能够再次成功地重新建立与 kafka 的通信(通常这之前会发生管道异常,但并非总是如此)。

当前调查结果: 在执行负载测试时,大约。 10 分钟后,我使用 JConsole 连接到应用程序并监视通过 kafka.producer 公开的 producer metrics。在第一个大约。重载30秒,buffer-available-bytes持续减少,直到达到0,并保持在0。waiting-threads保持在6和10(每次我点击刷新时交替)和缓冲区可用字节数保持在 0 大约。 6.5小时。在那之后,缓冲区可用字节显示所有最初分配的内存都已恢复,但 kafka 发布尝试继续失败大约。又过了 30 分钟,最终恢复了所需的行为。

当前生产者配置

request.timeout.ms=3000
max.retry.count=2
max.inflight.requests=1
max.block.ms=10000
retry.backoff.ms=3000

所有其他属性都在使用 their default values

问题:

  1. 鉴于我的用例会改变 batch.sizelinger.ms 在消除方面有任何积极影响重负载时遇到的问题?
  2. 鉴于我有单独的线程,所有线程都使用单独的消息和回调调用 kafkatemplate.send(),并且我将max.in.flight.requests.per.connection 设置为 1,batch.size 和 linger.ms 除了限制每条消息的大小之外是否被忽略?我的理解是,在这种情况下实际上没有发生批处理,并且每条消息都作为单独的请求发送。
  3. 鉴于我已将 max.block.ms 设置为 10 秒,为什么缓冲区内存保持使用了这么长时间,为什么所有消息在这么多小时内仍然无法发布。我的理解是,在 10 秒后,每个新的发布尝试都应该失败,并且 return 失败回调反过来释放关联的线程

更新: 尝试澄清线程的使用。我正在使用 JavaDocs 中推荐的单一生产者实例。有诸如 https-jsse-nio-22443-exec-* 之类的线程正在处理传入的 https 请求。当请求进入某个处理时,一旦所有非 kafka 相关逻辑完成,就会调用另一个用 @Async 修饰的 class 中的方法。此方法调用 kafkatemplate.send()。在执行发布到 kafka 之前,返回给客户端的响应显示在日志中(这就是我如何验证它是通过单独的线程执行的,因为服务不会在 return 响应之前等待发布)。 有似乎正在处理来自 kafkatemplate.send() 的回调的 task-scheduler-* 线程。我的猜测是单个 kafka-producer-network-thread 处理所有发布。

我的应用程序正在发出一个 http 请求,并在每次 kafka 发布失败时将每条消息发送到数据库平台上的死信 table。为执行向 kafka 的发布而启动的相同线程被重新用于对数据库的调用。我将数据库调用逻辑移到了另一个 class 中,并用它自己的 @Async 和自定义 TaskExecutor 对其进行了装饰。这样做之后,我监视了 JConsole 并且可以看到对 Kafka 的调用似乎正在重新使用相同的 10 个线程 (TaskExecutor: core Pool size - 10, QueueCapacity - 0, and MaxPoolSize - 80) 并且对数据库服务的调用现在正在使用一个单独的线程池 (TaskExecutor: core Pool size - 10, QueueCapacity - 0, and MaxPoolSize - 80) 不断关闭和打开新线程,但保持相对恒定的线程数。通过这种新行为,缓冲区可用字节数保持在健康的恒定水平,并且应用程序的 kafka 发布者在代理重新联机后成功重新建立连接。