FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplatesGuides
Release Notes
Source514
  1. MooseStack
  2. Moose Streaming
  3. Dead Letter Queues

On this page

OverviewDead Letter Record StructureCreating Dead Letter QueuesBasic SetupConfiguring Transformations with Dead Letter QueuesConfiguring Consumers with Dead Letter QueuesConfiguring Ingest APIs with Dead Letter QueuesProcessing Dead Letter MessagesMonitoring Dead Letter QueuesRecovery and Retry LogicBest PracticesCommon PatternsCircuit Breaker PatternRetry with Exponential BackoffUsing Dead Letter Queues in Ingestion Pipelines

Dead Letter Queues

Overview

Dead Letter Queues (DLQs) provide a robust error handling mechanism for stream processing in Moose. When streaming functions fail during transformation or consumption, failed messages are automatically routed to a configured dead letter queue for later analysis and recovery.

Dead Letter Queue Benefits

Fault tolerance

Prevent single message failures from stopping entire stream processing pipelines

Error visibility

Capture detailed error information including original data, error messages, and timestamps

Recovery workflows

Build automated or manual processes to handle and retry failed messages

Type safety

Maintain full type safety when accessing original data from dead letter records

Dead Letter Record Structure

When a message fails processing, Moose creates a dead letter record with the following structure:

