Moose Stack

Moose Streaming

Transformation Functions

Transformation Functions

Viewing:

Overview

Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination.

Implementing Transformations

Reshape and Enrich Data

Transform data shape or enrich records:

DataTransform.ts
import { Stream, Key } from "@514labs/moose-lib";
 
interface RawEvent {
  id: Key<string>;
  timestamp: string;
  data: {
    user_id: string;
    platform: string;
    app_version: string;
    ip_address: string;
  }
}
 
interface EnrichedEvent {
  eventId: Key<string>;
  timestamp: Date;
  userId: Key<string>;
  properties: {
    platform: string;
    version: string;
    country: string;
  };
  metadata: {
    originalTimestamp: string;
    processedAt: Date;
  }
}
 
const rawStream = new Stream<RawEvent>("raw_events");
const enrichedStream = new Stream<EnrichedEvent>("enriched_events");
 
// Reshape and enrich data
rawStream.addTransform(enrichedStream, async (record: RawEvent) => ({
  eventId: record.id,
  timestamp: new Date(record.timestamp),
  userId: record.data.user_id,
  properties: {
    platform: record.data.platform || 'unknown',
    version: record.data.app_version,
    country: await lookupCountry(record.data.ip_address)
  },
  metadata: {
    originalTimestamp: record.timestamp,
    processedAt: new Date()
  }
}));

Filtering

Remove or filter records based on conditions:

FilterStream.ts
interface MetricRecord {
  id: string;
  name: string;
  value: number;
  timestamp: Date;
}
 
const inputStream = new Stream<MetricRecord>("input_metrics");
const validMetrics = new Stream<MetricRecord>("valid_metrics");
 
// Multiple filtering conditions
inputStream.addTransform(validMetrics, (record) => {
  // Filter out records with invalid values
  if (isNaN(record.value) || record.value < 0) {
    return undefined;
  }
 
  // Filter out old records
  if (record.timestamp < getStartOfDay()) {
    return undefined;
  }
 
  // Filter out specific metrics
  if (record.name.startsWith('debug_')) {
    return undefined;
  }
 
  return record;
});

Fan Out (1:N)

Send data to multiple downstream processors:

FanOut.ts
interface Order {
  orderId: string;
  userId: string;
  amount: number;
  items: string[];
}
 
interface HighPriorityOrder extends Order {
  priority: 'high';
}
 
interface ArchivedOrder extends Order {
  archivedAt: Date;
}
 
// Define destination streams
const analyticsStream = new Stream<Order>("order_analytics");
const notificationStream = new Stream<HighPriorityOrder>("order_notifications");
const archiveStream = new Stream<ArchivedOrder>("order_archive");
 
// Source stream
const orderStream = new Stream<Order>("orders");
 
// Send all orders to analytics
orderStream.addTransform(analyticsStream, (order) => order);
 
// Send large orders to notifications
orderStream.addTransform(notificationStream, (order) => {
  if (order.amount > 1000) {
    return {
      ...order,
      priority: 'high'
    };
  }
  return undefined; // Skip small orders
});
 
// Archive all orders
orderStream.addTransform(archiveStream, (order) => ({
  ...order,
  archivedAt: new Date()
}));

Fan In (N:1)

Combine data from multiple sources:

FanIn.ts
import { Stream, OlapTable, Key } from "@514labs/moose-lib";
 
interface UserEvent {
  userId: Key<string>;
  eventType: string;
  timestamp: Date;
  source: string;
}
 
// Source streams
const webEvents = new Stream<UserEvent>("web_events");
const mobileEvents = new Stream<UserEvent>("mobile_events");
const apiEvents = new Stream<UserEvent>("api_events");
 
// Create a stream and table for the combined events
const eventsTable = new OlapTable<UserEvent>("all_events");
const allEvents = new Stream<UserEvent>("all_events", {
  destination: eventsTable
});
 
// Fan in from web
webEvents.addTransform(allEvents, (event) => ({
  ...event,
  source: 'web',
  timestamp: new Date()
}));
 
// Fan in from mobile
mobileEvents.addTransform(allEvents, (event) => ({
  ...event,
  source: 'mobile',
  timestamp: new Date()
}));
 
// Fan in from API
apiEvents.addTransform(allEvents, (event) => ({
  ...event,
  source: 'api',
  timestamp: new Date()
}));

Unnesting

Flatten nested records:

Unnest.ts
import { Stream, Key } from "@514labs/moose-lib";
 
interface NestedRecord {
  id: Key<string>;
  nested: {
    value: number;
  }[];
}
 
interface FlattenedRecord {
  id: Key<string>;
  value: number;
}
 
const nestedStream = new Stream<NestedRecord>("nested_records");
const flattenedStream = new Stream<FlattenedRecord>("flattened_records");
 
nestedStream.addTransform(flattenedStream, (record) => record.nested.map((n) => ({
  id: record.id,
  value: n.value
})));

Multiple transformation routes limitation

You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either:

  • Use conditional logic inside a single streaming function to handle different cases, or
  • Implement a fan-out/fan-in pattern, where you route records to different intermediate streams and then merge them back into the destination stream.

Error Handling with Dead Letter Queues

When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline.

DeadLetterQueue.ts
import { DeadLetterQueue, IngestPipeline } from "@514labs/moose-lib";
 
interface UserEvent {
  userId: string;
  action: string;
  timestamp: number;
}
 
interface ProcessedEvent {
  userId: string;
  action: string;
  processedAt: Date;
  isValid: boolean;
}
 
// Create pipelines
const rawEvents = new IngestPipeline<UserEvent>("raw_events", {
  ingest: true,
  stream: true,
  table: false
});
 
const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", {
  ingest: false,
  stream: true,
  table: true
});
 
// Create dead letter queue for failed transformations
const eventDLQ = new DeadLetterQueue<UserEvent>("EventDLQ");
 
// Add transform with error handling
rawEvents.stream!.addTransform(
  processedEvents.stream!,
  (event: UserEvent): ProcessedEvent => {
    // This might fail for invalid data
    if (!event.userId || event.userId.length === 0) {
      throw new Error("Invalid userId: cannot be empty");
    }
 
    return {
      userId: event.userId,
      action: event.action,
      processedAt: new Date(),
      isValid: true
    };
  },
  {
    deadLetterQueue: eventDLQ  // Failed messages go here
  }
);
 
// Monitor dead letter messages
eventDLQ.addConsumer((deadLetter) => {
  console.log(`Error: ${deadLetter.errorMessage}`);
  console.log(`Failed at: ${deadLetter.failedAt}`);
  
  // Access original typed data
  const originalEvent: UserEvent = deadLetter.asTyped();
  console.log(`Original User ID: ${originalEvent.userId}`);
});

Learn More

For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the Dead Letter Queues guide.