Flink Kafka:在没有收到消息的时间间隔后,优雅地关闭来自kafka源的flink消费消息

Flink Kafka: Gracefully close flink consuming messages from kafka source after a time interval when no messages are received

我已将 flinkkafkaconsumer 作为源添加到我的流执行环境中。当在特定时间(类似于 kafka 轮询时间)没有收到新消息时,我想 close/stop 停止使用数据。 目前它是 运行 无限期地阻止执行移动到下一步(验证消息)。 请建议是否有任何解决方法。

注意:我尝试使用来自反序列化的 endofstream,但它不起作用,因为流实际上是不确定的。

提前致谢。

如果这是为了测试,那么一种方法是创建您自己的 "wraps" FlinkKafkaConsumer 自定义源。您的来源的 run() 方法将从线程调用 Kafka 来源的 run() 方法,传入一个包装真实收集器的收集器,并在收集到任何内容时更新 "last collected time" 。在源的 run() 方法中,您将对此进行轮询,并在时间过长时调用 Kakfa 源的 cancel() 方法,然后也退出。

话虽如此,通常对于单元测试,您会希望使用模拟源来让您准确控制生成的内容以及生成的时间,而不是启动 Kafka 系统。