如何将没有模式的数据发送到 kafka - 汇合 jdbc - 接收器使用情况?

How can I send data without schema to kafka - confluent jdbc - sink usage?

我使用 conluent jdbc-sink 将我的数据从 kafka 加载到 oracle。

但是我用数据来写我的架构值。

我不想用数据编写模式,如何在 kafka 主题上编写模式然后我只想从我的客户端发送数据?

提前致谢

json数据

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "field": 'ID',
                "type": "int32",
                "optional": False
            },
            {
                "field": 'PRODUCT',
                "type": "string",
                "optional": True
            },
            {
                "field": 'QUANTITY',
                "type": "int32",
                "optional": True
            },
            {
                "field": 'PRICE',
                "type": "int32",
                "optional": True
            }
        ],
        "optional": True,
        "name": "myrecord"
    },
    "payload": {
        "ID": 1071,
        "PRODUCT": 'ersin',
        "QUANTITIY": 1071,
        "PRICE": 1453
   }

python代码:

producer.send(topic, key=b'1071'
              , value=json.dumps(v, default=json_util.default).encode('utf-8'))

我该如何解决这个问题?

提前致谢

如果您想使用 JDBC 接收器连接器,您必须提供架构。这可以通过三种方式实现:

  • 使用 JSON 并启用模式
  • 使用 Avro 和 Schema Registry
  • 将JSON 模式与模式注册表一起使用

您目前正在使用 JSON 并启用了模式,需要将模式与实际负载一起发送。实现您的要求的唯一方法是使用 Avro 和 Confluent Schema Registry,以便您的模式在模式注册表中注册。这样,您就不需要每次都发送负载模式。

另一种选择是将 JSON 与架构注册表 (#1289) 一起使用。对于 Kafka Connect,您可以使用 JsonSchemaConverter,对于 Java 消费者和生产者,您可以使用 KafkaJsonSchemaSerializerKafkaJsonSchemaDeserializer.