FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplatesGuides
Release Notes
Source514
  1. MooseStack
  2. Moose Streaming
  3. Transformation Functions

On this page

OverviewImplementing TransformationsReshape and Enrich DataFilteringFan Out (1:N)Fan In (N:1)UnnestingError Handling with Dead Letter Queues

Transformation Functions

Overview

Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination.

Implementing Transformations

Reshape and Enrich Data

Transform data shape or enrich records:

DataTransform.py
from moose_lib import Stream, Keyfrom pydantic import BaseModelfrom datetime import datetime class EventProperties(BaseModel):    user_id: str    platform: str    app_version: str    ip_address: str class RawEvent(BaseModel):    id: Key[str]    timestamp: str    data: EventProperties class EnrichedEventProperties(BaseModel):    platform: str    version: str    country: str class EnrichedEventMetadata(BaseModel):    originalTimestamp: str    processedAt: datetime class EnrichedEvent(BaseModel):    eventId: Key[str]    timestamp: datetime    userId: Key[str]    properties: EnrichedEventProperties    metadata: EnrichedEventMetadata raw_stream = Stream[RawEvent]("raw_events")enriched_stream = Stream[EnrichedEvent]("enriched_events") raw_stream.add_transform(destination=enriched_stream, transformation=lambda record: EnrichedEvent(  eventId=record.id,  timestamp=datetime.fromisoformat(record.timestamp),  userId=record.data.user_id,  properties=EnrichedEventProperties(    platform=record.data.platform,    version=record.data.app_version,    country=lookupCountry(record.data.ip_address)  ),  metadata=EnrichedEventMetadata(    originalTimestamp=record.timestamp,    processedAt=datetime.now()  )))

Filtering

Remove or filter records based on conditions:

FilterStream.py
from moose_lib import Stream, Keyfrom pydantic import BaseModel class MetricRecord(BaseModel):    id: Key[str]    name: str    value: float    timestamp: Date class ValidMetrics(BaseModel):    id: Key[str]    name: str    value: float    timestamp: Date input_stream = Stream[MetricRecord]("input_metrics")valid_metrics = Stream[ValidMetrics]("valid_metrics") def filter_function(record: MetricRecord) -> ValidMetrics | None:  if record.value > 0 and record.timestamp > getStartOfDay() and not record.name.startswith('debug_'):    return ValidMetrics(      id=record.id,      name=record.name,      value=record.value,      timestamp=record.timestamp    )  return None input_stream.add_transform(destination=valid_metrics, transformation=filter_function)

Fan Out (1:N)

Send data to multiple downstream processors:

FanOut.py
from moose_lib import Stream, Keyfrom pydantic import BaseModel # Define data modelsclass Order(BaseModel):    orderId: Key[str]    userId: Key[str]    amount: float    items: List[str] class HighPriorityOrder(Order):    priority: str = 'high' class ArchivedOrder(Order):    archivedAt: Date   # Create source and destination streamsorder_stream = Stream[Order]("orders")analytics_stream = Stream[Order]("order_analytics")notification_stream = Stream[HighPriorityOrder]("order_notifications")archive_stream = Stream[ArchivedOrder]("order_archive") # Send all orders to analyticsdef analytics_transform(order: Order) -> Order:  return order order_stream.add_transform(destination=analytics_stream, transformation=analytics_transform) # Send large orders to notificationsdef high_priority_transform(order: Order) -> HighPriorityOrder | None:  if order.amount > 1000:    return HighPriorityOrder(      orderId=order.orderId,      userId=order.userId,      amount=order.amount,      items=order.items,      priority='high'    )  return None  # Skip small orders order_stream.add_transform(destination=notification_stream, transformation=high_priority_transform) # Archive all orders with timestampdef archive_transform(order: Order) -> ArchivedOrder | None:  return ArchivedOrder(    orderId=order.orderId,    userId=order.userId,    amount=order.amount,    items=order.items,    archivedAt=datetime.now()  ) order_stream.add_transform(destination=archive_stream, transformation=archive_transform)

Fan In (N:1)

Combine data from multiple sources:

FanIn.py
from moose_lib import Stream, OlapTable, Key, StreamConfig class UserEvent(BaseModel):    userId: Key[str]    eventType: str    timestamp: Date    source: str # Create source and destination streamsweb_events = Stream[UserEvent]("web_events")mobile_events = Stream[UserEvent]("mobile_events")api_events = Stream[UserEvent]("api_events") # Create a stream and table for the combined eventsevents_table = OlapTable[UserEvent]("all_events")all_events = Stream[UserEvent]("all_events", StreamConfig(  destination=events_table)) # Fan in from webdef web_transform(event: UserEvent) -> UserEvent:  return UserEvent(    userId=event.userId,    eventType=event.eventType,    timestamp=event.timestamp,    source='web'  ) web_events.add_transform(destination=all_events, transformation=web_transform) # Fan in from mobiledef mobile_transform(event: UserEvent) -> UserEvent:  return UserEvent(    userId=event.userId,    eventType=event.eventType,    timestamp=event.timestamp,    source='mobile'  ) mobile_events.add_transform(destination=all_events, transformation=mobile_transform) # Fan in from APIdef api_transform(event: UserEvent) -> UserEvent:  return UserEvent(    userId=event.userId,    eventType=event.eventType,    timestamp=event.timestamp,    source='api'  ) api_events.add_transform(destination=all_events, transformation=api_transform)

