FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplatesGuides
Release Notes
Source514
  1. MooseStack
  2. Moose Streaming
  3. Streaming Consumer Functions

On this page

OverviewBasic UsageProcessing PatternsStateful Processing with MooseCachePropagating Events to External SystemsHTTP API Calls

Streaming Consumer Functions

Overview

Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures.

Basic Usage

Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this:

StreamConsumer.py
from moose_lib import Streamfrom pydantic import BaseModelfrom datetime import datetime class UserEvent(BaseModel):    id: str    user_id: str    timestamp: datetime    event_type: str user_events_stream = Stream[UserEvent]("user-events") # Add a consumer to process eventsdef process_event(event: UserEvent):    print(f"Processing event: {event.id}")    print(f"User: {event.user_id}, Type: {event.event_type}")        # Your processing logic here    # e.g., update analytics, send notifications, etc. user_events_stream.add_consumer(process_event) # Add multiple consumers for different purposesdef analytics_consumer(event: UserEvent):    # Analytics processing    if event.event_type == 'purchase':        update_purchase_analytics(event) def notification_consumer(event: UserEvent):    # Notification processing    if event.event_type == 'signup':        send_welcome_email(event.user_id) user_events_stream.add_consumer(analytics_consumer)user_events_stream.add_consumer(notification_consumer)

Processing Patterns

Stateful Processing with MooseCache

Maintain state across event processing using MooseCache for distributed state management:

StatefulProcessing.ts
import { MooseCache } from "@514labs/moose-lib"; // State container for accumulating datainterface AccumulatorState {  id: string;  counter: number;  sum: number;  lastModified: Date;  attributes: Record<string, any>;} // Input message structureinterface InputMessage {  id: string;  groupId: string;  numericValue: number;  messageType: string;  timestamp: Date;  payload: Record<string, any>;} const messageStream = new Stream<InputMessage>("input-stream"); messageStream.addConsumer(async (message: InputMessage) => {  // Get distributed cache instance  const cache = await MooseCache.get();  const cacheKey = `state:${message.groupId}`;    // Load existing state or create new one  let state: AccumulatorState | null = await cache.get<AccumulatorState>(cacheKey);    if (!state) {    // Initialize new state    state = {      id: message.groupId,      counter: 0,      sum: 0,      lastModified: new Date(),      attributes: {}    };  }    // Apply message to state  state.counter += 1;  state.sum += message.numericValue;  state.lastModified = message.timestamp;  state.attributes = { ...state.attributes, ...message.payload };    // Determine cache lifetime based on message type  const ttlSeconds = message.messageType === 'complete' ? 60 : 3600;    if (message.messageType === 'complete' || shouldFinalize(state)) {    // Finalize and remove state    await finalizeState(state);    await cache.delete(cacheKey);  } else {    // Persist updated state    await cache.set(cacheKey, state, ttlSeconds);  }}); // Condition for automatic state finalizationfunction shouldFinalize(state: AccumulatorState): boolean {  const threshold = 100;  const timeLimit = 30 * 60 * 1000; // 30 minutes  const elapsed = new Date().getTime() - state.lastModified.getTime();    return state.counter >= threshold || elapsed > timeLimit;} async function finalizeState(state: AccumulatorState): Promise<void> {  console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`);}
StatefulProcessing.py
from datetime import datetimefrom typing import Dict, Anyfrom moose_lib import MooseCachefrom pydantic import BaseModel # State container for accumulating dataclass AccumulatorState(BaseModel):    id: str    counter: int    sum: float    last_modified: datetime    attributes: Dict[str, Any] # Input message structureclass InputMessage(BaseModel):    id: str    group_id: str    numeric_value: float    message_type: str    timestamp: datetime    payload: Dict[str, Any] message_stream = Stream[InputMessage]("input-stream") # Initialize distributed cachecache = MooseCache() def process_message(message: InputMessage):    cache_key = f"state:{message.group_id}"        # Load existing state or create new one    state = cache.get(cache_key, AccumulatorState)        if not state:        # Initialize new state        state = AccumulatorState(            id=message.group_id,            counter=0,            sum=0.0,            last_modified=datetime.now(),            attributes={}        )        # Apply message to state    state.counter += 1    state.sum += message.numeric_value    state.last_modified = message.timestamp    state.attributes.update(message.payload)        # Determine cache lifetime based on message type    ttl_seconds = 60 if message.message_type == 'complete' else 3600        if message.message_type == 'complete' or should_finalize(state):        # Finalize and remove state        finalize_state(state)        cache.delete(cache_key)    else:        # Persist updated state        cache.set(cache_key, state, ttl_seconds=ttl_seconds) def should_finalize(state: AccumulatorState) -> bool:    """Condition for automatic state finalization"""    threshold = 100    time_limit_seconds = 30 * 60  # 30 minutes    elapsed = (datetime.now() - state.last_modified).total_seconds()        return state.counter >= threshold or elapsed > time_limit_seconds def finalize_state(state: AccumulatorState):    print(f"Finalizing state {state.id}: counter={state.counter}, sum={state.sum}") message_stream.add_consumer(process_message)

