# Moose / Streaming / Create Stream Documentation – TypeScript
## Included Files
1. moose/streaming/create-stream/create-stream.mdx
## Create Streams
Source: moose/streaming/create-stream/create-stream.mdx
Define and create Kafka/Redpanda topics with type-safe schemas
# Creating Streams
## Overview
Streams serve as the transport layer between your data sources and database tables. Built on Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data.
## Creating Streams
You can create streams in two ways:
- High-level: Using the `IngestPipeline` class (recommended)
- Low-level: Manually configuring the `Stream` component
### Streams for Ingestion
The `IngestPipeline` class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion:
```ts filename="IngestionStream.ts" copy {10}
interface RawData {
id: Key;
value: number;
}
);
```
While the `IngestPipeline` provides a convenient way to set up streams with ingestion APIs and tables, you can also configure these components individually for more granular control:
```ts filename="StreamObject.ts" copy {8-12}
interface RawData {
id: string;
value: number;
}
// Create a table for the raw data
);
// Create an ingestion API for the raw data
);
```
### Streams for Transformations
If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data:
#### Single Stream Transformation
```ts filename="TransformDestinationStream.ts" copy
interface RawData {
id: Key;
value: number;
}
interface TransformedData {
id: Key;
transformedValue: number;
transformedAt: Date;
}
// Configure components for raw data ingestion & buffering
const rawData = new IngestPipeline("raw_data", {
ingestApi: true,
stream: true, // Buffers data between the ingestion API and the database table
table: false // Don't create a table for the raw data
});
// Create a table for the transformed data
const transformedData = new IngestPipeline("transformed_data", {
ingestApi: false, // Don't create an ingestion API for the transformed data
stream: true, // Create destination stream for the transformed data
table: true // Create a table for the transformed data
});
rawData.stream.addTransform(transformedData.stream, (record) => ({
id: record.id,
transformedValue: record.value * 2,
transformedAt: new Date()
}));
```
```ts filename="TransformDestinationStream.ts" copy
interface RawData {
id: Key;
value: number;
}
interface TransformedData {
id: Key;
transformedValue: number;
transformedAt: Date;
}
// Configure components for raw data ingestion & buffering
);
// Configure components for transformed data stream & storage
);
// Add a transform to the raw data stream to transform the data
rawDataStream.addTransform(transformedStream, (record) => ({
id: record.id,
transformedValue: record.value * 2,
transformedAt: new Date()
}));
```
#### Chaining Transformations
For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful:
```ts filename="ChainedTransformations.ts" copy
// Define the schema for raw input data
interface RawData {
id: Key;
value: number;
}
// Define the schema for intermediate transformed data
interface IntermediateData {
id: Key;
transformedValue: number;
transformedAt: Date;
}
// Define the schema for final transformed data
interface FinalData {
id: Key;
transformedValue: number;
anotherTransformedValue: number;
transformedAt: Date;
}
// Create the first pipeline for raw data ingestion
// Only create an API and a stream (no table) since we're ingesting the raw data
const rawData = new IngestPipeline("raw_data", {
ingestApi: true, // Enable HTTP ingestion endpoint
stream: true, // Create a stream to buffer data
table: false // Don't store raw data in a table
});
// Create an intermediate stream to hold data between transformations (no api or table needed)
));
// Create the final pipeline that will store the fully transformed data
const finalData = new IngestPipeline("final_stream", {
ingestApi: false, // No direct ingestion to this pipeline
stream: true, // Create a stream for processing
table: true // Store final results in a table
});
// Second transformation: further transform the intermediate data
intermediateStream.addTransform(finalData.stream, (record) => ({
id: record.id,
transformedValue: record.transformedValue * 2, // Double the intermediate value
anotherTransformedValue: record.transformedValue * 3, // Triple the intermediate value
transformedAt: new Date() // Update timestamp
}));
```
## Stream Configurations
### Parallelism and Retention
```typescript filename="StreamConfig.ts"
);
```
### LifeCycle Management
Control how Moose manages your stream resources when your code changes. See the [LifeCycle Management guide](./lifecycle) for detailed information.
```typescript filename="LifeCycleStreamConfig.ts"
// Production stream with external management
);
// Development stream with full management
);
```
See the [API Reference](/moose/reference/ts-moose-lib#stream) for complete configuration options.