Transformation Functions
Overview
Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination.
Implementing Transformations
Reshape and Enrich Data
Transform data shape or enrich records:
import { Stream, Key } from "@514labs/moose-lib"; interface RawEvent { id: Key<string>; timestamp: string; data: { user_id: string; platform: string; app_version: string; ip_address: string; }} interface EnrichedEvent { eventId: Key<string>; timestamp: Date; userId: Key<string>; properties: { platform: string; version: string; country: string; }; metadata: { originalTimestamp: string; processedAt: Date; }} const rawStream = new Stream<RawEvent>("raw_events");const enrichedStream = new Stream<EnrichedEvent>("enriched_events"); // Reshape and enrich datarawStream.addTransform(enrichedStream, async (record: RawEvent) => ({ eventId: record.id, timestamp: new Date(record.timestamp), userId: record.data.user_id, properties: { platform: record.data.platform || 'unknown', version: record.data.app_version, country: await lookupCountry(record.data.ip_address) }, metadata: { originalTimestamp: record.timestamp, processedAt: new Date() }}));Filtering
Remove or filter records based on conditions:
interface MetricRecord { id: string; name: string; value: number; timestamp: Date;} const inputStream = new Stream<MetricRecord>("input_metrics");const validMetrics = new Stream<MetricRecord>("valid_metrics"); // Multiple filtering conditionsinputStream.addTransform(validMetrics, (record) => { // Filter out records with invalid values if (isNaN(record.value) || record.value < 0) { return undefined; } // Filter out old records if (record.timestamp < getStartOfDay()) { return undefined; } // Filter out specific metrics if (record.name.startsWith('debug_')) { return undefined; } return record;});Fan Out (1:N)
Send data to multiple downstream processors:
interface Order { orderId: string; userId: string; amount: number; items: string[];} interface HighPriorityOrder extends Order { priority: 'high';} interface ArchivedOrder extends Order { archivedAt: Date;} // Define destination streamsconst analyticsStream = new Stream<Order>("order_analytics");const notificationStream = new Stream<HighPriorityOrder>("order_notifications");const archiveStream = new Stream<ArchivedOrder>("order_archive"); // Source streamconst orderStream = new Stream<Order>("orders"); // Send all orders to analyticsorderStream.addTransform(analyticsStream, (order) => order); // Send large orders to notificationsorderStream.addTransform(notificationStream, (order) => { if (order.amount > 1000) { return { ...order, priority: 'high' }; } return undefined; // Skip small orders}); // Archive all ordersorderStream.addTransform(archiveStream, (order) => ({ ...order, archivedAt: new Date()}));Fan In (N:1)
Combine data from multiple sources:
import { Stream, OlapTable, Key } from "@514labs/moose-lib"; interface UserEvent { userId: Key<string>; eventType: string; timestamp: Date; source: string;} // Source streamsconst webEvents = new Stream<UserEvent>("web_events");const mobileEvents = new Stream<UserEvent>("mobile_events");const apiEvents = new Stream<UserEvent>("api_events"); // Create a stream and table for the combined eventsconst eventsTable = new OlapTable<UserEvent>("all_events");const allEvents = new Stream<UserEvent>("all_events", { destination: eventsTable}); // Fan in from webwebEvents.addTransform(allEvents, (event) => ({ ...event, source: 'web', timestamp: new Date()})); // Fan in from mobilemobileEvents.addTransform(allEvents, (event) => ({ ...event, source: 'mobile', timestamp: new Date()})); // Fan in from APIapiEvents.addTransform(allEvents, (event) => ({ ...event, source: 'api', timestamp: new Date()}));Unnesting
Flatten nested records:
import { Stream, Key } from "@514labs/moose-lib"; interface NestedRecord { id: Key<string>; nested: { value: number; }[];} interface FlattenedRecord { id: Key<string>; value: number;} const nestedStream = new Stream<NestedRecord>("nested_records");const flattenedStream = new Stream<FlattenedRecord>("flattened_records"); nestedStream.addTransform(flattenedStream, (record) => record.nested.map((n) => ({ id: record.id, value: n.value})));Multiple transformation routes limitation
You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either:
- Use conditional logic inside a single streaming function to handle different cases, or
- Implement a fan-out/fan-in pattern, where you route records to different intermediate streams and then merge them back into the destination stream.
Error Handling with Dead Letter Queues
When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline.
import { DeadLetterQueue, IngestPipeline } from "@514labs/moose-lib"; interface UserEvent { userId: string; action: string; timestamp: number;} interface ProcessedEvent { userId: string; action: string; processedAt: Date; isValid: boolean;} // Create pipelinesconst rawEvents = new IngestPipeline<UserEvent>("raw_events", { ingestApi: true, stream: true, table: false}); const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", { ingestApi: false, stream: true, table: true}); // Create dead letter queue for failed transformationsconst eventDLQ = new DeadLetterQueue<UserEvent>("EventDLQ"); // Add transform with error handlingrawEvents.stream!.addTransform( processedEvents.stream!, (event: UserEvent): ProcessedEvent => { // This might fail for invalid data if (!event.userId || event.userId.length === 0) { throw new Error("Invalid userId: cannot be empty"); } return { userId: event.userId, action: event.action, processedAt: new Date(), isValid: true }; }, { deadLetterQueue: eventDLQ // Failed messages go here }); // Monitor dead letter messageseventDLQ.addConsumer((deadLetter) => { console.log(`Error: ${deadLetter.errorMessage}`); console.log(`Failed at: ${deadLetter.failedAt}`); // Access original typed data const originalEvent: UserEvent = deadLetter.asTyped(); console.log(`Original User ID: ${originalEvent.userId}`);});Learn More
For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the Dead Letter Queues guide.