生产者如何与注册表通信以及它向注册表发送什么
how does the producer communicate with the registry and what does it send to the registry
我试图通过阅读文档来理解这一点,但也许因为我不是高级程序员,所以我并不真正理解它。
我在文档中,例如在这个例子中:
https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-protobuf.html#protobuf-serializer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, MyRecord> producer = new KafkaProducer<String, MyRecord>(props);
String topic = "testproto";
String key = "testkey";
OtherRecord otherRecord = OtherRecord.newBuilder()
.setOtherId(123).build();
MyRecord myrecord = MyRecord.newBuilder()
.setF1("value1").setF2(otherRecord).build();
ProducerRecord<String, MyRecord> record
= new ProducerRecord<String, MyRecord>(topic, key, myrecord);
producer.send(record).get();
producer.close();
我在这里看到您定义了架构注册表 url 然后生产者会以某种方式知道它会向注册表发送联系人以向注册表提供有关消息的一些元数据。
现在我想更好地了解这实际上是如何工作的,以及生产者和注册中心之间交换了什么(或者是与注册中心联系的卡夫卡)?
无论如何,我的问题是假设我有一个 protobuf 格式的记录。
我正在将那个 protobuf 放入某个主题的 kafka 中。
现在我想激活模式注册表,那么生产者是否只需将原型定义发送到模式注册表中?
生产者是否直接从记录中获取元数据定义?
它会尝试将任何新消息更新到队列中吗?将数据推送到 kafka 时,这不会增加一点延迟吗?
抱歉,如果这都是非常基本的问题,但我只是想了解大局,我想念这种平静。
感谢您提供任何信息,如果文档中已经明确了这一点,我们深表歉意。
(我需要这个以便我可以使用 ksql 反序列化我在 kafka 中的消息)
此致,
would the producer just send the proto definition into the schema registry
Serializer 执行,而不是 Producer 直接执行。
MyRecord
序列化为二进制,schema通过HTTP发送到Registry,其中returns一个ID,然后发送的消息包含0x0 + ID + binary-protobuf-value
would it try to update in any new message into the queue?
在发送任何消息之前先发送架构。现有消息未受影响
would this not increase a bit the latency when pushing the data to kafka?
仅针对架构被缓存后的第一条消息
我试图通过阅读文档来理解这一点,但也许因为我不是高级程序员,所以我并不真正理解它。
我在文档中,例如在这个例子中: https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-protobuf.html#protobuf-serializer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, MyRecord> producer = new KafkaProducer<String, MyRecord>(props);
String topic = "testproto";
String key = "testkey";
OtherRecord otherRecord = OtherRecord.newBuilder()
.setOtherId(123).build();
MyRecord myrecord = MyRecord.newBuilder()
.setF1("value1").setF2(otherRecord).build();
ProducerRecord<String, MyRecord> record
= new ProducerRecord<String, MyRecord>(topic, key, myrecord);
producer.send(record).get();
producer.close();
我在这里看到您定义了架构注册表 url 然后生产者会以某种方式知道它会向注册表发送联系人以向注册表提供有关消息的一些元数据。
现在我想更好地了解这实际上是如何工作的,以及生产者和注册中心之间交换了什么(或者是与注册中心联系的卡夫卡)?
无论如何,我的问题是假设我有一个 protobuf 格式的记录。 我正在将那个 protobuf 放入某个主题的 kafka 中。 现在我想激活模式注册表,那么生产者是否只需将原型定义发送到模式注册表中? 生产者是否直接从记录中获取元数据定义? 它会尝试将任何新消息更新到队列中吗?将数据推送到 kafka 时,这不会增加一点延迟吗?
抱歉,如果这都是非常基本的问题,但我只是想了解大局,我想念这种平静。
感谢您提供任何信息,如果文档中已经明确了这一点,我们深表歉意。 (我需要这个以便我可以使用 ksql 反序列化我在 kafka 中的消息)
此致,
would the producer just send the proto definition into the schema registry
Serializer 执行,而不是 Producer 直接执行。
MyRecord
序列化为二进制,schema通过HTTP发送到Registry,其中returns一个ID,然后发送的消息包含0x0 + ID + binary-protobuf-value
would it try to update in any new message into the queue?
在发送任何消息之前先发送架构。现有消息未受影响
would this not increase a bit the latency when pushing the data to kafka?
仅针对架构被缓存后的第一条消息