Moose

Developing

Error Handling With Dead Letter Queues

Dead Letter Queues

Viewing

Overview

Dead Letter Queues (DLQs) provide a robust error handling mechanism for stream processing in Moose. When streaming functions fail during transformation or consumption, failed messages are automatically routed to a configured dead letter queue for later analysis and recovery.

Dead Letter Queue Benefits

Fault tolerance

Prevent single message failures from stopping entire stream processing pipelines

Error visibility

Capture detailed error information including original data, error messages, and timestamps

Recovery workflows

Build automated or manual processes to handle and retry failed messages

Type safety

Maintain full type safety when accessing original data from dead letter records

Dead Letter Record Structure

When a message fails processing, Moose creates a dead letter record with the following structure:

interface DeadLetterModel {
  originalRecord: Record<string, any>;  // The original message that failed
  errorMessage: string;                 // Error description
  errorType: string;                    // Error class/type name
  failedAt: Date;                       // Timestamp when failure occurred
  source: "api" | "transform" | "table"; // Where the failure happened
}
 
interface DeadLetter<T> extends DeadLetterModel {
  asTyped: () => T;  // Type-safe access to original record
}

Creating Dead Letter Queues

Basic Setup

dead-letter-setup.ts
import { DeadLetterQueue, Stream } from "@514labs/moose-lib";
 
// Define your data model
interface UserEvent {
  userId: string;
  action: string;
  timestamp: number;
}
 
// Create a dead letter queue for UserEvent failures
const userEventDLQ = new DeadLetterQueue<UserEvent>("UserEventDLQ");

Configuring Transforms with Dead Letter Queues

transform-with-dlq.ts
import { IngestPipeline, DeadLetterQueue } from "@514labs/moose-lib";
 
interface RawEvent {
  userId: string;
  action: string;
  timestamp: number;
}
 
interface ProcessedEvent {
  userId: string;
  action: string;
  processedAt: Date;
  isValid: boolean;
}
 
// Create pipelines
const rawEvents = new IngestPipeline<RawEvent>("raw_events", {
  ingest: true,
  stream: true,
  table: false
});
 
const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", {
  ingest: false,
  stream: true,
  table: true
});
 
// Create dead letter queue
const eventDLQ = new DeadLetterQueue<RawEvent>("EventDLQ");
 
// Add transform with DLQ configuration
rawEvents.stream!.addTransform(
  processedEvents.stream!,
  (event: RawEvent): ProcessedEvent => {
    // This transform might fail for invalid data
    if (!event.userId || event.userId.length === 0) {
      throw new Error("Invalid userId: cannot be empty");
    }
    
    if (event.timestamp < 0) {
      throw new Error("Invalid timestamp: cannot be negative");
    }
 
    return {
      userId: event.userId,
      action: event.action,
      processedAt: new Date(),
      isValid: true
    };
  },
  {
    deadLetterQueue: eventDLQ  // Configure DLQ for this transform
  }
);

Configuring Consumers with Dead Letter Queues

consumer-with-dlq.ts
// Add consumer with DLQ configuration
rawEvents.stream!.addConsumer(
  (event: RawEvent): void => {
    // This consumer might fail for certain events
    if (event.action === "forbidden_action") {
      throw new Error("Forbidden action detected");
    }
    
    // Process the event (e.g., send to external API)
    console.log(`Processing event for user ${event.userId}`);
  },
  {
    deadLetterQueue: eventDLQ  // Configure DLQ for this consumer
  }
);

Processing Dead Letter Messages

Monitoring Dead Letter Queues

dlq-monitoring.ts
// Add a consumer to monitor dead letter messages
eventDLQ.addConsumer((deadLetter) => {
  console.log("Dead letter received:");
  console.log(`Error: ${deadLetter.errorMessage}`);
  console.log(`Error Type: ${deadLetter.errorType}`);
  console.log(`Failed At: ${deadLetter.failedAt}`);
  console.log(`Source: ${deadLetter.source}`);
  
  // Access the original typed data
  const originalEvent: RawEvent = deadLetter.asTyped();
  console.log(`Original User ID: ${originalEvent.userId}`);
});

