Publishing Data to Streams

Viewing:

Overview

Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines.

Publishing Methods

Using REST APIs

The most common way to publish data is through Moose’s built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint:

PublishViaAPI.ts
// When you create an IngestPipeline with ingestApi: true, Moose automatically creates an API endpoint
const rawData = new IngestPipeline<RawData>("raw_data", {
  ingestApi: true, // Creates POST /ingest/raw_data endpoint
  stream: true,
  table: true
});
 
// You can then publish data via HTTP POST requests
const response = await fetch('/ingest/raw_data', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    id: '123',
    value: 42
  })
});

Using OpenAPI

See the OpenAPI documentation to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs.

Direct Stream Publishing

You can publish directly to a stream from your Moose code using the stream’s send method. This is useful when emitting events from workflows or other backend logic. send accepts a single record or an array of records.

Using Schema Registry JSON

If your Stream is configured with schemaConfig.kind = "JSON", Moose produces using the Confluent envelope automatically (0x00 + schema id + JSON). No code changes are needed beyond setting schemaConfig. See the Schema Registry guide.

DirectPublish.ts
import { Stream, Key } from "@514labs/moose-lib";
 
interface UserEvent {
  id: Key<string>;
  userId: string;
  timestamp: Date;
  eventType: string;
}
 
// Create a stream (optionally configure destination to sync to a table)
const events = new Stream<UserEvent>("user-events");
 
// Publish a single record
await events.send({
  id: "evt_1",
  userId: "user_123",
  timestamp: new Date(),
  eventType: "click",
});
 
// Publish multiple records
await events.send([
  { id: "evt_2", userId: "user_456", timestamp: new Date(), eventType: "view" },
  { id: "evt_3", userId: "user_789", timestamp: new Date(), eventType: "signup" },
]);

Topic naming and versions

Moose builds the Kafka topic name from your stream name, optional namespace, and optional version (dots become underscores). For example, a stream named events with version 1.2.0 becomes events_1_2_0 (or my_ns.events_1_2_0 when the namespace is "my_ns").

Using the Kafka/Redpanda Client from External Applications

You can also publish to streams from external applications using Kafka/Redpanda clients:

See the Kafka.js documentation for more information on how to use the Kafka.js client to publish to streams.

ExternalPublish.ts
import { KafkaJS } from '@confluentinc/kafka-javascript';
const { Kafka } = KafkaJS;
 
const kafka = new Kafka({
  kafkaJS: {
    clientId: 'my-app',
    brokers: ['localhost:19092']
  }
});
 
const producer = kafka.producer();
 
await producer.connect();
 
// Publish to the stream topic
await producer.send({
  topic: 'user-events', // Stream name becomes the topic name
  messages: [
    {
      key: 'event-123',
      value: JSON.stringify({
        id: 'event-123',
        userId: 'user-456',
        timestamp: new Date().toISOString(),
        eventType: 'page_view'
      })
    }
  ]
});

Locating Redpanda Connection Details

When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the moose.config.toml file in the root of your project:

moose.config.toml
[redpanda_config]
broker = "localhost:19092"
message_timeout_ms = 1000
retention_ms = 30000
replication_factor = 1