Flink Kafka:在没有收到消息的时间间隔后,优雅地关闭来自kafka源的flink消费消息
Flink Kafka: Gracefully close flink consuming messages from kafka source after a time interval when no messages are received
我已将 flinkkafkaconsumer 作为源添加到我的流执行环境中。当在特定时间(类似于 kafka 轮询时间)没有收到新消息时,我想 close/stop 停止使用数据。
目前它是 运行 无限期地阻止执行移动到下一步(验证消息)。
请建议是否有任何解决方法。
注意:我尝试使用来自反序列化的 endofstream,但它不起作用,因为流实际上是不确定的。
提前致谢。
如果这是为了测试,那么一种方法是创建您自己的 "wraps" FlinkKafkaConsumer
自定义源。您的来源的 run()
方法将从线程调用 Kafka 来源的 run()
方法,传入一个包装真实收集器的收集器,并在收集到任何内容时更新 "last collected time" 。在源的 run()
方法中,您将对此进行轮询,并在时间过长时调用 Kakfa 源的 cancel()
方法,然后也退出。
话虽如此,通常对于单元测试,您会希望使用模拟源来让您准确控制生成的内容以及生成的时间,而不是启动 Kafka 系统。
我已将 flinkkafkaconsumer 作为源添加到我的流执行环境中。当在特定时间(类似于 kafka 轮询时间)没有收到新消息时,我想 close/stop 停止使用数据。 目前它是 运行 无限期地阻止执行移动到下一步(验证消息)。 请建议是否有任何解决方法。
注意:我尝试使用来自反序列化的 endofstream,但它不起作用,因为流实际上是不确定的。
提前致谢。
如果这是为了测试,那么一种方法是创建您自己的 "wraps" FlinkKafkaConsumer
自定义源。您的来源的 run()
方法将从线程调用 Kafka 来源的 run()
方法,传入一个包装真实收集器的收集器,并在收集到任何内容时更新 "last collected time" 。在源的 run()
方法中,您将对此进行轮询,并在时间过长时调用 Kakfa 源的 cancel()
方法,然后也退出。
话虽如此,通常对于单元测试,您会希望使用模拟源来让您准确控制生成的内容以及生成的时间,而不是启动 Kafka 系统。