# Moose / Streaming / Create Stream Documentation – TypeScript ## Included Files 1. moose/streaming/create-stream/create-stream.mdx ## Create Streams Source: moose/streaming/create-stream/create-stream.mdx Define and create Kafka/Redpanda topics with type-safe schemas # Creating Streams ## Overview Streams serve as the transport layer between your data sources and database tables. Built on Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data. ## Creating Streams You can create streams in two ways: - High-level: Using the `IngestPipeline` class (recommended) - Low-level: Manually configuring the `Stream` component ### Streams for Ingestion The `IngestPipeline` class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion: ```ts filename="IngestionStream.ts" copy {10} interface RawData { id: Key; value: number; } ); ``` While the `IngestPipeline` provides a convenient way to set up streams with ingestion APIs and tables, you can also configure these components individually for more granular control: ```ts filename="StreamObject.ts" copy {8-12} interface RawData { id: string; value: number; } // Create a table for the raw data ); // Create an ingestion API for the raw data ); ``` ### Streams for Transformations If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data: #### Single Stream Transformation ```ts filename="TransformDestinationStream.ts" copy interface RawData { id: Key; value: number; } interface TransformedData { id: Key; transformedValue: number; transformedAt: Date; } // Configure components for raw data ingestion & buffering const rawData = new IngestPipeline("raw_data", { ingestApi: true, stream: true, // Buffers data between the ingestion API and the database table table: false // Don't create a table for the raw data }); // Create a table for the transformed data const transformedData = new IngestPipeline("transformed_data", { ingestApi: false, // Don't create an ingestion API for the transformed data stream: true, // Create destination stream for the transformed data table: true // Create a table for the transformed data }); rawData.stream.addTransform(transformedData.stream, (record) => ({ id: record.id, transformedValue: record.value * 2, transformedAt: new Date() })); ``` ```ts filename="TransformDestinationStream.ts" copy interface RawData { id: Key; value: number; } interface TransformedData { id: Key; transformedValue: number; transformedAt: Date; } // Configure components for raw data ingestion & buffering ); // Configure components for transformed data stream & storage ); // Add a transform to the raw data stream to transform the data rawDataStream.addTransform(transformedStream, (record) => ({ id: record.id, transformedValue: record.value * 2, transformedAt: new Date() })); ``` #### Chaining Transformations For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful: ```ts filename="ChainedTransformations.ts" copy // Define the schema for raw input data interface RawData { id: Key; value: number; } // Define the schema for intermediate transformed data interface IntermediateData { id: Key; transformedValue: number; transformedAt: Date; } // Define the schema for final transformed data interface FinalData { id: Key; transformedValue: number; anotherTransformedValue: number; transformedAt: Date; } // Create the first pipeline for raw data ingestion // Only create an API and a stream (no table) since we're ingesting the raw data const rawData = new IngestPipeline("raw_data", { ingestApi: true, // Enable HTTP ingestion endpoint stream: true, // Create a stream to buffer data table: false // Don't store raw data in a table }); // Create an intermediate stream to hold data between transformations (no api or table needed) )); // Create the final pipeline that will store the fully transformed data const finalData = new IngestPipeline("final_stream", { ingestApi: false, // No direct ingestion to this pipeline stream: true, // Create a stream for processing table: true // Store final results in a table }); // Second transformation: further transform the intermediate data intermediateStream.addTransform(finalData.stream, (record) => ({ id: record.id, transformedValue: record.transformedValue * 2, // Double the intermediate value anotherTransformedValue: record.transformedValue * 3, // Triple the intermediate value transformedAt: new Date() // Update timestamp })); ``` ## Stream Configurations ### Parallelism and Retention ```typescript filename="StreamConfig.ts" ); ``` ### LifeCycle Management Control how Moose manages your stream resources when your code changes. See the [LifeCycle Management guide](./lifecycle) for detailed information. ```typescript filename="LifeCycleStreamConfig.ts" // Production stream with external management ); // Development stream with full management ); ``` See the [API Reference](/moose/reference/ts-moose-lib#stream) for complete configuration options.