Dead Letter Queues (DLQs) provide a robust error handling mechanism for stream processing in Moose. When streaming functions fail during transformation or consumption, failed messages are automatically routed to a configured dead letter queue for later analysis and recovery.
Prevent single message failures from stopping entire stream processing pipelines
Capture detailed error information including original data, error messages, and timestamps
Build automated or manual processes to handle and retry failed messages
Maintain full type safety when accessing original data from dead letter records
When a message fails processing, Moose creates a dead letter record with the following structure:
interface DeadLetterModel { originalRecord: Record<string, any>; // The original message that failed errorMessage: string; // Error description errorType: string; // Error class/type name failedAt: Date; // Timestamp when failure occurred source: "api" | "transform" | "table"; // Where the failure happened} interface DeadLetter<T> extends DeadLetterModel { asTyped: () => T; // Type-safe access to original record}interface DeadLetterModel { originalRecord: Record<string, any>; // The original message that failed errorMessage: string; // Error description errorType: string; // Error class/type name failedAt: Date; // Timestamp when failure occurred source: "api" | "transform" | "table"; // Where the failure happened} interface DeadLetter<T> extends DeadLetterModel { asTyped: () => T; // Type-safe access to original record}import { DeadLetterQueue, Stream } from "@514labs/moose-lib"; // Define your data modelinterface UserEvent { userId: string; action: string; timestamp: number;} // Create a dead letter queue for UserEvent failuresconst userEventDLQ = new DeadLetterQueue<UserEvent>("UserEventDLQ");Add a dead letter queue to your Transformation Function configuration, and any errors thrown in the transformation will trigger the event to be routed to the dead letter queue.
import { DeadLetterQueue } from "@514labs/moose-lib"; // Create dead letter queueconst eventDLQ = new DeadLetterQueue<RawEvent>("EventDLQ"); // Add transform with errors to trigger DLQ, and DLQ configurationrawEvents.stream!.addTransform( processedEvents.stream!, (event: RawEvent): ProcessedEvent => { // This transform might fail for invalid data if (!event.userId || event.userId.length === 0) { throw new Error("Invalid userId: cannot be empty"); } if (event.timestamp < 0) { throw new Error("Invalid timestamp: cannot be negative"); } return { userId: event.userId, action: event.action, processedAt: new Date(), isValid: true }; }, { deadLetterQueue: eventDLQ // Configure DLQ for this transform });Add a dead letter queue to your Consumer Function configuration, and any errors thrown in the function will trigger the event to be routed to the dead letter queue.
// Add consumer with errors to trigger DLQ, and DLQ configurationrawEvents.stream!.addConsumer( (event: RawEvent): void => { // This consumer might fail for certain events if (event.action === "forbidden_action") { throw new Error("Forbidden action detected"); } // Process the event (e.g., send to external API) console.log(`Processing event for user ${event.userId}`); }, { deadLetterQueue: eventDLQ // Configure DLQ for this consumer });Add a dead letter queue to your Ingest API configuration, and any runtime data validation failures at the API will trigger the event to be routed to the dead letter queue.
interface ExampleModel { id: string; userId: string; timestamp: Date; properties?: { device?: string; version?: number; }} export const api = new IngestApi<ExampleModel>("your-api-route", { destination: new Stream<ExampleModel>("your-stream-name"), deadLetterQueue: new DeadLetterQueue<ExampleModel>("your-dlq-name")});// Add a consumer to monitor dead letter messageseventDLQ.addConsumer((deadLetter) => { console.log("Dead letter received:"); console.log(`Error: ${deadLetter.errorMessage}`); console.log(`Error Type: ${deadLetter.errorType}`); console.log(`Failed At: ${deadLetter.failedAt}`); console.log(`Source: ${deadLetter.source}`); // Access the original typed data const originalEvent: RawEvent = deadLetter.asTyped(); console.log(`Original User ID: ${originalEvent.userId}`);});// Create a recovery stream for fixed messagesconst recoveredEvents = new Stream<ProcessedEvent>("recovered_events", { destination: processedEvents.table // Send recovered data to main table}); // Add recovery logic to the DLQeventDLQ.addTransform( recoveredEvents, (deadLetter): ProcessedEvent | null => { try { const originalEvent = deadLetter.asTyped(); // Apply fixes based on error type if (deadLetter.errorMessage.includes("Invalid userId")) { // Skip events with invalid user IDs return null; } if (deadLetter.errorMessage.includes("Invalid timestamp")) { // Fix negative timestamps const fixedEvent = { ...originalEvent, timestamp: Math.abs(originalEvent.timestamp) }; return { userId: fixedEvent.userId, action: fixedEvent.action, processedAt: new Date(), isValid: true }; } return null; // Skip other errors } catch (error) { console.error("Recovery failed:", error); return null; } });Set up alerts when dead letter queues receive messages to catch processing issues early
Throw descriptive errors that help identify the root cause and potential fixes
Build automated recovery processes for common failure patterns
Configure appropriate retention periods for dead letter queues based on your recovery SLAs
Create separate dead letter queues for different types of failures or processing stages
Ensure dead letter events are properly logged for debugging and analysis
let failureCount = 0;const maxFailures = 5;const resetTimeout = 60000; // 1 minute rawEvents.stream!.addTransform( processedEvents.stream!, (event: RawEvent): ProcessedEvent => { if (failureCount >= maxFailures) { throw new Error("Circuit breaker open - too many failures"); } try { // Your processing logic here const result = processEvent(event); failureCount = 0; // Reset on success return result; } catch (error) { failureCount++; if (failureCount >= maxFailures) { setTimeout(() => { failureCount = 0; }, resetTimeout); } throw error; } }, { deadLetterQueue: eventDLQ });// Create a retry DLQ with delay processingconst retryDLQ = new DeadLetterQueue<RawEvent>("RetryDLQ"); retryDLQ.addTransform( processedEvents.stream!, (deadLetter): ProcessedEvent | null => { const retryCount = deadLetter.originalRecord.retryCount || 0; const maxRetries = 3; if (retryCount >= maxRetries) { console.log("Max retries exceeded, giving up"); return null; } // Calculate delay (exponential backoff) const delay = Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s setTimeout(() => { try { const originalEvent = deadLetter.asTyped(); // Add retry count to track attempts const eventWithRetry = { ...originalEvent, retryCount: retryCount + 1 }; // Retry the original processing logic processEvent(eventWithRetry); } catch (error) { // Will go back to DLQ with incremented retry count throw error; } }, delay); return null; // Don't emit immediately, wait for retry });Dead letter queues add overhead to stream processing. Use them judiciously and monitor their impact on throughput. Consider implementing sampling for high-volume streams where occasional message loss is acceptable.
Dead letter queue events can be integrated with monitoring systems like Prometheus, DataDog, or CloudWatch for alerting and dashboards. Consider tracking metrics like DLQ message rate, error types, and recovery success rates.
Dead Letter Queues (DLQs) can be directly integrated with your ingestion pipelines to capture records that fail validation or processing at the API entry point. This ensures that no data is lost, even if it cannot be immediately processed.
import { IngestPipeline, DeadLetterQueue } from "@514labs/moose-lib"; interface ExampleSchema { id: string; name: string; value: number; timestamp: Date;} const pipeline = new IngestPipeline<ExampleSchema>("example", { ingestApi: true, stream: true, table: true, deadLetterQueue: true, // Route failed ingestions to DLQ});See the Ingestion API documentation for more details and best practices on configuring DLQs for ingestion.