1. MooseStack
  2. Moose Streaming
  3. Transformation Functions

On this page

OverviewImplementing TransformationsReshape and Enrich DataFilteringFan Out (1:N)Fan In (N:1)UnnestingError Handling with Dead Letter Queues

Transformation Functions

Overview

Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination.

Implementing Transformations

Reshape and Enrich Data

Transform data shape or enrich records:

import { Stream, Key } from "@514labs/moose-lib"; interface RawEvent {  id: Key<string>;  timestamp: string;  data: {    user_id: string;    platform: string;    app_version: string;    ip_address: string;  }} interface EnrichedEvent {  eventId: Key<string>;  timestamp: Date;  userId: Key<string>;  properties: {    platform: string;    version: string;    country: string;  };  metadata: {    originalTimestamp: string;    processedAt: Date;  }} const rawStream = new Stream<RawEvent>("raw_events");const enrichedStream = new Stream<EnrichedEvent>("enriched_events"); // Reshape and enrich datarawStream.addTransform(enrichedStream, async (record: RawEvent) => ({  eventId: record.id,  timestamp: new Date(record.timestamp),  userId: record.data.user_id,  properties: {    platform: record.data.platform || 'unknown',    version: record.data.app_version,    country: await lookupCountry(record.data.ip_address)  },  metadata: {    originalTimestamp: record.timestamp,    processedAt: new Date()  }}));

Filtering

Remove or filter records based on conditions:

interface MetricRecord {  id: string;  name: string;  value: number;  timestamp: Date;} const inputStream = new Stream<MetricRecord>("input_metrics");const validMetrics = new Stream<MetricRecord>("valid_metrics"); // Multiple filtering conditionsinputStream.addTransform(validMetrics, (record) => {  // Filter out records with invalid values  if (isNaN(record.value) || record.value < 0) {    return undefined;  }   // Filter out old records  if (record.timestamp < getStartOfDay()) {    return undefined;  }   // Filter out specific metrics  if (record.name.startsWith('debug_')) {    return undefined;  }   return record;});

Fan Out (1:N)

Send data to multiple downstream processors:

interface Order {  orderId: string;  userId: string;  amount: number;  items: string[];} interface HighPriorityOrder extends Order {  priority: 'high';} interface ArchivedOrder extends Order {  archivedAt: Date;} // Define destination streamsconst analyticsStream = new Stream<Order>("order_analytics");const notificationStream = new Stream<HighPriorityOrder>("order_notifications");const archiveStream = new Stream<ArchivedOrder>("order_archive"); // Source streamconst orderStream = new Stream<Order>("orders"); // Send all orders to analyticsorderStream.addTransform(analyticsStream, (order) => order); // Send large orders to notificationsorderStream.addTransform(notificationStream, (order) => {  if (order.amount > 1000) {    return {      ...order,      priority: 'high'    };  }  return undefined; // Skip small orders}); // Archive all ordersorderStream.addTransform(archiveStream, (order) => ({  ...order,  archivedAt: new Date()}));

Fan In (N:1)

Combine data from multiple sources:

import { Stream, OlapTable, Key } from "@514labs/moose-lib"; interface UserEvent {  userId: Key<string>;  eventType: string;  timestamp: Date;  source: string;} // Source streamsconst webEvents = new Stream<UserEvent>("web_events");const mobileEvents = new Stream<UserEvent>("mobile_events");const apiEvents = new Stream<UserEvent>("api_events"); // Create a stream and table for the combined eventsconst eventsTable = new OlapTable<UserEvent>("all_events");const allEvents = new Stream<UserEvent>("all_events", {  destination: eventsTable}); // Fan in from webwebEvents.addTransform(allEvents, (event) => ({  ...event,  source: 'web',  timestamp: new Date()})); // Fan in from mobilemobileEvents.addTransform(allEvents, (event) => ({  ...event,  source: 'mobile',  timestamp: new Date()})); // Fan in from APIapiEvents.addTransform(allEvents, (event) => ({  ...event,  source: 'api',  timestamp: new Date()}));

Unnesting

Flatten nested records:

