FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplatesGuides
Release Notes
Source514
  1. MooseStack
  2. Moose Streaming
  3. Create Streams

On this page

OverviewCreating StreamsStreams for IngestionStreams for TransformationsExample transformation using a lambda functionStream ConfigurationsParallelism and RetentionLifeCycle Management

Creating Streams

Overview

Streams serve as the transport layer between your data sources and database tables. Built on Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data.

Working with streams

Type safe transport layer

The schema of your Redpanda topic is derived from the data model you define

Buffer data from APIs

Data is buffered in the stream to protect against data loss during high load or service disruptions

Chain transformations on the fly

Add transformations between streams to process, reshape, and enrich data in-flight before landing in your database tables

Sync data to destination tables

Moose automatically syncs data from streams to your database tables when a destination is specified in your stream configuration

Creating Streams

You can create streams in two ways:

  • High-level: Using the IngestPipeline class (recommended)
  • Low-level: Manually configuring the Stream component

Streams for Ingestion

The IngestPipeline class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion:

IngestionStream.py
from moose_lib import IngestPipeline, IngestPipelineConfig, Keyfrom pydantic import BaseModel class RawData(BaseModel):    id: Key[str]    value: int raw_ingestion_stream = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(  ingest_api = True, # Creates an ingestion API endpoint at `POST /ingest/raw_data`  stream = True, # Buffers data between the ingestion API and the database table  table = True, # Creates an OLAP table named `raw_data`))

Explanation

Ingest API Writes to Stream

Data sent to the `POST /ingest/raw_data` endpoint is buffered in the `raw_data` stream

Stream to Table Sync

Moose automatically executes a process to sync the data from the `raw_data` stream to the `raw_data` OLAP table

Destination Table

Data is stored in the `raw_data` OLAP table

Streams for Transformations

If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data:

Single Stream Transformation

TransformDestinationStream.ts
import { IngestPipeline, Key } from "@514labs/moose-lib"; interface RawData {  id: Key<string>;  value: number;} interface TransformedData {  id: Key<string>;  transformedValue: number;  transformedAt: Date;} // Configure components for raw data ingestion & bufferingconst rawData = new IngestPipeline<RawData>("raw_data", {  ingestApi: true,  stream: true, // Buffers data between the ingestion API and the database table  table: false // Don't create a table for the raw data}); // Create a table for the transformed dataconst transformedData = new IngestPipeline<TransformedData>("transformed_data", {  ingestApi: false, // Don't create an ingestion API for the transformed data  stream: true, // Create destination stream for the transformed data  table: true // Create a table for the transformed data}); rawData.stream.addTransform(transformedData.stream, (record) => ({  id: record.id,  transformedValue: record.value * 2,  transformedAt: new Date()}));
TransformDestinationStream.py
# Import required librariesfrom moose_lib import IngestPipeline, Keyfrom pydantic import BaseModel # Define schema for raw incoming dataclass RawData(BaseModel):    id: Key[str]  # Primary key    value: int    # Value to be transformed # Define schema for transformed dataclass TransformedData(BaseModel):    id: Key[str]           # Primary key (preserved from raw data)    transformedValue: int  # Transformed value    transformedAt: Date    # Timestamp of transformation # Create pipeline for raw data - only for ingestion and streamingraw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(  ingest_api = True,   # Enable API endpoint  stream = True,   # Create stream for buffering  table = False    # No table needed for raw data)) # Create pipeline for transformed data - for storage onlytransformed_data = IngestPipeline[TransformedData]("transformed_data", IngestPipelineConfig(  ingest_api = False,  # No direct API endpoint  stream = True,   # Create stream to receive transformed data  table = True     # Store transformed data in table)) # Define a named transformation functiondef transform_function(record: RawData) -> TransformedData:  return TransformedData(    id=record.id,    transformedValue=record.value * 2,    transformedAt=datetime.now()  ) # Connect the streams with the transformation functionraw_data.get_stream().add_transform(  destination=transformed_data.get_stream(), # Use the get_stream() method to get the stream object from the IngestPipeline  transformation=transform_function # Can also define a lambda function)

Chaining Transformations

For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful:

ChainedTransformations.py
from moose_lib import IngestPipeline, Key, Stream, IngestPipelineConfig # Define the schema for raw input dataclass RawData(BaseModel):    id: Key[str]    value: int # Define the schema for intermediate transformed dataclass IntermediateData(BaseModel):    id: Key[str]    transformedValue: int    transformedAt: Date # Define the schema for final transformed dataclass FinalData(BaseModel):    id: Key[str]    transformedValue: int    anotherTransformedValue: int    transformedAt: Date  # Create the first pipeline for raw data ingestion# Only create an API and a stream (no table) since we're ingesting the raw dataraw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(  ingest_api=True,  stream=True,  table=False)) # Create an intermediate stream to hold data between transformations (no api or table needed)intermediate_stream = Stream[IntermediateData]("intermediate_stream") # First transformation: double the value and add timestampraw_data.get_stream().add_transform(destination=intermediate_stream, transformation=lambda record: IntermediateData(  id=record.id,  transformedValue=record.value * 2,  transformedAt=datetime.now())) # Create the final pipeline that will store the fully transformed datafinal_data = IngestPipeline[FinalData]("final_stream", IngestPipelineConfig(  ingest_api=False,  stream=True,  table=True)) # Second transformation: further transform the intermediate dataintermediate_stream.add_transform(destination=final_data.get_stream(), transformation=lambda record: FinalData(  id=record.id,  transformedValue=record.transformedValue * 2,  anotherTransformedValue=record.transformedValue * 3,  transformedAt=datetime.now()))

Stream Configurations

Parallelism and Retention

StreamConfig.py
from moose_lib import Stream, StreamConfig high_throughput_stream = Stream[Data]("high_throughput", StreamConfig(  parallelism=4, # Process 4 records simultaneously  retention_period=86400, # Keep data for 1 day))

LifeCycle Management

Control how Moose manages your stream resources when your code changes. See the LifeCycle Management guide for detailed information.

LifeCycleStreamConfig.py
from moose_lib import Stream, StreamConfig, LifeCycle # Production stream with external managementprod_stream = Stream[Data]("prod_stream", StreamConfig(  life_cycle=LifeCycle.EXTERNALLY_MANAGED)) # Development stream with full managementdev_stream = Stream[Data]("dev_stream", StreamConfig(  life_cycle=LifeCycle.FULLY_MANAGED))

See the API Reference for complete configuration options.

Use getters to get the stream object

Use the get_stream() method to get the stream object from the IngestPipeline to avoid errors when referencing the stream object.

  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streaming
    • Manage Streams
    • Create Streams
    • Sync to OLAP
    • Dead Letter Queues
    • Functions
    • Consumer Functions
    • Transformation Functions
    • Writing to Streams
    • From Your Code
    • Schema Registry
    • From CDC Services
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework