Streams serve as the transport layer between your data sources and database tables. Built on Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data.
The schema of your Redpanda topic is derived from the data model you define
Data is buffered in the stream to protect against data loss during high load or service disruptions
Add transformations between streams to process, reshape, and enrich data in-flight before landing in your database tables
Moose automatically syncs data from streams to your database tables when a destination is specified in your stream configuration
You can create streams in two ways:
IngestPipeline class (recommended)Stream componentThe IngestPipeline class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion:
import { IngestPipeline, Key } from "@514labs/moose-lib"; interface RawData { id: Key<string>; value: number;} export const rawIngestionStream = new IngestPipeline<RawData>("raw_data", { ingestApi: true, // Creates an ingestion API endpoint at `POST /ingest/raw_data` stream: true, // Buffers data between the ingestion API and the database table table: true // Creates an OLAP table named `raw_data`});Data sent to the `POST /ingest/raw_data` endpoint is buffered in the `raw_data` stream
Moose automatically executes a process to sync the data from the `raw_data` stream to the `raw_data` OLAP table
Data is stored in the `raw_data` OLAP table
If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data:
import { IngestPipeline, Key } from "@514labs/moose-lib"; interface RawData { id: Key<string>; value: number;} interface TransformedData { id: Key<string>; transformedValue: number; transformedAt: Date;} // Configure components for raw data ingestion & bufferingconst rawData = new IngestPipeline<RawData>("raw_data", { ingestApi: true, stream: true, // Buffers data between the ingestion API and the database table table: false // Don't create a table for the raw data}); // Create a table for the transformed dataconst transformedData = new IngestPipeline<TransformedData>("transformed_data", { ingestApi: false, // Don't create an ingestion API for the transformed data stream: true, // Create destination stream for the transformed data table: true // Create a table for the transformed data}); rawData.stream.addTransform(transformedData.stream, (record) => ({ id: record.id, transformedValue: record.value * 2, transformedAt: new Date()}));For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful:
import { IngestPipeline, Key } from "@514labs/moose-lib"; // Define the schema for raw input datainterface RawData { id: Key<string>; value: number;} // Define the schema for intermediate transformed datainterface IntermediateData { id: Key<string>; transformedValue: number; transformedAt: Date;} // Define the schema for final transformed datainterface FinalData { id: Key<string>; transformedValue: number; anotherTransformedValue: number; transformedAt: Date;} // Create the first pipeline for raw data ingestion// Only create an API and a stream (no table) since we're ingesting the raw dataconst rawData = new IngestPipeline<RawData>("raw_data", { ingestApi: true, // Enable HTTP ingestion endpoint stream: true, // Create a stream to buffer data table: false // Don't store raw data in a table}); // Create an intermediate stream to hold data between transformations (no api or table needed)export const intermediateStream = new Stream<IntermediateData>("intermediate_stream"); // First transformation: double the value and add timestamprawData.stream.addTransform(intermediateStream, (record) => ({ id: record.id, transformedValue: record.value * 2, // Double the original value transformedAt: new Date() // Add current timestamp})); // Create the final pipeline that will store the fully transformed dataconst finalData = new IngestPipeline<FinalData>("final_stream", { ingestApi: false, // No direct ingestion to this pipeline stream: true, // Create a stream for processing table: true // Store final results in a table}); // Second transformation: further transform the intermediate dataintermediateStream.addTransform(finalData.stream, (record) => ({ id: record.id, transformedValue: record.transformedValue * 2, // Double the intermediate value anotherTransformedValue: record.transformedValue * 3, // Triple the intermediate value transformedAt: new Date() // Update timestamp}));export const highThroughputStream = new Stream<Data>("high_throughput", { parallelism: 4, // Process 4 records simultaneously retentionPeriod: 86400 // Keep data for 1 day});Control how Moose manages your stream resources when your code changes. See the LifeCycle Management guide for detailed information.
import { Stream, LifeCycle } from "@514labs/moose-lib"; // Production stream with external managementexport const prodStream = new Stream<Data>("prod_stream", { lifeCycle: LifeCycle.EXTERNALLY_MANAGED}); // Development stream with full managementexport const devStream = new Stream<Data>("dev_stream", { lifeCycle: LifeCycle.FULLY_MANAGED});See the API Reference for complete configuration options.