如何以编程方式从 Python 中的汇合模式注册表中获取模式
How to programmatically get schema from confluent schema registry in Python
截至目前,我正在做类似读取 avsc 文件以获取架构的操作
value_schema = avro.load('client.avsc')
我可以做些什么来使用主题名称从融合模式注册表中获取模式吗?
我找到了一种方法,但不知道如何使用它。
https://github.com/marcosschroh/python-schema-registry-client
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
sr = CachedSchemaRegistryClient({
'url': 'http://localhost:8081',
'ssl.certificate.location': '/path/to/cert', # optional
'ssl.key.location': '/path/to/key' # optional
})
value_schema = sr.get_latest_schema("orders-value")[1]
key_schema= sr.get_latest_schema("orders-key")[1]
按主题名称获取架构
from schema_registry.client import SchemaRegistryClient
sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_schema(subject='mySubject', version='latest')
通过 ID 获取架构
from schema_registry.client import SchemaRegistryClient
sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_by_id(schema_id=1)
我喜欢它,它对我有用
import requests
import os
SCHEMA_REGISTRY_URL = os.getenv('SCHEMA_REGISTRY_URL');
print("SCHEMA_REGISTRY_URL: ", SCHEMA_REGISTRY_URL)
URL = SCHEMA_REGISTRY_URL + '/subjects/' + topic + '/versions/latest/schema'
r = requests.get(url=URL)
schema = r.json()
print("Schema From Schema Registry ==========================>>")
print("Schema: ", schema)
您可以使用 get_latest_version
函数获取架构信息
from confluent_kafka.schema_registry import SchemaRegistryClient
sr = SchemaRegistryClient({"url": 'http://localhost:8081'})
subjects = sr.get_subjects()
for subject in subjects:
schema = sr.get_latest_version(subject)
print(schema.version)
print(schema.schema_id)
print(schema.schema.schema_str)
截至目前,我正在做类似读取 avsc 文件以获取架构的操作
value_schema = avro.load('client.avsc')
我可以做些什么来使用主题名称从融合模式注册表中获取模式吗?
我找到了一种方法,但不知道如何使用它。
https://github.com/marcosschroh/python-schema-registry-client
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
sr = CachedSchemaRegistryClient({
'url': 'http://localhost:8081',
'ssl.certificate.location': '/path/to/cert', # optional
'ssl.key.location': '/path/to/key' # optional
})
value_schema = sr.get_latest_schema("orders-value")[1]
key_schema= sr.get_latest_schema("orders-key")[1]
按主题名称获取架构
from schema_registry.client import SchemaRegistryClient
sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_schema(subject='mySubject', version='latest')
通过 ID 获取架构
from schema_registry.client import SchemaRegistryClient
sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_by_id(schema_id=1)
我喜欢它,它对我有用
import requests
import os
SCHEMA_REGISTRY_URL = os.getenv('SCHEMA_REGISTRY_URL');
print("SCHEMA_REGISTRY_URL: ", SCHEMA_REGISTRY_URL)
URL = SCHEMA_REGISTRY_URL + '/subjects/' + topic + '/versions/latest/schema'
r = requests.get(url=URL)
schema = r.json()
print("Schema From Schema Registry ==========================>>")
print("Schema: ", schema)
您可以使用 get_latest_version
函数获取架构信息
from confluent_kafka.schema_registry import SchemaRegistryClient
sr = SchemaRegistryClient({"url": 'http://localhost:8081'})
subjects = sr.get_subjects()
for subject in subjects:
schema = sr.get_latest_version(subject)
print(schema.version)
print(schema.schema_id)
print(schema.schema.schema_str)