Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines.
The most common way to publish data is through Moose's built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint:
import requestsfrom moose_lib import IngestPipeline, IngestPipelineConfig # When you create an IngestPipeline with ingest_api: True, Moose automatically creates an API endpointraw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig( ingest_api=True, # Creates POST /ingest/raw_data endpoint stream=True, table=True)) # You can then publish data via HTTP POST requestsresponse = requests.post('/ingest/raw_data', json={ 'id': '123', 'value': 42})See the OpenAPI documentation to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs.
You can publish directly to a stream from your Moose code using the stream's method. This is useful when emitting events from workflows or other backend logic. accepts a single record or an array of records.
sendsendIf your Stream is configured with schemaConfig.kind = "JSON",
Moose produces using the Confluent envelope automatically (0x00 + schema id + JSON).
No code changes are needed beyond setting schemaConfig. See the Schema Registry guide.
import { Stream, Key } from "@514labs/moose-lib"; interface UserEvent { id: Key<string>; userId: string; timestamp: Date; eventType: string;} // Create a stream (optionally configure destination to sync to a table)const events = new Stream<UserEvent>("user-events"); // Publish a single recordawait events.send({ id: "evt_1", userId: "user_123", timestamp: new Date(), eventType: "click",}); // Publish multiple recordsawait events.send([ { id: "evt_2", userId: "user_456", timestamp: new Date(), eventType: "view" }, { id: "evt_3", userId: "user_789", timestamp: new Date(), eventType: "signup" },]);Moose builds the Kafka topic name from your stream name,
optional namespace, and optional version (dots become underscores).
For example, a stream named events with version 1.2.0 becomes events_1_2_0
(or my_ns.events_1_2_0 when the namespace is "my_ns").
You can also publish to streams from external applications using Kafka/Redpanda clients:
from kafka import KafkaProducerimport jsonfrom datetime import datetime producer = KafkaProducer( bootstrap_servers=['localhost:19092'], value_serializer=lambda v: json.dumps(v).encode('utf-8')) # Publish to the stream topicproducer.send('user-events', { # Stream name becomes the topic name 'id': 'event-123', 'user_id': 'user-456', 'timestamp': datetime.now().isoformat(), 'event_type': 'page_view'})When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the moose.config.toml file in the root of your project:
[redpanda_config]broker = "localhost:19092"message_timeout_ms = 1000retention_ms = 30000replication_factor = 1