Wednesday, March 2, 2022

Azure Function - Schema Registry

 import os

from azure.schemaregistry import SchemaRegistryClient
from azure.identity import ClientSecretCredential


FULLY_QUALIFIED_NAMESPACE = "xxxxxx.servicebus.windows.net"
EVENT_HUB_NAME = "xxxxxx"
EVENT_HUB_POLICY_NAME = "xxxxxx"
EVENT_HUB_POLICY_PRIMARY_KEY = "xxxxxx"
SCHEMA_REGISTRY_GROUP_NAME = "xxxxxx"
AZURE_TENANT_ID = "xxxxxx"
AZURE_CLIENT_ID = "xxxxxx"
AZURE_CLIENT_SECRET = "xxxxxx"
format = "avro"

definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}]
}"""

token_credential = ClientSecretCredential(tenant_id=AZURE_TENANT_ID,
client_id= AZURE_CLIENT_ID, client_secret=AZURE_CLIENT_SECRET)

schema_registry_client = SchemaRegistryClient(FULLY_QUALIFIED_NAMESPACE, token_credential)
with schema_registry_client:
schema_registry_client.register_schema(SCHEMA_REGISTRY_GROUP_NAME, "TEST",
definition, format)
schema_properties = schema_registry_client.get_schema_properties(
group_name=SCHEMA_REGISTRY_GROUP_NAME,
name="TEST",
definition=definition,
format="Avro"
)
schema_id = schema_properties.id


print(f'schema_id: {schema_id}')
print(f'schema_format: {schema_properties.format}')

print('schema: ', schema_registry_client.get_schema(schema_id).definition)
print('SCHEMA_REGISTRY_GROUP_NAME: ', SCHEMA_REGISTRY_GROUP_NAME)

No comments:

Post a Comment