Stream Processing
Viewing typescript
switch to python
Overview
Streams serve as the transport layer between your data sources and database tables. Built On Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data.
Working with streams
Type safe transport layer
The schema of your Redpanda topic is derived from the data model you define
Buffer data from APIs
Data is buffered in the stream to protect against data loss during high load or service disruptions
Chain transformations on the fly
Add transformations between streams to process, reshape, and enrich data in-flight before landing in your database tables
Sync data to destination tables
Moose automatically syncs data from streams to your database tables when a destination is specified in your stream configuration
Creating Streams
You can create streams in two ways:
- High-level: Using the
IngestPipeline
class (recommended) - Low-level: Manually configuring the
Stream
component
Streams for Ingestion
The IngestPipeline
class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion:
import { IngestPipeline, Key } from "@514labs/moose-lib";
interface RawData {
id: Key<string>;
value: number;
}
const rawIngestionStream = new IngestPipeline<RawData>("raw_data", {
ingest: true, // Creates an ingestion API endpoint at `POST /ingest/raw_data`
stream: true, // Buffers data between the ingestion API and the database table
table: true // Creates an OLAP table named `raw_data`
});
from moose_lib import IngestPipeline, Key
from pydantic import BaseModel
class RawData(BaseModel):
id: Key[str]
value: int
raw_ingestion_stream = IngestPipeline[RawData]("raw_data", {
ingest: True, # Creates an ingestion API endpoint at `POST /ingest/raw_data`
stream: True, # Buffers data between the ingestion API and the database table
table: True, # Creates an OLAP table named `raw_data`
})
Explanation
Ingest API Writes to Stream
Data sent to the `POST /ingest/raw_data` endpoint is buffered in the `raw_data` stream
Stream to Table Sync
Moose automatically executes a process to sync the data from the `raw_data` stream to the `raw_data` OLAP table
Destination Table
Data is stored in the `raw_data` OLAP table
Streams for Transformations
If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data:
Single Stream Transformation
import { IngestPipeline, Key } from "@514labs/moose-lib";
interface RawData {
id: Key<string>;
value: number;
}
interface TransformedData {
id: Key<string>;
transformedValue: number;
transformedAt: Date;
}
// Configure components for raw data ingestion & buffering
const rawData = new IngestPipeline<RawData>("raw_data", {
ingest: true,
stream: true, // Buffers data between the ingestion API and the database table
table: false // Don't create a table for the raw data
});
// Create a table for the transformed data
const transformedData = new IngestPipeline<TransformedData>("transformed_data", {
ingest: false, // Don't create an ingestion API for the transformed data
stream: true, // Create destination stream for the transformed data
table: true // Create a table for the transformed data
});
rawData.stream.addTransform(transformedData.stream, (record) => ({
id: record.id,
transformedValue: record.value * 2,
transformedAt: new Date()
}));
# Import required libraries
from moose_lib import IngestPipeline, Key
from pydantic import BaseModel
# Define schema for raw incoming data
class RawData(BaseModel):
id: Key[str] # Primary key
value: int # Value to be transformed
# Define schema for transformed data
class TransformedData(BaseModel):
id: Key[str] # Primary key (preserved from raw data)
transformedValue: int # Transformed value
transformedAt: Date # Timestamp of transformation
# Create pipeline for raw data - only for ingestion and streaming
raw_data = IngestPipeline[RawData]("raw_data", {
ingest: True, # Enable API endpoint
stream: True, # Create stream for buffering
table: False # No table needed for raw data
})
# Create pipeline for transformed data - for storage only
transformed_data = IngestPipeline[TransformedData]("transformed_data", {
ingest: False, # No direct API endpoint
stream: True, # Create stream to receive transformed data
table: True # Store transformed data in table
})
# Define a named transformation function
def transform_function(record: RawData) -> TransformedData:
return TransformedData(
id=record.id,
transformedValue=record.value * 2,
transformedAt=datetime.now()
)
# Connect the streams with the transformation function
raw_data.get_stream().add_transform(
destination=transformed_data.get_stream(), # Use the get_stream() method to get the stream object from the IngestPipeline
transformation=transform_function # Can also define a lambda function
)
Use getters to get the stream object
Use the get_stream()
method to get the stream object from the IngestPipeline to avoid errors when referencing the stream object.
Chaining Transformations
For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful:
import { IngestPipeline, Key } from "@514labs/moose-lib";
// Define the schema for raw input data
interface RawData {
id: Key<string>;
value: number;
}
// Define the schema for intermediate transformed data
interface IntermediateData {
id: Key<string>;
transformedValue: number;
transformedAt: Date;
}
// Define the schema for final transformed data
interface FinalData {
id: Key<string>;
transformedValue: number;
anotherTransformedValue: number;
transformedAt: Date;
}
// Create the first pipeline for raw data ingestion
// Only create an API and a stream (no table) since we're ingesting the raw data
const rawData = new IngestPipeline<RawData>("raw_data", {
ingest: true, // Enable HTTP ingestion endpoint
stream: true, // Create a stream to buffer data
table: false // Don't store raw data in a table
});
// Create an intermediate stream to hold data between transformations (no api or table needed)
const intermediateStream = new Stream<IntermediateData>("intermediate_stream");
// First transformation: double the value and add timestamp
rawData.stream.addTransform(intermediateStream, (record) => ({
id: record.id,
transformedValue: record.value * 2, // Double the original value
transformedAt: new Date() // Add current timestamp
}));
// Create the final pipeline that will store the fully transformed data
const finalData = new IngestPipeline<FinalData>("final_stream", {
ingest: false, // No direct ingestion to this pipeline
stream: true, // Create a stream for processing
table: true // Store final results in a table
});
// Second transformation: further transform the intermediate data
intermediateStream.addTransform(finalData.stream, (record) => ({
id: record.id,
transformedValue: record.transformedValue * 2, // Double the intermediate value
anotherTransformedValue: record.transformedValue * 3, // Triple the intermediate value
transformedAt: new Date() // Update timestamp
}));
from moose_lib import IngestPipeline, Key, Stream, IngestPipelineConfig
# Define the schema for raw input data
class RawData(BaseModel):
id: Key[str]
value: int
# Define the schema for intermediate transformed data
class IntermediateData(BaseModel):
id: Key[str]
transformedValue: int
transformedAt: Date
# Define the schema for final transformed data
class FinalData(BaseModel):
id: Key[str]
transformedValue: int
anotherTransformedValue: int
transformedAt: Date
# Create the first pipeline for raw data ingestion
# Only create an API and a stream (no table) since we're ingesting the raw data
raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest=True,
stream=True,
table=False
))
# Create an intermediate stream to hold data between transformations (no api or table needed)
intermediate_stream = Stream[IntermediateData]("intermediate_stream")
# First transformation: double the value and add timestamp
raw_data.get_stream().add_transform(destination=intermediate_stream, transformation=lambda record: IntermediateData(
id=record.id,
transformedValue=record.value * 2,
transformedAt=datetime.now()
))
# Create the final pipeline that will store the fully transformed data
final_data = IngestPipeline[FinalData]("final_stream", IngestPipelineConfig(
ingest=False,
stream=True,
table=True
))
# Second transformation: further transform the intermediate data
intermediate_stream.add_transform(destination=final_data.get_stream(), transformation=lambda record: FinalData(
id=record.id,
transformedValue=record.transformedValue * 2,
anotherTransformedValue=record.transformedValue * 3,
transformedAt=datetime.now()
Implementing Transformations
Reshape and Enrich Data
Transform data shape or enrich records:
import { Stream, Key } from "@514labs/moose-lib";
interface RawEvent {
id: Key<string>;
timestamp: string;
data: {
user_id: string;
platform: string;
app_version: string;
ip_address: string;
}
}
interface EnrichedEvent {
eventId: Key<string>;
timestamp: Date;
userId: Key<string>;
properties: {
platform: string;
version: string;
country: string;
};
metadata: {
originalTimestamp: string;
processedAt: Date;
}
}
const rawStream = new Stream<RawEvent>("raw_events");
const enrichedStream = new Stream<EnrichedEvent>("enriched_events");
// Reshape and enrich data
rawStream.addTransform(enrichedStream, async (record: RawEvent) => ({
eventId: record.id,
timestamp: new Date(record.timestamp),
userId: record.data.user_id,
properties: {
platform: record.data.platform || 'unknown',
version: record.data.app_version,
country: await lookupCountry(record.data.ip_address)
},
metadata: {
originalTimestamp: record.timestamp,
processedAt: new Date()
}
}));
from moose_lib import Stream, Key
from pydantic import BaseModel
class EventProperties(BaseModel):
user_id: str
platform: str
app_version: str
ip_address: str
class RawEvent(BaseModel):
id: Key[str]
timestamp: str
data: EventProperties
class EnrichedEventProperties(BaseModel):
platform: str
version: str
country: str
class EnrichedEventMetadata(BaseModel):
originalTimestamp: str
processedAt: Date
class EnrichedEvent(BaseModel):
eventId: Key[str]
timestamp: Date
userId: Key[str]
properties: EnrichedEventProperties
metadata: EnrichedEventMetadata
raw_stream = Stream[RawEvent]("raw_events")
enriched_stream = Stream[EnrichedEvent]("enriched_events")
raw_stream.add_transform(destination=enriched_stream, transformation=lambda record: EnrichedEvent(
eventId=record.id,
timestamp=record.timestamp,
userId=record.data.user_id,
properties=EnrichedEventProperties(
platform=record.data.platform,
version=record.data.app_version,
country=await lookupCountry(record.data.ip_address)
),
metadata=EnrichedEventMetadata(
originalTimestamp=record.timestamp,
processedAt=datetime.now()
)
))
Filtering
Remove or filter records based on conditions:
interface MetricRecord {
id: string;
name: string;
value: number;
timestamp: Date;
}
const inputStream = new Stream<MetricRecord>("input_metrics");
const validMetrics = new Stream<MetricRecord>("valid_metrics");
// Multiple filtering conditions
inputStream.addTransform(validMetrics, (record) => {
// Filter out records with invalid values
if (isNaN(record.value) || record.value < 0) {
return undefined;
}
// Filter out old records
if (record.timestamp < getStartOfDay()) {
return undefined;
}
// Filter out specific metrics
if (record.name.startsWith('debug_')) {
return undefined;
}
return record;
});
from moose_lib import Stream, Key
from pydantic import BaseModel
class MetricRecord(BaseModel):
id: Key[str]
name: str
value: float
timestamp: Date
class ValidMetrics(BaseModel):
id: Key[str]
name: str
value: float
timestamp: Date
input_stream = Stream[MetricRecord]("input_metrics")
valid_metrics = Stream[ValidMetrics]("valid_metrics")
def filter_function(record: MetricRecord) -> ValidMetrics | None:
if record.value > 0 and record.timestamp > getStartOfDay() and not record.name.startswith('debug_'):
return ValidMetrics(
id=record.id,
name=record.name,
value=record.value,
timestamp=record.timestamp
)
return None
input_stream.add_transform(destination=valid_metrics, transformation=filter_function)
Fan Out (1:N)
Send data to multiple downstream processors:
interface Order {
orderId: string;
userId: string;
amount: number;
items: string[];
}
interface HighPriorityOrder extends Order {
priority: 'high';
}
interface ArchivedOrder extends Order {
archivedAt: Date;
}
// Define destination streams
const analyticsStream = new Stream<Order>("order_analytics");
const notificationStream = new Stream<HighPriorityOrder>("order_notifications");
const archiveStream = new Stream<ArchivedOrder>("order_archive");
// Source stream
const orderStream = new Stream<Order>("orders");
// Send all orders to analytics
orderStream.addTransform(analyticsStream, (order) => order);
// Send large orders to notifications
orderStream.addTransform(notificationStream, (order) => {
if (order.amount > 1000) {
return {
...order,
priority: 'high'
};
}
return undefined; // Skip small orders
});
// Archive all orders
orderStream.addTransform(archiveStream, (order) => ({
...order,
archivedAt: new Date()
}));
from moose_lib import Stream, Key
from pydantic import BaseModel
# Define data models
class Order(BaseModel):
orderId: Key[str]
userId: Key[str]
amount: float
items: List[str]
class HighPriorityOrder(Order):
priority: str = 'high'
class ArchivedOrder(Order):
archivedAt: Date
# Create source and destination streams
order_stream = Stream[Order]("orders")
analytics_stream = Stream[Order]("order_analytics")
notification_stream = Stream[HighPriorityOrder]("order_notifications")
archive_stream = Stream[ArchivedOrder]("order_archive")
# Send all orders to analytics
def analytics_transform(order: Order) -> Order:
return order
order_stream.add_transform(destination=analytics_stream, transformation=analytics_transform)
# Send large orders to notifications
def high_priority_transform(order: Order) -> HighPriorityOrder | None:
if order.amount > 1000:
return HighPriorityOrder(
orderId=order.orderId,
userId=order.userId,
amount=order.amount,
items=order.items,
priority='high'
)
return None # Skip small orders
order_stream.add_transform(destination=notification_stream, transformation=high_priority_transform)
# Archive all orders with timestamp
def archive_transform(order: Order) -> ArchivedOrder | None:
return ArchivedOrder(
orderId=order.orderId,
userId=order.userId,
amount=order.amount,
items=order.items,
archivedAt=datetime.now()
)
order_stream.add_transform(destination=archive_stream, transformation=archive_transform)
Fan In (N:1)
Combine data from multiple sources:
import { Stream, OlapTable, Key } from "@514labs/moose-lib";
interface UserEvent {
userId: Key<string>;
eventType: string;
timestamp: Date;
source: string;
}
// Source streams
const webEvents = new Stream<UserEvent>("web_events");
const mobileEvents = new Stream<UserEvent>("mobile_events");
const apiEvents = new Stream<UserEvent>("api_events");
// Create a stream and table for the combined events
const eventsTable = new OlapTable<UserEvent>("all_events");
const allEvents = new Stream<UserEvent>("all_events", {
destination: eventsTable
});
// Fan in from web
webEvents.addTransform(allEvents, (event) => ({
...event,
source: 'web',
timestamp: new Date()
}));
// Fan in from mobile
mobileEvents.addTransform(allEvents, (event) => ({
...event,
source: 'mobile',
timestamp: new Date()
}));
// Fan in from API
apiEvents.addTransform(allEvents, (event) => ({
...event,
source: 'api',
timestamp: new Date()
}));
from moose_lib import Stream, OlapTable, Key, StreamConfig
class UserEvent(BaseModel):
userId: Key[str]
eventType: str
timestamp: Date
source: str
# Create source and destination streams
web_events = Stream[UserEvent]("web_events")
mobile_events = Stream[UserEvent]("mobile_events")
api_events = Stream[UserEvent]("api_events")
# Create a stream and table for the combined events
events_table = OlapTable[UserEvent]("all_events")
all_events = Stream[UserEvent]("all_events", StreamConfig(
destination=events_table
))
# Fan in from web
def web_transform(event: UserEvent) -> UserEvent:
return UserEvent(
userId=event.userId,
eventType=event.eventType,
timestamp=event.timestamp,
source='web'
)
web_events.add_transform(destination=all_events, transformation=web_transform)
# Fan in from mobile
def mobile_transform(event: UserEvent) -> UserEvent:
return UserEvent(
userId=event.userId,
eventType=event.eventType,
timestamp=event.timestamp,
source='mobile'
)
mobile_events.add_transform(destination=all_events, transformation=mobile_transform)
# Fan in from API
def api_transform(event: UserEvent) -> UserEvent:
return UserEvent(
userId=event.userId,
eventType=event.eventType,
timestamp=event.timestamp,
source='api'
)
api_events.add_transform(destination=all_events, transformation=api_transform)
Unnesting
Flatten nested records:
import { Stream, Key } from "@514labs/moose-lib";
interface NestedRecord {
id: Key<string>;
nested: {
value: number;
}[];
}
interface FlattenedRecord {
id: Key<string>;
value: number;
}
const nestedStream = new Stream<NestedRecord>("nested_records");
const flattenedStream = new Stream<FlattenedRecord>("flattened_records");
nestedStream.addTransform(flattenedStream, (record) => record.nested.map((n) => ({
id: record.id,
value: n.value
})));
from moose_lib import Stream, Key
class NestedRecord(BaseModel):
id: Key[str]
nested: List[NestedValue]
class FlattenedRecord(BaseModel):
id: Key[str]
value: int
nested_stream = Stream[NestedRecord]("nested_records")
flattened_stream = Stream[FlattenedRecord]("flattened_records")
def unnest_transform(record: NestedRecord) -> List[FlattenedRecord]:
result = []
for nested in record.nested:
result.append(FlattenedRecord(
id=record.id,
value=nested.value
))
return result
nested_stream.add_transform(flattened_stream, unnest_transform)
See API Reference for all available transform options.
Stream Configurations
Parallelism and Retention
const highThroughputStream = new Stream<Data>("high_throughput", {
parallelism: 4, // Process 4 records simultaneously
retentionPeriod: 86400 // Keep data for 1 day
});
from moose_lib import Stream, StreamConfig
high_throughput_stream = Stream[Data]("high_throughput", StreamConfig(
parallelism=4, # Process 4 records simultaneously
retention_period=86400, # Keep data for 1 day
))
See the API Reference for complete configuration options.