如何以编程方式从 Python 中的汇合模式注册表中获取模式

How to programmatically get schema from confluent schema registry in Python

截至目前,我正在做类似读取 avsc 文件以获取架构的操作

value_schema = avro.load('client.avsc')

我可以做些什么来使用主题名称从融合模式注册表中获取模式吗?

我找到了一种方法,但不知道如何使用它。

https://github.com/marcosschroh/python-schema-registry-client

使用confluent-kafka-python

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

sr = CachedSchemaRegistryClient({
    'url': 'http://localhost:8081',
    'ssl.certificate.location': '/path/to/cert',  # optional
    'ssl.key.location': '/path/to/key'  # optional
})

value_schema = sr.get_latest_schema("orders-value")[1]
key_schema= sr.get_latest_schema("orders-key")[1]

使用SchemaRegistryClient

按主题名称获取架构

from schema_registry.client import SchemaRegistryClient


sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_schema(subject='mySubject', version='latest')

通过 ID 获取架构

from schema_registry.client import SchemaRegistryClient


sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_by_id(schema_id=1)

我喜欢它,它对我有用

     import requests
     import os

     SCHEMA_REGISTRY_URL = os.getenv('SCHEMA_REGISTRY_URL');
     print("SCHEMA_REGISTRY_URL: ", SCHEMA_REGISTRY_URL)
     URL = SCHEMA_REGISTRY_URL + '/subjects/' + topic + '/versions/latest/schema'
     r = requests.get(url=URL)
     schema = r.json()


     print("Schema From Schema Registry ==========================>>")
     print("Schema: ", schema)

您可以使用 get_latest_version 函数获取架构信息

from confluent_kafka.schema_registry import SchemaRegistryClient

sr = SchemaRegistryClient({"url": 'http://localhost:8081'})
subjects = sr.get_subjects()
for subject in subjects:
    schema = sr.get_latest_version(subject)
    print(schema.version)
    print(schema.schema_id)
    print(schema.schema.schema_str)