Dead Letter Queues
Viewing
Overview
Dead Letter Queues (DLQs) provide a robust error handling mechanism for stream processing in Moose. When streaming functions fail during transformation or consumption, failed messages are automatically routed to a configured dead letter queue for later analysis and recovery.
Dead Letter Queue Benefits
Fault tolerance
Prevent single message failures from stopping entire stream processing pipelines
Error visibility
Capture detailed error information including original data, error messages, and timestamps
Recovery workflows
Build automated or manual processes to handle and retry failed messages
Type safety
Maintain full type safety when accessing original data from dead letter records
Dead Letter Record Structure
When a message fails processing, Moose creates a dead letter record with the following structure:
interface DeadLetterModel {
originalRecord: Record<string, any>; // The original message that failed
errorMessage: string; // Error description
errorType: string; // Error class/type name
failedAt: Date; // Timestamp when failure occurred
source: "api" | "transform" | "table"; // Where the failure happened
}
interface DeadLetter<T> extends DeadLetterModel {
asTyped: () => T; // Type-safe access to original record
}
class DeadLetterModel(BaseModel, Generic[T]):
original_record: Any # The original message that failed
error_message: str # Error description
error_type: str # Error class/type name
failed_at: datetime.datetime # Timestamp when failure occurred
source: Literal["api", "transform", "table"] # Where the failure happened
def as_typed(self) -> T: # Type-safe access to original record
return self._t.model_validate(self.original_record)
Creating Dead Letter Queues
Basic Setup
import { DeadLetterQueue, Stream } from "@514labs/moose-lib";
// Define your data model
interface UserEvent {
userId: string;
action: string;
timestamp: number;
}
// Create a dead letter queue for UserEvent failures
const userEventDLQ = new DeadLetterQueue<UserEvent>("UserEventDLQ");
from moose_lib import DeadLetterQueue
from pydantic import BaseModel
# Define your data model
class UserEvent(BaseModel):
user_id: str
action: str
timestamp: float
# Create a dead letter queue for UserEvent failures
user_event_dlq = DeadLetterQueue[UserEvent](name="UserEventDLQ")
Configuring Transforms with Dead Letter Queues
import { IngestPipeline, DeadLetterQueue } from "@514labs/moose-lib";
interface RawEvent {
userId: string;
action: string;
timestamp: number;
}
interface ProcessedEvent {
userId: string;
action: string;
processedAt: Date;
isValid: boolean;
}
// Create pipelines
const rawEvents = new IngestPipeline<RawEvent>("raw_events", {
ingest: true,
stream: true,
table: false
});
const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", {
ingest: false,
stream: true,
table: true
});
// Create dead letter queue
const eventDLQ = new DeadLetterQueue<RawEvent>("EventDLQ");
// Add transform with DLQ configuration
rawEvents.stream!.addTransform(
processedEvents.stream!,
(event: RawEvent): ProcessedEvent => {
// This transform might fail for invalid data
if (!event.userId || event.userId.length === 0) {
throw new Error("Invalid userId: cannot be empty");
}
if (event.timestamp < 0) {
throw new Error("Invalid timestamp: cannot be negative");
}
return {
userId: event.userId,
action: event.action,
processedAt: new Date(),
isValid: true
};
},
{
deadLetterQueue: eventDLQ // Configure DLQ for this transform
}
);
from moose_lib import IngestPipeline, DeadLetterQueue, TransformConfig
from pydantic import BaseModel
from datetime import datetime
class RawEvent(BaseModel):
user_id: str
action: str
timestamp: float
class ProcessedEvent(BaseModel):
user_id: str
action: str
processed_at: datetime
is_valid: bool
# Create pipelines
raw_events = IngestPipeline[RawEvent]("raw_events", {
"ingest": True,
"stream": True,
"table": False
})
processed_events = IngestPipeline[ProcessedEvent]("processed_events", {
"ingest": False,
"stream": True,
"table": True
})
# Create dead letter queue
event_dlq = DeadLetterQueue[RawEvent](name="EventDLQ")
def process_event(event: RawEvent) -> ProcessedEvent:
# This transform might fail for invalid data
if not event.user_id or len(event.user_id) == 0:
raise ValueError("Invalid user_id: cannot be empty")
if event.timestamp < 0:
raise ValueError("Invalid timestamp: cannot be negative")
return ProcessedEvent(
user_id=event.user_id,
action=event.action,
processed_at=datetime.now(),
is_valid=True
)
# Add transform with DLQ configuration
raw_events.get_stream().add_transform(
destination=processed_events.get_stream(),
transformation=process_event,
config=TransformConfig(
dead_letter_queue=event_dlq # Configure DLQ for this transform
)
)
Configuring Consumers with Dead Letter Queues
// Add consumer with DLQ configuration
rawEvents.stream!.addConsumer(
(event: RawEvent): void => {
// This consumer might fail for certain events
if (event.action === "forbidden_action") {
throw new Error("Forbidden action detected");
}
// Process the event (e.g., send to external API)
console.log(`Processing event for user ${event.userId}`);
},
{
deadLetterQueue: eventDLQ // Configure DLQ for this consumer
}
);
from moose_lib import ConsumerConfig
def process_event_consumer(event: RawEvent) -> None:
# This consumer might fail for certain events
if event.action == "forbidden_action":
raise ValueError("Forbidden action detected")
# Process the event (e.g., send to external API)
print(f"Processing event for user {event.user_id}")
# Add consumer with DLQ configuration
raw_events.get_stream().add_consumer(
consumer=process_event_consumer,
config=ConsumerConfig(
dead_letter_queue=event_dlq # Configure DLQ for this consumer
)
)
Processing Dead Letter Messages
Monitoring Dead Letter Queues
// Add a consumer to monitor dead letter messages
eventDLQ.addConsumer((deadLetter) => {
console.log("Dead letter received:");
console.log(`Error: ${deadLetter.errorMessage}`);
console.log(`Error Type: ${deadLetter.errorType}`);
console.log(`Failed At: ${deadLetter.failedAt}`);
console.log(`Source: ${deadLetter.source}`);
// Access the original typed data
const originalEvent: RawEvent = deadLetter.asTyped();
console.log(`Original User ID: ${originalEvent.userId}`);
});
def monitor_dead_letters(dead_letter: DeadLetterModel[RawEvent]) -> None:
print("Dead letter received:")
print(f"Error: {dead_letter.error_message}")
print(f"Error Type: {dead_letter.error_type}")
print(f"Failed At: {dead_letter.failed_at}")
print(f"Source: {dead_letter.source}")
# Access the original typed data
original_event: RawEvent = dead_letter.as_typed()
print(f"Original User ID: {original_event.user_id}")
# Add consumer to monitor dead letter messages
event_dlq.add_consumer(monitor_dead_letters)
Recovery and Retry Logic
// Create a recovery stream for fixed messages
const recoveredEvents = new Stream<ProcessedEvent>("recovered_events", {
destination: processedEvents.table // Send recovered data to main table
});
// Add recovery logic to the DLQ
eventDLQ.addTransform(
recoveredEvents,
(deadLetter): ProcessedEvent | null => {
try {
const originalEvent = deadLetter.asTyped();
// Apply fixes based on error type
if (deadLetter.errorMessage.includes("Invalid userId")) {
// Skip events with invalid user IDs
return null;
}
if (deadLetter.errorMessage.includes("Invalid timestamp")) {
// Fix negative timestamps
const fixedEvent = {
...originalEvent,
timestamp: Math.abs(originalEvent.timestamp)
};
return {
userId: fixedEvent.userId,
action: fixedEvent.action,
processedAt: new Date(),
isValid: true
};
}
return null; // Skip other errors
} catch (error) {
console.error("Recovery failed:", error);
return null;
}
}
);
from moose_lib import Stream
from typing import Optional
# Create a recovery stream for fixed messages
recovered_events = Stream[ProcessedEvent]("recovered_events", {
"destination": processed_events.get_table() # Send recovered data to main table
})
def recover_event(dead_letter: DeadLetterModel[RawEvent]) -> Optional[ProcessedEvent]:
try:
original_event = dead_letter.as_typed()
# Apply fixes based on error type
if "Invalid user_id" in dead_letter.error_message:
# Skip events with invalid user IDs
return None
if "Invalid timestamp" in dead_letter.error_message:
# Fix negative timestamps
fixed_timestamp = abs(original_event.timestamp)
return ProcessedEvent(
user_id=original_event.user_id,
action=original_event.action,
processed_at=datetime.now(),
is_valid=True
)
return None # Skip other errors
except Exception as error:
print(f"Recovery failed: {error}")
return None
# Add recovery logic to the DLQ
event_dlq.add_transform(
destination=recovered_events,
transformation=recover_event
)
Best Practices
Dead Letter Queue Best Practices
Monitor DLQ volume
Set up alerts when dead letter queues receive messages to catch processing issues early
Include context in errors
Throw descriptive errors that help identify the root cause and potential fixes
Implement recovery logic
Build automated recovery processes for common failure patterns
Set retention policies
Configure appropriate retention periods for dead letter queues based on your recovery SLAs
Use separate DLQs
Create separate dead letter queues for different types of failures or processing stages
Log dead letter events
Ensure dead letter events are properly logged for debugging and analysis
Common Patterns
Circuit Breaker Pattern
let failureCount = 0;
const maxFailures = 5;
const resetTimeout = 60000; // 1 minute
rawEvents.stream!.addTransform(
processedEvents.stream!,
(event: RawEvent): ProcessedEvent => {
if (failureCount >= maxFailures) {
throw new Error("Circuit breaker open - too many failures");
}
try {
// Your processing logic here
const result = processEvent(event);
failureCount = 0; // Reset on success
return result;
} catch (error) {
failureCount++;
if (failureCount >= maxFailures) {
setTimeout(() => { failureCount = 0; }, resetTimeout);
}
throw error;
}
},
{ deadLetterQueue: eventDLQ }
);
Retry with Exponential Backoff
// Create a retry DLQ with delay processing
const retryDLQ = new DeadLetterQueue<RawEvent>("RetryDLQ");
retryDLQ.addTransform(
processedEvents.stream!,
(deadLetter): ProcessedEvent | null => {
const retryCount = deadLetter.originalRecord.retryCount || 0;
const maxRetries = 3;
if (retryCount >= maxRetries) {
console.log("Max retries exceeded, giving up");
return null;
}
// Calculate delay (exponential backoff)
const delay = Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s
setTimeout(() => {
try {
const originalEvent = deadLetter.asTyped();
// Add retry count to track attempts
const eventWithRetry = {
...originalEvent,
retryCount: retryCount + 1
};
// Retry the original processing logic
processEvent(eventWithRetry);
} catch (error) {
// Will go back to DLQ with incremented retry count
throw error;
}
}, delay);
return null; // Don't emit immediately, wait for retry
}
);
Performance Considerations
Dead letter queues add overhead to stream processing. Use them judiciously and monitor their impact on throughput. Consider implementing sampling for high-volume streams where occasional message loss is acceptable.
Monitoring Integration
Dead letter queue events can be integrated with monitoring systems like Prometheus, DataDog, or CloudWatch for alerting and dashboards. Consider tracking metrics like DLQ message rate, error types, and recovery success rates.