JDBC 的 Kafka 连接源 - 毒丸、错误消息、错误处理问题
Kafka connect source for JDBC - poison pill, bad message, error handling issues
- 我在 postgres 中存储了一些数据 table。
- 每一行都有使用 Avro 序列化的数据。
- 我创建了一个 Kafka 连接源连接器,它读取行并将它们发送到具有相同 Avro 架构的主题。该主题
confluent_value_schema_validation
为真。
- 这很好用,但是如果我在我的 postgres table 中插入一条毒丸消息,那么连接器就会拒绝继续前进。一种解决方案是手动干预以向前移动偏移量,以便跳过错误消息。另一个解决方案是将
confluent_value_schema_validation
设置为 false,这样任何消息都可以添加到我的主题中。
- 我尝试了以下配置,但没有任何反应。
"errors.tolerance": "all",
"errors.retry.timeout": 500,
"errors.retry.delay.max.ms": 100,
- 我的问题 - 如何让 Kafka Connect JDBC 源连接器忽略错误并继续处理好消息?
How can I make the Kafka Connect JDBC source connector ignore errors and move forward with the good messages?
你不能,目前(2021 年 1 月)。
Kafka Connect 中的错误处理将在转换和 serde 级别处理错误,但连接器本身从数据库读取记录的错误将不会在相同的机制下处理。
除了手动将源连接器移过问题记录之外,您还可以考虑一种方法,将来自 table 的原始提要(没有架构验证)提取到一个主题中,然后编写一个 Kafka Streams 应用程序从这个原始主题读取数据并将其写入目标主题,在途中应用架构验证并在此时处理错误消息。
- 我在 postgres 中存储了一些数据 table。
- 每一行都有使用 Avro 序列化的数据。
- 我创建了一个 Kafka 连接源连接器,它读取行并将它们发送到具有相同 Avro 架构的主题。该主题
confluent_value_schema_validation
为真。 - 这很好用,但是如果我在我的 postgres table 中插入一条毒丸消息,那么连接器就会拒绝继续前进。一种解决方案是手动干预以向前移动偏移量,以便跳过错误消息。另一个解决方案是将
confluent_value_schema_validation
设置为 false,这样任何消息都可以添加到我的主题中。 - 我尝试了以下配置,但没有任何反应。
"errors.tolerance": "all",
"errors.retry.timeout": 500,
"errors.retry.delay.max.ms": 100,
- 我的问题 - 如何让 Kafka Connect JDBC 源连接器忽略错误并继续处理好消息?
How can I make the Kafka Connect JDBC source connector ignore errors and move forward with the good messages?
你不能,目前(2021 年 1 月)。
Kafka Connect 中的错误处理将在转换和 serde 级别处理错误,但连接器本身从数据库读取记录的错误将不会在相同的机制下处理。
除了手动将源连接器移过问题记录之外,您还可以考虑一种方法,将来自 table 的原始提要(没有架构验证)提取到一个主题中,然后编写一个 Kafka Streams 应用程序从这个原始主题读取数据并将其写入目标主题,在途中应用架构验证并在此时处理错误消息。