Schema Registry Integration

Viewing:

JSON Schema first

The first supported encoding is JSON Schema. Avro and Protobuf are planned.

Overview

Moose can publish and consume Kafka/Redpanda messages using Confluent Schema Registry. The first supported encoding is JSON Schema; Avro and Protobuf are planned.

What you get

Typed payloads

Use your Moose data models with JSON Schema in Schema Registry

Standards-compliant envelope

Messages use Confluent wire format: 0x00 + 4-byte schema id + payload

Works across TypeScript and Python

Both SDKs support producing with JSON Schema; runners can consume SR JSON automatically

Configure Schema Registry URL

Set the Schema Registry URL in moose.config.toml under redpanda_config (aliased as kafka_config). You can also override with environment variables.

moose.config.toml
[redpanda_config]
broker = "localhost:19092"
schema_registry_url = "http://localhost:8081"

Environment overrides (either key works):

Terminal
export MOOSE_REDPANDA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081
# or
export MOOSE_KAFKA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081

Referencing Schemas

You can attach a Schema Registry reference to any Stream via schemaConfig. Use one of:

  • Subject latest: { subjectLatest: string }
  • Subject and version: { subject: string, version: number }
  • Schema id: { id: number }
sr-stream.ts
import { Stream, type KafkaSchemaConfig } from "@514labs/moose-lib";
 
interface Event {
  id: string;
  value: number;
}
 
const schemaConfig: KafkaSchemaConfig = {
  kind: "JSON",
  reference: { subjectLatest: "event-value" },
};
 
export const events = new Stream<Event>("events", {
  schemaConfig,
});
 
// Producing uses Schema Registry envelope automatically
await events.send({ id: "e1", value: 42 });

Consuming SR JSON in Runners

Moose streaming runners automatically detect the Confluent JSON envelope when consuming and strip the header before parsing the JSON. Your transformation code continues to work unchanged.

Ingestion APIs and SR

When an Ingest API routes to a topic that has a schemaConfig of kind JSON, Moose resolves the schema id and publishes requests using the Schema Registry envelope. You can also set the reference to a fixed id to skip lookups.

Discover existing topics and schemas

Use the CLI to pull external topics and optionally fetch JSON Schemas from Schema Registry to emit typed models.

Terminal
moose kafka pull <bootstrap> \
  --schema-registry http://localhost:8081 \
  --path app/external-topics \
  --include "*" \
  --exclude "{__consumer_offsets,_schemas}"

This writes external topic declarations under the provided path based on language (default path is inferred).

Current limitations

  • JSON Schema only (Avro/Protobuf planned)
  • Ingest API schema declared in code may not match the actual schema in registry.