Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination.
Transform data shape or enrich records:
from moose_lib import Stream, Keyfrom pydantic import BaseModelfrom datetime import datetime class EventProperties(BaseModel): user_id: str platform: str app_version: str ip_address: str class RawEvent(BaseModel): id: Key[str] timestamp: str data: EventProperties class EnrichedEventProperties(BaseModel): platform: str version: str country: str class EnrichedEventMetadata(BaseModel): originalTimestamp: str processedAt: datetime class EnrichedEvent(BaseModel): eventId: Key[str] timestamp: datetime userId: Key[str] properties: EnrichedEventProperties metadata: EnrichedEventMetadata raw_stream = Stream[RawEvent]("raw_events")enriched_stream = Stream[EnrichedEvent]("enriched_events") raw_stream.add_transform(destination=enriched_stream, transformation=lambda record: EnrichedEvent( eventId=record.id, timestamp=datetime.fromisoformat(record.timestamp), userId=record.data.user_id, properties=EnrichedEventProperties( platform=record.data.platform, version=record.data.app_version, country=lookupCountry(record.data.ip_address) ), metadata=EnrichedEventMetadata( originalTimestamp=record.timestamp, processedAt=datetime.now() )))Remove or filter records based on conditions:
from moose_lib import Stream, Keyfrom pydantic import BaseModel class MetricRecord(BaseModel): id: Key[str] name: str value: float timestamp: Date class ValidMetrics(BaseModel): id: Key[str] name: str value: float timestamp: Date input_stream = Stream[MetricRecord]("input_metrics")valid_metrics = Stream[ValidMetrics]("valid_metrics") def filter_function(record: MetricRecord) -> ValidMetrics | None: if record.value > 0 and record.timestamp > getStartOfDay() and not record.name.startswith('debug_'): return ValidMetrics( id=record.id, name=record.name, value=record.value, timestamp=record.timestamp ) return None input_stream.add_transform(destination=valid_metrics, transformation=filter_function)Send data to multiple downstream processors:
from moose_lib import Stream, Keyfrom pydantic import BaseModel # Define data modelsclass Order(BaseModel): orderId: Key[str] userId: Key[str] amount: float items: List[str] class HighPriorityOrder(Order): priority: str = 'high' class ArchivedOrder(Order): archivedAt: Date # Create source and destination streamsorder_stream = Stream[Order]("orders")analytics_stream = Stream[Order]("order_analytics")notification_stream = Stream[HighPriorityOrder]("order_notifications")archive_stream = Stream[ArchivedOrder]("order_archive") # Send all orders to analyticsdef analytics_transform(order: Order) -> Order: return order order_stream.add_transform(destination=analytics_stream, transformation=analytics_transform) # Send large orders to notificationsdef high_priority_transform(order: Order) -> HighPriorityOrder | None: if order.amount > 1000: return HighPriorityOrder( orderId=order.orderId, userId=order.userId, amount=order.amount, items=order.items, priority='high' ) return None # Skip small orders order_stream.add_transform(destination=notification_stream, transformation=high_priority_transform) # Archive all orders with timestampdef archive_transform(order: Order) -> ArchivedOrder | None: return ArchivedOrder( orderId=order.orderId, userId=order.userId, amount=order.amount, items=order.items, archivedAt=datetime.now() ) order_stream.add_transform(destination=archive_stream, transformation=archive_transform)Combine data from multiple sources:
from moose_lib import Stream, OlapTable, Key, StreamConfig class UserEvent(BaseModel): userId: Key[str] eventType: str timestamp: Date source: str # Create source and destination streamsweb_events = Stream[UserEvent]("web_events")mobile_events = Stream[UserEvent]("mobile_events")api_events = Stream[UserEvent]("api_events") # Create a stream and table for the combined eventsevents_table = OlapTable[UserEvent]("all_events")all_events = Stream[UserEvent]("all_events", StreamConfig( destination=events_table)) # Fan in from webdef web_transform(event: UserEvent) -> UserEvent: return UserEvent( userId=event.userId, eventType=event.eventType, timestamp=event.timestamp, source='web' ) web_events.add_transform(destination=all_events, transformation=web_transform) # Fan in from mobiledef mobile_transform(event: UserEvent) -> UserEvent: return UserEvent( userId=event.userId, eventType=event.eventType, timestamp=event.timestamp, source='mobile' ) mobile_events.add_transform(destination=all_events, transformation=mobile_transform) # Fan in from APIdef api_transform(event: UserEvent) -> UserEvent: return UserEvent( userId=event.userId, eventType=event.eventType, timestamp=event.timestamp, source='api' ) api_events.add_transform(destination=all_events, transformation=api_transform)Flatten nested records:
from moose_lib import Stream, Key class NestedRecord(BaseModel): id: Key[str] nested: List[NestedValue] class FlattenedRecord(BaseModel): id: Key[str] value: int nested_stream = Stream[NestedRecord]("nested_records")flattened_stream = Stream[FlattenedRecord]("flattened_records") def unnest_transform(record: NestedRecord) -> List[FlattenedRecord]: result = [] for nested in record.nested: result.append(FlattenedRecord( id=record.id, value=nested.value )) return result nested_stream.add_transform(flattened_stream, unnest_transform)You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either:
When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline.
import { DeadLetterQueue, IngestPipeline } from "@514labs/moose-lib"; interface UserEvent { userId: string; action: string; timestamp: number;} interface ProcessedEvent { userId: string; action: string; processedAt: Date; isValid: boolean;} // Create pipelinesconst rawEvents = new IngestPipeline<UserEvent>("raw_events", { ingestApi: true, stream: true, table: false}); const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", { ingestApi: false, stream: true, table: true}); // Create dead letter queue for failed transformationsconst eventDLQ = new DeadLetterQueue<UserEvent>("EventDLQ"); // Add transform with error handlingrawEvents.stream!.addTransform( processedEvents.stream!, (event: UserEvent): ProcessedEvent => { // This might fail for invalid data if (!event.userId || event.userId.length === 0) { throw new Error("Invalid userId: cannot be empty"); } return { userId: event.userId, action: event.action, processedAt: new Date(), isValid: true }; }, { deadLetterQueue: eventDLQ // Failed messages go here }); // Monitor dead letter messageseventDLQ.addConsumer((deadLetter) => { console.log(`Error: ${deadLetter.errorMessage}`); console.log(`Failed at: ${deadLetter.failedAt}`); // Access original typed data const originalEvent: UserEvent = deadLetter.asTyped(); console.log(`Original User ID: ${originalEvent.userId}`);});For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the Dead Letter Queues guide.
from moose_lib import DeadLetterQueue, IngestPipeline, IngestPipelineConfig, TransformConfig, DeadLetterModelfrom pydantic import BaseModelfrom datetime import datetime class UserEvent(BaseModel): user_id: str action: str timestamp: float class ProcessedEvent(BaseModel): user_id: str action: str processed_at: datetime is_valid: bool # Create pipelinesraw_events = IngestPipeline[UserEvent]("raw_events", IngestPipelineConfig( ingest_api=True, stream=True, table=False)) processed_events = IngestPipeline[ProcessedEvent]("processed_events", IngestPipelineConfig( ingest_api=False, stream=True, table=True)) # Create dead letter queue for failed transformationsevent_dlq = DeadLetterQueue[UserEvent](name="EventDLQ") def process_event(event: UserEvent) -> ProcessedEvent: # This might fail for invalid data if not event.user_id or len(event.user_id) == 0: raise ValueError("Invalid user_id: cannot be empty") return ProcessedEvent( user_id=event.user_id, action=event.action, processed_at=datetime.now(), is_valid=True ) # Add transform with error handlingraw_events.get_stream().add_transform( destination=processed_events.get_stream(), transformation=process_event, config=TransformConfig( dead_letter_queue=event_dlq # Failed messages go here )) def monitor_dead_letters(dead_letter: DeadLetterModel[UserEvent]) -> None: print(f"Error: {dead_letter.error_message}") print(f"Failed at: {dead_letter.failed_at}") # Access original typed data original_event: UserEvent = dead_letter.as_typed() print(f"Original User ID: {original_event.user_id}") # Monitor dead letter messagesevent_dlq.add_consumer(monitor_dead_letters)