avro 模式中的时间戳在 Kafka Connect 中产生不兼容的值验证 JDBC
Timestamp in avro schema produces incompatible value validation in Kafka Connect JDBC
JDBC 接收器连接器产生的错误:
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.util.Date for field: "some_timestamp_field"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:151)
at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:107)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
源 JDBC 连接器 (MySQL) 注册的 avro 模式:
{
"type":"record",
"name":"ConnectDefault",
"namespace":"io.confluent.connect.avro",
"fields":[
...
{
"name":"some_timestamp_field",
"type":{
"type":"long",
"connect.version":1,
"connect.name":"org.apache.kafka.connect.data.Timestamp",
"logicalType":"timestamp-millis"
}
},
...
]
}
因此,在 avro 架构中,时间戳字段注册为具有正确(时间戳)逻辑类型的 INT64。但是 connect 将模式类型读取为 INT64
并将其与值类型 java.util.Date
.
进行比较
这是一个错误,还是有解决办法?可能是我遗漏了一些东西,因为这看起来像一个标准的连接模型。
提前致谢。
更新
接收器连接器配置:
{
"name": "sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "topic",
"connection.url": "jdbc:postgresql://host:port/db",
"connection.user": "user",
"connection.password": "password",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://host:port",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://host:port",
"auto.create": "true",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "id"
}
}
Kafka 中的反序列化数据:
{
"id":678148,
"some_timestamp_field":1543806057000,
...
}
我们已经解决了这个问题work around
。我们的目标是将 id 从 BIGINT 转换为 STRING(TEXT/VARCHAR) 并将记录保存在下游数据库中。
但是由于一个问题(可能是 https://issues.apache.org/jira/browse/KAFKA-5891),强制转换 id 字段无效。 Kafka 也试图验证转换链中的时间戳字段,但读取模式 type/name 错误并导致类型不匹配(请参阅上面的记录正文和错误日志)。
所以我们做了如下解决:
extract only the id field as key
-> execute cast transform on the key
-> it works as key does not contain timestamp field
.
这里是变通的配置:
{
"name": "sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "topic",
"connection.url": "jdbc:postgresql://host:port/db",
"connection.user": "user",
"connection.password": "password",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://host:port",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://host:port",
"transforms": "createKey,castKeyToString",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "id",
"transforms.castKeyToString.type": "org.apache.kafka.connect.transforms.Cast$Key",
"transforms.castKeyToString.spec": "id:string",
"auto.create": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id"
}
}
免责声明:这不是一个合适的解决方案,只是一种解决方法。铸造变换中的错误应该被修复。在我看来,转换转换应该只关注指定用于转换的字段,而不是消息中的其他字段。
祝你有美好的一天。
JDBC 接收器连接器产生的错误:
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.util.Date for field: "some_timestamp_field"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:151)
at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:107)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
源 JDBC 连接器 (MySQL) 注册的 avro 模式:
{
"type":"record",
"name":"ConnectDefault",
"namespace":"io.confluent.connect.avro",
"fields":[
...
{
"name":"some_timestamp_field",
"type":{
"type":"long",
"connect.version":1,
"connect.name":"org.apache.kafka.connect.data.Timestamp",
"logicalType":"timestamp-millis"
}
},
...
]
}
因此,在 avro 架构中,时间戳字段注册为具有正确(时间戳)逻辑类型的 INT64。但是 connect 将模式类型读取为 INT64
并将其与值类型 java.util.Date
.
这是一个错误,还是有解决办法?可能是我遗漏了一些东西,因为这看起来像一个标准的连接模型。
提前致谢。
更新
接收器连接器配置:
{
"name": "sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "topic",
"connection.url": "jdbc:postgresql://host:port/db",
"connection.user": "user",
"connection.password": "password",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://host:port",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://host:port",
"auto.create": "true",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "id"
}
}
Kafka 中的反序列化数据:
{
"id":678148,
"some_timestamp_field":1543806057000,
...
}
我们已经解决了这个问题work around
。我们的目标是将 id 从 BIGINT 转换为 STRING(TEXT/VARCHAR) 并将记录保存在下游数据库中。
但是由于一个问题(可能是 https://issues.apache.org/jira/browse/KAFKA-5891),强制转换 id 字段无效。 Kafka 也试图验证转换链中的时间戳字段,但读取模式 type/name 错误并导致类型不匹配(请参阅上面的记录正文和错误日志)。
所以我们做了如下解决:
extract only the id field as key
-> execute cast transform on the key
-> it works as key does not contain timestamp field
.
这里是变通的配置:
{
"name": "sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "topic",
"connection.url": "jdbc:postgresql://host:port/db",
"connection.user": "user",
"connection.password": "password",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://host:port",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://host:port",
"transforms": "createKey,castKeyToString",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "id",
"transforms.castKeyToString.type": "org.apache.kafka.connect.transforms.Cast$Key",
"transforms.castKeyToString.spec": "id:string",
"auto.create": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id"
}
}
免责声明:这不是一个合适的解决方案,只是一种解决方法。铸造变换中的错误应该被修复。在我看来,转换转换应该只关注指定用于转换的字段,而不是消息中的其他字段。
祝你有美好的一天。