# Moose / Streaming Documentation – Python ## Included Files 1. moose/streaming/connect-cdc.mdx 2. moose/streaming/consumer-functions.mdx 3. moose/streaming/create-stream.mdx 4. moose/streaming/dead-letter-queues.mdx 5. moose/streaming/from-your-code.mdx 6. moose/streaming/schema-registry.mdx 7. moose/streaming/sync-to-table.mdx 8. moose/streaming/transform-functions.mdx ## connect-cdc Source: moose/streaming/connect-cdc.mdx # Connect to CDC Services Coming Soon! --- ## Streaming Consumer Functions Source: moose/streaming/consumer-functions.mdx Read and process data from streams with consumers and processors # Streaming Consumer Functions ## Overview Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures. ## Basic Usage Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this: ```py filename="StreamConsumer.py" copy from moose_lib import Stream from pydantic import BaseModel from datetime import datetime class UserEvent(BaseModel): id: str user_id: str timestamp: datetime event_type: str user_events_stream = Stream[UserEvent]("user-events") # Add a consumer to process events def process_event(event: UserEvent): print(f"Processing event: {event.id}") print(f"User: {event.user_id}, Type: {event.event_type}") # Your processing logic here # e.g., update analytics, send notifications, etc. user_events_stream.add_consumer(process_event) # Add multiple consumers for different purposes def analytics_consumer(event: UserEvent): # Analytics processing if event.event_type == 'purchase': update_purchase_analytics(event) def notification_consumer(event: UserEvent): # Notification processing if event.event_type == 'signup': send_welcome_email(event.user_id) user_events_stream.add_consumer(analytics_consumer) user_events_stream.add_consumer(notification_consumer) ``` ## Processing Patterns ### Stateful Processing with MooseCache Maintain state across event processing using MooseCache for distributed state management: ```py filename="StatefulProcessing.py" copy from datetime import datetime from typing import Dict, Any from moose_lib import MooseCache from pydantic import BaseModel # State container for accumulating data class AccumulatorState(BaseModel): id: str counter: int sum: float last_modified: datetime attributes: Dict[str, Any] # Input message structure class InputMessage(BaseModel): id: str group_id: str numeric_value: float message_type: str timestamp: datetime payload: Dict[str, Any] message_stream = Stream[InputMessage]("input-stream") # Initialize distributed cache cache = MooseCache() def process_message(message: InputMessage): cache_key = f"state:{message.group_id}" # Load existing state or create new one state = cache.get(cache_key, AccumulatorState) if not state: # Initialize new state state = AccumulatorState( id=message.group_id, counter=0, sum=0.0, last_modified=datetime.now(), attributes={} ) # Apply message to state state.counter += 1 state.sum += message.numeric_value state.last_modified = message.timestamp state.attributes.update(message.payload) # Determine cache lifetime based on message type ttl_seconds = 60 if message.message_type == 'complete' else 3600 if message.message_type == 'complete' or should_finalize(state): # Finalize and remove state finalize_state(state) cache.delete(cache_key) else: # Persist updated state cache.set(cache_key, state, ttl_seconds=ttl_seconds) def should_finalize(state: AccumulatorState) -> bool: """Condition for automatic state finalization""" threshold = 100 time_limit_seconds = 30 * 60 # 30 minutes elapsed = (datetime.now() - state.last_modified).total_seconds() return state.counter >= threshold or elapsed > time_limit_seconds def finalize_state(state: AccumulatorState): print(f"Finalizing state {state.id}: counter={state.counter}, sum={state.sum}") message_stream.add_consumer(process_message) ``` ## Propagating Events to External Systems You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur: ### HTTP API Calls Send processed data to external APIs: ```py filename="HttpIntegration.py" copy from datetime import datetime from typing import Dict, Any from pydantic import BaseModel class WebhookPayload(BaseModel): id: str data: Dict[str, Any] timestamp: datetime webhook_stream = Stream[WebhookPayload]("webhook-events") async def send_to_external_api(payload: WebhookPayload): try: async with httpx.AsyncClient() as client: response = await client.post( 'https://external-api.com/webhook', headers={ 'Content-Type': 'application/json', 'Authorization': f'Bearer {os.getenv("API_TOKEN")}' }, json={ 'event_id': payload.id, 'event_data': payload.data, 'processed_at': datetime.now().isoformat() } ) if response.status_code != 200: raise Exception(f"HTTP {response.status_code}: {response.text}") print(f"Successfully sent event {payload.id} to external API") except Exception as error: print(f"Failed to send event {payload.id}: {error}") # Could implement retry logic or dead letter queue here webhook_stream.add_consumer(send_to_external_api) ``` #### Database Operations Write processed data to external databases: ```py filename="DatabaseIntegration.py" copy from datetime import datetime from typing import Dict, Any from pydantic import BaseModel class DatabaseRecord(BaseModel): id: str category: str value: float metadata: Dict[str, Any] timestamp: datetime db_stream = Stream[DatabaseRecord]("database-events") async def insert_to_database(record: DatabaseRecord): try: # Connect to PostgreSQL database conn = await asyncpg.connect( host=os.getenv('DB_HOST'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), database=os.getenv('DB_NAME') ) # Insert record into external database await conn.execute( ''' INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES ($1, $2, $3, $4, $5) ''', record.id, record.category, record.value, json.dumps(record.metadata), record.timestamp ) print(f"Inserted record {record.id} into database") except Exception as error: print(f"Database insert failed for record {record.id}: {error}") finally: if 'conn' in locals(): await conn.close() db_stream.add_consumer(insert_to_database) ``` #### File System Operations Write processed data to files or cloud storage: ```py filename="FileSystemIntegration.py" copy from datetime import datetime from typing import Literal from pydantic import BaseModel class FileOutput(BaseModel): id: str filename: str content: str directory: str format: Literal['json', 'csv', 'txt'] file_stream = Stream[FileOutput]("file-events") async def write_to_file(output: FileOutput): try: # Ensure directory exists os.makedirs(output.directory, exist_ok=True) # Format content based on type if output.format == 'json': formatted_content = json.dumps(json.loads(output.content), indent=2) else: formatted_content = output.content # Write file with timestamp timestamp = datetime.now().isoformat().replace(':', '-').replace('.', '-') filename = f"{output.filename}_{timestamp}.{output.format}" filepath = os.path.join(output.directory, filename) async with aiofiles.open(filepath, 'w', encoding='utf-8') as f: await f.write(formatted_content) print(f"Written file: {filepath}") except Exception as error: print(f"Failed to write file for output {output.id}: {error}") file_stream.add_consumer(write_to_file) ``` #### Email and Notifications Send alerts and notifications based on processed events: ```py filename="NotificationIntegration.py" copy from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from typing import Dict, Any, Literal from pydantic import BaseModel class NotificationEvent(BaseModel): id: str type: Literal['email', 'slack', 'webhook'] recipient: str subject: str message: str priority: Literal['low', 'medium', 'high'] metadata: Dict[str, Any] notification_stream = Stream[NotificationEvent]("notifications") async def send_notification(notification: NotificationEvent): try: if notification.type == 'email': # Send email msg = MIMEMultipart() msg['From'] = os.getenv('SMTP_FROM') msg['To'] = notification.recipient msg['Subject'] = notification.subject body = f""" {notification.message} Priority: {notification.priority} """ msg.attach(MIMEText(body, 'plain')) server = smtplib.SMTP(os.getenv('SMTP_HOST'), int(os.getenv('SMTP_PORT', '587'))) server.starttls() server.login(os.getenv('SMTP_USER'), os.getenv('SMTP_PASS')) server.send_message(msg) server.quit() elif notification.type == 'slack': # Send to Slack async with httpx.AsyncClient() as client: await client.post( f"https://hooks.slack.com/services/{os.getenv('SLACK_WEBHOOK')}", json={ 'text': notification.message, 'channel': notification.recipient, 'username': 'Moose Alert', 'icon_emoji': ':warning:' if notification.priority == 'high' else ':information_source:' } ) elif notification.type == 'webhook': # Send to webhook async with httpx.AsyncClient() as client: await client.post( notification.recipient, json={ 'id': notification.id, 'subject': notification.subject, 'message': notification.message, 'priority': notification.priority, 'metadata': notification.metadata } ) print(f"Sent {notification.type} notification {notification.id}") except Exception as error: print(f"Failed to send notification {notification.id}: {error}") notification_stream.add_consumer(send_notification) ``` --- ## Create Streams Source: moose/streaming/create-stream.mdx Define and create Kafka/Redpanda topics with type-safe schemas # Creating Streams ## Overview Streams serve as the transport layer between your data sources and database tables. Built on Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data. ## Creating Streams You can create streams in two ways: - High-level: Using the `IngestPipeline` class (recommended) - Low-level: Manually configuring the `Stream` component ### Streams for Ingestion The `IngestPipeline` class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion: ```py filename="IngestionStream.py" copy {10} from moose_lib import IngestPipeline, IngestPipelineConfig, Key from pydantic import BaseModel class RawData(BaseModel): id: Key[str] value: int raw_ingestion_stream = IngestPipeline[RawData]("raw_data", IngestPipelineConfig( ingest_api = True, # Creates an ingestion API endpoint at `POST /ingest/raw_data` stream = True, # Buffers data between the ingestion API and the database table table = True, # Creates an OLAP table named `raw_data` )) ``` While the `IngestPipeline` provides a convenient way to set up streams with ingestion APIs and tables, you can also configure these components individually for more granular control: ```py filename="StreamObject.py" copy {8-12} from moose_lib import Stream, StreamConfig, Key, IngestApi, IngestConfig from pydantic import BaseModel class RawData(BaseModel): id: Key[str] value: int raw_table = OlapTable[RawData]("raw_data") raw_stream = Stream[RawData]("raw_data", StreamConfig( destination: raw_table # Optional: Specify a destination table for the stream, sets up a process to sync data from the stream to the table )) raw_ingest_api = IngestApi[RawData]("raw_data", IngestConfig( destination: raw_stream # Configure Moose to write all validated data to the stream )) ``` ### Streams for Transformations If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data: #### Single Stream Transformation ```py filename="TransformDestinationStream.py" copy # Import required libraries from moose_lib import IngestPipeline, Key from pydantic import BaseModel # Define schema for raw incoming data class RawData(BaseModel): id: Key[str] # Primary key value: int # Value to be transformed # Define schema for transformed data class TransformedData(BaseModel): id: Key[str] # Primary key (preserved from raw data) transformedValue: int # Transformed value transformedAt: Date # Timestamp of transformation # Create pipeline for raw data - only for ingestion and streaming raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig( ingest_api = True, # Enable API endpoint stream = True, # Create stream for buffering table = False # No table needed for raw data )) # Create pipeline for transformed data - for storage only transformed_data = IngestPipeline[TransformedData]("transformed_data", IngestPipelineConfig( ingest_api = False, # No direct API endpoint stream = True, # Create stream to receive transformed data table = True # Store transformed data in table )) # Define a named transformation function def transform_function(record: RawData) -> TransformedData: return TransformedData( id=record.id, transformedValue=record.value * 2, transformedAt=datetime.now() ) # Connect the streams with the transformation function raw_data.get_stream().add_transform( destination=transformed_data.get_stream(), # Use the get_stream() method to get the stream object from the IngestPipeline transformation=transform_function # Can also define a lambda function ) ``` Use the `get_stream()` method to get the stream object from the IngestPipeline to avoid errors when referencing the stream object. You can use lambda functions to define transformations: ```py filename="TransformDestinationStream.py" copy from moose_lib import Key, IngestApi, OlapTable, Stream from pydantic import BaseModel class RawData(BaseModel): id: Key[str] value: int class TransformedData(BaseModel): id: Key[str] transformedValue: int transformedAt: Date # Create pipeline components for raw data - only for ingestion and streaming raw_stream = Stream[RawData]("raw_data") ## No destination table since we're not storing the raw data raw_api = IngestApi[RawData]("raw_data", IngestConfig( destination=raw_stream ## Connect the ingestion API to the raw data stream )) # Create pipeline components for transformed data - no ingestion API since we're not ingesting the transformed data transformed_table = OlapTable[TransformedData]("transformed_data") ## Store the transformed data in a table transformed_stream = Stream[TransformedData]("transformed_data", StreamConfig(destination=transformed_table)) ## Connect the transformed data stream to the destination table ## Example transformation using a lambda function raw_stream.add_transform( destination=transformed_stream, transformation=lambda record: TransformedData( id=record.id, transformedValue=record.value * 2, transformedAt=datetime.now() ) ) ``` #### Chaining Transformations For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful: ```py filename="ChainedTransformations.py" copy from moose_lib import IngestPipeline, Key, Stream, IngestPipelineConfig # Define the schema for raw input data class RawData(BaseModel): id: Key[str] value: int # Define the schema for intermediate transformed data class IntermediateData(BaseModel): id: Key[str] transformedValue: int transformedAt: Date # Define the schema for final transformed data class FinalData(BaseModel): id: Key[str] transformedValue: int anotherTransformedValue: int transformedAt: Date # Create the first pipeline for raw data ingestion # Only create an API and a stream (no table) since we're ingesting the raw data raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig( ingest_api=True, stream=True, table=False )) # Create an intermediate stream to hold data between transformations (no api or table needed) intermediate_stream = Stream[IntermediateData]("intermediate_stream") # First transformation: double the value and add timestamp raw_data.get_stream().add_transform(destination=intermediate_stream, transformation=lambda record: IntermediateData( id=record.id, transformedValue=record.value * 2, transformedAt=datetime.now() )) # Create the final pipeline that will store the fully transformed data final_data = IngestPipeline[FinalData]("final_stream", IngestPipelineConfig( ingest_api=False, stream=True, table=True )) # Second transformation: further transform the intermediate data intermediate_stream.add_transform(destination=final_data.get_stream(), transformation=lambda record: FinalData( id=record.id, transformedValue=record.transformedValue * 2, anotherTransformedValue=record.transformedValue * 3, transformedAt=datetime.now() )) ``` ## Stream Configurations ### Parallelism and Retention ```py filename="StreamConfig.py" copy from moose_lib import Stream, StreamConfig high_throughput_stream = Stream[Data]("high_throughput", StreamConfig( parallelism=4, # Process 4 records simultaneously retention_period=86400, # Keep data for 1 day )) ``` ### LifeCycle Management Control how Moose manages your stream resources when your code changes. See the [LifeCycle Management guide](./lifecycle) for detailed information. ```py filename="LifeCycleStreamConfig.py" copy from moose_lib import Stream, StreamConfig, LifeCycle # Production stream with external management prod_stream = Stream[Data]("prod_stream", StreamConfig( life_cycle=LifeCycle.EXTERNALLY_MANAGED )) # Development stream with full management dev_stream = Stream[Data]("dev_stream", StreamConfig( life_cycle=LifeCycle.FULLY_MANAGED )) ``` See the [API Reference](/moose/reference/ts-moose-lib#stream) for complete configuration options. --- ## Dead Letter Queues Source: moose/streaming/dead-letter-queues.mdx Handle failed stream processing with dead letter queues # Dead Letter Queues ## Overview Dead Letter Queues (DLQs) provide a robust error handling mechanism for stream processing in Moose. When streaming functions fail during transformation or consumption, failed messages are automatically routed to a configured dead letter queue for later analysis and recovery. ## Dead Letter Record Structure When a message fails processing, Moose creates a dead letter record with the following structure: ```py class DeadLetterModel(BaseModel, Generic[T]): original_record: Any # The original message that failed error_message: str # Error description error_type: str # Error class/type name failed_at: datetime.datetime # Timestamp when failure occurred source: Literal["api", "transform", "table"] # Where the failure happened def as_typed(self) -> T: # Type-safe access to original record return self._t.model_validate(self.original_record) ``` ## Creating Dead Letter Queues ### Basic Setup ```py filename="dead-letter-setup.py" copy from moose_lib import DeadLetterQueue from pydantic import BaseModel # Define your data model class UserEvent(BaseModel): user_id: str action: str timestamp: float # Create a dead letter queue for UserEvent failures user_event_dlq = DeadLetterQueue[UserEvent](name="UserEventDLQ") ``` ### Configuring Transformations with Dead Letter Queues Add a dead letter queue to your Transformation Function configuration, and any errors thrown in the transformation will trigger the event to be routed to the dead letter queue. ```py filename="transform-with-dlq.py" copy from moose_lib import DeadLetterQueue, TransformConfig # Create dead letter queue event_dlq = DeadLetterQueue[RawEvent](name="EventDLQ") # Define transformation function, including errors to trigger DLQ def process_event(event: RawEvent) -> ProcessedEvent: # This transform might fail for invalid data if not event.user_id or len(event.user_id) == 0: raise ValueError("Invalid user_id: cannot be empty") if event.timestamp < 0: raise ValueError("Invalid timestamp: cannot be negative") return ProcessedEvent( user_id=event.user_id, action=event.action, processed_at=datetime.now(), is_valid=True ) # Add transform with DLQ configuration raw_events.get_stream().add_transform( destination=processed_events.get_stream(), transformation=process_event, config=TransformConfig( dead_letter_queue=event_dlq # Configure DLQ for this transform ) ) ``` ### Configuring Consumers with Dead Letter Queues Add a dead letter queue to your Consumer Function configuration, and any errors thrown in the function will trigger the event to be routed to the dead letter queue. ```py filename="consumer-with-dlq.py" copy from moose_lib import ConsumerConfig # Define consumer function with errors to trigger DLQ def process_event_consumer(event: RawEvent) -> None: # This consumer might fail for certain events if event.action == "forbidden_action": raise ValueError("Forbidden action detected") # Process the event (e.g., send to external API) print(f"Processing event for user {event.user_id}") # Add consumer with DLQ configuration raw_events.get_stream().add_consumer( consumer=process_event_consumer, config=ConsumerConfig( dead_letter_queue=event_dlq # Configure DLQ for this consumer ) ) ``` ### Configuring Ingest APIs with Dead Letter Queues Add a dead letter queue to your Ingest API configuration, and any runtime data validation failures at the API will trigger the event to be routed to the dead letter queue. ```python filename="ValidationExample.py" copy from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfig from pydantic import BaseModel class Properties(BaseModel): device: Optional[str] version: Optional[int] class ExampleModel(BaseModel): id: str userId: str timestamp: datetime properties: Properties api = IngestApi[ExampleModel]("your-api-route", IngestConfig( destination=Stream[ExampleModel]("your-stream-name"), dead_letter_queue=DeadLetterQueue[ExampleModel]("your-dlq-name") )) ``` ## Processing Dead Letter Messages ### Monitoring Dead Letter Queues ```py filename="dlq-monitoring.py" copy def monitor_dead_letters(dead_letter: DeadLetterModel[RawEvent]) -> None: print("Dead letter received:") print(f"Error: {dead_letter.error_message}") print(f"Error Type: {dead_letter.error_type}") print(f"Failed At: {dead_letter.failed_at}") print(f"Source: {dead_letter.source}") # Access the original typed data original_event: RawEvent = dead_letter.as_typed() print(f"Original User ID: {original_event.user_id}") # Add consumer to monitor dead letter messages event_dlq.add_consumer(monitor_dead_letters) ``` ### Recovery and Retry Logic ```py filename="dlq-recovery.py" copy from moose_lib import Stream from typing import Optional # Create a recovery stream for fixed messages recovered_events = Stream[ProcessedEvent]("recovered_events", { "destination": processed_events.get_table() # Send recovered data to main table }) def recover_event(dead_letter: DeadLetterModel[RawEvent]) -> Optional[ProcessedEvent]: try: original_event = dead_letter.as_typed() # Apply fixes based on error type if "Invalid user_id" in dead_letter.error_message: # Skip events with invalid user IDs return None if "Invalid timestamp" in dead_letter.error_message: # Fix negative timestamps fixed_timestamp = abs(original_event.timestamp) return ProcessedEvent( user_id=original_event.user_id, action=original_event.action, processed_at=datetime.now(), is_valid=True ) return None # Skip other errors except Exception as error: print(f"Recovery failed: {error}") return None # Add recovery logic to the DLQ event_dlq.add_transform( destination=recovered_events, transformation=recover_event ) ``` ## Best Practices ## Common Patterns ### Circuit Breaker Pattern ### Retry with Exponential Backoff Dead letter queues add overhead to stream processing. Use them judiciously and monitor their impact on throughput. Consider implementing sampling for high-volume streams where occasional message loss is acceptable. Dead letter queue events can be integrated with monitoring systems like Prometheus, DataDog, or CloudWatch for alerting and dashboards. Consider tracking metrics like DLQ message rate, error types, and recovery success rates. ## Using Dead Letter Queues in Ingestion Pipelines Dead Letter Queues (DLQs) can be directly integrated with your ingestion pipelines to capture records that fail validation or processing at the API entry point. This ensures that no data is lost, even if it cannot be immediately processed. ```python filename="IngestPipelineWithDLQ.py" copy from moose_lib import IngestPipeline, DeadLetterQueue, IngestPipelineConfig from pydantic import BaseModel class ExampleSchema(BaseModel): id: str name: str value: int timestamp: datetime example_dlq = DeadLetterQueue[ExampleSchema](name="exampleDLQ") pipeline = IngestPipeline[ExampleSchema]( name="example", config=IngestPipelineConfig( ingest_api=True, stream=True, table=True, dead_letter_queue=True # Route failed ingestions to DLQ ) ) ``` See the [Ingestion API documentation](/moose/apis/ingest-api#validation) for more details and best practices on configuring DLQs for ingestion. --- ## Publish Data Source: moose/streaming/from-your-code.mdx Write data to streams from applications, APIs, or external sources # Publishing Data to Streams ## Overview Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines. ## Publishing Methods ### Using REST APIs The most common way to publish data is through Moose's built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint: ```py filename="PublishViaAPI.py" copy from moose_lib import IngestPipeline, IngestPipelineConfig # When you create an IngestPipeline with ingest_api: True, Moose automatically creates an API endpoint raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig( ingest_api=True, # Creates POST /ingest/raw_data endpoint stream=True, table=True )) # You can then publish data via HTTP POST requests response = requests.post('/ingest/raw_data', json={ 'id': '123', 'value': 42 }) ``` See the [OpenAPI documentation](/stack/open-api) to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs. ### Direct Stream Publishing You can publish directly to a stream from your Moose code using the stream's `send` method. This is useful when emitting events from workflows or other backend logic. `send` accepts a single record or an array of records. If your `Stream` is configured with `schemaConfig.kind = "JSON"`, Moose produces using the Confluent envelope automatically (0x00 + schema id + JSON). No code changes are needed beyond setting `schemaConfig`. See the [Schema Registry guide](/moose/streaming/schema-registry). ```py filename="DirectPublish.py" copy from moose_lib import Stream, StreamConfig, Key from pydantic import BaseModel from datetime import datetime class UserEvent(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str # Create a stream (optionally pass StreamConfig with destination/table settings) events = Stream[UserEvent]("user_events", StreamConfig()) # Publish a single record events.send(UserEvent( id="evt_1", user_id="user_123", timestamp=datetime.now(), event_type="click" )) # Publish multiple records events.send([ UserEvent(id="evt_2", user_id="user_456", timestamp=datetime.now(), event_type="view"), UserEvent(id="evt_3", user_id="user_789", timestamp=datetime.now(), event_type="signup"), ]) ``` Moose builds the Kafka topic name from your stream name, optional namespace, and optional version (dots become underscores). For example, a stream named `events` with version `1.2.0` becomes `events_1_2_0` (or `my_ns.events_1_2_0` when the namespace is `"my_ns"`). ### Using the Kafka/Redpanda Client from External Applications You can also publish to streams from external applications using Kafka/Redpanda clients: ```py filename="ExternalPublish.py" copy from kafka import KafkaProducer from datetime import datetime producer = KafkaProducer( bootstrap_servers=['localhost:19092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) # Publish to the stream topic producer.send('user-events', { # Stream name becomes the topic name 'id': 'event-123', 'user_id': 'user-456', 'timestamp': datetime.now().isoformat(), 'event_type': 'page_view' }) ``` #### Locating Redpanda Connection Details When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the `moose.config.toml` file in the root of your project: ```toml filename="moose.config.toml" copy [redpanda_config] broker = "localhost:19092" message_timeout_ms = 1000 retention_ms = 30000 replication_factor = 1 ``` --- ## Schema Registry Source: moose/streaming/schema-registry.mdx Use Confluent Schema Registry with Moose streams (JSON Schema first) # Schema Registry Integration The first supported encoding is JSON Schema. Avro and Protobuf are planned. ## Overview Moose can publish and consume Kafka/Redpanda messages using Confluent Schema Registry. The first supported encoding is JSON Schema; Avro and Protobuf are planned. ## Configure Schema Registry URL Set the Schema Registry URL in `moose.config.toml` under `redpanda_config` (aliased as `kafka_config`). You can also override with environment variables. ```toml filename="moose.config.toml" copy [redpanda_config] broker = "localhost:19092" schema_registry_url = "http://localhost:8081" ``` Environment overrides (either key works): ```bash filename="Terminal" copy export MOOSE_REDPANDA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081 # or export MOOSE_KAFKA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081 ``` ## Referencing Schemas You can attach a Schema Registry reference to any `Stream` via `schemaConfig`. Use one of: - Subject latest: `{ subjectLatest: string }` - Subject and version: `{ subject: string, version: number }` - Schema id: `{ id: number }` ```py filename="sr_stream.py" copy {13-21} from moose_lib import Stream, StreamConfig from moose_lib.dmv2.stream import KafkaSchemaConfig, SubjectLatest from pydantic import BaseModel class Event(BaseModel): id: str value: int schema_config = KafkaSchemaConfig( kind="JSON", reference=SubjectLatest(name="event-value"), ) events = Stream[Event]( "events", StreamConfig(schema_config=schema_config), ) events.send(Event(id="e1", value=42)) ``` ## Consuming SR JSON in Runners Moose streaming runners automatically detect the Confluent JSON envelope when consuming and strip the header before parsing the JSON. Your transformation code continues to work unchanged. ## Ingestion APIs and SR When an Ingest API routes to a topic that has a `schemaConfig` of kind JSON, Moose resolves the schema id and publishes requests using the Schema Registry envelope. You can also set the reference to a fixed `id` to skip lookups. ## Discover existing topics and schemas Use the CLI to pull external topics and optionally fetch JSON Schemas from Schema Registry to emit typed models. ```bash filename="Terminal" copy moose kafka pull \ --schema-registry http://localhost:8081 \ --path app/external-topics \ --include "*" \ --exclude "{__consumer_offsets,_schemas}" ``` This writes external topic declarations under the provided path based on language (default path is inferred). ## Current limitations - JSON Schema only (Avro/Protobuf planned) - Ingest API schema declared in code may not match the actual schema in registry. --- ## Sync to Table Source: moose/streaming/sync-to-table.mdx Automatically sync stream data to OLAP tables with intelligent batching # Sync to Table ## Overview Moose automatically handles batch writes between streams and OLAP tables through a **destination configuration**. When you specify a `destination` OLAP table for a stream, Moose provisions a background synchronization process that batches and writes data from the stream to the table. ### Basic Usage ```py filename="SyncToTable.py" copy {12} from moose_lib import Stream, OlapTable, Key class Event(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str events_table = OlapTable[Event]("events") events_stream = Stream[Event]("events", StreamConfig( destination=events_table # This configures automatic batching )) ``` ## Setting Up Automatic Sync ### Using IngestPipeline (Easiest) The simplest way to set up automatic syncing is with an `IngestPipeline`, which creates all components and wires them together: ```py filename="AutoSync.py" copy {14-15} from moose_lib import IngestPipeline, IngestPipelineConfig, Key from pydantic import BaseModel from datetime import datetime class Event(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str # Creates stream, table, API, and automatic sync events_pipeline = IngestPipeline[Event]("events", IngestPipelineConfig( ingest_api=True, stream=True, # Creates stream table=True # Creates destination table + auto-sync process )) ``` ### Standalone Components For more granular control, you can configure components individually: ```py filename="ManualSync.py" copy from moose_lib import Stream, OlapTable, IngestApi, StreamConfig, Key from pydantic import BaseModel from datetime import datetime class Event(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str # Create table first events_table = OlapTable[Event]("events") # Create stream with destination table (enables auto-sync) events_stream = Stream[Event]("events", StreamConfig( destination=events_table # This configures automatic batching )) # Create API that writes to the stream events_api = IngestApi[Event]("events", { "destination": events_stream }) ``` ## How Automatic Syncing Works When you configure a stream with a `destination` table, Moose automatically handles the synchronization by managing a Rust process process in the background. Moose creates a **Rust background process** that: 1. **Consumes** messages from the stream (Kafka/Redpanda topic) 2. **Batches** records up to 100,000 or flushes every second (whichever comes first) 3. **Executes** optimized ClickHouse `INSERT` statements 4. **Commits** stream offsets after successful writes 5. **Retries** failed batches with exponential backoff Default batching parameters: | Parameter | Value | Description | |-----------|-------|-------------| | `MAX_BATCH_SIZE` | 100,000 records | Maximum records per batch insert | | `FLUSH_INTERVAL` | 1 second | Automatic flush regardless of batch size | Currently, you cannot configure the batching parameters, but we're interested in adding this feature. If you need this capability, let us know on slack! [ClickHouse inserts need to be batched for optimal performance](https://clickhouse.com/blog/asynchronous-data-inserts-in-clickhouse#data-needs-to-be-batched-for-optimal-performance). Moose automatically handles this optimization internally, ensuring your data is efficiently written to ClickHouse without any configuration required. ## Data Flow Example Here's how data flows through the automatic sync process: ```py filename="DataFlow.py" copy # 1. Data sent to ingestion API requests.post('http://localhost:4000/ingest/events', json={ "id": "evt_123", "user_id": "user_456", "timestamp": "2024-01-15T10:30:00Z", "event_type": "click" }) # 2. API validates and writes to stream # 3. Background sync process batches stream data # 4. Batch automatically written to ClickHouse table when: # - Batch reaches 100,000 records, OR # - 1 second has elapsed since last flush # 5. Data available for queries in events table # SELECT * FROM events WHERE user_id = 'user_456'; ``` ## Monitoring and Observability The sync process provides built-in observability within the Moose runtime: - **Batch Insert Logs**: Records successful batch insertions with sizes and offsets - **Error Handling**: Logs transient failures with retry information - **Metrics**: Tracks throughput, batch sizes, and error rates - **Offset Tracking**: Maintains Kafka consumer group offsets for reliability --- ## Transformation Functions Source: moose/streaming/transform-functions.mdx Process and transform data in-flight between streams # Transformation Functions ## Overview Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination. ## Implementing Transformations ### Reshape and Enrich Data Transform data shape or enrich records: ```py filename="DataTransform.py" copy from moose_lib import Stream, Key from pydantic import BaseModel from datetime import datetime class EventProperties(BaseModel): user_id: str platform: str app_version: str ip_address: str class RawEvent(BaseModel): id: Key[str] timestamp: str data: EventProperties class EnrichedEventProperties(BaseModel): platform: str version: str country: str class EnrichedEventMetadata(BaseModel): originalTimestamp: str processedAt: datetime class EnrichedEvent(BaseModel): eventId: Key[str] timestamp: datetime userId: Key[str] properties: EnrichedEventProperties metadata: EnrichedEventMetadata raw_stream = Stream[RawEvent]("raw_events") enriched_stream = Stream[EnrichedEvent]("enriched_events") raw_stream.add_transform(destination=enriched_stream, transformation=lambda record: EnrichedEvent( eventId=record.id, timestamp=datetime.fromisoformat(record.timestamp), userId=record.data.user_id, properties=EnrichedEventProperties( platform=record.data.platform, version=record.data.app_version, country=lookupCountry(record.data.ip_address) ), metadata=EnrichedEventMetadata( originalTimestamp=record.timestamp, processedAt=datetime.now() ) )) ``` ### Filtering Remove or filter records based on conditions: ```py filename="FilterStream.py" copy from moose_lib import Stream, Key from pydantic import BaseModel class MetricRecord(BaseModel): id: Key[str] name: str value: float timestamp: Date class ValidMetrics(BaseModel): id: Key[str] name: str value: float timestamp: Date input_stream = Stream[MetricRecord]("input_metrics") valid_metrics = Stream[ValidMetrics]("valid_metrics") def filter_function(record: MetricRecord) -> ValidMetrics | None: if record.value > 0 and record.timestamp > getStartOfDay() and not record.name.startswith('debug_'): return ValidMetrics( id=record.id, name=record.name, value=record.value, timestamp=record.timestamp ) return None input_stream.add_transform(destination=valid_metrics, transformation=filter_function) ``` ### Fan Out (1:N) Send data to multiple downstream processors: ```py filename="FanOut.py" copy from moose_lib import Stream, Key from pydantic import BaseModel # Define data models class Order(BaseModel): orderId: Key[str] userId: Key[str] amount: float items: List[str] class HighPriorityOrder(Order): priority: str = 'high' class ArchivedOrder(Order): archivedAt: Date # Create source and destination streams order_stream = Stream[Order]("orders") analytics_stream = Stream[Order]("order_analytics") notification_stream = Stream[HighPriorityOrder]("order_notifications") archive_stream = Stream[ArchivedOrder]("order_archive") # Send all orders to analytics def analytics_transform(order: Order) -> Order: return order order_stream.add_transform(destination=analytics_stream, transformation=analytics_transform) # Send large orders to notifications def high_priority_transform(order: Order) -> HighPriorityOrder | None: if order.amount > 1000: return HighPriorityOrder( orderId=order.orderId, userId=order.userId, amount=order.amount, items=order.items, priority='high' ) return None # Skip small orders order_stream.add_transform(destination=notification_stream, transformation=high_priority_transform) # Archive all orders with timestamp def archive_transform(order: Order) -> ArchivedOrder | None: return ArchivedOrder( orderId=order.orderId, userId=order.userId, amount=order.amount, items=order.items, archivedAt=datetime.now() ) order_stream.add_transform(destination=archive_stream, transformation=archive_transform) ``` ### Fan In (N:1) Combine data from multiple sources: ```py filename="FanIn.py" copy from moose_lib import Stream, OlapTable, Key, StreamConfig class UserEvent(BaseModel): userId: Key[str] eventType: str timestamp: Date source: str # Create source and destination streams web_events = Stream[UserEvent]("web_events") mobile_events = Stream[UserEvent]("mobile_events") api_events = Stream[UserEvent]("api_events") # Create a stream and table for the combined events events_table = OlapTable[UserEvent]("all_events") all_events = Stream[UserEvent]("all_events", StreamConfig( destination=events_table )) # Fan in from web def web_transform(event: UserEvent) -> UserEvent: return UserEvent( userId=event.userId, eventType=event.eventType, timestamp=event.timestamp, source='web' ) web_events.add_transform(destination=all_events, transformation=web_transform) # Fan in from mobile def mobile_transform(event: UserEvent) -> UserEvent: return UserEvent( userId=event.userId, eventType=event.eventType, timestamp=event.timestamp, source='mobile' ) mobile_events.add_transform(destination=all_events, transformation=mobile_transform) # Fan in from API def api_transform(event: UserEvent) -> UserEvent: return UserEvent( userId=event.userId, eventType=event.eventType, timestamp=event.timestamp, source='api' ) api_events.add_transform(destination=all_events, transformation=api_transform) ``` ### Unnesting Flatten nested records: ```py filename="Unnest.py" copy from moose_lib import Stream, Key class NestedRecord(BaseModel): id: Key[str] nested: List[NestedValue] class FlattenedRecord(BaseModel): id: Key[str] value: int nested_stream = Stream[NestedRecord]("nested_records") flattened_stream = Stream[FlattenedRecord]("flattened_records") def unnest_transform(record: NestedRecord) -> List[FlattenedRecord]: result = [] for nested in record.nested: result.append(FlattenedRecord( id=record.id, value=nested.value )) return result nested_stream.add_transform(flattened_stream, unnest_transform) ``` You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either: - Use conditional logic inside a single streaming function to handle different cases, or - Implement a fan-out/fan-in pattern, where you route records to different intermediate streams and then merge them back into the destination stream. ## Error Handling with Dead Letter Queues When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline. ```py filename="DeadLetterQueue.py" copy from moose_lib import DeadLetterQueue, IngestPipeline, IngestPipelineConfig, TransformConfig, DeadLetterModel from pydantic import BaseModel from datetime import datetime class UserEvent(BaseModel): user_id: str action: str timestamp: float class ProcessedEvent(BaseModel): user_id: str action: str processed_at: datetime is_valid: bool # Create pipelines raw_events = IngestPipeline[UserEvent]("raw_events", IngestPipelineConfig( ingest_api=True, stream=True, table=False )) processed_events = IngestPipeline[ProcessedEvent]("processed_events", IngestPipelineConfig( ingest_api=False, stream=True, table=True )) # Create dead letter queue for failed transformations event_dlq = DeadLetterQueue[UserEvent](name="EventDLQ") def process_event(event: UserEvent) -> ProcessedEvent: # This might fail for invalid data if not event.user_id or len(event.user_id) == 0: raise ValueError("Invalid user_id: cannot be empty") return ProcessedEvent( user_id=event.user_id, action=event.action, processed_at=datetime.now(), is_valid=True ) # Add transform with error handling raw_events.get_stream().add_transform( destination=processed_events.get_stream(), transformation=process_event, config=TransformConfig( dead_letter_queue=event_dlq # Failed messages go here ) ) def monitor_dead_letters(dead_letter: DeadLetterModel[UserEvent]) -> None: print(f"Error: {dead_letter.error_message}") print(f"Failed at: {dead_letter.failed_at}") # Access original typed data original_event: UserEvent = dead_letter.as_typed() print(f"Original User ID: {original_event.user_id}") # Monitor dead letter messages event_dlq.add_consumer(monitor_dead_letters) ``` For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the [Dead Letter Queues guide](./dead-letter-queues).