停止 Kafka Streams 应用程序
Stop a Kafka Streams app
是否可以有一个 Kafka Streams 应用程序运行一个主题中的所有数据然后退出?
示例我正在根据日期将数据生成到主题中。消费者被 cron 启动,遍历所有可用数据,然后.. 做什么?我不想让它坐下来等待更多数据。假设一切都在那里,然后优雅地退出。
可能吗?
您可以创建一个 consumer
,然后一旦它停止提取数据,您就可以调用 consumer.close()
。或者,如果您以后想再次投票,只需调用 consumer.pause()
并稍后调用 .resume
。
执行此操作的一种方法是在消费者轮询块中。比如
data = consumer.poll()
if (!data.next()) {
consumer.close()
}
记住poll
returnsConsumerRecord<K,V>
并符合Iterable
接口。
在 Kafka Streams(至于其他流处理解决方案)中,没有 "end of data" 因为它首先是流处理,而不是批处理。
尽管如此,您可以观察 Kafka Streams 应用程序的 "lag" 并在没有滞后时将其关闭(滞后,是尚未消费的消息数)。
例如,您可以使用bin/kafka-consumer-groups.sh
来检查您的Streams 应用程序的延迟(应用程序ID 用作消费者组ID)。如果您想将此嵌入到您的 Streams 应用程序中,您可以使用 kafka.admin.AdminClient
获取消费者组信息。
是否可以有一个 Kafka Streams 应用程序运行一个主题中的所有数据然后退出?
示例我正在根据日期将数据生成到主题中。消费者被 cron 启动,遍历所有可用数据,然后.. 做什么?我不想让它坐下来等待更多数据。假设一切都在那里,然后优雅地退出。
可能吗?
您可以创建一个 consumer
,然后一旦它停止提取数据,您就可以调用 consumer.close()
。或者,如果您以后想再次投票,只需调用 consumer.pause()
并稍后调用 .resume
。
执行此操作的一种方法是在消费者轮询块中。比如
data = consumer.poll()
if (!data.next()) {
consumer.close()
}
记住poll
returnsConsumerRecord<K,V>
并符合Iterable
接口。
在 Kafka Streams(至于其他流处理解决方案)中,没有 "end of data" 因为它首先是流处理,而不是批处理。
尽管如此,您可以观察 Kafka Streams 应用程序的 "lag" 并在没有滞后时将其关闭(滞后,是尚未消费的消息数)。
例如,您可以使用bin/kafka-consumer-groups.sh
来检查您的Streams 应用程序的延迟(应用程序ID 用作消费者组ID)。如果您想将此嵌入到您的 Streams 应用程序中,您可以使用 kafka.admin.AdminClient
获取消费者组信息。