Creating Streams
Viewing:
Overview
Streams serve as the transport layer between your data sources and database tables. Built on Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data.
Working with streams
Type safe transport layer
The schema of your Redpanda topic is derived from the data model you define
Buffer data from APIs
Data is buffered in the stream to protect against data loss during high load or service disruptions
Chain transformations on the fly
Add transformations between streams to process, reshape, and enrich data in-flight before landing in your database tables
Sync data to destination tables
Moose automatically syncs data from streams to your database tables when a destination is specified in your stream configuration
Creating Streams
You can create streams in two ways:
- High-level: Using the
IngestPipeline
class (recommended) - Low-level: Manually configuring the
Stream
component
Streams for Ingestion
The IngestPipeline
class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion:
import { IngestPipeline, Key } from "@514labs/moose-lib";
interface RawData {
id: Key<string>;
value: number;
}
const rawIngestionStream = new IngestPipeline<RawData>("raw_data", {
ingest: true, // Creates an ingestion API endpoint at `POST /ingest/raw_data`
stream: true, // Buffers data between the ingestion API and the database table
table: true // Creates an OLAP table named `raw_data`
});
from moose_lib import IngestPipeline, Key
from pydantic import BaseModel
class RawData(BaseModel):
id: Key[str]
value: int
raw_ingestion_stream = IngestPipeline[RawData]("raw_data", {
ingest: True, # Creates an ingestion API endpoint at `POST /ingest/raw_data`
stream: True, # Buffers data between the ingestion API and the database table
table: True, # Creates an OLAP table named `raw_data`
})
Explanation
Ingest API Writes to Stream
Data sent to the `POST /ingest/raw_data` endpoint is buffered in the `raw_data` stream
Stream to Table Sync
Moose automatically executes a process to sync the data from the `raw_data` stream to the `raw_data` OLAP table
Destination Table
Data is stored in the `raw_data` OLAP table
Streams for Transformations
If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data:
Single Stream Transformation
import { IngestPipeline, Key } from "@514labs/moose-lib";
interface RawData {
id: Key<string>;
value: number;
}
interface TransformedData {
id: Key<string>;
transformedValue: number;
transformedAt: Date;
}
// Configure components for raw data ingestion & buffering
const rawData = new IngestPipeline<RawData>("raw_data", {
ingest: true,
stream: true, // Buffers data between the ingestion API and the database table
table: false // Don't create a table for the raw data
});
// Create a table for the transformed data
const transformedData = new IngestPipeline<TransformedData>("transformed_data", {
ingest: false, // Don't create an ingestion API for the transformed data
stream: true, // Create destination stream for the transformed data
table: true // Create a table for the transformed data
});
rawData.stream.addTransform(transformedData.stream, (record) => ({
id: record.id,
transformedValue: record.value * 2,
transformedAt: new Date()
}));
# Import required libraries
from moose_lib import IngestPipeline, Key
from pydantic import BaseModel
# Define schema for raw incoming data
class RawData(BaseModel):
id: Key[str] # Primary key
value: int # Value to be transformed
# Define schema for transformed data
class TransformedData(BaseModel):
id: Key[str] # Primary key (preserved from raw data)
transformedValue: int # Transformed value
transformedAt: Date # Timestamp of transformation
# Create pipeline for raw data - only for ingestion and streaming
raw_data = IngestPipeline[RawData]("raw_data", {
ingest: True, # Enable API endpoint
stream: True, # Create stream for buffering
table: False # No table needed for raw data
})
# Create pipeline for transformed data - for storage only
transformed_data = IngestPipeline[TransformedData]("transformed_data", {
ingest: False, # No direct API endpoint
stream: True, # Create stream to receive transformed data
table: True # Store transformed data in table
})
# Define a named transformation function
def transform_function(record: RawData) -> TransformedData:
return TransformedData(
id=record.id,
transformedValue=record.value * 2,
transformedAt=datetime.now()
)
# Connect the streams with the transformation function
raw_data.get_stream().add_transform(
destination=transformed_data.get_stream(), # Use the get_stream() method to get the stream object from the IngestPipeline
transformation=transform_function # Can also define a lambda function
)
Use getters to get the stream object
Use the get_stream()
method to get the stream object from the IngestPipeline to avoid errors when referencing the stream object.
Chaining Transformations
For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful:
import { IngestPipeline, Key } from "@514labs/moose-lib";
// Define the schema for raw input data
interface RawData {
id: Key<string>;
value: number;
}
// Define the schema for intermediate transformed data
interface IntermediateData {
id: Key<string>;
transformedValue: number;
transformedAt: Date;
}
// Define the schema for final transformed data
interface FinalData {
id: Key<string>;
transformedValue: number;
anotherTransformedValue: number;
transformedAt: Date;
}
// Create the first pipeline for raw data ingestion
// Only create an API and a stream (no table) since we're ingesting the raw data
const rawData = new IngestPipeline<RawData>("raw_data", {
ingest: true, // Enable HTTP ingestion endpoint
stream: true, // Create a stream to buffer data
table: false // Don't store raw data in a table
});
// Create an intermediate stream to hold data between transformations (no api or table needed)
const intermediateStream = new Stream<IntermediateData>("intermediate_stream");
// First transformation: double the value and add timestamp
rawData.stream.addTransform(intermediateStream, (record) => ({
id: record.id,
transformedValue: record.value * 2, // Double the original value
transformedAt: new Date() // Add current timestamp
}));
// Create the final pipeline that will store the fully transformed data
const finalData = new IngestPipeline<FinalData>("final_stream", {
ingest: false, // No direct ingestion to this pipeline
stream: true, // Create a stream for processing
table: true // Store final results in a table
});
// Second transformation: further transform the intermediate data
intermediateStream.addTransform(finalData.stream, (record) => ({
id: record.id,
transformedValue: record.transformedValue * 2, // Double the intermediate value
anotherTransformedValue: record.transformedValue * 3, // Triple the intermediate value
transformedAt: new Date() // Update timestamp
}));
from moose_lib import IngestPipeline, Key, Stream, IngestPipelineConfig
# Define the schema for raw input data
class RawData(BaseModel):
id: Key[str]
value: int
# Define the schema for intermediate transformed data
class IntermediateData(BaseModel):
id: Key[str]
transformedValue: int
transformedAt: Date
# Define the schema for final transformed data
class FinalData(BaseModel):
id: Key[str]
transformedValue: int
anotherTransformedValue: int
transformedAt: Date
# Create the first pipeline for raw data ingestion
# Only create an API and a stream (no table) since we're ingesting the raw data
raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest=True,
stream=True,
table=False
))
# Create an intermediate stream to hold data between transformations (no api or table needed)
intermediate_stream = Stream[IntermediateData]("intermediate_stream")
# First transformation: double the value and add timestamp
raw_data.get_stream().add_transform(destination=intermediate_stream, transformation=lambda record: IntermediateData(
id=record.id,
transformedValue=record.value * 2,
transformedAt=datetime.now()
))
# Create the final pipeline that will store the fully transformed data
final_data = IngestPipeline[FinalData]("final_stream", IngestPipelineConfig(
ingest=False,
stream=True,
table=True
))
# Second transformation: further transform the intermediate data
intermediate_stream.add_transform(destination=final_data.get_stream(), transformation=lambda record: FinalData(
id=record.id,
transformedValue=record.transformedValue * 2,
anotherTransformedValue=record.transformedValue * 3,
transformedAt=datetime.now()
))
Stream Configurations
Parallelism and Retention
const highThroughputStream = new Stream<Data>("high_throughput", {
parallelism: 4, // Process 4 records simultaneously
retentionPeriod: 86400 // Keep data for 1 day
});
from moose_lib import Stream, StreamConfig
high_throughput_stream = Stream[Data]("high_throughput", StreamConfig(
parallelism=4, # Process 4 records simultaneously
retention_period=86400, # Keep data for 1 day
))
LifeCycle Management
Control how Moose manages your stream resources when your code changes. See the LifeCycle Management guide for detailed information.
import { Stream, LifeCycle } from "@514labs/moose-lib";
// Production stream with external management
const prodStream = new Stream<Data>("prod_stream", {
lifeCycle: LifeCycle.EXTERNALLY_MANAGED
});
// Development stream with full management
const devStream = new Stream<Data>("dev_stream", {
lifeCycle: LifeCycle.FULLY_MANAGED
});
from moose_lib import Stream, StreamConfig, LifeCycle
# Production stream with external management
prod_stream = Stream[Data]("prod_stream", StreamConfig(
life_cycle=LifeCycle.EXTERNALLY_MANAGED
))
# Development stream with full management
dev_stream = Stream[Data]("dev_stream", StreamConfig(
life_cycle=LifeCycle.FULLY_MANAGED
))
See the API Reference for complete configuration options.