Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination.
Transform data shape or enrich records:
import { Stream, Key } from "@514labs/moose-lib"; interface RawEvent { id: Key<string>; timestamp: string; data: { user_id: string; platform: string; app_version: string; ip_address: string; }} interface EnrichedEvent { eventId: Key<string>; timestamp: Date; userId: Key<string>; properties: { platform: string; version: string; country: string; }; metadata: { originalTimestamp: string; processedAt: Date; }} const rawStream = new Stream<RawEvent>("raw_events");const enrichedStream = new Stream<EnrichedEvent>("enriched_events"); // Reshape and enrich datarawStream.addTransform(enrichedStream, async (record: RawEvent) => ({ eventId: record.id, timestamp: new Date(record.timestamp), userId: record.data.user_id, properties: { platform: record.data.platform || 'unknown', version: record.data.app_version, country: await lookupCountry(record.data.ip_address) }, metadata: { originalTimestamp: record.timestamp, processedAt: new Date() }}));Remove or filter records based on conditions:
interface MetricRecord { id: string; name: string; value: number; timestamp: Date;} const inputStream = new Stream<MetricRecord>("input_metrics");const validMetrics = new Stream<MetricRecord>("valid_metrics"); // Multiple filtering conditionsinputStream.addTransform(validMetrics, (record) => { // Filter out records with invalid values if (isNaN(record.value) || record.value < 0) { return undefined; } // Filter out old records if (record.timestamp < getStartOfDay()) { return undefined; } // Filter out specific metrics if (record.name.startsWith('debug_')) { return undefined; } return record;});Send data to multiple downstream processors:
interface Order { orderId: string; userId: string; amount: number; items: string[];} interface HighPriorityOrder extends Order { priority: 'high';} interface ArchivedOrder extends Order { archivedAt: Date;} // Define destination streamsconst analyticsStream = new Stream<Order>("order_analytics");const notificationStream = new Stream<HighPriorityOrder>("order_notifications");const archiveStream = new Stream<ArchivedOrder>("order_archive"); // Source streamconst orderStream = new Stream<Order>("orders"); // Send all orders to analyticsorderStream.addTransform(analyticsStream, (order) => order); // Send large orders to notificationsorderStream.addTransform(notificationStream, (order) => { if (order.amount > 1000) { return { ...order, priority: 'high' }; } return undefined; // Skip small orders}); // Archive all ordersorderStream.addTransform(archiveStream, (order) => ({ ...order, archivedAt: new Date()}));Combine data from multiple sources:
import { Stream, OlapTable, Key } from "@514labs/moose-lib"; interface UserEvent { userId: Key<string>; eventType: string; timestamp: Date; source: string;} // Source streamsconst webEvents = new Stream<UserEvent>("web_events");const mobileEvents = new Stream<UserEvent>("mobile_events");const apiEvents = new Stream<UserEvent>("api_events"); // Create a stream and table for the combined eventsconst eventsTable = new OlapTable<UserEvent>("all_events");const allEvents = new Stream<UserEvent>("all_events", { destination: eventsTable}); // Fan in from webwebEvents.addTransform(allEvents, (event) => ({ ...event, source: 'web', timestamp: new Date()})); // Fan in from mobilemobileEvents.addTransform(allEvents, (event) => ({ ...event, source: 'mobile', timestamp: new Date()})); // Fan in from APIapiEvents.addTransform(allEvents, (event) => ({ ...event, source: 'api', timestamp: new Date()}));Flatten nested records:
import { Stream, Key } from "@514labs/moose-lib"; interface NestedRecord { id: Key<string>; nested: { value: number; }[];} interface FlattenedRecord { id: Key<string>; value: number;} const nestedStream = new Stream<NestedRecord>("nested_records");const flattenedStream = new Stream<FlattenedRecord>("flattened_records"); nestedStream.addTransform(flattenedStream, (record) => record.nested.map((n) => ({ id: record.id, value: n.value})));You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either:
When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline.
import { DeadLetterQueue, IngestPipeline } from "@514labs/moose-lib"; interface UserEvent { userId: string; action: string; timestamp: number;} interface ProcessedEvent { userId: string; action: string; processedAt: Date; isValid: boolean;} // Create pipelinesconst rawEvents = new IngestPipeline<UserEvent>("raw_events", { ingestApi: true, stream: true, table: false}); const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", { ingestApi: false, stream: true, table: true}); // Create dead letter queue for failed transformationsconst eventDLQ = new DeadLetterQueue<UserEvent>("EventDLQ"); // Add transform with error handlingrawEvents.stream!.addTransform( processedEvents.stream!, (event: UserEvent): ProcessedEvent => { // This might fail for invalid data if (!event.userId || event.userId.length === 0) { throw new Error("Invalid userId: cannot be empty"); } return { userId: event.userId, action: event.action, processedAt: new Date(), isValid: true }; }, { deadLetterQueue: eventDLQ // Failed messages go here }); // Monitor dead letter messageseventDLQ.addConsumer((deadLetter) => { console.log(`Error: ${deadLetter.errorMessage}`); console.log(`Failed at: ${deadLetter.failedAt}`); // Access original typed data const originalEvent: UserEvent = deadLetter.asTyped(); console.log(`Original User ID: ${originalEvent.userId}`);});For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the Dead Letter Queues guide.
import { DeadLetterQueue, IngestPipeline } from "@514labs/moose-lib"; interface UserEvent { userId: string; action: string; timestamp: number;} interface ProcessedEvent { userId: string; action: string; processedAt: Date; isValid: boolean;} // Create pipelinesconst rawEvents = new IngestPipeline<UserEvent>("raw_events", { ingestApi: true, stream: true, table: false}); const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", { ingestApi: false, stream: true, table: true}); // Create dead letter queue for failed transformationsconst eventDLQ = new DeadLetterQueue<UserEvent>("EventDLQ"); // Add transform with error handlingrawEvents.stream!.addTransform( processedEvents.stream!, (event: UserEvent): ProcessedEvent => { // This might fail for invalid data if (!event.userId || event.userId.length === 0) { throw new Error("Invalid userId: cannot be empty"); } return { userId: event.userId, action: event.action, processedAt: new Date(), isValid: true }; }, { deadLetterQueue: eventDLQ // Failed messages go here }); // Monitor dead letter messageseventDLQ.addConsumer((deadLetter) => { console.log(`Error: ${deadLetter.errorMessage}`); console.log(`Failed at: ${deadLetter.failedAt}`); // Access original typed data const originalEvent: UserEvent = deadLetter.asTyped(); console.log(`Original User ID: ${originalEvent.userId}`);});