# Moose / Streaming / Consumer Functions Documentation – Python ## Included Files 1. moose/streaming/consumer-functions/consumer-functions.mdx ## Streaming Consumer Functions Source: moose/streaming/consumer-functions/consumer-functions.mdx Read and process data from streams with consumers and processors # Streaming Consumer Functions ## Overview Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures. ## Basic Usage Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this: ```py filename="StreamConsumer.py" copy from moose_lib import Stream from pydantic import BaseModel from datetime import datetime class UserEvent(BaseModel): id: str user_id: str timestamp: datetime event_type: str user_events_stream = Stream[UserEvent]("user-events") # Add a consumer to process events def process_event(event: UserEvent): print(f"Processing event: {event.id}") print(f"User: {event.user_id}, Type: {event.event_type}") # Your processing logic here # e.g., update analytics, send notifications, etc. user_events_stream.add_consumer(process_event) # Add multiple consumers for different purposes def analytics_consumer(event: UserEvent): # Analytics processing if event.event_type == 'purchase': update_purchase_analytics(event) def notification_consumer(event: UserEvent): # Notification processing if event.event_type == 'signup': send_welcome_email(event.user_id) user_events_stream.add_consumer(analytics_consumer) user_events_stream.add_consumer(notification_consumer) ``` ## Processing Patterns ### Stateful Processing with MooseCache Maintain state across event processing using MooseCache for distributed state management: ```py filename="StatefulProcessing.py" copy from datetime import datetime from typing import Dict, Any from moose_lib import MooseCache from pydantic import BaseModel # State container for accumulating data class AccumulatorState(BaseModel): id: str counter: int sum: float last_modified: datetime attributes: Dict[str, Any] # Input message structure class InputMessage(BaseModel): id: str group_id: str numeric_value: float message_type: str timestamp: datetime payload: Dict[str, Any] message_stream = Stream[InputMessage]("input-stream") # Initialize distributed cache cache = MooseCache() def process_message(message: InputMessage): cache_key = f"state:{message.group_id}" # Load existing state or create new one state = cache.get(cache_key, AccumulatorState) if not state: # Initialize new state state = AccumulatorState( id=message.group_id, counter=0, sum=0.0, last_modified=datetime.now(), attributes={} ) # Apply message to state state.counter += 1 state.sum += message.numeric_value state.last_modified = message.timestamp state.attributes.update(message.payload) # Determine cache lifetime based on message type ttl_seconds = 60 if message.message_type == 'complete' else 3600 if message.message_type == 'complete' or should_finalize(state): # Finalize and remove state finalize_state(state) cache.delete(cache_key) else: # Persist updated state cache.set(cache_key, state, ttl_seconds=ttl_seconds) def should_finalize(state: AccumulatorState) -> bool: """Condition for automatic state finalization""" threshold = 100 time_limit_seconds = 30 * 60 # 30 minutes elapsed = (datetime.now() - state.last_modified).total_seconds() return state.counter >= threshold or elapsed > time_limit_seconds def finalize_state(state: AccumulatorState): print(f"Finalizing state {state.id}: counter={state.counter}, sum={state.sum}") message_stream.add_consumer(process_message) ``` ## Propagating Events to External Systems You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur: ### HTTP API Calls Send processed data to external APIs: ```py filename="HttpIntegration.py" copy from datetime import datetime from typing import Dict, Any from pydantic import BaseModel class WebhookPayload(BaseModel): id: str data: Dict[str, Any] timestamp: datetime webhook_stream = Stream[WebhookPayload]("webhook-events") async def send_to_external_api(payload: WebhookPayload): try: async with httpx.AsyncClient() as client: response = await client.post( 'https://external-api.com/webhook', headers={ 'Content-Type': 'application/json', 'Authorization': f'Bearer {os.getenv("API_TOKEN")}' }, json={ 'event_id': payload.id, 'event_data': payload.data, 'processed_at': datetime.now().isoformat() } ) if response.status_code != 200: raise Exception(f"HTTP {response.status_code}: {response.text}") print(f"Successfully sent event {payload.id} to external API") except Exception as error: print(f"Failed to send event {payload.id}: {error}") # Could implement retry logic or dead letter queue here webhook_stream.add_consumer(send_to_external_api) ``` #### Database Operations Write processed data to external databases: ```py filename="DatabaseIntegration.py" copy from datetime import datetime from typing import Dict, Any from pydantic import BaseModel class DatabaseRecord(BaseModel): id: str category: str value: float metadata: Dict[str, Any] timestamp: datetime db_stream = Stream[DatabaseRecord]("database-events") async def insert_to_database(record: DatabaseRecord): try: # Connect to PostgreSQL database conn = await asyncpg.connect( host=os.getenv('DB_HOST'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), database=os.getenv('DB_NAME') ) # Insert record into external database await conn.execute( ''' INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES ($1, $2, $3, $4, $5) ''', record.id, record.category, record.value, json.dumps(record.metadata), record.timestamp ) print(f"Inserted record {record.id} into database") except Exception as error: print(f"Database insert failed for record {record.id}: {error}") finally: if 'conn' in locals(): await conn.close() db_stream.add_consumer(insert_to_database) ``` #### File System Operations Write processed data to files or cloud storage: ```py filename="FileSystemIntegration.py" copy from datetime import datetime from typing import Literal from pydantic import BaseModel class FileOutput(BaseModel): id: str filename: str content: str directory: str format: Literal['json', 'csv', 'txt'] file_stream = Stream[FileOutput]("file-events") async def write_to_file(output: FileOutput): try: # Ensure directory exists os.makedirs(output.directory, exist_ok=True) # Format content based on type if output.format == 'json': formatted_content = json.dumps(json.loads(output.content), indent=2) else: formatted_content = output.content # Write file with timestamp timestamp = datetime.now().isoformat().replace(':', '-').replace('.', '-') filename = f"{output.filename}_{timestamp}.{output.format}" filepath = os.path.join(output.directory, filename) async with aiofiles.open(filepath, 'w', encoding='utf-8') as f: await f.write(formatted_content) print(f"Written file: {filepath}") except Exception as error: print(f"Failed to write file for output {output.id}: {error}") file_stream.add_consumer(write_to_file) ``` #### Email and Notifications Send alerts and notifications based on processed events: ```py filename="NotificationIntegration.py" copy from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from typing import Dict, Any, Literal from pydantic import BaseModel class NotificationEvent(BaseModel): id: str type: Literal['email', 'slack', 'webhook'] recipient: str subject: str message: str priority: Literal['low', 'medium', 'high'] metadata: Dict[str, Any] notification_stream = Stream[NotificationEvent]("notifications") async def send_notification(notification: NotificationEvent): try: if notification.type == 'email': # Send email msg = MIMEMultipart() msg['From'] = os.getenv('SMTP_FROM') msg['To'] = notification.recipient msg['Subject'] = notification.subject body = f""" {notification.message} Priority: {notification.priority} """ msg.attach(MIMEText(body, 'plain')) server = smtplib.SMTP(os.getenv('SMTP_HOST'), int(os.getenv('SMTP_PORT', '587'))) server.starttls() server.login(os.getenv('SMTP_USER'), os.getenv('SMTP_PASS')) server.send_message(msg) server.quit() elif notification.type == 'slack': # Send to Slack async with httpx.AsyncClient() as client: await client.post( f"https://hooks.slack.com/services/{os.getenv('SLACK_WEBHOOK')}", json={ 'text': notification.message, 'channel': notification.recipient, 'username': 'Moose Alert', 'icon_emoji': ':warning:' if notification.priority == 'high' else ':information_source:' } ) elif notification.type == 'webhook': # Send to webhook async with httpx.AsyncClient() as client: await client.post( notification.recipient, json={ 'id': notification.id, 'subject': notification.subject, 'message': notification.message, 'priority': notification.priority, 'metadata': notification.metadata } ) print(f"Sent {notification.type} notification {notification.id}") except Exception as error: print(f"Failed to send notification {notification.id}: {error}") notification_stream.add_consumer(send_notification) ```