Publishing Data to Streams
Overview
Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines.
Publishing Methods
Using REST APIs
The most common way to publish data is through Moose's built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint:
// When you create an IngestPipeline with ingestApi: true, Moose automatically creates an API endpointconst rawData = new IngestPipeline<RawData>("raw_data", { ingestApi: true, // Creates POST /ingest/raw_data endpoint stream: true, table: true}); // You can then publish data via HTTP POST requestsconst response = await fetch('/ingest/raw_data', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ id: '123', value: 42 })});Using OpenAPI
See the OpenAPI documentation to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs.
Direct Stream Publishing
You can publish directly to a stream from your Moose code using the stream's send method.
This is useful when emitting events from workflows or other backend logic.
send accepts a single record or an array of records.
Using Schema Registry JSON
If your Stream is configured with schemaConfig.kind = "JSON",
Moose produces using the Confluent envelope automatically (0x00 + schema id + JSON).
No code changes are needed beyond setting schemaConfig. See the Schema Registry guide.
import { Stream, Key } from "@514labs/moose-lib"; interface UserEvent { id: Key<string>; userId: string; timestamp: Date; eventType: string;} // Create a stream (optionally configure destination to sync to a table)const events = new Stream<UserEvent>("user-events"); // Publish a single recordawait events.send({ id: "evt_1", userId: "user_123", timestamp: new Date(), eventType: "click",}); // Publish multiple recordsawait events.send([ { id: "evt_2", userId: "user_456", timestamp: new Date(), eventType: "view" }, { id: "evt_3", userId: "user_789", timestamp: new Date(), eventType: "signup" },]);Topic naming and versions
Moose builds the Kafka topic name from your stream name,
optional namespace, and optional version (dots become underscores).
For example, a stream named events with version 1.2.0 becomes events_1_2_0
(or my_ns.events_1_2_0 when the namespace is "my_ns").
Using the Kafka/Redpanda Client from External Applications
You can also publish to streams from external applications using Kafka/Redpanda clients:
See the Kafka.js documentation for more information on how to use the Kafka.js client to publish to streams.
import { KafkaJS } from '@confluentinc/kafka-javascript';const { Kafka } = KafkaJS; const kafka = new Kafka({ kafkaJS: { clientId: 'my-app', brokers: ['localhost:19092'] }}); const producer = kafka.producer(); await producer.connect(); // Publish to the stream topicawait producer.send({ topic: 'user-events', // Stream name becomes the topic name messages: [ { key: 'event-123', value: JSON.stringify({ id: 'event-123', userId: 'user-456', timestamp: new Date().toISOString(), eventType: 'page_view' }) } ]});Locating Redpanda Connection Details
When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the moose.config.toml file in the root of your project:
[redpanda_config]broker = "localhost:19092"message_timeout_ms = 1000retention_ms = 30000replication_factor = 1