KStreams 正在尝试删除重新分区主题
KStreams is trying to delete a repartition topic
我们构建了一个从主题读取并在不同字段上执行 groupBy
的管道。
input
.groupBy(
(key, value) -> value.getFieldA(),
Grouped.with("TopicName", Serdes.String(), Serdes.Integer()))
.windowedBy(SessionWindows.with(ofMinutes(5)).grace(Duration.ZERO))
此步骤创建一个中间 app-TopicName-repartition
主题。但是,KStream 不断向 Kafka 发送 Delete
请求。我们可以在 Kafka 端看到日志:
INFO [DENY] Auth request Delete on Topic:app-TopicName-repartition by User test_user (cached) (io.aiven.kafka.auth.AivenAclAuthorizer)
我们的代码中没有 streams.cleanUp()
或通过管理员手动删除的过程 API。删除请求仅适用于重新分区主题,不适用于其他中间主题。该应用程序运行良好。它只是不断发送后台删除请求,因为我已将 retries
设置为 Integer.MAX_VALUE
。我未能调试该问题。为什么 KStream 发出重新分区主题的删除请求?
[更新]
据我对 KStreams 源代码的追踪,它在 TaskManager
中调用了 KafkaAdminClient.deleteRecords()
。这是我在日志文件中看到 Delete
的原因吗? KStreams 源代码中没有其他明确删除主题的调用。
没错。 Kafka Streams 从不尝试删除主题。但是,您需要允许它从重新分区主题中清除数据。请注意,重新分区主题默认配置有无限保留时间,如果 Kafka Streams 无法清除主题,它将无限增长。
有关您需要哪些 ACL 的更多详细信息,请查看文档:https://docs.confluent.io/current/streams/developer-guide/security.html#required-acl-setting-for-secure-ak-clusters
我们构建了一个从主题读取并在不同字段上执行 groupBy
的管道。
input
.groupBy(
(key, value) -> value.getFieldA(),
Grouped.with("TopicName", Serdes.String(), Serdes.Integer()))
.windowedBy(SessionWindows.with(ofMinutes(5)).grace(Duration.ZERO))
此步骤创建一个中间 app-TopicName-repartition
主题。但是,KStream 不断向 Kafka 发送 Delete
请求。我们可以在 Kafka 端看到日志:
INFO [DENY] Auth request Delete on Topic:app-TopicName-repartition by User test_user (cached) (io.aiven.kafka.auth.AivenAclAuthorizer)
我们的代码中没有 streams.cleanUp()
或通过管理员手动删除的过程 API。删除请求仅适用于重新分区主题,不适用于其他中间主题。该应用程序运行良好。它只是不断发送后台删除请求,因为我已将 retries
设置为 Integer.MAX_VALUE
。我未能调试该问题。为什么 KStream 发出重新分区主题的删除请求?
[更新]
据我对 KStreams 源代码的追踪,它在 TaskManager
中调用了 KafkaAdminClient.deleteRecords()
。这是我在日志文件中看到 Delete
的原因吗? KStreams 源代码中没有其他明确删除主题的调用。
没错。 Kafka Streams 从不尝试删除主题。但是,您需要允许它从重新分区主题中清除数据。请注意,重新分区主题默认配置有无限保留时间,如果 Kafka Streams 无法清除主题,它将无限增长。
有关您需要哪些 ACL 的更多详细信息,请查看文档:https://docs.confluent.io/current/streams/developer-guide/security.html#required-acl-setting-for-secure-ak-clusters