Schema Registry Integration
Viewing:
JSON Schema first
The first supported encoding is JSON Schema. Avro and Protobuf are planned.
Overview
Moose can publish and consume Kafka/Redpanda messages using Confluent Schema Registry. The first supported encoding is JSON Schema; Avro and Protobuf are planned.
What you get
Typed payloads
Use your Moose data models with JSON Schema in Schema Registry
Standards-compliant envelope
Messages use Confluent wire format: 0x00 + 4-byte schema id + payload
Works across TypeScript and Python
Both SDKs support producing with JSON Schema; runners can consume SR JSON automatically
Configure Schema Registry URL
Set the Schema Registry URL in moose.config.toml
under redpanda_config
(aliased as kafka_config
). You can also override with environment variables.
[redpanda_config]
broker = "localhost:19092"
schema_registry_url = "http://localhost:8081"
Environment overrides (either key works):
export MOOSE_REDPANDA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081
# or
export MOOSE_KAFKA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081
Referencing Schemas
You can attach a Schema Registry reference to any Stream
via schemaConfig
. Use one of:
- Subject latest:
{ subjectLatest: string }
- Subject and version:
{ subject: string, version: number }
- Schema id:
{ id: number }
import { Stream, type KafkaSchemaConfig } from "@514labs/moose-lib";
interface Event {
id: string;
value: number;
}
const schemaConfig: KafkaSchemaConfig = {
kind: "JSON",
reference: { subjectLatest: "event-value" },
};
export const events = new Stream<Event>("events", {
schemaConfig,
});
// Producing uses Schema Registry envelope automatically
await events.send({ id: "e1", value: 42 });
from moose_lib import Stream, StreamConfig
from moose_lib.dmv2.stream import KafkaSchemaConfig, SubjectLatest
from pydantic import BaseModel
class Event(BaseModel):
id: str
value: int
schema_config = KafkaSchemaConfig(
kind="JSON",
reference=SubjectLatest(name="event-value"),
)
events = Stream[Event](
"events",
StreamConfig(schema_config=schema_config),
)
events.send(Event(id="e1", value=42))
Consuming SR JSON in Runners
Moose streaming runners automatically detect the Confluent JSON envelope when consuming and strip the header before parsing the JSON. Your transformation code continues to work unchanged.
Ingestion APIs and SR
When an Ingest API routes to a topic that has a schemaConfig
of kind JSON,
Moose resolves the schema id and publishes requests using the Schema Registry envelope.
You can also set the reference to a fixed id
to skip lookups.
Discover existing topics and schemas
Use the CLI to pull external topics and optionally fetch JSON Schemas from Schema Registry to emit typed models.
moose kafka pull <bootstrap> \
--schema-registry http://localhost:8081 \
--path app/external-topics \
--include "*" \
--exclude "{__consumer_offsets,_schemas}"
This writes external topic declarations under the provided path based on language (default path is inferred).
Current limitations
- JSON Schema only (Avro/Protobuf planned)
- Ingest API schema declared in code may not match the actual schema in registry.