Recovery and Retry Logic

dlq-recovery.ts
// Create a recovery stream for fixed messages
const recoveredEvents = new Stream<ProcessedEvent>("recovered_events", {
  destination: processedEvents.table  // Send recovered data to main table
});
 
// Add recovery logic to the DLQ
eventDLQ.addTransform(
  recoveredEvents,
  (deadLetter): ProcessedEvent | null => {
    try {
      const originalEvent = deadLetter.asTyped();
      
      // Apply fixes based on error type
      if (deadLetter.errorMessage.includes("Invalid userId")) {
        // Skip events with invalid user IDs
        return null;
      }
      
      if (deadLetter.errorMessage.includes("Invalid timestamp")) {
        // Fix negative timestamps
        const fixedEvent = {
          ...originalEvent,
          timestamp: Math.abs(originalEvent.timestamp)
        };
        
        return {
          userId: fixedEvent.userId,
          action: fixedEvent.action,
          processedAt: new Date(),
          isValid: true
        };
      }
      
      return null; // Skip other errors
    } catch (error) {
      console.error("Recovery failed:", error);
      return null;
    }
  }
);

Best Practices

Dead Letter Queue Best Practices

Monitor DLQ volume

Set up alerts when dead letter queues receive messages to catch processing issues early

Include context in errors

Throw descriptive errors that help identify the root cause and potential fixes

Implement recovery logic

Build automated recovery processes for common failure patterns

Set retention policies

Configure appropriate retention periods for dead letter queues based on your recovery SLAs

Use separate DLQs

Create separate dead letter queues for different types of failures or processing stages

Log dead letter events

Ensure dead letter events are properly logged for debugging and analysis

Common Patterns

Circuit Breaker Pattern

circuit-breaker.ts
let failureCount = 0;
const maxFailures = 5;
const resetTimeout = 60000; // 1 minute
 
rawEvents.stream!.addTransform(
  processedEvents.stream!,
  (event: RawEvent): ProcessedEvent => {
    if (failureCount >= maxFailures) {
      throw new Error("Circuit breaker open - too many failures");
    }
    
    try {
      // Your processing logic here
      const result = processEvent(event);
      failureCount = 0; // Reset on success
      return result;
    } catch (error) {
      failureCount++;
      if (failureCount >= maxFailures) {
        setTimeout(() => { failureCount = 0; }, resetTimeout);
      }
      throw error;
    }
  },
  { deadLetterQueue: eventDLQ }
);

Retry with Exponential Backoff

retry-backoff.ts
// Create a retry DLQ with delay processing
const retryDLQ = new DeadLetterQueue<RawEvent>("RetryDLQ");
 
retryDLQ.addTransform(
  processedEvents.stream!,
  (deadLetter): ProcessedEvent | null => {
    const retryCount = deadLetter.originalRecord.retryCount || 0;
    const maxRetries = 3;
    
    if (retryCount >= maxRetries) {
      console.log("Max retries exceeded, giving up");
      return null;
    }
    
    // Calculate delay (exponential backoff)
    const delay = Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s
    
    setTimeout(() => {
      try {
        const originalEvent = deadLetter.asTyped();
        // Add retry count to track attempts
        const eventWithRetry = {
          ...originalEvent,
          retryCount: retryCount + 1
        };
        
        // Retry the original processing logic
        processEvent(eventWithRetry);
      } catch (error) {
        // Will go back to DLQ with incremented retry count
        throw error;
      }
    }, delay);
    
    return null; // Don't emit immediately, wait for retry
  }
);

Performance Considerations

Dead letter queues add overhead to stream processing. Use them judiciously and monitor their impact on throughput. Consider implementing sampling for high-volume streams where occasional message loss is acceptable.

Monitoring Integration

Dead letter queue events can be integrated with monitoring systems like Prometheus, DataDog, or CloudWatch for alerting and dashboards. Consider tracking metrics like DLQ message rate, error types, and recovery success rates.