Publishing Data to Streams
Viewing:
Overview
Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines.
Publishing Methods
Using REST APIs
The most common way to publish data is through Moose’s built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint:
// When you create an IngestPipeline with ingest: true, Moose automatically creates an API endpoint
const rawData = new IngestPipeline<RawData>("raw_data", {
ingest: true, // Creates POST /ingest/raw_data endpoint
stream: true,
table: true
});
// You can then publish data via HTTP POST requests
const response = await fetch('/ingest/raw_data', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
id: '123',
value: 42
})
});
import requests
# When you create an IngestPipeline with ingest: True, Moose automatically creates an API endpoint
raw_data = IngestPipeline[RawData]("raw_data", {
ingest: True, # Creates POST /ingest/raw_data endpoint
stream: True,
table: True
})
# You can then publish data via HTTP POST requests
response = requests.post('/ingest/raw_data', json={
'id': '123',
'value': 42
})
Using OpenAPI
See the OpenAPI documentation to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs.
Direct Stream Publishing (Coming Soon)
Soon, you will be able to publish directly to streams using the stream’s publish method. This is an efficient way to publish data to streams from within other parts of your Moose backend—in particular, from your workflows.
Using the Kafka/Redpanda Client from External Applications
You can also publish to streams from external applications using Kafka/Redpanda clients:
See the Kafka.js documentation for more information on how to use the Kafka.js client to publish to streams.
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:19092']
});
const producer = kafka.producer();
await producer.connect();
// Publish to the stream topic
await producer.send({
topic: 'user-events', // Stream name becomes the topic name
messages: [
{
key: 'event-123',
value: JSON.stringify({
id: 'event-123',
userId: 'user-456',
timestamp: new Date().toISOString(),
eventType: 'page_view'
})
}
]
});
from kafka import KafkaProducer
import json
from datetime import datetime
producer = KafkaProducer(
bootstrap_servers=['localhost:19092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Publish to the stream topic
producer.send('user-events', { # Stream name becomes the topic name
'id': 'event-123',
'user_id': 'user-456',
'timestamp': datetime.now().isoformat(),
'event_type': 'page_view'
})
Locating Redpanda Connection Details
When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the moose.config.toml
file in the root of your project:
[redpanda_config]
broker = "localhost:19092"
message_timeout_ms = 1000
retention_ms = 30000
replication_factor = 1