# Moose / Streaming / Schema Registry Documentation – Python ## Included Files 1. moose/streaming/schema-registry/schema-registry.mdx ## Schema Registry Source: moose/streaming/schema-registry/schema-registry.mdx Use Confluent Schema Registry with Moose streams (JSON Schema first) # Schema Registry Integration The first supported encoding is JSON Schema. Avro and Protobuf are planned. ## Overview Moose can publish and consume Kafka/Redpanda messages using Confluent Schema Registry. The first supported encoding is JSON Schema; Avro and Protobuf are planned. ## Configure Schema Registry URL Set the Schema Registry URL in `moose.config.toml` under `redpanda_config` (aliased as `kafka_config`). You can also override with environment variables. ```toml filename="moose.config.toml" copy [redpanda_config] broker = "localhost:19092" schema_registry_url = "http://localhost:8081" ``` Environment overrides (either key works): ```bash filename="Terminal" copy export MOOSE_REDPANDA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081 # or export MOOSE_KAFKA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081 ``` ## Referencing Schemas You can attach a Schema Registry reference to any `Stream` via `schemaConfig`. Use one of: - Subject latest: `{ subjectLatest: string }` - Subject and version: `{ subject: string, version: number }` - Schema id: `{ id: number }` ```py filename="sr_stream.py" copy {13-21} from moose_lib import Stream, StreamConfig from moose_lib.dmv2.stream import KafkaSchemaConfig, SubjectLatest from pydantic import BaseModel class Event(BaseModel): id: str value: int schema_config = KafkaSchemaConfig( kind="JSON", reference=SubjectLatest(name="event-value"), ) events = Stream[Event]( "events", StreamConfig(schema_config=schema_config), ) events.send(Event(id="e1", value=42)) ``` ## Consuming SR JSON in Runners Moose streaming runners automatically detect the Confluent JSON envelope when consuming and strip the header before parsing the JSON. Your transformation code continues to work unchanged. ## Ingestion APIs and SR When an Ingest API routes to a topic that has a `schemaConfig` of kind JSON, Moose resolves the schema id and publishes requests using the Schema Registry envelope. You can also set the reference to a fixed `id` to skip lookups. ## Discover existing topics and schemas Use the CLI to pull external topics and optionally fetch JSON Schemas from Schema Registry to emit typed models. ```bash filename="Terminal" copy moose kafka pull \ --schema-registry http://localhost:8081 \ --path app/external-topics \ --include "*" \ --exclude "{__consumer_offsets,_schemas}" ``` This writes external topic declarations under the provided path based on language (default path is inferred). ## Current limitations - JSON Schema only (Avro/Protobuf planned) - Ingest API schema declared in code may not match the actual schema in registry.