# Moose / Streaming / Create Stream Documentation – Python
## Included Files
1. moose/streaming/create-stream/create-stream.mdx
## Create Streams
Source: moose/streaming/create-stream/create-stream.mdx
Define and create Kafka/Redpanda topics with type-safe schemas
# Creating Streams
## Overview
Streams serve as the transport layer between your data sources and database tables. Built on Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data.
## Creating Streams
You can create streams in two ways:
- High-level: Using the `IngestPipeline` class (recommended)
- Low-level: Manually configuring the `Stream` component
### Streams for Ingestion
The `IngestPipeline` class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion:
```py filename="IngestionStream.py" copy {10}
from moose_lib import IngestPipeline, IngestPipelineConfig, Key
from pydantic import BaseModel
class RawData(BaseModel):
id: Key[str]
value: int
raw_ingestion_stream = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest_api = True, # Creates an ingestion API endpoint at `POST /ingest/raw_data`
stream = True, # Buffers data between the ingestion API and the database table
table = True, # Creates an OLAP table named `raw_data`
))
```
While the `IngestPipeline` provides a convenient way to set up streams with ingestion APIs and tables, you can also configure these components individually for more granular control:
```py filename="StreamObject.py" copy {8-12}
from moose_lib import Stream, StreamConfig, Key, IngestApi, IngestConfig
from pydantic import BaseModel
class RawData(BaseModel):
id: Key[str]
value: int
raw_table = OlapTable[RawData]("raw_data")
raw_stream = Stream[RawData]("raw_data", StreamConfig(
destination: raw_table # Optional: Specify a destination table for the stream, sets up a process to sync data from the stream to the table
))
raw_ingest_api = IngestApi[RawData]("raw_data", IngestConfig(
destination: raw_stream # Configure Moose to write all validated data to the stream
))
```
### Streams for Transformations
If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data:
#### Single Stream Transformation
```py filename="TransformDestinationStream.py" copy
# Import required libraries
from moose_lib import IngestPipeline, Key
from pydantic import BaseModel
# Define schema for raw incoming data
class RawData(BaseModel):
id: Key[str] # Primary key
value: int # Value to be transformed
# Define schema for transformed data
class TransformedData(BaseModel):
id: Key[str] # Primary key (preserved from raw data)
transformedValue: int # Transformed value
transformedAt: Date # Timestamp of transformation
# Create pipeline for raw data - only for ingestion and streaming
raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest_api = True, # Enable API endpoint
stream = True, # Create stream for buffering
table = False # No table needed for raw data
))
# Create pipeline for transformed data - for storage only
transformed_data = IngestPipeline[TransformedData]("transformed_data", IngestPipelineConfig(
ingest_api = False, # No direct API endpoint
stream = True, # Create stream to receive transformed data
table = True # Store transformed data in table
))
# Define a named transformation function
def transform_function(record: RawData) -> TransformedData:
return TransformedData(
id=record.id,
transformedValue=record.value * 2,
transformedAt=datetime.now()
)
# Connect the streams with the transformation function
raw_data.get_stream().add_transform(
destination=transformed_data.get_stream(), # Use the get_stream() method to get the stream object from the IngestPipeline
transformation=transform_function # Can also define a lambda function
)
```
Use the `get_stream()` method to get the stream object from the IngestPipeline to avoid errors when referencing the stream object.
You can use lambda functions to define transformations:
```py filename="TransformDestinationStream.py" copy
from moose_lib import Key, IngestApi, OlapTable, Stream
from pydantic import BaseModel
class RawData(BaseModel):
id: Key[str]
value: int
class TransformedData(BaseModel):
id: Key[str]
transformedValue: int
transformedAt: Date
# Create pipeline components for raw data - only for ingestion and streaming
raw_stream = Stream[RawData]("raw_data") ## No destination table since we're not storing the raw data
raw_api = IngestApi[RawData]("raw_data", IngestConfig(
destination=raw_stream ## Connect the ingestion API to the raw data stream
))
# Create pipeline components for transformed data - no ingestion API since we're not ingesting the transformed data
transformed_table = OlapTable[TransformedData]("transformed_data") ## Store the transformed data in a table
transformed_stream = Stream[TransformedData]("transformed_data", StreamConfig(destination=transformed_table)) ## Connect the transformed data stream to the destination table
## Example transformation using a lambda function
raw_stream.add_transform(
destination=transformed_stream,
transformation=lambda record: TransformedData(
id=record.id,
transformedValue=record.value * 2,
transformedAt=datetime.now()
)
)
```
#### Chaining Transformations
For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful:
```py filename="ChainedTransformations.py" copy
from moose_lib import IngestPipeline, Key, Stream, IngestPipelineConfig
# Define the schema for raw input data
class RawData(BaseModel):
id: Key[str]
value: int
# Define the schema for intermediate transformed data
class IntermediateData(BaseModel):
id: Key[str]
transformedValue: int
transformedAt: Date
# Define the schema for final transformed data
class FinalData(BaseModel):
id: Key[str]
transformedValue: int
anotherTransformedValue: int
transformedAt: Date
# Create the first pipeline for raw data ingestion
# Only create an API and a stream (no table) since we're ingesting the raw data
raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest_api=True,
stream=True,
table=False
))
# Create an intermediate stream to hold data between transformations (no api or table needed)
intermediate_stream = Stream[IntermediateData]("intermediate_stream")
# First transformation: double the value and add timestamp
raw_data.get_stream().add_transform(destination=intermediate_stream, transformation=lambda record: IntermediateData(
id=record.id,
transformedValue=record.value * 2,
transformedAt=datetime.now()
))
# Create the final pipeline that will store the fully transformed data
final_data = IngestPipeline[FinalData]("final_stream", IngestPipelineConfig(
ingest_api=False,
stream=True,
table=True
))
# Second transformation: further transform the intermediate data
intermediate_stream.add_transform(destination=final_data.get_stream(), transformation=lambda record: FinalData(
id=record.id,
transformedValue=record.transformedValue * 2,
anotherTransformedValue=record.transformedValue * 3,
transformedAt=datetime.now()
))
```
## Stream Configurations
### Parallelism and Retention
```py filename="StreamConfig.py" copy
from moose_lib import Stream, StreamConfig
high_throughput_stream = Stream[Data]("high_throughput", StreamConfig(
parallelism=4, # Process 4 records simultaneously
retention_period=86400, # Keep data for 1 day
))
```
### LifeCycle Management
Control how Moose manages your stream resources when your code changes. See the [LifeCycle Management guide](./lifecycle) for detailed information.
```py filename="LifeCycleStreamConfig.py" copy
from moose_lib import Stream, StreamConfig, LifeCycle
# Production stream with external management
prod_stream = Stream[Data]("prod_stream", StreamConfig(
life_cycle=LifeCycle.EXTERNALLY_MANAGED
))
# Development stream with full management
dev_stream = Stream[Data]("dev_stream", StreamConfig(
life_cycle=LifeCycle.FULLY_MANAGED
))
```
See the [API Reference](/moose/reference/ts-moose-lib#stream) for complete configuration options.