# Moose / Streaming / Dead Letter Queues Documentation – Python ## Included Files 1. moose/streaming/dead-letter-queues/dead-letter-queues.mdx ## Dead Letter Queues Source: moose/streaming/dead-letter-queues/dead-letter-queues.mdx Handle failed stream processing with dead letter queues # Dead Letter Queues ## Overview Dead Letter Queues (DLQs) provide a robust error handling mechanism for stream processing in Moose. When streaming functions fail during transformation or consumption, failed messages are automatically routed to a configured dead letter queue for later analysis and recovery. ## Dead Letter Record Structure When a message fails processing, Moose creates a dead letter record with the following structure: ```py class DeadLetterModel(BaseModel, Generic[T]): original_record: Any # The original message that failed error_message: str # Error description error_type: str # Error class/type name failed_at: datetime.datetime # Timestamp when failure occurred source: Literal["api", "transform", "table"] # Where the failure happened def as_typed(self) -> T: # Type-safe access to original record return self._t.model_validate(self.original_record) ``` ## Creating Dead Letter Queues ### Basic Setup ```py filename="dead-letter-setup.py" copy from moose_lib import DeadLetterQueue from pydantic import BaseModel # Define your data model class UserEvent(BaseModel): user_id: str action: str timestamp: float # Create a dead letter queue for UserEvent failures user_event_dlq = DeadLetterQueue[UserEvent](name="UserEventDLQ") ``` ### Configuring Transformations with Dead Letter Queues Add a dead letter queue to your Transformation Function configuration, and any errors thrown in the transformation will trigger the event to be routed to the dead letter queue. ```py filename="transform-with-dlq.py" copy from moose_lib import DeadLetterQueue, TransformConfig # Create dead letter queue event_dlq = DeadLetterQueue[RawEvent](name="EventDLQ") # Define transformation function, including errors to trigger DLQ def process_event(event: RawEvent) -> ProcessedEvent: # This transform might fail for invalid data if not event.user_id or len(event.user_id) == 0: raise ValueError("Invalid user_id: cannot be empty") if event.timestamp < 0: raise ValueError("Invalid timestamp: cannot be negative") return ProcessedEvent( user_id=event.user_id, action=event.action, processed_at=datetime.now(), is_valid=True ) # Add transform with DLQ configuration raw_events.get_stream().add_transform( destination=processed_events.get_stream(), transformation=process_event, config=TransformConfig( dead_letter_queue=event_dlq # Configure DLQ for this transform ) ) ``` ### Configuring Consumers with Dead Letter Queues Add a dead letter queue to your Consumer Function configuration, and any errors thrown in the function will trigger the event to be routed to the dead letter queue. ```py filename="consumer-with-dlq.py" copy from moose_lib import ConsumerConfig # Define consumer function with errors to trigger DLQ def process_event_consumer(event: RawEvent) -> None: # This consumer might fail for certain events if event.action == "forbidden_action": raise ValueError("Forbidden action detected") # Process the event (e.g., send to external API) print(f"Processing event for user {event.user_id}") # Add consumer with DLQ configuration raw_events.get_stream().add_consumer( consumer=process_event_consumer, config=ConsumerConfig( dead_letter_queue=event_dlq # Configure DLQ for this consumer ) ) ``` ### Configuring Ingest APIs with Dead Letter Queues Add a dead letter queue to your Ingest API configuration, and any runtime data validation failures at the API will trigger the event to be routed to the dead letter queue. ```python filename="ValidationExample.py" copy from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfig from pydantic import BaseModel class Properties(BaseModel): device: Optional[str] version: Optional[int] class ExampleModel(BaseModel): id: str userId: str timestamp: datetime properties: Properties api = IngestApi[ExampleModel]("your-api-route", IngestConfig( destination=Stream[ExampleModel]("your-stream-name"), dead_letter_queue=DeadLetterQueue[ExampleModel]("your-dlq-name") )) ``` ## Processing Dead Letter Messages ### Monitoring Dead Letter Queues ```py filename="dlq-monitoring.py" copy def monitor_dead_letters(dead_letter: DeadLetterModel[RawEvent]) -> None: print("Dead letter received:") print(f"Error: {dead_letter.error_message}") print(f"Error Type: {dead_letter.error_type}") print(f"Failed At: {dead_letter.failed_at}") print(f"Source: {dead_letter.source}") # Access the original typed data original_event: RawEvent = dead_letter.as_typed() print(f"Original User ID: {original_event.user_id}") # Add consumer to monitor dead letter messages event_dlq.add_consumer(monitor_dead_letters) ``` ### Recovery and Retry Logic ```py filename="dlq-recovery.py" copy from moose_lib import Stream from typing import Optional # Create a recovery stream for fixed messages recovered_events = Stream[ProcessedEvent]("recovered_events", { "destination": processed_events.get_table() # Send recovered data to main table }) def recover_event(dead_letter: DeadLetterModel[RawEvent]) -> Optional[ProcessedEvent]: try: original_event = dead_letter.as_typed() # Apply fixes based on error type if "Invalid user_id" in dead_letter.error_message: # Skip events with invalid user IDs return None if "Invalid timestamp" in dead_letter.error_message: # Fix negative timestamps fixed_timestamp = abs(original_event.timestamp) return ProcessedEvent( user_id=original_event.user_id, action=original_event.action, processed_at=datetime.now(), is_valid=True ) return None # Skip other errors except Exception as error: print(f"Recovery failed: {error}") return None # Add recovery logic to the DLQ event_dlq.add_transform( destination=recovered_events, transformation=recover_event ) ``` ## Best Practices ## Common Patterns ### Circuit Breaker Pattern ### Retry with Exponential Backoff Dead letter queues add overhead to stream processing. Use them judiciously and monitor their impact on throughput. Consider implementing sampling for high-volume streams where occasional message loss is acceptable. Dead letter queue events can be integrated with monitoring systems like Prometheus, DataDog, or CloudWatch for alerting and dashboards. Consider tracking metrics like DLQ message rate, error types, and recovery success rates. ## Using Dead Letter Queues in Ingestion Pipelines Dead Letter Queues (DLQs) can be directly integrated with your ingestion pipelines to capture records that fail validation or processing at the API entry point. This ensures that no data is lost, even if it cannot be immediately processed. ```python filename="IngestPipelineWithDLQ.py" copy from moose_lib import IngestPipeline, DeadLetterQueue, IngestPipelineConfig from pydantic import BaseModel class ExampleSchema(BaseModel): id: str name: str value: int timestamp: datetime example_dlq = DeadLetterQueue[ExampleSchema](name="exampleDLQ") pipeline = IngestPipeline[ExampleSchema]( name="example", config=IngestPipelineConfig( ingest_api=True, stream=True, table=True, dead_letter_queue=True # Route failed ingestions to DLQ ) ) ``` See the [Ingestion API documentation](/moose/apis/ingest-api#validation) for more details and best practices on configuring DLQs for ingestion.