没有架构的 Kafka Connect,只有 JSON
Kafka Connect without schema, only JSON
我想将 JDBC 接收器连接器与 JSON 一起使用,但没有架构。
他们写 (source):
If you need to use JSON without Schema Registry for Connect data, you
can use the JsonConverter supported with Kafka. The example below
shows the JsonConverter key and value properties that are added to the
configuration:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
When the properties key.converter.schemas.enable and
value.converter.schemas.enable are set to true, the key or value is
not treated as plain JSON, but rather as a composite JSON object
containing both an internal schema and the data. When these are
enabled for a source connector, both the schema and data are in the
composite JSON object. When these are enabled for a sink connector,
the schema and data are extracted from the composite JSON object.
Note that this implementation never uses Schema Registry.
When the properties key.converter.schemas.enable and
value.converter.schemas.enable are set to false (the default), only
the data is passed along, without the schema. This reduces the payload
overhead for applications that do not need a schema.
我配置了连接器:
{
"name": "noschemajustjson",
"config": {
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"schemas.enable": "false",
"name": "noschemajustjson",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"config.action.reload": "restart",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": "testconnect2",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "********",
"dialect.name": "PostgreSqlDatabaseDialect",
"table.name.format": "utp",
"auto.create": "false",
"auto.evolve": "false"
}
}
但我仍然得到错误:
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink
connector 'noschemajustjson2' is configured with
'delete.enabled=false' and 'pk.mode=none' and therefore requires
records with a non-null Struct value and non-null Struct schema, but
found record at
(topic='testconnect2',partition=0,offset=0,timestamp=1626416739697)
with a HashMap value and null value schema.
那么我应该怎么做才能强制 Connect 在完全没有模式的情况下工作(只有普通 JSON)?
I want to use JDBC sink connector with JSON and without schema
您不能这样做 - JDBC 接收器连接器流式传输到关系数据库,并且关系数据库具有模式 :-D JDBC 接收器连接器因此 需要 为数据呈现的模式。
根据数据的来源,您有不同的选择。
- 如果它是从 Kafka Connect 中提取的,请使用支持模式(Avro、Protobuf、JSON 模式)的转换器
- 如果它是由您可以控制的应用程序生成的,请让该应用程序使用模式(Avro、Protobuf、JSON 模式)序列化该数据
- 如果它来自某个你无法控制的地方,那么你需要预处理主题以添加显式模式并将其写入一个新主题,然后由 JDBC 水槽连接器。
参考资料和资源:
我想将 JDBC 接收器连接器与 JSON 一起使用,但没有架构。 他们写 (source):
If you need to use JSON without Schema Registry for Connect data, you can use the JsonConverter supported with Kafka. The example below shows the JsonConverter key and value properties that are added to the configuration:
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false
When the properties key.converter.schemas.enable and value.converter.schemas.enable are set to true, the key or value is not treated as plain JSON, but rather as a composite JSON object containing both an internal schema and the data. When these are enabled for a source connector, both the schema and data are in the composite JSON object. When these are enabled for a sink connector, the schema and data are extracted from the composite JSON object. Note that this implementation never uses Schema Registry.
When the properties key.converter.schemas.enable and value.converter.schemas.enable are set to false (the default), only the data is passed along, without the schema. This reduces the payload overhead for applications that do not need a schema.
我配置了连接器:
{
"name": "noschemajustjson",
"config": {
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"schemas.enable": "false",
"name": "noschemajustjson",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"config.action.reload": "restart",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": "testconnect2",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "********",
"dialect.name": "PostgreSqlDatabaseDialect",
"table.name.format": "utp",
"auto.create": "false",
"auto.evolve": "false"
}
}
但我仍然得到错误:
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'noschemajustjson2' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='testconnect2',partition=0,offset=0,timestamp=1626416739697) with a HashMap value and null value schema.
那么我应该怎么做才能强制 Connect 在完全没有模式的情况下工作(只有普通 JSON)?
I want to use JDBC sink connector with JSON and without schema
您不能这样做 - JDBC 接收器连接器流式传输到关系数据库,并且关系数据库具有模式 :-D JDBC 接收器连接器因此 需要 为数据呈现的模式。
根据数据的来源,您有不同的选择。
- 如果它是从 Kafka Connect 中提取的,请使用支持模式(Avro、Protobuf、JSON 模式)的转换器
- 如果它是由您可以控制的应用程序生成的,请让该应用程序使用模式(Avro、Protobuf、JSON 模式)序列化该数据
- 如果它来自某个你无法控制的地方,那么你需要预处理主题以添加显式模式并将其写入一个新主题,然后由 JDBC 水槽连接器。
参考资料和资源: