kafka connect - 如何从有效负载中过滤模式元数据
kafka connect - How to filter schema metadata from payload
我正在尝试从负载中删除模式,这里是配置
connector.properties
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/employee_db?user=root&password=root
table.whitelist=testemp
mode=incrementing
incrementing.column.name=employee_id
topic.prefix=test-mysql-jdbc-
以下是我的worker.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=C:\Users\name\Desktop\kafka\libs
输出:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"employee_id"},{"type":"string","optional":false,"field":"first_name"}],"optional":false,"name":"testemp"},"payload":{"employee_id":2,"first_name":"test"}}
异常输出:
{"payload":{"employee_id":2,"first_name":"test"}}
我尝试按照 here 中的建议在 worker 中禁用 value.converter.schemas.enable= false 仍然没有效果
我是不是漏掉了什么?
有两个选项可以修复它:
- 从您的连接器配置中删除
value.converter
属性(您使用相同的 value.converter
)
- 在连接器配置中设置
value.converter.schemas.enable=false
。
架构已添加到消息中,因为您已经覆盖了值转换器并且没有禁用架构(默认情况下 JsonConverter
架构已启用)。从 Kafka Connect 的角度来看,您使用了全新的转换器(它不会使用全局配置中的属性)
如果您将禁用架构,您的消息将如下所示:
{
"employee_id": 2,
"first_name":"test"
}
我正在尝试从负载中删除模式,这里是配置
connector.properties
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/employee_db?user=root&password=root
table.whitelist=testemp
mode=incrementing
incrementing.column.name=employee_id
topic.prefix=test-mysql-jdbc-
以下是我的worker.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=C:\Users\name\Desktop\kafka\libs
输出:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"employee_id"},{"type":"string","optional":false,"field":"first_name"}],"optional":false,"name":"testemp"},"payload":{"employee_id":2,"first_name":"test"}}
异常输出:
{"payload":{"employee_id":2,"first_name":"test"}}
我尝试按照 here 中的建议在 worker 中禁用 value.converter.schemas.enable= false 仍然没有效果
我是不是漏掉了什么?
有两个选项可以修复它:
- 从您的连接器配置中删除
value.converter
属性(您使用相同的value.converter
) - 在连接器配置中设置
value.converter.schemas.enable=false
。
架构已添加到消息中,因为您已经覆盖了值转换器并且没有禁用架构(默认情况下 JsonConverter
架构已启用)。从 Kafka Connect 的角度来看,您使用了全新的转换器(它不会使用全局配置中的属性)
如果您将禁用架构,您的消息将如下所示:
{
"employee_id": 2,
"first_name":"test"
}