Kafka Connect - Caused by: org.apache.kafka.connect.errors.ConnectException: table 的 PK 模式是 RECORD_KEY,但缺少记录密钥架构
Kafka Connect - Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table is RECORD_KEY, but record key schema is missing
我有 jdbc-sink 用于将数据从 Kafka 传输到 Oracle 数据库。
我的连接出现这个错误。
Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table 'orders' is RECORD_KEY, but record key schema is missing
我的接收器属性:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "ersin",
"connection.password": "ersin!",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "MESSAGE_KEY",
"insert.mode": "update ",
"plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
我的 connect-avro-distributed.properties :
bootstrap.servers=10.0.0.0:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
我这样发送数据:
./bin/kafka-avro-console-producer \
--broker-list 10.0.0.0:9092 --topic orders \
--property parse.key="true" \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator="$" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "int"}]}' \
--property schema.registry.url=http://10.0.0.0:8081
我该如何解决这个问题?
提前致谢
问题似乎出在您的负载和配置 "pk.mode": "record_key"
上。
pk.mode
用于定义主键模式,您有以下配置选项:
none
:未使用密钥
kafka
:使用Kafka坐标作为PK
record_key
:使用了记录键中的字段,可以是原语或结构。
record_value
: 使用了记录值中的字段,它必须是一个结构。
在您的配置中,您使用的是 record_key
,这意味着 Kafka Connect 将从消息的键中获取字段并将其用作目标 Oracle table 中的主键。
虽然您还没有分享您的 Kafka Connect worker 的配置,但我猜您在那里缺少一些配置参数。
根据 documentation、
The sink connector requires knowledge of schemas, so you should use a
suitable converter e.g. the Avro converter that comes with the schema
registry, or the JSON converter with schemas enabled. Kafka record
keys if present can be primitive types or a Connect struct, and the
record value must be a Connect struct. Fields being selected from
Connect structs must be of primitive types. If the data in the topic
is not of a compatible format, implementing a custom Converter
may
be necessary.
现在您的问题似乎是 "pk.fields"
当前设置为 "pk.fields": "MESSAGE_KEY"
。在您的架构中,消息键定义为 id
。因此,以下应该可以解决问题:
"pk.fields": "id"
我有 jdbc-sink 用于将数据从 Kafka 传输到 Oracle 数据库。
我的连接出现这个错误。
Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table 'orders' is RECORD_KEY, but record key schema is missing
我的接收器属性:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "ersin",
"connection.password": "ersin!",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "MESSAGE_KEY",
"insert.mode": "update ",
"plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
我的 connect-avro-distributed.properties :
bootstrap.servers=10.0.0.0:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
我这样发送数据:
./bin/kafka-avro-console-producer \
--broker-list 10.0.0.0:9092 --topic orders \
--property parse.key="true" \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator="$" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "int"}]}' \
--property schema.registry.url=http://10.0.0.0:8081
我该如何解决这个问题?
提前致谢
问题似乎出在您的负载和配置 "pk.mode": "record_key"
上。
pk.mode
用于定义主键模式,您有以下配置选项:
none
:未使用密钥kafka
:使用Kafka坐标作为PKrecord_key
:使用了记录键中的字段,可以是原语或结构。record_value
: 使用了记录值中的字段,它必须是一个结构。
在您的配置中,您使用的是 record_key
,这意味着 Kafka Connect 将从消息的键中获取字段并将其用作目标 Oracle table 中的主键。
虽然您还没有分享您的 Kafka Connect worker 的配置,但我猜您在那里缺少一些配置参数。
根据 documentation、
The sink connector requires knowledge of schemas, so you should use a suitable converter e.g. the Avro converter that comes with the schema registry, or the JSON converter with schemas enabled. Kafka record keys if present can be primitive types or a Connect struct, and the record value must be a Connect struct. Fields being selected from Connect structs must be of primitive types. If the data in the topic is not of a compatible format, implementing a custom
Converter
may be necessary.
现在您的问题似乎是 "pk.fields"
当前设置为 "pk.fields": "MESSAGE_KEY"
。在您的架构中,消息键定义为 id
。因此,以下应该可以解决问题:
"pk.fields": "id"