使用 confluent_kafka 为 python 向主题注册架构 ID
Registering Schema ID with Topic using confluent_kafka for python
到目前为止我得到的唯一答案是,您必须为架构和主题指定相同的名称,然后这应该 link 它们在一起。但是在注册了名称为 test_topic
的架构后,例如:
{
"type": "record",
"name": "test_topic",
"namespace": "com.test",
"doc": "My test schema",
"fields": [
{
"name": "name",
"type": "string"
}
]
}
和运行使用以下命令,插入没有问题。
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"name": "My first name"}}]}' "http://localhost/topics/test_topic"
但是当我 运行 下面的命令也插入时没有给出任何错误(注意,我更改了 属性 名称)
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"test": "My first name"}}]}' "http://localhost/topics/test_topic"
我怀疑是一条错误消息说我的数据与该主题的架构不匹配...
我的架构 ID 是 10,所以我知道它正在运行并已注册,但目前不是很有用。
Python代码:
from confluent_kafka import Producer
import socket
import json
conf = {'bootstrap.servers': 'localhost:9092', 'client.id': socket.gethostname()}
producer = Producer(conf)
def acked(err, msg):
if err is not None:
print(f'Failed to deliver message: {str(msg)}, {str(err)}')
else:
print(f'Message produced: {str(msg)}')
producer.produce("test_topic", key="key", value=json.dumps({"test": name}).encode('ascii') , callback=acked)
producer.poll(5)
you have to give the schema and the topic the same name, and then this should link them together
Schema Registry 的工作原理并非如此。
每条kafka记录都有一个键和一个值。
注册表有主题,这些主题没有严格映射到主题。
但是,Kafka 客户端(反)序列化器实现将使用 topic-key
和 topic-value
主题名称到注册表中的 register/extract 模式。
客户端无法告诉注册表将架构放在哪个 ID 上。该逻辑是计算服务器端
我不确定我是否理解您的 post 与 REST 代理有什么关系,但是您 posting plain JSON 而不是告诉它数据应该是 Avro(你使用的是不正确的 header)
If Avro is used, the content type will be application/vnd.kafka.avro.v2+json
到目前为止我得到的唯一答案是,您必须为架构和主题指定相同的名称,然后这应该 link 它们在一起。但是在注册了名称为 test_topic
的架构后,例如:
{
"type": "record",
"name": "test_topic",
"namespace": "com.test",
"doc": "My test schema",
"fields": [
{
"name": "name",
"type": "string"
}
]
}
和运行使用以下命令,插入没有问题。
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"name": "My first name"}}]}' "http://localhost/topics/test_topic"
但是当我 运行 下面的命令也插入时没有给出任何错误(注意,我更改了 属性 名称)
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"test": "My first name"}}]}' "http://localhost/topics/test_topic"
我怀疑是一条错误消息说我的数据与该主题的架构不匹配...
我的架构 ID 是 10,所以我知道它正在运行并已注册,但目前不是很有用。
Python代码:
from confluent_kafka import Producer
import socket
import json
conf = {'bootstrap.servers': 'localhost:9092', 'client.id': socket.gethostname()}
producer = Producer(conf)
def acked(err, msg):
if err is not None:
print(f'Failed to deliver message: {str(msg)}, {str(err)}')
else:
print(f'Message produced: {str(msg)}')
producer.produce("test_topic", key="key", value=json.dumps({"test": name}).encode('ascii') , callback=acked)
producer.poll(5)
you have to give the schema and the topic the same name, and then this should link them together
Schema Registry 的工作原理并非如此。
每条kafka记录都有一个键和一个值。
注册表有主题,这些主题没有严格映射到主题。
但是,Kafka 客户端(反)序列化器实现将使用 topic-key
和 topic-value
主题名称到注册表中的 register/extract 模式。
客户端无法告诉注册表将架构放在哪个 ID 上。该逻辑是计算服务器端
我不确定我是否理解您的 post 与 REST 代理有什么关系,但是您 posting plain JSON 而不是告诉它数据应该是 Avro(你使用的是不正确的 header)
If Avro is used, the content type will be
application/vnd.kafka.avro.v2+json