Dead Letter Queues (DLQs) provide a robust error handling mechanism for stream processing in Moose. When streaming functions fail during transformation or consumption, failed messages are automatically routed to a configured dead letter queue for later analysis and recovery.
Prevent single message failures from stopping entire stream processing pipelines
Capture detailed error information including original data, error messages, and timestamps
Build automated or manual processes to handle and retry failed messages
Maintain full type safety when accessing original data from dead letter records
When a message fails processing, Moose creates a dead letter record with the following structure:
interface DeadLetterModel { originalRecord: Record<string, any>; // The original message that failed errorMessage: string; // Error description errorType: string; // Error class/type name failedAt: Date; // Timestamp when failure occurred source: "api" | "transform" | "table"; // Where the failure happened} interface DeadLetter<T> extends DeadLetterModel { asTyped: () => T; // Type-safe access to original record}class DeadLetterModel(BaseModel, Generic[T]): original_record: Any # The original message that failed error_message: str # Error description error_type: str # Error class/type name failed_at: datetime.datetime # Timestamp when failure occurred source: Literal["api", "transform", "table"] # Where the failure happened def as_typed(self) -> T: # Type-safe access to original record return self._t.model_validate(self.original_record)from moose_lib import DeadLetterQueuefrom pydantic import BaseModel # Define your data modelclass UserEvent(BaseModel): user_id: str action: str timestamp: float # Create a dead letter queue for UserEvent failuresuser_event_dlq = DeadLetterQueue[UserEvent](name="UserEventDLQ")Add a dead letter queue to your Transformation Function configuration, and any errors thrown in the transformation will trigger the event to be routed to the dead letter queue.
from moose_lib import DeadLetterQueue, TransformConfig # Create dead letter queueevent_dlq = DeadLetterQueue[RawEvent](name="EventDLQ") # Define transformation function, including errors to trigger DLQdef process_event(event: RawEvent) -> ProcessedEvent: # This transform might fail for invalid data if not event.user_id or len(event.user_id) == 0: raise ValueError("Invalid user_id: cannot be empty") if event.timestamp < 0: raise ValueError("Invalid timestamp: cannot be negative") return ProcessedEvent( user_id=event.user_id, action=event.action, processed_at=datetime.now(), is_valid=True ) # Add transform with DLQ configurationraw_events.get_stream().add_transform( destination=processed_events.get_stream(), transformation=process_event, config=TransformConfig( dead_letter_queue=event_dlq # Configure DLQ for this transform ))Add a dead letter queue to your Consumer Function configuration, and any errors thrown in the function will trigger the event to be routed to the dead letter queue.
from moose_lib import ConsumerConfig # Define consumer function with errors to trigger DLQdef process_event_consumer(event: RawEvent) -> None: # This consumer might fail for certain events if event.action == "forbidden_action": raise ValueError("Forbidden action detected") # Process the event (e.g., send to external API) print(f"Processing event for user {event.user_id}") # Add consumer with DLQ configurationraw_events.get_stream().add_consumer( consumer=process_event_consumer, config=ConsumerConfig( dead_letter_queue=event_dlq # Configure DLQ for this consumer ))Add a dead letter queue to your Ingest API configuration, and any runtime data validation failures at the API will trigger the event to be routed to the dead letter queue.
from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfigfrom pydantic import BaseModel class Properties(BaseModel): device: Optional[str] version: Optional[int] class ExampleModel(BaseModel): id: str userId: str timestamp: datetime properties: Properties api = IngestApi[ExampleModel]("your-api-route", IngestConfig( destination=Stream[ExampleModel]("your-stream-name"), dead_letter_queue=DeadLetterQueue[ExampleModel]("your-dlq-name")))def monitor_dead_letters(dead_letter: DeadLetterModel[RawEvent]) -> None: print("Dead letter received:") print(f"Error: {dead_letter.error_message}") print(f"Error Type: {dead_letter.error_type}") print(f"Failed At: {dead_letter.failed_at}") print(f"Source: {dead_letter.source}") # Access the original typed data original_event: RawEvent = dead_letter.as_typed() print(f"Original User ID: {original_event.user_id}") # Add consumer to monitor dead letter messagesevent_dlq.add_consumer(monitor_dead_letters)from moose_lib import Streamfrom typing import Optional # Create a recovery stream for fixed messagesrecovered_events = Stream[ProcessedEvent]("recovered_events", { "destination": processed_events.get_table() # Send recovered data to main table}) def recover_event(dead_letter: DeadLetterModel[RawEvent]) -> Optional[ProcessedEvent]: try: original_event = dead_letter.as_typed() # Apply fixes based on error type if "Invalid user_id" in dead_letter.error_message: # Skip events with invalid user IDs return None if "Invalid timestamp" in dead_letter.error_message: # Fix negative timestamps fixed_timestamp = abs(original_event.timestamp) return ProcessedEvent( user_id=original_event.user_id, action=original_event.action, processed_at=datetime.now(), is_valid=True ) return None # Skip other errors except Exception as error: print(f"Recovery failed: {error}") return None # Add recovery logic to the DLQevent_dlq.add_transform( destination=recovered_events, transformation=recover_event)Set up alerts when dead letter queues receive messages to catch processing issues early
Throw descriptive errors that help identify the root cause and potential fixes
Build automated recovery processes for common failure patterns
Configure appropriate retention periods for dead letter queues based on your recovery SLAs
Create separate dead letter queues for different types of failures or processing stages
Ensure dead letter events are properly logged for debugging and analysis
failure_count = 0max_failures = 5reset_timeout = 60000 # 1 minute def process_with_circuit_breaker(event: RawEvent) -> ProcessedEvent: global failure_count if failure_count >= max_failures: raise ValueError("Circuit breaker open - too many failures") try: # Your processing logic here result = process_event(event) failure_count = 0 # Reset on success return result except Exception as error: failure_count += 1 if failure_count >= max_failures: # Reset after timeout import threading threading.Timer(reset_timeout / 1000, lambda: setattr(globals(), 'failure_count', 0)).start() raise error raw_events.get_stream().add_transform( destination=processed_events.get_stream(), transformation=process_with_circuit_breaker, config=TransformConfig(dead_letter_queue=event_dlq))from moose_lib import DeadLetterQueueimport time # Create a retry DLQ with delay processingretry_dlq = DeadLetterQueue[RawEvent](name="RetryDLQ") def retry_with_backoff(dead_letter: DeadLetterModel[RawEvent]) -> Optional[ProcessedEvent]: retry_count = dead_letter.original_record.get("retry_count", 0) max_retries = 3 if retry_count >= max_retries: print("Max retries exceeded, giving up") return None # Calculate delay (exponential backoff) delay = (2 ** retry_count) * 1 # 1s, 2s, 4s time.sleep(delay) try: original_event = dead_letter.as_typed() # Add retry count to track attempts event_with_retry = { **original_event.model_dump(), "retry_count": retry_count + 1 } # Retry the original processing logic process_event(event_with_retry) return None # Success, don't emit except Exception as error: # Will go back to DLQ with incremented retry count raise error retry_dlq.add_transform( destination=processed_events.get_stream(), transformation=retry_with_backoff)Dead letter queues add overhead to stream processing. Use them judiciously and monitor their impact on throughput. Consider implementing sampling for high-volume streams where occasional message loss is acceptable.
Dead letter queue events can be integrated with monitoring systems like Prometheus, DataDog, or CloudWatch for alerting and dashboards. Consider tracking metrics like DLQ message rate, error types, and recovery success rates.
Dead Letter Queues (DLQs) can be directly integrated with your ingestion pipelines to capture records that fail validation or processing at the API entry point. This ensures that no data is lost, even if it cannot be immediately processed.
from moose_lib import IngestPipeline, DeadLetterQueue, IngestPipelineConfigfrom pydantic import BaseModel class ExampleSchema(BaseModel): id: str name: str value: int timestamp: datetime example_dlq = DeadLetterQueue[ExampleSchema](name="exampleDLQ") pipeline = IngestPipeline[ExampleSchema]( name="example", config=IngestPipelineConfig( ingest_api=True, stream=True, table=True, dead_letter_queue=True # Route failed ingestions to DLQ ))See the Ingestion API documentation for more details and best practices on configuring DLQs for ingestion.