Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures.
Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this:
from moose_lib import Streamfrom pydantic import BaseModelfrom datetime import datetime class UserEvent(BaseModel): id: str user_id: str timestamp: datetime event_type: str user_events_stream = Stream[UserEvent]("user-events") # Add a consumer to process eventsdef process_event(event: UserEvent): print(f"Processing event: {event.id}") print(f"User: {event.user_id}, Type: {event.event_type}") # Your processing logic here # e.g., update analytics, send notifications, etc. user_events_stream.add_consumer(process_event) # Add multiple consumers for different purposesdef analytics_consumer(event: UserEvent): # Analytics processing if event.event_type == 'purchase': update_purchase_analytics(event) def notification_consumer(event: UserEvent): # Notification processing if event.event_type == 'signup': send_welcome_email(event.user_id) user_events_stream.add_consumer(analytics_consumer)user_events_stream.add_consumer(notification_consumer)Maintain state across event processing using MooseCache for distributed state management:
import { MooseCache } from "@514labs/moose-lib"; // State container for accumulating datainterface AccumulatorState { id: string; counter: number; sum: number; lastModified: Date; attributes: Record<string, any>;} // Input message structureinterface InputMessage { id: string; groupId: string; numericValue: number; messageType: string; timestamp: Date; payload: Record<string, any>;} const messageStream = new Stream<InputMessage>("input-stream"); messageStream.addConsumer(async (message: InputMessage) => { // Get distributed cache instance const cache = await MooseCache.get(); const cacheKey = `state:${message.groupId}`; // Load existing state or create new one let state: AccumulatorState | null = await cache.get<AccumulatorState>(cacheKey); if (!state) { // Initialize new state state = { id: message.groupId, counter: 0, sum: 0, lastModified: new Date(), attributes: {} }; } // Apply message to state state.counter += 1; state.sum += message.numericValue; state.lastModified = message.timestamp; state.attributes = { ...state.attributes, ...message.payload }; // Determine cache lifetime based on message type const ttlSeconds = message.messageType === 'complete' ? 60 : 3600; if (message.messageType === 'complete' || shouldFinalize(state)) { // Finalize and remove state await finalizeState(state); await cache.delete(cacheKey); } else { // Persist updated state await cache.set(cacheKey, state, ttlSeconds); }}); // Condition for automatic state finalizationfunction shouldFinalize(state: AccumulatorState): boolean { const threshold = 100; const timeLimit = 30 * 60 * 1000; // 30 minutes const elapsed = new Date().getTime() - state.lastModified.getTime(); return state.counter >= threshold || elapsed > timeLimit;} async function finalizeState(state: AccumulatorState): Promise<void> { console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`);}from datetime import datetimefrom typing import Dict, Anyfrom moose_lib import MooseCachefrom pydantic import BaseModel # State container for accumulating dataclass AccumulatorState(BaseModel): id: str counter: int sum: float last_modified: datetime attributes: Dict[str, Any] # Input message structureclass InputMessage(BaseModel): id: str group_id: str numeric_value: float message_type: str timestamp: datetime payload: Dict[str, Any] message_stream = Stream[InputMessage]("input-stream") # Initialize distributed cachecache = MooseCache() def process_message(message: InputMessage): cache_key = f"state:{message.group_id}" # Load existing state or create new one state = cache.get(cache_key, AccumulatorState) if not state: # Initialize new state state = AccumulatorState( id=message.group_id, counter=0, sum=0.0, last_modified=datetime.now(), attributes={} ) # Apply message to state state.counter += 1 state.sum += message.numeric_value state.last_modified = message.timestamp state.attributes.update(message.payload) # Determine cache lifetime based on message type ttl_seconds = 60 if message.message_type == 'complete' else 3600 if message.message_type == 'complete' or should_finalize(state): # Finalize and remove state finalize_state(state) cache.delete(cache_key) else: # Persist updated state cache.set(cache_key, state, ttl_seconds=ttl_seconds) def should_finalize(state: AccumulatorState) -> bool: """Condition for automatic state finalization""" threshold = 100 time_limit_seconds = 30 * 60 # 30 minutes elapsed = (datetime.now() - state.last_modified).total_seconds() return state.counter >= threshold or elapsed > time_limit_seconds def finalize_state(state: AccumulatorState): print(f"Finalizing state {state.id}: counter={state.counter}, sum={state.sum}") message_stream.add_consumer(process_message)You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur:
Send processed data to external APIs:
import httpximport osfrom datetime import datetimefrom typing import Dict, Anyfrom pydantic import BaseModel class WebhookPayload(BaseModel): id: str data: Dict[str, Any] timestamp: datetime webhook_stream = Stream[WebhookPayload]("webhook-events") async def send_to_external_api(payload: WebhookPayload): try: async with httpx.AsyncClient() as client: response = await client.post( 'https://external-api.com/webhook', headers={ 'Content-Type': 'application/json', 'Authorization': f'Bearer {os.getenv("API_TOKEN")}' }, json={ 'event_id': payload.id, 'event_data': payload.data, 'processed_at': datetime.now().isoformat() } ) if response.status_code != 200: raise Exception(f"HTTP {response.status_code}: {response.text}") print(f"Successfully sent event {payload.id} to external API") except Exception as error: print(f"Failed to send event {payload.id}: {error}") # Could implement retry logic or dead letter queue here webhook_stream.add_consumer(send_to_external_api)Write processed data to external databases:
import asyncpgimport jsonimport osfrom datetime import datetimefrom typing import Dict, Anyfrom pydantic import BaseModel class DatabaseRecord(BaseModel): id: str category: str value: float metadata: Dict[str, Any] timestamp: datetime db_stream = Stream[DatabaseRecord]("database-events") async def insert_to_database(record: DatabaseRecord): try: # Connect to PostgreSQL database conn = await asyncpg.connect( host=os.getenv('DB_HOST'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), database=os.getenv('DB_NAME') ) # Insert record into external database await conn.execute( ''' INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES ($1, $2, $3, $4, $5) ''', record.id, record.category, record.value, json.dumps(record.metadata), record.timestamp ) print(f"Inserted record {record.id} into database") except Exception as error: print(f"Database insert failed for record {record.id}: {error}") finally: if 'conn' in locals(): await conn.close() db_stream.add_consumer(insert_to_database)Write processed data to files or cloud storage:
import osimport jsonimport aiofilesfrom datetime import datetimefrom typing import Literalfrom pydantic import BaseModel class FileOutput(BaseModel): id: str filename: str content: str directory: str format: Literal['json', 'csv', 'txt'] file_stream = Stream[FileOutput]("file-events") async def write_to_file(output: FileOutput): try: # Ensure directory exists os.makedirs(output.directory, exist_ok=True) # Format content based on type if output.format == 'json': formatted_content = json.dumps(json.loads(output.content), indent=2) else: formatted_content = output.content # Write file with timestamp timestamp = datetime.now().isoformat().replace(':', '-').replace('.', '-') filename = f"{output.filename}_{timestamp}.{output.format}" filepath = os.path.join(output.directory, filename) async with aiofiles.open(filepath, 'w', encoding='utf-8') as f: await f.write(formatted_content) print(f"Written file: {filepath}") except Exception as error: print(f"Failed to write file for output {output.id}: {error}") file_stream.add_consumer(write_to_file)Send alerts and notifications based on processed events:
import nodemailer from 'nodemailer'; interface NotificationEvent { id: string; type: 'email' | 'slack' | 'webhook'; recipient: string; subject: string; message: string; priority: 'low' | 'medium' | 'high'; metadata: Record<string, any>;} const notificationStream = new Stream<NotificationEvent>("notifications"); // Configure email transporterconst emailTransporter = nodemailer.createTransporter({ host: process.env.SMTP_HOST, port: parseInt(process.env.SMTP_PORT || '587'), secure: false, auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS }}); notificationStream.addConsumer(async (notification: NotificationEvent) => { try { switch (notification.type) { case 'email': await emailTransporter.sendMail({ from: process.env.SMTP_FROM, to: notification.recipient, subject: notification.subject, text: notification.message, html: `<div> <h3>${notification.subject}</h3> <p>${notification.message}</p> <p><em>Priority: ${notification.priority}</em></p> </div>` }); break; case 'slack': await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ text: notification.message, channel: notification.recipient, username: 'Moose Alert', icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:' }) }); break; case 'webhook': await fetch(notification.recipient, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ id: notification.id, subject: notification.subject, message: notification.message, priority: notification.priority, metadata: notification.metadata }) }); break; } console.log(`Sent ${notification.type} notification ${notification.id}`); } catch (error) { console.error(`Failed to send notification ${notification.id}:`, error); }});import smtplibimport httpxfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartfrom typing import Dict, Any, Literalfrom pydantic import BaseModel class NotificationEvent(BaseModel): id: str type: Literal['email', 'slack', 'webhook'] recipient: str subject: str message: str priority: Literal['low', 'medium', 'high'] metadata: Dict[str, Any] notification_stream = Stream[NotificationEvent]("notifications") async def send_notification(notification: NotificationEvent): try: if notification.type == 'email': # Send email msg = MIMEMultipart() msg['From'] = os.getenv('SMTP_FROM') msg['To'] = notification.recipient msg['Subject'] = notification.subject body = f""" {notification.message} Priority: {notification.priority} """ msg.attach(MIMEText(body, 'plain')) server = smtplib.SMTP(os.getenv('SMTP_HOST'), int(os.getenv('SMTP_PORT', '587'))) server.starttls() server.login(os.getenv('SMTP_USER'), os.getenv('SMTP_PASS')) server.send_message(msg) server.quit() elif notification.type == 'slack': # Send to Slack async with httpx.AsyncClient() as client: await client.post( f"https://hooks.slack.com/services/{os.getenv('SLACK_WEBHOOK')}", json={ 'text': notification.message, 'channel': notification.recipient, 'username': 'Moose Alert', 'icon_emoji': ':warning:' if notification.priority == 'high' else ':information_source:' } ) elif notification.type == 'webhook': # Send to webhook async with httpx.AsyncClient() as client: await client.post( notification.recipient, json={ 'id': notification.id, 'subject': notification.subject, 'message': notification.message, 'priority': notification.priority, 'metadata': notification.metadata } ) print(f"Sent {notification.type} notification {notification.id}") except Exception as error: print(f"Failed to send notification {notification.id}: {error}") notification_stream.add_consumer(send_notification)