Propagating Events to External Systems

You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur:

HTTP API Calls

Send processed data to external APIs:

HttpIntegration.py
import httpximport osfrom datetime import datetimefrom typing import Dict, Anyfrom pydantic import BaseModel class WebhookPayload(BaseModel):    id: str    data: Dict[str, Any]    timestamp: datetime webhook_stream = Stream[WebhookPayload]("webhook-events") async def send_to_external_api(payload: WebhookPayload):    try:        async with httpx.AsyncClient() as client:            response = await client.post(                'https://external-api.com/webhook',                headers={                    'Content-Type': 'application/json',                    'Authorization': f'Bearer {os.getenv("API_TOKEN")}'                },                json={                    'event_id': payload.id,                    'event_data': payload.data,                    'processed_at': datetime.now().isoformat()                }            )                    if response.status_code != 200:            raise Exception(f"HTTP {response.status_code}: {response.text}")                    print(f"Successfully sent event {payload.id} to external API")    except Exception as error:        print(f"Failed to send event {payload.id}: {error}")        # Could implement retry logic or dead letter queue here webhook_stream.add_consumer(send_to_external_api)

Database Operations

Write processed data to external databases:

DatabaseIntegration.py
import asyncpgimport jsonimport osfrom datetime import datetimefrom typing import Dict, Anyfrom pydantic import BaseModel class DatabaseRecord(BaseModel):    id: str    category: str    value: float    metadata: Dict[str, Any]    timestamp: datetime db_stream = Stream[DatabaseRecord]("database-events") async def insert_to_database(record: DatabaseRecord):    try:        # Connect to PostgreSQL database        conn = await asyncpg.connect(            host=os.getenv('DB_HOST'),            user=os.getenv('DB_USER'),            password=os.getenv('DB_PASSWORD'),            database=os.getenv('DB_NAME')        )                # Insert record into external database        await conn.execute(            '''            INSERT INTO processed_events (id, category, value, metadata, created_at)            VALUES ($1, $2, $3, $4, $5)            ''',            record.id,            record.category,            record.value,            json.dumps(record.metadata),            record.timestamp        )                print(f"Inserted record {record.id} into database")            except Exception as error:        print(f"Database insert failed for record {record.id}: {error}")    finally:        if 'conn' in locals():            await conn.close() db_stream.add_consumer(insert_to_database)

File System Operations

Write processed data to files or cloud storage:

FileSystemIntegration.py
import osimport jsonimport aiofilesfrom datetime import datetimefrom typing import Literalfrom pydantic import BaseModel class FileOutput(BaseModel):    id: str    filename: str    content: str    directory: str    format: Literal['json', 'csv', 'txt'] file_stream = Stream[FileOutput]("file-events") async def write_to_file(output: FileOutput):    try:        # Ensure directory exists        os.makedirs(output.directory, exist_ok=True)                # Format content based on type        if output.format == 'json':            formatted_content = json.dumps(json.loads(output.content), indent=2)        else:            formatted_content = output.content                # Write file with timestamp        timestamp = datetime.now().isoformat().replace(':', '-').replace('.', '-')        filename = f"{output.filename}_{timestamp}.{output.format}"        filepath = os.path.join(output.directory, filename)                async with aiofiles.open(filepath, 'w', encoding='utf-8') as f:            await f.write(formatted_content)                print(f"Written file: {filepath}")            except Exception as error:        print(f"Failed to write file for output {output.id}: {error}") file_stream.add_consumer(write_to_file)

