Kafka Connect SourceTask 的轮询间隔
Poll Interval for Kafka Connect SourceTask
我正在使用 Kafka-Connect API 实现自定义源连接器,可用于轮询 REST-APIs 并将 JSON 响应下沉到 Kafka主题。
现在我想知道如何实现 SourceTask 的轮询间隔,JDBC 连接器如何提供轮询间隔。我必须在某处将线程设置为睡眠,但我必须在哪里执行此操作?
使用max.poll.interval.ms
.
请参考这个link:https://kafka.apache.org/documentation/
我在 SourceTask
实现中解决了这个用例,方法是添加类型为 long
的私有字段来存储时间戳。在第一次 poll()
调用时,该字段尚未初始化,因此会轮询配置的 REST-API。在第一次调用时,提到的 long
字段会使用当前时间戳进行初始化。在随后的所有 poll()
次调用中,将检查上一次调用的此时间戳。如果自前一个 poll()
以来经过的毫秒数小于两次轮询之间的配置间隔,我将线程发送到睡眠状态,因为配置的毫秒数已经过去。
我正在使用 Kafka-Connect API 实现自定义源连接器,可用于轮询 REST-APIs 并将 JSON 响应下沉到 Kafka主题。
现在我想知道如何实现 SourceTask 的轮询间隔,JDBC 连接器如何提供轮询间隔。我必须在某处将线程设置为睡眠,但我必须在哪里执行此操作?
使用max.poll.interval.ms
.
请参考这个link:https://kafka.apache.org/documentation/
我在 SourceTask
实现中解决了这个用例,方法是添加类型为 long
的私有字段来存储时间戳。在第一次 poll()
调用时,该字段尚未初始化,因此会轮询配置的 REST-API。在第一次调用时,提到的 long
字段会使用当前时间戳进行初始化。在随后的所有 poll()
次调用中,将检查上一次调用的此时间戳。如果自前一个 poll()
以来经过的毫秒数小于两次轮询之间的配置间隔,我将线程发送到睡眠状态,因为配置的毫秒数已经过去。