Publishing Data to Streams
Viewing:
Overview
Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines.
Publishing Methods
Using REST APIs
The most common way to publish data is through Moose’s built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint:
// When you create an IngestPipeline with ingestApi: true, Moose automatically creates an API endpoint
const rawData = new IngestPipeline<RawData>("raw_data", {
ingestApi: true, // Creates POST /ingest/raw_data endpoint
stream: true,
table: true
});
// You can then publish data via HTTP POST requests
const response = await fetch('/ingest/raw_data', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
id: '123',
value: 42
})
});import requests
from moose_lib import IngestPipeline, IngestPipelineConfig
# When you create an IngestPipeline with ingest_api: True, Moose automatically creates an API endpoint
raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest_api=True, # Creates POST /ingest/raw_data endpoint
stream=True,
table=True
))
# You can then publish data via HTTP POST requests
response = requests.post('/ingest/raw_data', json={
'id': '123',
'value': 42
})Using OpenAPI
See the OpenAPI documentation to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs.
Direct Stream Publishing
You can publish directly to a stream from your Moose code using the stream’s send method.
This is useful when emitting events from workflows or other backend logic.
send accepts a single record or an array of records.
Using Schema Registry JSON
If your Stream is configured with schemaConfig.kind = "JSON",
Moose produces using the Confluent envelope automatically (0x00 + schema id + JSON).
No code changes are needed beyond setting schemaConfig. See the Schema Registry guide.
import { Stream, Key } from "@514labs/moose-lib";
interface UserEvent {
id: Key<string>;
userId: string;
timestamp: Date;
eventType: string;
}
// Create a stream (optionally configure destination to sync to a table)
const events = new Stream<UserEvent>("user-events");
// Publish a single record
await events.send({
id: "evt_1",
userId: "user_123",
timestamp: new Date(),
eventType: "click",
});
// Publish multiple records
await events.send([
{ id: "evt_2", userId: "user_456", timestamp: new Date(), eventType: "view" },
{ id: "evt_3", userId: "user_789", timestamp: new Date(), eventType: "signup" },
]);Topic naming and versions
Moose builds the Kafka topic name from your stream name,
optional namespace, and optional version (dots become underscores).
For example, a stream named events with version 1.2.0 becomes events_1_2_0
(or my_ns.events_1_2_0 when the namespace is "my_ns").
Using the Kafka/Redpanda Client from External Applications
You can also publish to streams from external applications using Kafka/Redpanda clients:
See the Kafka.js documentation for more information on how to use the Kafka.js client to publish to streams.
import { KafkaJS } from '@confluentinc/kafka-javascript';
const { Kafka } = KafkaJS;
const kafka = new Kafka({
kafkaJS: {
clientId: 'my-app',
brokers: ['localhost:19092']
}
});
const producer = kafka.producer();
await producer.connect();
// Publish to the stream topic
await producer.send({
topic: 'user-events', // Stream name becomes the topic name
messages: [
{
key: 'event-123',
value: JSON.stringify({
id: 'event-123',
userId: 'user-456',
timestamp: new Date().toISOString(),
eventType: 'page_view'
})
}
]
});from kafka import KafkaProducer
import json
from datetime import datetime
producer = KafkaProducer(
bootstrap_servers=['localhost:19092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Publish to the stream topic
producer.send('user-events', { # Stream name becomes the topic name
'id': 'event-123',
'user_id': 'user-456',
'timestamp': datetime.now().isoformat(),
'event_type': 'page_view'
})Locating Redpanda Connection Details
When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the moose.config.toml file in the root of your project:
[redpanda_config]
broker = "localhost:19092"
message_timeout_ms = 1000
retention_ms = 30000
replication_factor = 1