import { Stream, Key } from "@514labs/moose-lib"; interface NestedRecord {  id: Key<string>;  nested: {    value: number;  }[];} interface FlattenedRecord {  id: Key<string>;  value: number;} const nestedStream = new Stream<NestedRecord>("nested_records");const flattenedStream = new Stream<FlattenedRecord>("flattened_records"); nestedStream.addTransform(flattenedStream, (record) => record.nested.map((n) => ({  id: record.id,  value: n.value})));
Multiple transformation routes limitation

You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either:

  • Use conditional logic inside a single streaming function to handle different cases, or
  • Implement a fan-out/fan-in pattern, where you route records to different intermediate streams and then merge them back into the destination stream.

Error Handling with Dead Letter Queues

When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline.

import { DeadLetterQueue, IngestPipeline } from "@514labs/moose-lib"; interface UserEvent {  userId: string;  action: string;  timestamp: number;} interface ProcessedEvent {  userId: string;  action: string;  processedAt: Date;  isValid: boolean;} // Create pipelinesconst rawEvents = new IngestPipeline<UserEvent>("raw_events", {  ingestApi: true,  stream: true,  table: false}); const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", {  ingestApi: false,  stream: true,  table: true}); // Create dead letter queue for failed transformationsconst eventDLQ = new DeadLetterQueue<UserEvent>("EventDLQ"); // Add transform with error handlingrawEvents.stream!.addTransform(  processedEvents.stream!,  (event: UserEvent): ProcessedEvent => {    // This might fail for invalid data    if (!event.userId || event.userId.length === 0) {      throw new Error("Invalid userId: cannot be empty");    }     return {      userId: event.userId,      action: event.action,      processedAt: new Date(),      isValid: true    };  },  {    deadLetterQueue: eventDLQ  // Failed messages go here  }); // Monitor dead letter messageseventDLQ.addConsumer((deadLetter) => {  console.log(`Error: ${deadLetter.errorMessage}`);  console.log(`Failed at: ${deadLetter.failedAt}`);    // Access original typed data  const originalEvent: UserEvent = deadLetter.asTyped();  console.log(`Original User ID: ${originalEvent.userId}`);});
Learn More

For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the Dead Letter Queues guide.

import { DeadLetterQueue, IngestPipeline } from "@514labs/moose-lib"; interface UserEvent {  userId: string;  action: string;  timestamp: number;} interface ProcessedEvent {  userId: string;  action: string;  processedAt: Date;  isValid: boolean;} // Create pipelinesconst rawEvents = new IngestPipeline<UserEvent>("raw_events", {  ingestApi: true,  stream: true,  table: false}); const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", {  ingestApi: false,  stream: true,  table: true}); // Create dead letter queue for failed transformationsconst eventDLQ = new DeadLetterQueue<UserEvent>("EventDLQ"); // Add transform with error handlingrawEvents.stream!.addTransform(  processedEvents.stream!,  (event: UserEvent): ProcessedEvent => {    // This might fail for invalid data    if (!event.userId || event.userId.length === 0) {      throw new Error("Invalid userId: cannot be empty");    }     return {      userId: event.userId,      action: event.action,      processedAt: new Date(),      isValid: true    };  },  {    deadLetterQueue: eventDLQ  // Failed messages go here  }); // Monitor dead letter messageseventDLQ.addConsumer((deadLetter) => {  console.log(`Error: ${deadLetter.errorMessage}`);  console.log(`Failed at: ${deadLetter.failedAt}`);    // Access original typed data  const originalEvent: UserEvent = deadLetter.asTyped();  console.log(`Original User ID: ${originalEvent.userId}`);});
  • Overview
  • Quick Start
  • Templates / Examples
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Data Modeling
MooseStack in your App
  • App / API frameworks
Modules
  • Moose OLAP
  • Moose Streaming
    • Manage Streams
    • Create Streams
    • Sync to OLAP
    • Dead Letter Queues
    • Functions
    • Consumer Functions
    • Transformation Functions
    • Writing to Streams
    • From Your Code
    • Schema Registry
    • From CDC Services
  • Moose Workflows
  • Moose APIs
Deployment & Lifecycle
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Changelog
Contribution
  • Documentation
  • Framework
FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplates
Changelog
Source506