当记录从多个源插入源主题时,Kafka 源连接器未按预期提取记录

Kafka source connector is not pulling the record as expected when records are inserted in source topic from multiple sources

在我的一个用例中,我正在尝试创建一个管道

每当我从自定义分区发送消息时,我都会使用 LONG 数据类型发送以毫秒为单位的时间戳,因为在模式中,时间戳列已定义为 long。

我之前在自定义分区中的代码:

Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);

发送记录前显示结果:

date = Tue Mar 26 22:02:04 EDT 2019 , time in millis = 1553652124063

在表 2 的时间戳列中插入的值:

3/27/2019 2:02:04.063000 AM

由于它采用英国时区(我相信),我临时修复了从当前时间戳中减去 4 小时的时间,以便我可以与美国 EST 时间戳相匹配。

Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);

显示结果:

date = Tue Mar 26 22:04:43 EDT 2019 , time in millis = 1553637883826

在表 2 的时间戳列中插入的值:

3/26/2019 10:04:43.826000 PM

如果我遗漏了什么请告诉我,因为我不确定为什么当我从自定义分区发送消息时会发生这种情况。

引擎盖下Jdbc源连接器使用以下查询:

SELECT * FROM someTable
WHERE
someTimestampColumn < $endTimetampValue
AND (
    (someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
    OR someTimestampColumn > $beginTimetampValue)
ORDER BY someTimestampColumn, someIncrementalColumn ASC

汇总:如果时间戳列的值是较早当前时间戳并且较晚[,则查询检索行=40=] 比上次检查。

以上参数为:

  1. beginTimetampValue - 最后导入记录的时间戳列值
  2. endTimetampValue - 根据数据库的当前时间戳
  3. lastIncrementedValue - 最后导入记录的增量列值

我认为在你的情况下 Producer 时间戳 的表记录比你稍后插入 手动 (使用查询)。

当Jdbc连接器检查要导入到Kafka的新记录时,它会跳过它们(因为它们不满足someTimestampColumn < $endTimetampValue 时间戳条件

您也可以将日志级别更改为 DEBUG 并查看日志中发生了什么