# Moose / Streaming / Transform Functions Documentation – Python ## Included Files 1. moose/streaming/transform-functions/transform-functions.mdx ## Transformation Functions Source: moose/streaming/transform-functions/transform-functions.mdx Process and transform data in-flight between streams # Transformation Functions ## Overview Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination. ## Implementing Transformations ### Reshape and Enrich Data Transform data shape or enrich records: ```py filename="DataTransform.py" copy from moose_lib import Stream, Key from pydantic import BaseModel from datetime import datetime class EventProperties(BaseModel): user_id: str platform: str app_version: str ip_address: str class RawEvent(BaseModel): id: Key[str] timestamp: str data: EventProperties class EnrichedEventProperties(BaseModel): platform: str version: str country: str class EnrichedEventMetadata(BaseModel): originalTimestamp: str processedAt: datetime class EnrichedEvent(BaseModel): eventId: Key[str] timestamp: datetime userId: Key[str] properties: EnrichedEventProperties metadata: EnrichedEventMetadata raw_stream = Stream[RawEvent]("raw_events") enriched_stream = Stream[EnrichedEvent]("enriched_events") raw_stream.add_transform(destination=enriched_stream, transformation=lambda record: EnrichedEvent( eventId=record.id, timestamp=datetime.fromisoformat(record.timestamp), userId=record.data.user_id, properties=EnrichedEventProperties( platform=record.data.platform, version=record.data.app_version, country=lookupCountry(record.data.ip_address) ), metadata=EnrichedEventMetadata( originalTimestamp=record.timestamp, processedAt=datetime.now() ) )) ``` ### Filtering Remove or filter records based on conditions: ```py filename="FilterStream.py" copy from moose_lib import Stream, Key from pydantic import BaseModel class MetricRecord(BaseModel): id: Key[str] name: str value: float timestamp: Date class ValidMetrics(BaseModel): id: Key[str] name: str value: float timestamp: Date input_stream = Stream[MetricRecord]("input_metrics") valid_metrics = Stream[ValidMetrics]("valid_metrics") def filter_function(record: MetricRecord) -> ValidMetrics | None: if record.value > 0 and record.timestamp > getStartOfDay() and not record.name.startswith('debug_'): return ValidMetrics( id=record.id, name=record.name, value=record.value, timestamp=record.timestamp ) return None input_stream.add_transform(destination=valid_metrics, transformation=filter_function) ``` ### Fan Out (1:N) Send data to multiple downstream processors: ```py filename="FanOut.py" copy from moose_lib import Stream, Key from pydantic import BaseModel # Define data models class Order(BaseModel): orderId: Key[str] userId: Key[str] amount: float items: List[str] class HighPriorityOrder(Order): priority: str = 'high' class ArchivedOrder(Order): archivedAt: Date # Create source and destination streams order_stream = Stream[Order]("orders") analytics_stream = Stream[Order]("order_analytics") notification_stream = Stream[HighPriorityOrder]("order_notifications") archive_stream = Stream[ArchivedOrder]("order_archive") # Send all orders to analytics def analytics_transform(order: Order) -> Order: return order order_stream.add_transform(destination=analytics_stream, transformation=analytics_transform) # Send large orders to notifications def high_priority_transform(order: Order) -> HighPriorityOrder | None: if order.amount > 1000: return HighPriorityOrder( orderId=order.orderId, userId=order.userId, amount=order.amount, items=order.items, priority='high' ) return None # Skip small orders order_stream.add_transform(destination=notification_stream, transformation=high_priority_transform) # Archive all orders with timestamp def archive_transform(order: Order) -> ArchivedOrder | None: return ArchivedOrder( orderId=order.orderId, userId=order.userId, amount=order.amount, items=order.items, archivedAt=datetime.now() ) order_stream.add_transform(destination=archive_stream, transformation=archive_transform) ``` ### Fan In (N:1) Combine data from multiple sources: ```py filename="FanIn.py" copy from moose_lib import Stream, OlapTable, Key, StreamConfig class UserEvent(BaseModel): userId: Key[str] eventType: str timestamp: Date source: str # Create source and destination streams web_events = Stream[UserEvent]("web_events") mobile_events = Stream[UserEvent]("mobile_events") api_events = Stream[UserEvent]("api_events") # Create a stream and table for the combined events events_table = OlapTable[UserEvent]("all_events") all_events = Stream[UserEvent]("all_events", StreamConfig( destination=events_table )) # Fan in from web def web_transform(event: UserEvent) -> UserEvent: return UserEvent( userId=event.userId, eventType=event.eventType, timestamp=event.timestamp, source='web' ) web_events.add_transform(destination=all_events, transformation=web_transform) # Fan in from mobile def mobile_transform(event: UserEvent) -> UserEvent: return UserEvent( userId=event.userId, eventType=event.eventType, timestamp=event.timestamp, source='mobile' ) mobile_events.add_transform(destination=all_events, transformation=mobile_transform) # Fan in from API def api_transform(event: UserEvent) -> UserEvent: return UserEvent( userId=event.userId, eventType=event.eventType, timestamp=event.timestamp, source='api' ) api_events.add_transform(destination=all_events, transformation=api_transform) ``` ### Unnesting Flatten nested records: ```py filename="Unnest.py" copy from moose_lib import Stream, Key class NestedRecord(BaseModel): id: Key[str] nested: List[NestedValue] class FlattenedRecord(BaseModel): id: Key[str] value: int nested_stream = Stream[NestedRecord]("nested_records") flattened_stream = Stream[FlattenedRecord]("flattened_records") def unnest_transform(record: NestedRecord) -> List[FlattenedRecord]: result = [] for nested in record.nested: result.append(FlattenedRecord( id=record.id, value=nested.value )) return result nested_stream.add_transform(flattened_stream, unnest_transform) ``` You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either: - Use conditional logic inside a single streaming function to handle different cases, or - Implement a fan-out/fan-in pattern, where you route records to different intermediate streams and then merge them back into the destination stream. ## Error Handling with Dead Letter Queues When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline. ```py filename="DeadLetterQueue.py" copy from moose_lib import DeadLetterQueue, IngestPipeline, IngestPipelineConfig, TransformConfig, DeadLetterModel from pydantic import BaseModel from datetime import datetime class UserEvent(BaseModel): user_id: str action: str timestamp: float class ProcessedEvent(BaseModel): user_id: str action: str processed_at: datetime is_valid: bool # Create pipelines raw_events = IngestPipeline[UserEvent]("raw_events", IngestPipelineConfig( ingest_api=True, stream=True, table=False )) processed_events = IngestPipeline[ProcessedEvent]("processed_events", IngestPipelineConfig( ingest_api=False, stream=True, table=True )) # Create dead letter queue for failed transformations event_dlq = DeadLetterQueue[UserEvent](name="EventDLQ") def process_event(event: UserEvent) -> ProcessedEvent: # This might fail for invalid data if not event.user_id or len(event.user_id) == 0: raise ValueError("Invalid user_id: cannot be empty") return ProcessedEvent( user_id=event.user_id, action=event.action, processed_at=datetime.now(), is_valid=True ) # Add transform with error handling raw_events.get_stream().add_transform( destination=processed_events.get_stream(), transformation=process_event, config=TransformConfig( dead_letter_queue=event_dlq # Failed messages go here ) ) def monitor_dead_letters(dead_letter: DeadLetterModel[UserEvent]) -> None: print(f"Error: {dead_letter.error_message}") print(f"Failed at: {dead_letter.failed_at}") # Access original typed data original_event: UserEvent = dead_letter.as_typed() print(f"Original User ID: {original_event.user_id}") # Monitor dead letter messages event_dlq.add_consumer(monitor_dead_letters) ``` For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the [Dead Letter Queues guide](./dead-letter-queues).