# Moose / Streaming / Dead Letter Queues Documentation – TypeScript ## Included Files 1. moose/streaming/dead-letter-queues/dead-letter-queues.mdx ## Dead Letter Queues Source: moose/streaming/dead-letter-queues/dead-letter-queues.mdx Handle failed stream processing with dead letter queues # Dead Letter Queues ## Overview Dead Letter Queues (DLQs) provide a robust error handling mechanism for stream processing in Moose. When streaming functions fail during transformation or consumption, failed messages are automatically routed to a configured dead letter queue for later analysis and recovery. ## Dead Letter Record Structure When a message fails processing, Moose creates a dead letter record with the following structure: ```ts interface DeadLetterModel { originalRecord: Record; // The original message that failed errorMessage: string; // Error description errorType: string; // Error class/type name failedAt: Date; // Timestamp when failure occurred source: "api" | "transform" | "table"; // Where the failure happened } interface DeadLetter extends DeadLetterModel { asTyped: () => T; // Type-safe access to original record } ``` ## Creating Dead Letter Queues ### Basic Setup ```ts filename="dead-letter-setup.ts" copy // Define your data model interface UserEvent { userId: string; action: string; timestamp: number; } // Create a dead letter queue for UserEvent failures const userEventDLQ = new DeadLetterQueue("UserEventDLQ"); ``` ### Configuring Transformations with Dead Letter Queues Add a dead letter queue to your Transformation Function configuration, and any errors thrown in the transformation will trigger the event to be routed to the dead letter queue. ```ts filename="transform-with-dlq.ts" copy // Create dead letter queue const eventDLQ = new DeadLetterQueue("EventDLQ"); // Add transform with errors to trigger DLQ, and DLQ configuration rawEvents.stream!.addTransform( processedEvents.stream!, (event: RawEvent): ProcessedEvent => { // This transform might fail for invalid data if (!event.userId || event.userId.length === 0) { throw new Error("Invalid userId: cannot be empty"); } if (event.timestamp < 0) { throw new Error("Invalid timestamp: cannot be negative"); } return { userId: event.userId, action: event.action, processedAt: new Date(), isValid: true }; }, { deadLetterQueue: eventDLQ // Configure DLQ for this transform } ); ``` ### Configuring Consumers with Dead Letter Queues Add a dead letter queue to your Consumer Function configuration, and any errors thrown in the function will trigger the event to be routed to the dead letter queue. ```ts filename="consumer-with-dlq.ts" copy // Add consumer with errors to trigger DLQ, and DLQ configuration rawEvents.stream!.addConsumer( (event: RawEvent): void => { // This consumer might fail for certain events if (event.action === "forbidden_action") { throw new Error("Forbidden action detected"); } // Process the event (e.g., send to external API) console.log(`Processing event for user ${event.userId}`); }, { deadLetterQueue: eventDLQ // Configure DLQ for this consumer } ); ``` ### Configuring Ingest APIs with Dead Letter Queues Add a dead letter queue to your Ingest API configuration, and any runtime data validation failures at the API will trigger the event to be routed to the dead letter queue. ```typescript filename="ValidationExample.ts" copy interface ExampleModel { id: string; userId: string; timestamp: Date; properties?: { device?: string; version?: number; } } ); ``` ## Processing Dead Letter Messages ### Monitoring Dead Letter Queues ```ts filename="dlq-monitoring.ts" copy // Add a consumer to monitor dead letter messages eventDLQ.addConsumer((deadLetter) => { console.log("Dead letter received:"); console.log(`Error: ${deadLetter.errorMessage}`); console.log(`Error Type: ${deadLetter.errorType}`); console.log(`Failed At: ${deadLetter.failedAt}`); console.log(`Source: ${deadLetter.source}`); // Access the original typed data const originalEvent: RawEvent = deadLetter.asTyped(); console.log(`Original User ID: ${originalEvent.userId}`); }); ``` ### Recovery and Retry Logic ```ts filename="dlq-recovery.ts" copy // Create a recovery stream for fixed messages const recoveredEvents = new Stream("recovered_events", { destination: processedEvents.table // Send recovered data to main table }); // Add recovery logic to the DLQ eventDLQ.addTransform( recoveredEvents, (deadLetter): ProcessedEvent | null => { try { const originalEvent = deadLetter.asTyped(); // Apply fixes based on error type if (deadLetter.errorMessage.includes("Invalid userId")) { // Skip events with invalid user IDs return null; } if (deadLetter.errorMessage.includes("Invalid timestamp")) { // Fix negative timestamps const fixedEvent = { ...originalEvent, timestamp: Math.abs(originalEvent.timestamp) }; return { userId: fixedEvent.userId, action: fixedEvent.action, processedAt: new Date(), isValid: true }; } return null; // Skip other errors } catch (error) { console.error("Recovery failed:", error); return null; } } ); ``` ## Best Practices ## Common Patterns ### Circuit Breaker Pattern ```ts filename="circuit-breaker.ts" copy let failureCount = 0; const maxFailures = 5; const resetTimeout = 60000; // 1 minute rawEvents.stream!.addTransform( processedEvents.stream!, (event: RawEvent): ProcessedEvent => { if (failureCount >= maxFailures) { throw new Error("Circuit breaker open - too many failures"); } try { // Your processing logic here const result = processEvent(event); failureCount = 0; // Reset on success return result; } catch (error) { failureCount++; if (failureCount >= maxFailures) { setTimeout(() => { failureCount = 0; }, resetTimeout); } throw error; } }, { deadLetterQueue: eventDLQ } ); ``` ### Retry with Exponential Backoff ```ts filename="retry-backoff.ts" copy // Create a retry DLQ with delay processing const retryDLQ = new DeadLetterQueue("RetryDLQ"); retryDLQ.addTransform( processedEvents.stream!, (deadLetter): ProcessedEvent | null => { const retryCount = deadLetter.originalRecord.retryCount || 0; const maxRetries = 3; if (retryCount >= maxRetries) { console.log("Max retries exceeded, giving up"); return null; } // Calculate delay (exponential backoff) const delay = Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s setTimeout(() => { try { const originalEvent = deadLetter.asTyped(); // Add retry count to track attempts const eventWithRetry = { ...originalEvent, retryCount: retryCount + 1 }; // Retry the original processing logic processEvent(eventWithRetry); } catch (error) { // Will go back to DLQ with incremented retry count throw error; } }, delay); return null; // Don't emit immediately, wait for retry } ); ``` Dead letter queues add overhead to stream processing. Use them judiciously and monitor their impact on throughput. Consider implementing sampling for high-volume streams where occasional message loss is acceptable. Dead letter queue events can be integrated with monitoring systems like Prometheus, DataDog, or CloudWatch for alerting and dashboards. Consider tracking metrics like DLQ message rate, error types, and recovery success rates. ## Using Dead Letter Queues in Ingestion Pipelines Dead Letter Queues (DLQs) can be directly integrated with your ingestion pipelines to capture records that fail validation or processing at the API entry point. This ensures that no data is lost, even if it cannot be immediately processed. ```typescript filename="IngestPipelineWithDLQ.ts" copy interface ExampleSchema { id: string; name: string; value: number; timestamp: Date; } const pipeline = new IngestPipeline("example", { ingestApi: true, stream: true, table: true, deadLetterQueue: true, // Route failed ingestions to DLQ }); ``` See the [Ingestion API documentation](/moose/apis/ingest-api#validation) for more details and best practices on configuring DLQs for ingestion.