# Moose / Streaming Documentation – Python
## Included Files
1. moose/streaming/connect-cdc.mdx
2. moose/streaming/consumer-functions.mdx
3. moose/streaming/create-stream.mdx
4. moose/streaming/dead-letter-queues.mdx
5. moose/streaming/from-your-code.mdx
6. moose/streaming/schema-registry.mdx
7. moose/streaming/sync-to-table.mdx
8. moose/streaming/transform-functions.mdx
## connect-cdc
Source: moose/streaming/connect-cdc.mdx
# Connect to CDC Services
Coming Soon!
---
## Streaming Consumer Functions
Source: moose/streaming/consumer-functions.mdx
Read and process data from streams with consumers and processors
# Streaming Consumer Functions
## Overview
Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures.
## Basic Usage
Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this:
```py filename="StreamConsumer.py" copy
from moose_lib import Stream
from pydantic import BaseModel
from datetime import datetime
class UserEvent(BaseModel):
id: str
user_id: str
timestamp: datetime
event_type: str
user_events_stream = Stream[UserEvent]("user-events")
# Add a consumer to process events
def process_event(event: UserEvent):
print(f"Processing event: {event.id}")
print(f"User: {event.user_id}, Type: {event.event_type}")
# Your processing logic here
# e.g., update analytics, send notifications, etc.
user_events_stream.add_consumer(process_event)
# Add multiple consumers for different purposes
def analytics_consumer(event: UserEvent):
# Analytics processing
if event.event_type == 'purchase':
update_purchase_analytics(event)
def notification_consumer(event: UserEvent):
# Notification processing
if event.event_type == 'signup':
send_welcome_email(event.user_id)
user_events_stream.add_consumer(analytics_consumer)
user_events_stream.add_consumer(notification_consumer)
```
## Processing Patterns
### Stateful Processing with MooseCache
Maintain state across event processing using MooseCache for distributed state management:
```py filename="StatefulProcessing.py" copy
from datetime import datetime
from typing import Dict, Any
from moose_lib import MooseCache
from pydantic import BaseModel
# State container for accumulating data
class AccumulatorState(BaseModel):
id: str
counter: int
sum: float
last_modified: datetime
attributes: Dict[str, Any]
# Input message structure
class InputMessage(BaseModel):
id: str
group_id: str
numeric_value: float
message_type: str
timestamp: datetime
payload: Dict[str, Any]
message_stream = Stream[InputMessage]("input-stream")
# Initialize distributed cache
cache = MooseCache()
def process_message(message: InputMessage):
cache_key = f"state:{message.group_id}"
# Load existing state or create new one
state = cache.get(cache_key, AccumulatorState)
if not state:
# Initialize new state
state = AccumulatorState(
id=message.group_id,
counter=0,
sum=0.0,
last_modified=datetime.now(),
attributes={}
)
# Apply message to state
state.counter += 1
state.sum += message.numeric_value
state.last_modified = message.timestamp
state.attributes.update(message.payload)
# Determine cache lifetime based on message type
ttl_seconds = 60 if message.message_type == 'complete' else 3600
if message.message_type == 'complete' or should_finalize(state):
# Finalize and remove state
finalize_state(state)
cache.delete(cache_key)
else:
# Persist updated state
cache.set(cache_key, state, ttl_seconds=ttl_seconds)
def should_finalize(state: AccumulatorState) -> bool:
"""Condition for automatic state finalization"""
threshold = 100
time_limit_seconds = 30 * 60 # 30 minutes
elapsed = (datetime.now() - state.last_modified).total_seconds()
return state.counter >= threshold or elapsed > time_limit_seconds
def finalize_state(state: AccumulatorState):
print(f"Finalizing state {state.id}: counter={state.counter}, sum={state.sum}")
message_stream.add_consumer(process_message)
```
## Propagating Events to External Systems
You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur:
### HTTP API Calls
Send processed data to external APIs:
```py filename="HttpIntegration.py" copy
from datetime import datetime
from typing import Dict, Any
from pydantic import BaseModel
class WebhookPayload(BaseModel):
id: str
data: Dict[str, Any]
timestamp: datetime
webhook_stream = Stream[WebhookPayload]("webhook-events")
async def send_to_external_api(payload: WebhookPayload):
try:
async with httpx.AsyncClient() as client:
response = await client.post(
'https://external-api.com/webhook',
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {os.getenv("API_TOKEN")}'
},
json={
'event_id': payload.id,
'event_data': payload.data,
'processed_at': datetime.now().isoformat()
}
)
if response.status_code != 200:
raise Exception(f"HTTP {response.status_code}: {response.text}")
print(f"Successfully sent event {payload.id} to external API")
except Exception as error:
print(f"Failed to send event {payload.id}: {error}")
# Could implement retry logic or dead letter queue here
webhook_stream.add_consumer(send_to_external_api)
```
#### Database Operations
Write processed data to external databases:
```py filename="DatabaseIntegration.py" copy
from datetime import datetime
from typing import Dict, Any
from pydantic import BaseModel
class DatabaseRecord(BaseModel):
id: str
category: str
value: float
metadata: Dict[str, Any]
timestamp: datetime
db_stream = Stream[DatabaseRecord]("database-events")
async def insert_to_database(record: DatabaseRecord):
try:
# Connect to PostgreSQL database
conn = await asyncpg.connect(
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
# Insert record into external database
await conn.execute(
'''
INSERT INTO processed_events (id, category, value, metadata, created_at)
VALUES ($1, $2, $3, $4, $5)
''',
record.id,
record.category,
record.value,
json.dumps(record.metadata),
record.timestamp
)
print(f"Inserted record {record.id} into database")
except Exception as error:
print(f"Database insert failed for record {record.id}: {error}")
finally:
if 'conn' in locals():
await conn.close()
db_stream.add_consumer(insert_to_database)
```
#### File System Operations
Write processed data to files or cloud storage:
```py filename="FileSystemIntegration.py" copy
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
class FileOutput(BaseModel):
id: str
filename: str
content: str
directory: str
format: Literal['json', 'csv', 'txt']
file_stream = Stream[FileOutput]("file-events")
async def write_to_file(output: FileOutput):
try:
# Ensure directory exists
os.makedirs(output.directory, exist_ok=True)
# Format content based on type
if output.format == 'json':
formatted_content = json.dumps(json.loads(output.content), indent=2)
else:
formatted_content = output.content
# Write file with timestamp
timestamp = datetime.now().isoformat().replace(':', '-').replace('.', '-')
filename = f"{output.filename}_{timestamp}.{output.format}"
filepath = os.path.join(output.directory, filename)
async with aiofiles.open(filepath, 'w', encoding='utf-8') as f:
await f.write(formatted_content)
print(f"Written file: {filepath}")
except Exception as error:
print(f"Failed to write file for output {output.id}: {error}")
file_stream.add_consumer(write_to_file)
```
#### Email and Notifications
Send alerts and notifications based on processed events:
```py filename="NotificationIntegration.py" copy
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, Any, Literal
from pydantic import BaseModel
class NotificationEvent(BaseModel):
id: str
type: Literal['email', 'slack', 'webhook']
recipient: str
subject: str
message: str
priority: Literal['low', 'medium', 'high']
metadata: Dict[str, Any]
notification_stream = Stream[NotificationEvent]("notifications")
async def send_notification(notification: NotificationEvent):
try:
if notification.type == 'email':
# Send email
msg = MIMEMultipart()
msg['From'] = os.getenv('SMTP_FROM')
msg['To'] = notification.recipient
msg['Subject'] = notification.subject
body = f"""
{notification.message}
Priority: {notification.priority}
"""
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(os.getenv('SMTP_HOST'), int(os.getenv('SMTP_PORT', '587')))
server.starttls()
server.login(os.getenv('SMTP_USER'), os.getenv('SMTP_PASS'))
server.send_message(msg)
server.quit()
elif notification.type == 'slack':
# Send to Slack
async with httpx.AsyncClient() as client:
await client.post(
f"https://hooks.slack.com/services/{os.getenv('SLACK_WEBHOOK')}",
json={
'text': notification.message,
'channel': notification.recipient,
'username': 'Moose Alert',
'icon_emoji': ':warning:' if notification.priority == 'high' else ':information_source:'
}
)
elif notification.type == 'webhook':
# Send to webhook
async with httpx.AsyncClient() as client:
await client.post(
notification.recipient,
json={
'id': notification.id,
'subject': notification.subject,
'message': notification.message,
'priority': notification.priority,
'metadata': notification.metadata
}
)
print(f"Sent {notification.type} notification {notification.id}")
except Exception as error:
print(f"Failed to send notification {notification.id}: {error}")
notification_stream.add_consumer(send_notification)
```
---
## Create Streams
Source: moose/streaming/create-stream.mdx
Define and create Kafka/Redpanda topics with type-safe schemas
# Creating Streams
## Overview
Streams serve as the transport layer between your data sources and database tables. Built on Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data.
## Creating Streams
You can create streams in two ways:
- High-level: Using the `IngestPipeline` class (recommended)
- Low-level: Manually configuring the `Stream` component
### Streams for Ingestion
The `IngestPipeline` class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion:
```py filename="IngestionStream.py" copy {10}
from moose_lib import IngestPipeline, IngestPipelineConfig, Key
from pydantic import BaseModel
class RawData(BaseModel):
id: Key[str]
value: int
raw_ingestion_stream = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest_api = True, # Creates an ingestion API endpoint at `POST /ingest/raw_data`
stream = True, # Buffers data between the ingestion API and the database table
table = True, # Creates an OLAP table named `raw_data`
))
```
While the `IngestPipeline` provides a convenient way to set up streams with ingestion APIs and tables, you can also configure these components individually for more granular control:
```py filename="StreamObject.py" copy {8-12}
from moose_lib import Stream, StreamConfig, Key, IngestApi, IngestConfig
from pydantic import BaseModel
class RawData(BaseModel):
id: Key[str]
value: int
raw_table = OlapTable[RawData]("raw_data")
raw_stream = Stream[RawData]("raw_data", StreamConfig(
destination: raw_table # Optional: Specify a destination table for the stream, sets up a process to sync data from the stream to the table
))
raw_ingest_api = IngestApi[RawData]("raw_data", IngestConfig(
destination: raw_stream # Configure Moose to write all validated data to the stream
))
```
### Streams for Transformations
If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data:
#### Single Stream Transformation
```py filename="TransformDestinationStream.py" copy
# Import required libraries
from moose_lib import IngestPipeline, Key
from pydantic import BaseModel
# Define schema for raw incoming data
class RawData(BaseModel):
id: Key[str] # Primary key
value: int # Value to be transformed
# Define schema for transformed data
class TransformedData(BaseModel):
id: Key[str] # Primary key (preserved from raw data)
transformedValue: int # Transformed value
transformedAt: Date # Timestamp of transformation
# Create pipeline for raw data - only for ingestion and streaming
raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest_api = True, # Enable API endpoint
stream = True, # Create stream for buffering
table = False # No table needed for raw data
))
# Create pipeline for transformed data - for storage only
transformed_data = IngestPipeline[TransformedData]("transformed_data", IngestPipelineConfig(
ingest_api = False, # No direct API endpoint
stream = True, # Create stream to receive transformed data
table = True # Store transformed data in table
))
# Define a named transformation function
def transform_function(record: RawData) -> TransformedData:
return TransformedData(
id=record.id,
transformedValue=record.value * 2,
transformedAt=datetime.now()
)
# Connect the streams with the transformation function
raw_data.get_stream().add_transform(
destination=transformed_data.get_stream(), # Use the get_stream() method to get the stream object from the IngestPipeline
transformation=transform_function # Can also define a lambda function
)
```
Use the `get_stream()` method to get the stream object from the IngestPipeline to avoid errors when referencing the stream object.
You can use lambda functions to define transformations:
```py filename="TransformDestinationStream.py" copy
from moose_lib import Key, IngestApi, OlapTable, Stream
from pydantic import BaseModel
class RawData(BaseModel):
id: Key[str]
value: int
class TransformedData(BaseModel):
id: Key[str]
transformedValue: int
transformedAt: Date
# Create pipeline components for raw data - only for ingestion and streaming
raw_stream = Stream[RawData]("raw_data") ## No destination table since we're not storing the raw data
raw_api = IngestApi[RawData]("raw_data", IngestConfig(
destination=raw_stream ## Connect the ingestion API to the raw data stream
))
# Create pipeline components for transformed data - no ingestion API since we're not ingesting the transformed data
transformed_table = OlapTable[TransformedData]("transformed_data") ## Store the transformed data in a table
transformed_stream = Stream[TransformedData]("transformed_data", StreamConfig(destination=transformed_table)) ## Connect the transformed data stream to the destination table
## Example transformation using a lambda function
raw_stream.add_transform(
destination=transformed_stream,
transformation=lambda record: TransformedData(
id=record.id,
transformedValue=record.value * 2,
transformedAt=datetime.now()
)
)
```
#### Chaining Transformations
For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful:
```py filename="ChainedTransformations.py" copy
from moose_lib import IngestPipeline, Key, Stream, IngestPipelineConfig
# Define the schema for raw input data
class RawData(BaseModel):
id: Key[str]
value: int
# Define the schema for intermediate transformed data
class IntermediateData(BaseModel):
id: Key[str]
transformedValue: int
transformedAt: Date
# Define the schema for final transformed data
class FinalData(BaseModel):
id: Key[str]
transformedValue: int
anotherTransformedValue: int
transformedAt: Date
# Create the first pipeline for raw data ingestion
# Only create an API and a stream (no table) since we're ingesting the raw data
raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest_api=True,
stream=True,
table=False
))
# Create an intermediate stream to hold data between transformations (no api or table needed)
intermediate_stream = Stream[IntermediateData]("intermediate_stream")
# First transformation: double the value and add timestamp
raw_data.get_stream().add_transform(destination=intermediate_stream, transformation=lambda record: IntermediateData(
id=record.id,
transformedValue=record.value * 2,
transformedAt=datetime.now()
))
# Create the final pipeline that will store the fully transformed data
final_data = IngestPipeline[FinalData]("final_stream", IngestPipelineConfig(
ingest_api=False,
stream=True,
table=True
))
# Second transformation: further transform the intermediate data
intermediate_stream.add_transform(destination=final_data.get_stream(), transformation=lambda record: FinalData(
id=record.id,
transformedValue=record.transformedValue * 2,
anotherTransformedValue=record.transformedValue * 3,
transformedAt=datetime.now()
))
```
## Stream Configurations
### Parallelism and Retention
```py filename="StreamConfig.py" copy
from moose_lib import Stream, StreamConfig
high_throughput_stream = Stream[Data]("high_throughput", StreamConfig(
parallelism=4, # Process 4 records simultaneously
retention_period=86400, # Keep data for 1 day
))
```
### LifeCycle Management
Control how Moose manages your stream resources when your code changes. See the [LifeCycle Management guide](./lifecycle) for detailed information.
```py filename="LifeCycleStreamConfig.py" copy
from moose_lib import Stream, StreamConfig, LifeCycle
# Production stream with external management
prod_stream = Stream[Data]("prod_stream", StreamConfig(
life_cycle=LifeCycle.EXTERNALLY_MANAGED
))
# Development stream with full management
dev_stream = Stream[Data]("dev_stream", StreamConfig(
life_cycle=LifeCycle.FULLY_MANAGED
))
```
See the [API Reference](/moose/reference/ts-moose-lib#stream) for complete configuration options.
---
## Dead Letter Queues
Source: moose/streaming/dead-letter-queues.mdx
Handle failed stream processing with dead letter queues
# Dead Letter Queues
## Overview
Dead Letter Queues (DLQs) provide a robust error handling mechanism for stream processing in Moose. When streaming functions fail during transformation or consumption, failed messages are automatically routed to a configured dead letter queue for later analysis and recovery.
## Dead Letter Record Structure
When a message fails processing, Moose creates a dead letter record with the following structure:
```py
class DeadLetterModel(BaseModel, Generic[T]):
original_record: Any # The original message that failed
error_message: str # Error description
error_type: str # Error class/type name
failed_at: datetime.datetime # Timestamp when failure occurred
source: Literal["api", "transform", "table"] # Where the failure happened
def as_typed(self) -> T: # Type-safe access to original record
return self._t.model_validate(self.original_record)
```
## Creating Dead Letter Queues
### Basic Setup
```py filename="dead-letter-setup.py" copy
from moose_lib import DeadLetterQueue
from pydantic import BaseModel
# Define your data model
class UserEvent(BaseModel):
user_id: str
action: str
timestamp: float
# Create a dead letter queue for UserEvent failures
user_event_dlq = DeadLetterQueue[UserEvent](name="UserEventDLQ")
```
### Configuring Transformations with Dead Letter Queues
Add a dead letter queue to your Transformation Function configuration, and any errors thrown in the transformation will trigger the event to be routed to the dead letter queue.
```py filename="transform-with-dlq.py" copy
from moose_lib import DeadLetterQueue, TransformConfig
# Create dead letter queue
event_dlq = DeadLetterQueue[RawEvent](name="EventDLQ")
# Define transformation function, including errors to trigger DLQ
def process_event(event: RawEvent) -> ProcessedEvent:
# This transform might fail for invalid data
if not event.user_id or len(event.user_id) == 0:
raise ValueError("Invalid user_id: cannot be empty")
if event.timestamp < 0:
raise ValueError("Invalid timestamp: cannot be negative")
return ProcessedEvent(
user_id=event.user_id,
action=event.action,
processed_at=datetime.now(),
is_valid=True
)
# Add transform with DLQ configuration
raw_events.get_stream().add_transform(
destination=processed_events.get_stream(),
transformation=process_event,
config=TransformConfig(
dead_letter_queue=event_dlq # Configure DLQ for this transform
)
)
```
### Configuring Consumers with Dead Letter Queues
Add a dead letter queue to your Consumer Function configuration, and any errors thrown in the function will trigger the event to be routed to the dead letter queue.
```py filename="consumer-with-dlq.py" copy
from moose_lib import ConsumerConfig
# Define consumer function with errors to trigger DLQ
def process_event_consumer(event: RawEvent) -> None:
# This consumer might fail for certain events
if event.action == "forbidden_action":
raise ValueError("Forbidden action detected")
# Process the event (e.g., send to external API)
print(f"Processing event for user {event.user_id}")
# Add consumer with DLQ configuration
raw_events.get_stream().add_consumer(
consumer=process_event_consumer,
config=ConsumerConfig(
dead_letter_queue=event_dlq # Configure DLQ for this consumer
)
)
```
### Configuring Ingest APIs with Dead Letter Queues
Add a dead letter queue to your Ingest API configuration, and any runtime data validation failures at the API will trigger the event to be routed to the dead letter queue.
```python filename="ValidationExample.py" copy
from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfig
from pydantic import BaseModel
class Properties(BaseModel):
device: Optional[str]
version: Optional[int]
class ExampleModel(BaseModel):
id: str
userId: str
timestamp: datetime
properties: Properties
api = IngestApi[ExampleModel]("your-api-route", IngestConfig(
destination=Stream[ExampleModel]("your-stream-name"),
dead_letter_queue=DeadLetterQueue[ExampleModel]("your-dlq-name")
))
```
## Processing Dead Letter Messages
### Monitoring Dead Letter Queues
```py filename="dlq-monitoring.py" copy
def monitor_dead_letters(dead_letter: DeadLetterModel[RawEvent]) -> None:
print("Dead letter received:")
print(f"Error: {dead_letter.error_message}")
print(f"Error Type: {dead_letter.error_type}")
print(f"Failed At: {dead_letter.failed_at}")
print(f"Source: {dead_letter.source}")
# Access the original typed data
original_event: RawEvent = dead_letter.as_typed()
print(f"Original User ID: {original_event.user_id}")
# Add consumer to monitor dead letter messages
event_dlq.add_consumer(monitor_dead_letters)
```
### Recovery and Retry Logic
```py filename="dlq-recovery.py" copy
from moose_lib import Stream
from typing import Optional
# Create a recovery stream for fixed messages
recovered_events = Stream[ProcessedEvent]("recovered_events", {
"destination": processed_events.get_table() # Send recovered data to main table
})
def recover_event(dead_letter: DeadLetterModel[RawEvent]) -> Optional[ProcessedEvent]:
try:
original_event = dead_letter.as_typed()
# Apply fixes based on error type
if "Invalid user_id" in dead_letter.error_message:
# Skip events with invalid user IDs
return None
if "Invalid timestamp" in dead_letter.error_message:
# Fix negative timestamps
fixed_timestamp = abs(original_event.timestamp)
return ProcessedEvent(
user_id=original_event.user_id,
action=original_event.action,
processed_at=datetime.now(),
is_valid=True
)
return None # Skip other errors
except Exception as error:
print(f"Recovery failed: {error}")
return None
# Add recovery logic to the DLQ
event_dlq.add_transform(
destination=recovered_events,
transformation=recover_event
)
```
## Best Practices
## Common Patterns
### Circuit Breaker Pattern
### Retry with Exponential Backoff
Dead letter queues add overhead to stream processing. Use them judiciously and monitor their impact on throughput. Consider implementing sampling for high-volume streams where occasional message loss is acceptable.
Dead letter queue events can be integrated with monitoring systems like Prometheus, DataDog, or CloudWatch for alerting and dashboards. Consider tracking metrics like DLQ message rate, error types, and recovery success rates.
## Using Dead Letter Queues in Ingestion Pipelines
Dead Letter Queues (DLQs) can be directly integrated with your ingestion pipelines to capture records that fail validation or processing at the API entry point. This ensures that no data is lost, even if it cannot be immediately processed.
```python filename="IngestPipelineWithDLQ.py" copy
from moose_lib import IngestPipeline, DeadLetterQueue, IngestPipelineConfig
from pydantic import BaseModel
class ExampleSchema(BaseModel):
id: str
name: str
value: int
timestamp: datetime
example_dlq = DeadLetterQueue[ExampleSchema](name="exampleDLQ")
pipeline = IngestPipeline[ExampleSchema](
name="example",
config=IngestPipelineConfig(
ingest_api=True,
stream=True,
table=True,
dead_letter_queue=True # Route failed ingestions to DLQ
)
)
```
See the [Ingestion API documentation](/moose/apis/ingest-api#validation) for more details and best practices on configuring DLQs for ingestion.
---
## Publish Data
Source: moose/streaming/from-your-code.mdx
Write data to streams from applications, APIs, or external sources
# Publishing Data to Streams
## Overview
Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines.
## Publishing Methods
### Using REST APIs
The most common way to publish data is through Moose's built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint:
```py filename="PublishViaAPI.py" copy
from moose_lib import IngestPipeline, IngestPipelineConfig
# When you create an IngestPipeline with ingest_api: True, Moose automatically creates an API endpoint
raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig(
ingest_api=True, # Creates POST /ingest/raw_data endpoint
stream=True,
table=True
))
# You can then publish data via HTTP POST requests
response = requests.post('/ingest/raw_data', json={
'id': '123',
'value': 42
})
```
See the [OpenAPI documentation](/stack/open-api) to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs.
### Direct Stream Publishing
You can publish directly to a stream from your Moose code using the stream's `send` method.
This is useful when emitting events from workflows or other backend logic.
`send` accepts a single record or an array of records.
If your `Stream` is configured with `schemaConfig.kind = "JSON"`,
Moose produces using the Confluent envelope automatically (0x00 + schema id + JSON).
No code changes are needed beyond setting `schemaConfig`. See the [Schema Registry guide](/moose/streaming/schema-registry).
```py filename="DirectPublish.py" copy
from moose_lib import Stream, StreamConfig, Key
from pydantic import BaseModel
from datetime import datetime
class UserEvent(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
# Create a stream (optionally pass StreamConfig with destination/table settings)
events = Stream[UserEvent]("user_events", StreamConfig())
# Publish a single record
events.send(UserEvent(
id="evt_1",
user_id="user_123",
timestamp=datetime.now(),
event_type="click"
))
# Publish multiple records
events.send([
UserEvent(id="evt_2", user_id="user_456", timestamp=datetime.now(), event_type="view"),
UserEvent(id="evt_3", user_id="user_789", timestamp=datetime.now(), event_type="signup"),
])
```
Moose builds the Kafka topic name from your stream name,
optional namespace, and optional version (dots become underscores).
For example, a stream named `events` with version `1.2.0` becomes `events_1_2_0`
(or `my_ns.events_1_2_0` when the namespace is `"my_ns"`).
### Using the Kafka/Redpanda Client from External Applications
You can also publish to streams from external applications using Kafka/Redpanda clients:
```py filename="ExternalPublish.py" copy
from kafka import KafkaProducer
from datetime import datetime
producer = KafkaProducer(
bootstrap_servers=['localhost:19092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Publish to the stream topic
producer.send('user-events', { # Stream name becomes the topic name
'id': 'event-123',
'user_id': 'user-456',
'timestamp': datetime.now().isoformat(),
'event_type': 'page_view'
})
```
#### Locating Redpanda Connection Details
When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the `moose.config.toml` file in the root of your project:
```toml filename="moose.config.toml" copy
[redpanda_config]
broker = "localhost:19092"
message_timeout_ms = 1000
retention_ms = 30000
replication_factor = 1
```
---
## Schema Registry
Source: moose/streaming/schema-registry.mdx
Use Confluent Schema Registry with Moose streams (JSON Schema first)
# Schema Registry Integration
The first supported encoding is JSON Schema. Avro and Protobuf are planned.
## Overview
Moose can publish and consume Kafka/Redpanda messages using Confluent Schema Registry. The first supported encoding is JSON Schema; Avro and Protobuf are planned.
## Configure Schema Registry URL
Set the Schema Registry URL in `moose.config.toml` under `redpanda_config` (aliased as `kafka_config`). You can also override with environment variables.
```toml filename="moose.config.toml" copy
[redpanda_config]
broker = "localhost:19092"
schema_registry_url = "http://localhost:8081"
```
Environment overrides (either key works):
```bash filename="Terminal" copy
export MOOSE_REDPANDA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081
# or
export MOOSE_KAFKA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081
```
## Referencing Schemas
You can attach a Schema Registry reference to any `Stream` via `schemaConfig`. Use one of:
- Subject latest: `{ subjectLatest: string }`
- Subject and version: `{ subject: string, version: number }`
- Schema id: `{ id: number }`
```py filename="sr_stream.py" copy {13-21}
from moose_lib import Stream, StreamConfig
from moose_lib.dmv2.stream import KafkaSchemaConfig, SubjectLatest
from pydantic import BaseModel
class Event(BaseModel):
id: str
value: int
schema_config = KafkaSchemaConfig(
kind="JSON",
reference=SubjectLatest(name="event-value"),
)
events = Stream[Event](
"events",
StreamConfig(schema_config=schema_config),
)
events.send(Event(id="e1", value=42))
```
## Consuming SR JSON in Runners
Moose streaming runners automatically detect the Confluent JSON envelope
when consuming and strip the header before parsing the JSON.
Your transformation code continues to work unchanged.
## Ingestion APIs and SR
When an Ingest API routes to a topic that has a `schemaConfig` of kind JSON,
Moose resolves the schema id and publishes requests using the Schema Registry envelope.
You can also set the reference to a fixed `id` to skip lookups.
## Discover existing topics and schemas
Use the CLI to pull external topics and optionally fetch JSON Schemas from Schema Registry to emit typed models.
```bash filename="Terminal" copy
moose kafka pull \
--schema-registry http://localhost:8081 \
--path app/external-topics \
--include "*" \
--exclude "{__consumer_offsets,_schemas}"
```
This writes external topic declarations under the provided path based on language (default path is inferred).
## Current limitations
- JSON Schema only (Avro/Protobuf planned)
- Ingest API schema declared in code may not match the actual schema in registry.
---
## Sync to Table
Source: moose/streaming/sync-to-table.mdx
Automatically sync stream data to OLAP tables with intelligent batching
# Sync to Table
## Overview
Moose automatically handles batch writes between streams and OLAP tables through a **destination configuration**. When you specify a `destination` OLAP table for a stream, Moose provisions a background synchronization process that batches and writes data from the stream to the table.
### Basic Usage
```py filename="SyncToTable.py" copy {12}
from moose_lib import Stream, OlapTable, Key
class Event(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
events_table = OlapTable[Event]("events")
events_stream = Stream[Event]("events", StreamConfig(
destination=events_table # This configures automatic batching
))
```
## Setting Up Automatic Sync
### Using IngestPipeline (Easiest)
The simplest way to set up automatic syncing is with an `IngestPipeline`, which creates all components and wires them together:
```py filename="AutoSync.py" copy {14-15}
from moose_lib import IngestPipeline, IngestPipelineConfig, Key
from pydantic import BaseModel
from datetime import datetime
class Event(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
# Creates stream, table, API, and automatic sync
events_pipeline = IngestPipeline[Event]("events", IngestPipelineConfig(
ingest_api=True,
stream=True, # Creates stream
table=True # Creates destination table + auto-sync process
))
```
### Standalone Components
For more granular control, you can configure components individually:
```py filename="ManualSync.py" copy
from moose_lib import Stream, OlapTable, IngestApi, StreamConfig, Key
from pydantic import BaseModel
from datetime import datetime
class Event(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
# Create table first
events_table = OlapTable[Event]("events")
# Create stream with destination table (enables auto-sync)
events_stream = Stream[Event]("events", StreamConfig(
destination=events_table # This configures automatic batching
))
# Create API that writes to the stream
events_api = IngestApi[Event]("events", {
"destination": events_stream
})
```
## How Automatic Syncing Works
When you configure a stream with a `destination` table, Moose automatically handles the synchronization by managing a Rust process process in the background.
Moose creates a **Rust background process** that:
1. **Consumes** messages from the stream (Kafka/Redpanda topic)
2. **Batches** records up to 100,000 or flushes every second (whichever comes first)
3. **Executes** optimized ClickHouse `INSERT` statements
4. **Commits** stream offsets after successful writes
5. **Retries** failed batches with exponential backoff
Default batching parameters:
| Parameter | Value | Description |
|-----------|-------|-------------|
| `MAX_BATCH_SIZE` | 100,000 records | Maximum records per batch insert |
| `FLUSH_INTERVAL` | 1 second | Automatic flush regardless of batch size |
Currently, you cannot configure the batching parameters, but we're interested in adding this feature. If you need this capability, let us know on slack!
[ClickHouse inserts need to be batched for optimal performance](https://clickhouse.com/blog/asynchronous-data-inserts-in-clickhouse#data-needs-to-be-batched-for-optimal-performance). Moose automatically handles this optimization internally, ensuring your data is efficiently written to ClickHouse without any configuration required.
## Data Flow Example
Here's how data flows through the automatic sync process:
```py filename="DataFlow.py" copy
# 1. Data sent to ingestion API
requests.post('http://localhost:4000/ingest/events', json={
"id": "evt_123",
"user_id": "user_456",
"timestamp": "2024-01-15T10:30:00Z",
"event_type": "click"
})
# 2. API validates and writes to stream
# 3. Background sync process batches stream data
# 4. Batch automatically written to ClickHouse table when:
# - Batch reaches 100,000 records, OR
# - 1 second has elapsed since last flush
# 5. Data available for queries in events table
# SELECT * FROM events WHERE user_id = 'user_456';
```
## Monitoring and Observability
The sync process provides built-in observability within the Moose runtime:
- **Batch Insert Logs**: Records successful batch insertions with sizes and offsets
- **Error Handling**: Logs transient failures with retry information
- **Metrics**: Tracks throughput, batch sizes, and error rates
- **Offset Tracking**: Maintains Kafka consumer group offsets for reliability
---
## Transformation Functions
Source: moose/streaming/transform-functions.mdx
Process and transform data in-flight between streams
# Transformation Functions
## Overview
Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination.
## Implementing Transformations
### Reshape and Enrich Data
Transform data shape or enrich records:
```py filename="DataTransform.py" copy
from moose_lib import Stream, Key
from pydantic import BaseModel
from datetime import datetime
class EventProperties(BaseModel):
user_id: str
platform: str
app_version: str
ip_address: str
class RawEvent(BaseModel):
id: Key[str]
timestamp: str
data: EventProperties
class EnrichedEventProperties(BaseModel):
platform: str
version: str
country: str
class EnrichedEventMetadata(BaseModel):
originalTimestamp: str
processedAt: datetime
class EnrichedEvent(BaseModel):
eventId: Key[str]
timestamp: datetime
userId: Key[str]
properties: EnrichedEventProperties
metadata: EnrichedEventMetadata
raw_stream = Stream[RawEvent]("raw_events")
enriched_stream = Stream[EnrichedEvent]("enriched_events")
raw_stream.add_transform(destination=enriched_stream, transformation=lambda record: EnrichedEvent(
eventId=record.id,
timestamp=datetime.fromisoformat(record.timestamp),
userId=record.data.user_id,
properties=EnrichedEventProperties(
platform=record.data.platform,
version=record.data.app_version,
country=lookupCountry(record.data.ip_address)
),
metadata=EnrichedEventMetadata(
originalTimestamp=record.timestamp,
processedAt=datetime.now()
)
))
```
### Filtering
Remove or filter records based on conditions:
```py filename="FilterStream.py" copy
from moose_lib import Stream, Key
from pydantic import BaseModel
class MetricRecord(BaseModel):
id: Key[str]
name: str
value: float
timestamp: Date
class ValidMetrics(BaseModel):
id: Key[str]
name: str
value: float
timestamp: Date
input_stream = Stream[MetricRecord]("input_metrics")
valid_metrics = Stream[ValidMetrics]("valid_metrics")
def filter_function(record: MetricRecord) -> ValidMetrics | None:
if record.value > 0 and record.timestamp > getStartOfDay() and not record.name.startswith('debug_'):
return ValidMetrics(
id=record.id,
name=record.name,
value=record.value,
timestamp=record.timestamp
)
return None
input_stream.add_transform(destination=valid_metrics, transformation=filter_function)
```
### Fan Out (1:N)
Send data to multiple downstream processors:
```py filename="FanOut.py" copy
from moose_lib import Stream, Key
from pydantic import BaseModel
# Define data models
class Order(BaseModel):
orderId: Key[str]
userId: Key[str]
amount: float
items: List[str]
class HighPriorityOrder(Order):
priority: str = 'high'
class ArchivedOrder(Order):
archivedAt: Date
# Create source and destination streams
order_stream = Stream[Order]("orders")
analytics_stream = Stream[Order]("order_analytics")
notification_stream = Stream[HighPriorityOrder]("order_notifications")
archive_stream = Stream[ArchivedOrder]("order_archive")
# Send all orders to analytics
def analytics_transform(order: Order) -> Order:
return order
order_stream.add_transform(destination=analytics_stream, transformation=analytics_transform)
# Send large orders to notifications
def high_priority_transform(order: Order) -> HighPriorityOrder | None:
if order.amount > 1000:
return HighPriorityOrder(
orderId=order.orderId,
userId=order.userId,
amount=order.amount,
items=order.items,
priority='high'
)
return None # Skip small orders
order_stream.add_transform(destination=notification_stream, transformation=high_priority_transform)
# Archive all orders with timestamp
def archive_transform(order: Order) -> ArchivedOrder | None:
return ArchivedOrder(
orderId=order.orderId,
userId=order.userId,
amount=order.amount,
items=order.items,
archivedAt=datetime.now()
)
order_stream.add_transform(destination=archive_stream, transformation=archive_transform)
```
### Fan In (N:1)
Combine data from multiple sources:
```py filename="FanIn.py" copy
from moose_lib import Stream, OlapTable, Key, StreamConfig
class UserEvent(BaseModel):
userId: Key[str]
eventType: str
timestamp: Date
source: str
# Create source and destination streams
web_events = Stream[UserEvent]("web_events")
mobile_events = Stream[UserEvent]("mobile_events")
api_events = Stream[UserEvent]("api_events")
# Create a stream and table for the combined events
events_table = OlapTable[UserEvent]("all_events")
all_events = Stream[UserEvent]("all_events", StreamConfig(
destination=events_table
))
# Fan in from web
def web_transform(event: UserEvent) -> UserEvent:
return UserEvent(
userId=event.userId,
eventType=event.eventType,
timestamp=event.timestamp,
source='web'
)
web_events.add_transform(destination=all_events, transformation=web_transform)
# Fan in from mobile
def mobile_transform(event: UserEvent) -> UserEvent:
return UserEvent(
userId=event.userId,
eventType=event.eventType,
timestamp=event.timestamp,
source='mobile'
)
mobile_events.add_transform(destination=all_events, transformation=mobile_transform)
# Fan in from API
def api_transform(event: UserEvent) -> UserEvent:
return UserEvent(
userId=event.userId,
eventType=event.eventType,
timestamp=event.timestamp,
source='api'
)
api_events.add_transform(destination=all_events, transformation=api_transform)
```
### Unnesting
Flatten nested records:
```py filename="Unnest.py" copy
from moose_lib import Stream, Key
class NestedRecord(BaseModel):
id: Key[str]
nested: List[NestedValue]
class FlattenedRecord(BaseModel):
id: Key[str]
value: int
nested_stream = Stream[NestedRecord]("nested_records")
flattened_stream = Stream[FlattenedRecord]("flattened_records")
def unnest_transform(record: NestedRecord) -> List[FlattenedRecord]:
result = []
for nested in record.nested:
result.append(FlattenedRecord(
id=record.id,
value=nested.value
))
return result
nested_stream.add_transform(flattened_stream, unnest_transform)
```
You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either:
- Use conditional logic inside a single streaming function to handle different cases, or
- Implement a fan-out/fan-in pattern, where you route records to different intermediate streams and then merge them back into the destination stream.
## Error Handling with Dead Letter Queues
When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline.
```py filename="DeadLetterQueue.py" copy
from moose_lib import DeadLetterQueue, IngestPipeline, IngestPipelineConfig, TransformConfig, DeadLetterModel
from pydantic import BaseModel
from datetime import datetime
class UserEvent(BaseModel):
user_id: str
action: str
timestamp: float
class ProcessedEvent(BaseModel):
user_id: str
action: str
processed_at: datetime
is_valid: bool
# Create pipelines
raw_events = IngestPipeline[UserEvent]("raw_events", IngestPipelineConfig(
ingest_api=True,
stream=True,
table=False
))
processed_events = IngestPipeline[ProcessedEvent]("processed_events", IngestPipelineConfig(
ingest_api=False,
stream=True,
table=True
))
# Create dead letter queue for failed transformations
event_dlq = DeadLetterQueue[UserEvent](name="EventDLQ")
def process_event(event: UserEvent) -> ProcessedEvent:
# This might fail for invalid data
if not event.user_id or len(event.user_id) == 0:
raise ValueError("Invalid user_id: cannot be empty")
return ProcessedEvent(
user_id=event.user_id,
action=event.action,
processed_at=datetime.now(),
is_valid=True
)
# Add transform with error handling
raw_events.get_stream().add_transform(
destination=processed_events.get_stream(),
transformation=process_event,
config=TransformConfig(
dead_letter_queue=event_dlq # Failed messages go here
)
)
def monitor_dead_letters(dead_letter: DeadLetterModel[UserEvent]) -> None:
print(f"Error: {dead_letter.error_message}")
print(f"Failed at: {dead_letter.failed_at}")
# Access original typed data
original_event: UserEvent = dead_letter.as_typed()
print(f"Original User ID: {original_event.user_id}")
# Monitor dead letter messages
event_dlq.add_consumer(monitor_dead_letters)
```
For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the [Dead Letter Queues guide](./dead-letter-queues).