# Moose / Streaming / Transform Functions Documentation – TypeScript ## Included Files 1. moose/streaming/transform-functions/transform-functions.mdx ## Transformation Functions Source: moose/streaming/transform-functions/transform-functions.mdx Process and transform data in-flight between streams # Transformation Functions ## Overview Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination. ## Implementing Transformations ### Reshape and Enrich Data Transform data shape or enrich records: ```typescript filename="DataTransform.ts" interface RawEvent { id: Key; timestamp: string; data: { user_id: string; platform: string; app_version: string; ip_address: string; } } interface EnrichedEvent { eventId: Key; timestamp: Date; userId: Key; properties: { platform: string; version: string; country: string; }; metadata: { originalTimestamp: string; processedAt: Date; } } const rawStream = new Stream("raw_events"); const enrichedStream = new Stream("enriched_events"); // Reshape and enrich data rawStream.addTransform(enrichedStream, async (record: RawEvent) => ({ eventId: record.id, timestamp: new Date(record.timestamp), userId: record.data.user_id, properties: { platform: record.data.platform || 'unknown', version: record.data.app_version, country: await lookupCountry(record.data.ip_address) }, metadata: { originalTimestamp: record.timestamp, processedAt: new Date() } })); ``` ### Filtering Remove or filter records based on conditions: ```typescript filename="FilterStream.ts" interface MetricRecord { id: string; name: string; value: number; timestamp: Date; } const inputStream = new Stream("input_metrics"); const validMetrics = new Stream("valid_metrics"); // Multiple filtering conditions inputStream.addTransform(validMetrics, (record) => { // Filter out records with invalid values if (isNaN(record.value) || record.value < 0) { return undefined; } // Filter out old records if (record.timestamp < getStartOfDay()) { return undefined; } // Filter out specific metrics if (record.name.startsWith('debug_')) { return undefined; } return record; }); ``` ### Fan Out (1:N) Send data to multiple downstream processors: ```ts filename="FanOut.ts" copy interface Order { orderId: string; userId: string; amount: number; items: string[]; } interface HighPriorityOrder extends Order { priority: 'high'; } interface ArchivedOrder extends Order { archivedAt: Date; } // Define destination streams const analyticsStream = new Stream("order_analytics"); const notificationStream = new Stream("order_notifications"); const archiveStream = new Stream("order_archive"); // Source stream const orderStream = new Stream("orders"); // Send all orders to analytics orderStream.addTransform(analyticsStream, (order) => order); // Send large orders to notifications orderStream.addTransform(notificationStream, (order) => { if (order.amount > 1000) { return { ...order, priority: 'high' }; } return undefined; // Skip small orders }); // Archive all orders orderStream.addTransform(archiveStream, (order) => ({ ...order, archivedAt: new Date() })); ``` ### Fan In (N:1) Combine data from multiple sources: ```typescript filename="FanIn.ts" interface UserEvent { userId: Key; eventType: string; timestamp: Date; source: string; } // Source streams const webEvents = new Stream("web_events"); const mobileEvents = new Stream("mobile_events"); const apiEvents = new Stream("api_events"); // Create a stream and table for the combined events const eventsTable = new OlapTable("all_events"); const allEvents = new Stream("all_events", { destination: eventsTable }); // Fan in from web webEvents.addTransform(allEvents, (event) => ({ ...event, source: 'web', timestamp: new Date() })); // Fan in from mobile mobileEvents.addTransform(allEvents, (event) => ({ ...event, source: 'mobile', timestamp: new Date() })); // Fan in from API apiEvents.addTransform(allEvents, (event) => ({ ...event, source: 'api', timestamp: new Date() })); ``` ### Unnesting Flatten nested records: ```typescript filename="Unnest.ts" interface NestedRecord { id: Key; nested: { value: number; }[]; } interface FlattenedRecord { id: Key; value: number; } const nestedStream = new Stream("nested_records"); const flattenedStream = new Stream("flattened_records"); nestedStream.addTransform(flattenedStream, (record) => record.nested.map((n) => ({ id: record.id, value: n.value }))); ``` You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either: - Use conditional logic inside a single streaming function to handle different cases, or - Implement a fan-out/fan-in pattern, where you route records to different intermediate streams and then merge them back into the destination stream. ## Error Handling with Dead Letter Queues When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline. ```typescript filename="DeadLetterQueue.ts" copy interface UserEvent { userId: string; action: string; timestamp: number; } interface ProcessedEvent { userId: string; action: string; processedAt: Date; isValid: boolean; } // Create pipelines const rawEvents = new IngestPipeline("raw_events", { ingestApi: true, stream: true, table: false }); const processedEvents = new IngestPipeline("processed_events", { ingestApi: false, stream: true, table: true }); // Create dead letter queue for failed transformations const eventDLQ = new DeadLetterQueue("EventDLQ"); // Add transform with error handling rawEvents.stream!.addTransform( processedEvents.stream!, (event: UserEvent): ProcessedEvent => { // This might fail for invalid data if (!event.userId || event.userId.length === 0) { throw new Error("Invalid userId: cannot be empty"); } return { userId: event.userId, action: event.action, processedAt: new Date(), isValid: true }; }, { deadLetterQueue: eventDLQ // Failed messages go here } ); // Monitor dead letter messages eventDLQ.addConsumer((deadLetter) => { console.log(`Error: ${deadLetter.errorMessage}`); console.log(`Failed at: ${deadLetter.failedAt}`); // Access original typed data const originalEvent: UserEvent = deadLetter.asTyped(); console.log(`Original User ID: ${originalEvent.userId}`); }); ``` For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the [Dead Letter Queues guide](./dead-letter-queues).