Unnesting

Flatten nested records:

Unnest.py
from moose_lib import Stream, Key class NestedRecord(BaseModel):    id: Key[str]    nested: List[NestedValue] class FlattenedRecord(BaseModel):    id: Key[str]    value: int nested_stream = Stream[NestedRecord]("nested_records")flattened_stream = Stream[FlattenedRecord]("flattened_records") def unnest_transform(record: NestedRecord) -> List[FlattenedRecord]:  result = []  for nested in record.nested:    result.append(FlattenedRecord(      id=record.id,      value=nested.value    ))  return result nested_stream.add_transform(flattened_stream, unnest_transform)
Multiple transformation routes limitation

You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either:

  • Use conditional logic inside a single streaming function to handle different cases, or
  • Implement a fan-out/fan-in pattern, where you route records to different intermediate streams and then merge them back into the destination stream.

Error Handling with Dead Letter Queues

When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline.

DeadLetterQueue.ts
import { DeadLetterQueue, IngestPipeline } from "@514labs/moose-lib"; interface UserEvent {  userId: string;  action: string;  timestamp: number;} interface ProcessedEvent {  userId: string;  action: string;  processedAt: Date;  isValid: boolean;} // Create pipelinesconst rawEvents = new IngestPipeline<UserEvent>("raw_events", {  ingestApi: true,  stream: true,  table: false}); const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", {  ingestApi: false,  stream: true,  table: true}); // Create dead letter queue for failed transformationsconst eventDLQ = new DeadLetterQueue<UserEvent>("EventDLQ"); // Add transform with error handlingrawEvents.stream!.addTransform(  processedEvents.stream!,  (event: UserEvent): ProcessedEvent => {    // This might fail for invalid data    if (!event.userId || event.userId.length === 0) {      throw new Error("Invalid userId: cannot be empty");    }     return {      userId: event.userId,      action: event.action,      processedAt: new Date(),      isValid: true    };  },  {    deadLetterQueue: eventDLQ  // Failed messages go here  }); // Monitor dead letter messageseventDLQ.addConsumer((deadLetter) => {  console.log(`Error: ${deadLetter.errorMessage}`);  console.log(`Failed at: ${deadLetter.failedAt}`);    // Access original typed data  const originalEvent: UserEvent = deadLetter.asTyped();  console.log(`Original User ID: ${originalEvent.userId}`);});
Learn More

For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the Dead Letter Queues guide.

DeadLetterQueue.py
from moose_lib import DeadLetterQueue, IngestPipeline, IngestPipelineConfig, TransformConfig, DeadLetterModelfrom pydantic import BaseModelfrom datetime import datetime class UserEvent(BaseModel):    user_id: str    action: str    timestamp: float class ProcessedEvent(BaseModel):    user_id: str    action: str    processed_at: datetime    is_valid: bool # Create pipelinesraw_events = IngestPipeline[UserEvent]("raw_events", IngestPipelineConfig(    ingest_api=True,    stream=True,    table=False)) processed_events = IngestPipeline[ProcessedEvent]("processed_events", IngestPipelineConfig(    ingest_api=False,    stream=True,    table=True)) # Create dead letter queue for failed transformationsevent_dlq = DeadLetterQueue[UserEvent](name="EventDLQ") def process_event(event: UserEvent) -> ProcessedEvent:    # This might fail for invalid data    if not event.user_id or len(event.user_id) == 0:        raise ValueError("Invalid user_id: cannot be empty")     return ProcessedEvent(        user_id=event.user_id,        action=event.action,        processed_at=datetime.now(),        is_valid=True    ) # Add transform with error handlingraw_events.get_stream().add_transform(    destination=processed_events.get_stream(),    transformation=process_event,    config=TransformConfig(        dead_letter_queue=event_dlq  # Failed messages go here    )) def monitor_dead_letters(dead_letter: DeadLetterModel[UserEvent]) -> None:    print(f"Error: {dead_letter.error_message}")    print(f"Failed at: {dead_letter.failed_at}")        # Access original typed data    original_event: UserEvent = dead_letter.as_typed()    print(f"Original User ID: {original_event.user_id}") # Monitor dead letter messagesevent_dlq.add_consumer(monitor_dead_letters)
  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streaming
    • Manage Streams
    • Create Streams
    • Sync to OLAP
    • Dead Letter Queues
    • Functions
    • Consumer Functions
    • Transformation Functions
    • Writing to Streams
    • From Your Code
    • Schema Registry
    • From CDC Services
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework