Kafka Connect SourceTask 的轮询间隔

Poll Interval for Kafka Connect SourceTask

我正在使用 Kafka-Connect API 实现自定义源连接器,可用于轮询 REST-APIs 并将 JSON 响应下沉到 Kafka主题。

现在我想知道如何实现 SourceTask 的轮询间隔,JDBC 连接器如何提供轮询间隔。我必须在某处将线程设置为睡眠,但我必须在哪里执行此操作?

使用max.poll.interval.ms.

请参考这个link:https://kafka.apache.org/documentation/

我在 SourceTask 实现中解决了这个用例,方法是添加类型为 long 的私有字段来存储时间戳。在第一次 poll() 调用时,该字段尚未初始化,因此会轮询配置的 REST-API。在第一次调用时,提到的 long 字段会使用当前时间戳进行初始化。在随后的所有 poll() 次调用中,将检查上一次调用的此时间戳。如果自前一个 poll() 以来经过的毫秒数小于两次轮询之间的配置间隔,我将线程发送到睡眠状态,因为配置的毫秒数已经过去。