Email and Notifications

Send alerts and notifications based on processed events:

NotificationIntegration.ts
import nodemailer from 'nodemailer'; interface NotificationEvent {  id: string;  type: 'email' | 'slack' | 'webhook';  recipient: string;  subject: string;  message: string;  priority: 'low' | 'medium' | 'high';  metadata: Record<string, any>;} const notificationStream = new Stream<NotificationEvent>("notifications"); // Configure email transporterconst emailTransporter = nodemailer.createTransporter({  host: process.env.SMTP_HOST,  port: parseInt(process.env.SMTP_PORT || '587'),  secure: false,  auth: {    user: process.env.SMTP_USER,    pass: process.env.SMTP_PASS  }}); notificationStream.addConsumer(async (notification: NotificationEvent) => {  try {    switch (notification.type) {      case 'email':        await emailTransporter.sendMail({          from: process.env.SMTP_FROM,          to: notification.recipient,          subject: notification.subject,          text: notification.message,          html: `<div>            <h3>${notification.subject}</h3>            <p>${notification.message}</p>            <p><em>Priority: ${notification.priority}</em></p>          </div>`        });        break;              case 'slack':        await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, {          method: 'POST',          headers: { 'Content-Type': 'application/json' },          body: JSON.stringify({            text: notification.message,            channel: notification.recipient,            username: 'Moose Alert',            icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:'          })        });        break;              case 'webhook':        await fetch(notification.recipient, {          method: 'POST',          headers: { 'Content-Type': 'application/json' },          body: JSON.stringify({            id: notification.id,            subject: notification.subject,            message: notification.message,            priority: notification.priority,            metadata: notification.metadata          })        });        break;    }        console.log(`Sent ${notification.type} notification ${notification.id}`);  } catch (error) {    console.error(`Failed to send notification ${notification.id}:`, error);  }});
NotificationIntegration.py
import smtplibimport httpxfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartfrom typing import Dict, Any, Literalfrom pydantic import BaseModel class NotificationEvent(BaseModel):    id: str    type: Literal['email', 'slack', 'webhook']    recipient: str    subject: str    message: str    priority: Literal['low', 'medium', 'high']    metadata: Dict[str, Any] notification_stream = Stream[NotificationEvent]("notifications") async def send_notification(notification: NotificationEvent):    try:        if notification.type == 'email':            # Send email            msg = MIMEMultipart()            msg['From'] = os.getenv('SMTP_FROM')            msg['To'] = notification.recipient            msg['Subject'] = notification.subject                        body = f"""            {notification.message}                        Priority: {notification.priority}            """            msg.attach(MIMEText(body, 'plain'))                        server = smtplib.SMTP(os.getenv('SMTP_HOST'), int(os.getenv('SMTP_PORT', '587')))            server.starttls()            server.login(os.getenv('SMTP_USER'), os.getenv('SMTP_PASS'))            server.send_message(msg)            server.quit()                    elif notification.type == 'slack':            # Send to Slack            async with httpx.AsyncClient() as client:                await client.post(                    f"https://hooks.slack.com/services/{os.getenv('SLACK_WEBHOOK')}",                    json={                        'text': notification.message,                        'channel': notification.recipient,                        'username': 'Moose Alert',                        'icon_emoji': ':warning:' if notification.priority == 'high' else ':information_source:'                    }                )                        elif notification.type == 'webhook':            # Send to webhook            async with httpx.AsyncClient() as client:                await client.post(                    notification.recipient,                    json={                        'id': notification.id,                        'subject': notification.subject,                        'message': notification.message,                        'priority': notification.priority,                        'metadata': notification.metadata                    }                )                print(f"Sent {notification.type} notification {notification.id}")            except Exception as error:        print(f"Failed to send notification {notification.id}: {error}") notification_stream.add_consumer(send_notification)
  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streaming
    • Manage Streams
    • Create Streams
    • Sync to OLAP
    • Dead Letter Queues
    • Functions
    • Consumer Functions
    • Transformation Functions
    • Writing to Streams
    • From Your Code
    • Schema Registry
    • From CDC Services
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework