我可以在 kafka 主题上定义模式吗?
Can I define schema on kafka topic?
我像这样将数据和值模式发送到 kafka 主题:
./bin/kafka-avro-console-producer \
--broker-list 10.0.0.0:9092 --topic orders \
--property parse.key="true" \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator="$" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type": ["null","int"],"default": null}, {"name":"price","type": ["null","int"],"default": null}]}' \
--property schema.registry.url=http://10.0.0.0:8081
然后我从 kafka 获取这个接收器属性的数据:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "[redact]",
"connection.password": "[redact]",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id",
"insert.mode": "upsert",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
但我只想从 kafka 获得 json 而没有 value.schema
。如果我把 kafka 主题放在这个 json 数据
{"id":9}${"id": {"int":9}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":61}}
如何从 kafka 获取这些数据并将 oracle 与 confluent jdbc sink 放在一起。
我想在 Kafka Connect 端创建模式?
另一件事是我可以从一个 kafka 主题中获取两种不同类型的数据,并且它在 oracle 端有两种不同的 table 和 jdbc 接收器。
如果您的源主题包含 JSON 数据但未声明架构,则您 必须 添加该架构才能使用 JDBC 接收器.
选项包括:
- ksqlDB,如下所示:https://www.youtube.com/watch?v=b-3qN_tlYR4&t=981s
- Kafka Connect 的单一消息转换 功能。 Apache Kafka 没有 SMT 可以做到这一点,但是 prototypes out there 可以做到这一点。
- 其他流处理,例如卡夫卡流
编辑:
I mean can I define two different jdbc sink to different oracle tables from one kafka topic
是的,每个主题可以被多个接收器消费。 table.name.format
配置选项可用于根据需要将主题路由到不同的 table 名称。
我像这样将数据和值模式发送到 kafka 主题:
./bin/kafka-avro-console-producer \
--broker-list 10.0.0.0:9092 --topic orders \
--property parse.key="true" \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator="$" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type": ["null","int"],"default": null}, {"name":"price","type": ["null","int"],"default": null}]}' \
--property schema.registry.url=http://10.0.0.0:8081
然后我从 kafka 获取这个接收器属性的数据:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "[redact]",
"connection.password": "[redact]",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id",
"insert.mode": "upsert",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
但我只想从 kafka 获得 json 而没有 value.schema
。如果我把 kafka 主题放在这个 json 数据
{"id":9}${"id": {"int":9}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":61}}
如何从 kafka 获取这些数据并将 oracle 与 confluent jdbc sink 放在一起。
我想在 Kafka Connect 端创建模式?
另一件事是我可以从一个 kafka 主题中获取两种不同类型的数据,并且它在 oracle 端有两种不同的 table 和 jdbc 接收器。
如果您的源主题包含 JSON 数据但未声明架构,则您 必须 添加该架构才能使用 JDBC 接收器.
选项包括:
- ksqlDB,如下所示:https://www.youtube.com/watch?v=b-3qN_tlYR4&t=981s
- Kafka Connect 的单一消息转换 功能。 Apache Kafka 没有 SMT 可以做到这一点,但是 prototypes out there 可以做到这一点。
- 其他流处理,例如卡夫卡流
编辑:
I mean can I define two different jdbc sink to different oracle tables from one kafka topic
是的,每个主题可以被多个接收器消费。 table.name.format
配置选项可用于根据需要将主题路由到不同的 table 名称。