# Moose / Streaming / Create Stream Documentation – Python ## Included Files 1. moose/streaming/create-stream/create-stream.mdx ## Create Streams Source: moose/streaming/create-stream/create-stream.mdx Define and create Kafka/Redpanda topics with type-safe schemas # Creating Streams ## Overview Streams serve as the transport layer between your data sources and database tables. Built on Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data. ## Creating Streams You can create streams in two ways: - High-level: Using the `IngestPipeline` class (recommended) - Low-level: Manually configuring the `Stream` component ### Streams for Ingestion The `IngestPipeline` class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion: ```py filename="IngestionStream.py" copy {10} from moose_lib import IngestPipeline, IngestPipelineConfig, Key from pydantic import BaseModel class RawData(BaseModel): id: Key[str] value: int raw_ingestion_stream = IngestPipeline[RawData]("raw_data", IngestPipelineConfig( ingest_api = True, # Creates an ingestion API endpoint at `POST /ingest/raw_data` stream = True, # Buffers data between the ingestion API and the database table table = True, # Creates an OLAP table named `raw_data` )) ``` While the `IngestPipeline` provides a convenient way to set up streams with ingestion APIs and tables, you can also configure these components individually for more granular control: ```py filename="StreamObject.py" copy {8-12} from moose_lib import Stream, StreamConfig, Key, IngestApi, IngestConfig from pydantic import BaseModel class RawData(BaseModel): id: Key[str] value: int raw_table = OlapTable[RawData]("raw_data") raw_stream = Stream[RawData]("raw_data", StreamConfig( destination: raw_table # Optional: Specify a destination table for the stream, sets up a process to sync data from the stream to the table )) raw_ingest_api = IngestApi[RawData]("raw_data", IngestConfig( destination: raw_stream # Configure Moose to write all validated data to the stream )) ``` ### Streams for Transformations If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data: #### Single Stream Transformation ```py filename="TransformDestinationStream.py" copy # Import required libraries from moose_lib import IngestPipeline, Key from pydantic import BaseModel # Define schema for raw incoming data class RawData(BaseModel): id: Key[str] # Primary key value: int # Value to be transformed # Define schema for transformed data class TransformedData(BaseModel): id: Key[str] # Primary key (preserved from raw data) transformedValue: int # Transformed value transformedAt: Date # Timestamp of transformation # Create pipeline for raw data - only for ingestion and streaming raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig( ingest_api = True, # Enable API endpoint stream = True, # Create stream for buffering table = False # No table needed for raw data )) # Create pipeline for transformed data - for storage only transformed_data = IngestPipeline[TransformedData]("transformed_data", IngestPipelineConfig( ingest_api = False, # No direct API endpoint stream = True, # Create stream to receive transformed data table = True # Store transformed data in table )) # Define a named transformation function def transform_function(record: RawData) -> TransformedData: return TransformedData( id=record.id, transformedValue=record.value * 2, transformedAt=datetime.now() ) # Connect the streams with the transformation function raw_data.get_stream().add_transform( destination=transformed_data.get_stream(), # Use the get_stream() method to get the stream object from the IngestPipeline transformation=transform_function # Can also define a lambda function ) ``` Use the `get_stream()` method to get the stream object from the IngestPipeline to avoid errors when referencing the stream object. You can use lambda functions to define transformations: ```py filename="TransformDestinationStream.py" copy from moose_lib import Key, IngestApi, OlapTable, Stream from pydantic import BaseModel class RawData(BaseModel): id: Key[str] value: int class TransformedData(BaseModel): id: Key[str] transformedValue: int transformedAt: Date # Create pipeline components for raw data - only for ingestion and streaming raw_stream = Stream[RawData]("raw_data") ## No destination table since we're not storing the raw data raw_api = IngestApi[RawData]("raw_data", IngestConfig( destination=raw_stream ## Connect the ingestion API to the raw data stream )) # Create pipeline components for transformed data - no ingestion API since we're not ingesting the transformed data transformed_table = OlapTable[TransformedData]("transformed_data") ## Store the transformed data in a table transformed_stream = Stream[TransformedData]("transformed_data", StreamConfig(destination=transformed_table)) ## Connect the transformed data stream to the destination table ## Example transformation using a lambda function raw_stream.add_transform( destination=transformed_stream, transformation=lambda record: TransformedData( id=record.id, transformedValue=record.value * 2, transformedAt=datetime.now() ) ) ``` #### Chaining Transformations For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful: ```py filename="ChainedTransformations.py" copy from moose_lib import IngestPipeline, Key, Stream, IngestPipelineConfig # Define the schema for raw input data class RawData(BaseModel): id: Key[str] value: int # Define the schema for intermediate transformed data class IntermediateData(BaseModel): id: Key[str] transformedValue: int transformedAt: Date # Define the schema for final transformed data class FinalData(BaseModel): id: Key[str] transformedValue: int anotherTransformedValue: int transformedAt: Date # Create the first pipeline for raw data ingestion # Only create an API and a stream (no table) since we're ingesting the raw data raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig( ingest_api=True, stream=True, table=False )) # Create an intermediate stream to hold data between transformations (no api or table needed) intermediate_stream = Stream[IntermediateData]("intermediate_stream") # First transformation: double the value and add timestamp raw_data.get_stream().add_transform(destination=intermediate_stream, transformation=lambda record: IntermediateData( id=record.id, transformedValue=record.value * 2, transformedAt=datetime.now() )) # Create the final pipeline that will store the fully transformed data final_data = IngestPipeline[FinalData]("final_stream", IngestPipelineConfig( ingest_api=False, stream=True, table=True )) # Second transformation: further transform the intermediate data intermediate_stream.add_transform(destination=final_data.get_stream(), transformation=lambda record: FinalData( id=record.id, transformedValue=record.transformedValue * 2, anotherTransformedValue=record.transformedValue * 3, transformedAt=datetime.now() )) ``` ## Stream Configurations ### Parallelism and Retention ```py filename="StreamConfig.py" copy from moose_lib import Stream, StreamConfig high_throughput_stream = Stream[Data]("high_throughput", StreamConfig( parallelism=4, # Process 4 records simultaneously retention_period=86400, # Keep data for 1 day )) ``` ### LifeCycle Management Control how Moose manages your stream resources when your code changes. See the [LifeCycle Management guide](./lifecycle) for detailed information. ```py filename="LifeCycleStreamConfig.py" copy from moose_lib import Stream, StreamConfig, LifeCycle # Production stream with external management prod_stream = Stream[Data]("prod_stream", StreamConfig( life_cycle=LifeCycle.EXTERNALLY_MANAGED )) # Development stream with full management dev_stream = Stream[Data]("dev_stream", StreamConfig( life_cycle=LifeCycle.FULLY_MANAGED )) ``` See the [API Reference](/moose/reference/ts-moose-lib#stream) for complete configuration options.