Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines.
The most common way to publish data is through Moose's built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint:
// When you create an IngestPipeline with ingestApi: true, Moose automatically creates an API endpointconst rawData = new IngestPipeline<RawData>("raw_data", { ingestApi: true, // Creates POST /ingest/raw_data endpoint stream: true, table: true}); // You can then publish data via HTTP POST requestsconst response = await fetch('/ingest/raw_data', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ id: '123', value: 42 })});See the OpenAPI documentation to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs.
You can publish directly to a stream from your Moose code using the stream's send method.
This is useful when emitting events from workflows or other backend logic.
accepts a single record or an array of records.
sendIf your Stream is configured with schemaConfig.kind = "JSON",
Moose produces using the Confluent envelope automatically (0x00 + schema id + JSON).
No code changes are needed beyond setting schemaConfig. See the Schema Registry guide.
import { Stream, Key } from "@514labs/moose-lib"; interface UserEvent { id: Key<string>; userId: string; timestamp: Date; eventType: string;} // Create a stream (optionally configure destination to sync to a table)const events = new Stream<UserEvent>("user-events"); // Publish a single recordawait events.send({ id: "evt_1", userId: "user_123", timestamp: new Date(), eventType: "click",}); // Publish multiple recordsawait events.send([ { id: "evt_2", userId: "user_456", timestamp: new Date(), eventType: "view" }, { id: "evt_3", userId: "user_789", timestamp: new Date(), eventType: "signup" },]);Moose builds the Kafka topic name from your stream name,
optional namespace, and optional version (dots become underscores).
For example, a stream named events with version 1.2.0 becomes events_1_2_0
(or my_ns.events_1_2_0 when the namespace is "my_ns").
You can also publish to streams from external applications using Kafka/Redpanda clients:
See the Kafka.js documentation for more information on how to use the Kafka.js client to publish to streams.
import { KafkaJS } from '@confluentinc/kafka-javascript';const { Kafka } = KafkaJS; const kafka = new Kafka({ kafkaJS: { clientId: 'my-app', brokers: ['localhost:19092'] }}); const producer = kafka.producer(); await producer.connect(); // Publish to the stream topicawait producer.send({ topic: 'user-events', // Stream name becomes the topic name messages: [ { key: 'event-123', value: JSON.stringify({ id: 'event-123', userId: 'user-456', timestamp: new Date().toISOString(), eventType: 'page_view' }) } ]});When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the moose.config.toml file in the root of your project:
[redpanda_config]broker = "localhost:19092"message_timeout_ms = 1000retention_ms = 30000replication_factor = 1