如何设置 JDBC 源连接器(kafka)的密钥?
How to set the key of the JDBC source connector (kafka)?
我正在使用 Kafka Source JDBC connector
从 mysql 数据库 table 读取数据并将其发布到主题 test-mysql-petai
。
数据库 table 有 2 个字段,其中 Id
是主键:
+---------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(20) | YES | | NULL | |
+---------+-------------+------+-----+---------+----------------+
我需要 id
字段的值作为主题的 Key。我尝试向 jdbc 连接器属性添加转换。
JDBCConnector.properties:
name=jdbc-source-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/test?user=dins&password=pw&serverTimezone=UTC
table.whitelist=petai
mode=incrementing
incrementing.column.name=id
schema.pattern=""
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
topic.prefix=test-mysql-jdbc-
但是,当我使用消费者读取键和值时,我得到以下信息:
Key = {"schema":{"type":"int32","optional":false},"payload":61}
Value ={"id":61,"name":"ttt"}
我需要得到以下信息:
Key = 61
Value ={"id":61,"name":"ttt"}
我做错了什么?感谢您的帮助。
谢谢。
如果您不想将模式包含到键中,您可以通过设置 key.converter.schemas.enable=false
来告知 Kafka Connect。
详细解释见Kafka Connect Deep Dive – Converters and Serialization Explained by Robin Moffatt。
我正在使用 Kafka Source JDBC connector
从 mysql 数据库 table 读取数据并将其发布到主题 test-mysql-petai
。
数据库 table 有 2 个字段,其中 Id
是主键:
+---------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(20) | YES | | NULL | |
+---------+-------------+------+-----+---------+----------------+
我需要 id
字段的值作为主题的 Key。我尝试向 jdbc 连接器属性添加转换。
JDBCConnector.properties:
name=jdbc-source-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/test?user=dins&password=pw&serverTimezone=UTC
table.whitelist=petai
mode=incrementing
incrementing.column.name=id
schema.pattern=""
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
topic.prefix=test-mysql-jdbc-
但是,当我使用消费者读取键和值时,我得到以下信息:
Key = {"schema":{"type":"int32","optional":false},"payload":61}
Value ={"id":61,"name":"ttt"}
我需要得到以下信息:
Key = 61
Value ={"id":61,"name":"ttt"}
我做错了什么?感谢您的帮助。
谢谢。
如果您不想将模式包含到键中,您可以通过设置 key.converter.schemas.enable=false
来告知 Kafka Connect。
详细解释见Kafka Connect Deep Dive – Converters and Serialization Explained by Robin Moffatt。