interface DeadLetterModel {  originalRecord: Record<string, any>;  // The original message that failed  errorMessage: string;                 // Error description  errorType: string;                    // Error class/type name  failedAt: Date;                       // Timestamp when failure occurred  source: "api" | "transform" | "table"; // Where the failure happened} interface DeadLetter<T> extends DeadLetterModel {  asTyped: () => T;  // Type-safe access to original record}
class DeadLetterModel(BaseModel, Generic[T]):    original_record: Any                    # The original message that failed    error_message: str                      # Error description    error_type: str                         # Error class/type name    failed_at: datetime.datetime            # Timestamp when failure occurred    source: Literal["api", "transform", "table"]  # Where the failure happened        def as_typed(self) -> T:               # Type-safe access to original record        return self._t.model_validate(self.original_record)

Creating Dead Letter Queues

Basic Setup

dead-letter-setup.py
from moose_lib import DeadLetterQueuefrom pydantic import BaseModel # Define your data modelclass UserEvent(BaseModel):    user_id: str    action: str    timestamp: float # Create a dead letter queue for UserEvent failuresuser_event_dlq = DeadLetterQueue[UserEvent](name="UserEventDLQ")

Configuring Transformations with Dead Letter Queues

Add a dead letter queue to your Transformation Function configuration, and any errors thrown in the transformation will trigger the event to be routed to the dead letter queue.

transform-with-dlq.py
from moose_lib import DeadLetterQueue, TransformConfig # Create dead letter queueevent_dlq = DeadLetterQueue[RawEvent](name="EventDLQ") # Define transformation function, including errors to trigger DLQdef process_event(event: RawEvent) -> ProcessedEvent:    # This transform might fail for invalid data    if not event.user_id or len(event.user_id) == 0:        raise ValueError("Invalid user_id: cannot be empty")        if event.timestamp < 0:        raise ValueError("Invalid timestamp: cannot be negative")     return ProcessedEvent(        user_id=event.user_id,        action=event.action,        processed_at=datetime.now(),        is_valid=True    ) # Add transform with DLQ configurationraw_events.get_stream().add_transform(    destination=processed_events.get_stream(),    transformation=process_event,    config=TransformConfig(        dead_letter_queue=event_dlq  # Configure DLQ for this transform    ))

Configuring Consumers with Dead Letter Queues

Add a dead letter queue to your Consumer Function configuration, and any errors thrown in the function will trigger the event to be routed to the dead letter queue.

consumer-with-dlq.py
from moose_lib import ConsumerConfig # Define consumer function with errors to trigger DLQdef process_event_consumer(event: RawEvent) -> None:    # This consumer might fail for certain events    if event.action == "forbidden_action":        raise ValueError("Forbidden action detected")        # Process the event (e.g., send to external API)    print(f"Processing event for user {event.user_id}") # Add consumer with DLQ configurationraw_events.get_stream().add_consumer(    consumer=process_event_consumer,    config=ConsumerConfig(        dead_letter_queue=event_dlq  # Configure DLQ for this consumer    ))

Configuring Ingest APIs with Dead Letter Queues

Add a dead letter queue to your Ingest API configuration, and any runtime data validation failures at the API will trigger the event to be routed to the dead letter queue.

ValidationExample.py
from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfigfrom pydantic import BaseModel class Properties(BaseModel):    device: Optional[str]    version: Optional[int] class ExampleModel(BaseModel):    id: str    userId: str    timestamp: datetime    properties: Properties api = IngestApi[ExampleModel]("your-api-route", IngestConfig(    destination=Stream[ExampleModel]("your-stream-name"),    dead_letter_queue=DeadLetterQueue[ExampleModel]("your-dlq-name")))

Processing Dead Letter Messages

Monitoring Dead Letter Queues

dlq-monitoring.py
def monitor_dead_letters(dead_letter: DeadLetterModel[RawEvent]) -> None:    print("Dead letter received:")    print(f"Error: {dead_letter.error_message}")    print(f"Error Type: {dead_letter.error_type}")    print(f"Failed At: {dead_letter.failed_at}")    print(f"Source: {dead_letter.source}")        # Access the original typed data    original_event: RawEvent = dead_letter.as_typed()    print(f"Original User ID: {original_event.user_id}") # Add consumer to monitor dead letter messagesevent_dlq.add_consumer(monitor_dead_letters)

Recovery and Retry Logic

dlq-recovery.py
from moose_lib import Streamfrom typing import Optional # Create a recovery stream for fixed messagesrecovered_events = Stream[ProcessedEvent]("recovered_events", {    "destination": processed_events.get_table()  # Send recovered data to main table}) def recover_event(dead_letter: DeadLetterModel[RawEvent]) -> Optional[ProcessedEvent]:    try:        original_event = dead_letter.as_typed()                # Apply fixes based on error type        if "Invalid user_id" in dead_letter.error_message:            # Skip events with invalid user IDs            return None                if "Invalid timestamp" in dead_letter.error_message:            # Fix negative timestamps            fixed_timestamp = abs(original_event.timestamp)                        return ProcessedEvent(                user_id=original_event.user_id,                action=original_event.action,                processed_at=datetime.now(),                is_valid=True            )                return None  # Skip other errors    except Exception as error:        print(f"Recovery failed: {error}")        return None # Add recovery logic to the DLQevent_dlq.add_transform(    destination=recovered_events,    transformation=recover_event)

Best Practices

Dead Letter Queue Best Practices

Monitor DLQ volume

Set up alerts when dead letter queues receive messages to catch processing issues early

Include context in errors

Throw descriptive errors that help identify the root cause and potential fixes

Implement recovery logic

Build automated recovery processes for common failure patterns

Set retention policies

Configure appropriate retention periods for dead letter queues based on your recovery SLAs

Use separate DLQs

Create separate dead letter queues for different types of failures or processing stages

Log dead letter events

Ensure dead letter events are properly logged for debugging and analysis

Common Patterns

Circuit Breaker Pattern

circuit-breaker.py
failure_count = 0max_failures = 5reset_timeout = 60000  # 1 minute def process_with_circuit_breaker(event: RawEvent) -> ProcessedEvent:    global failure_count        if failure_count >= max_failures:        raise ValueError("Circuit breaker open - too many failures")        try:        # Your processing logic here        result = process_event(event)        failure_count = 0  # Reset on success        return result    except Exception as error:        failure_count += 1        if failure_count >= max_failures:            # Reset after timeout            import threading            threading.Timer(reset_timeout / 1000, lambda: setattr(globals(), 'failure_count', 0)).start()        raise error raw_events.get_stream().add_transform(    destination=processed_events.get_stream(),    transformation=process_with_circuit_breaker,    config=TransformConfig(dead_letter_queue=event_dlq))

Retry with Exponential Backoff

retry-backoff.py
from moose_lib import DeadLetterQueueimport time # Create a retry DLQ with delay processingretry_dlq = DeadLetterQueue[RawEvent](name="RetryDLQ") def retry_with_backoff(dead_letter: DeadLetterModel[RawEvent]) -> Optional[ProcessedEvent]:    retry_count = dead_letter.original_record.get("retry_count", 0)    max_retries = 3        if retry_count >= max_retries:        print("Max retries exceeded, giving up")        return None        # Calculate delay (exponential backoff)    delay = (2 ** retry_count) * 1  # 1s, 2s, 4s        time.sleep(delay)        try:        original_event = dead_letter.as_typed()        # Add retry count to track attempts        event_with_retry = {            **original_event.model_dump(),            "retry_count": retry_count + 1        }                # Retry the original processing logic        process_event(event_with_retry)        return None  # Success, don't emit    except Exception as error:        # Will go back to DLQ with incremented retry count        raise error retry_dlq.add_transform(    destination=processed_events.get_stream(),    transformation=retry_with_backoff)
Performance Considerations

Dead letter queues add overhead to stream processing. Use them judiciously and monitor their impact on throughput. Consider implementing sampling for high-volume streams where occasional message loss is acceptable.

Monitoring Integration

Dead letter queue events can be integrated with monitoring systems like Prometheus, DataDog, or CloudWatch for alerting and dashboards. Consider tracking metrics like DLQ message rate, error types, and recovery success rates.

Using Dead Letter Queues in Ingestion Pipelines

Dead Letter Queues (DLQs) can be directly integrated with your ingestion pipelines to capture records that fail validation or processing at the API entry point. This ensures that no data is lost, even if it cannot be immediately processed.

IngestPipelineWithDLQ.py
from moose_lib import IngestPipeline, DeadLetterQueue, IngestPipelineConfigfrom pydantic import BaseModel class ExampleSchema(BaseModel):    id: str    name: str    value: int    timestamp: datetime example_dlq = DeadLetterQueue[ExampleSchema](name="exampleDLQ") pipeline = IngestPipeline[ExampleSchema](    name="example",    config=IngestPipelineConfig(        ingest_api=True,        stream=True,        table=True,        dead_letter_queue=True  # Route failed ingestions to DLQ    ))

See the Ingestion API documentation for more details and best practices on configuring DLQs for ingestion.

  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streaming
    • Manage Streams
    • Create Streams
    • Sync to OLAP
    • Dead Letter Queues
    • Functions
    • Consumer Functions
    • Transformation Functions
    • Writing to Streams
    • From Your Code
    • Schema Registry
    • From CDC Services
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework