Streaming Consumer Functions
Viewing:
Overview
Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures.
Basic Usage
Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this:
import { Stream } from "@514labs/moose-lib";
interface UserEvent {
id: string;
userId: string;
timestamp: Date;
eventType: string;
}
const userEventsStream = new Stream<UserEvent>("user-events");
// Add a consumer to process events
userEventsStream.addConsumer((event: UserEvent) => {
console.log(`Processing event: ${event.id}`);
console.log(`User: ${event.userId}, Type: ${event.eventType}`);
// Your processing logic here
// e.g., update analytics, send notifications, etc.
});
// Add multiple consumers for different purposes
userEventsStream.addConsumer((event: UserEvent) => {
// Analytics processing
if (event.eventType === 'purchase') {
updatePurchaseAnalytics(event);
}
});
userEventsStream.addConsumer((event: UserEvent) => {
// Notification processing
if (event.eventType === 'signup') {
sendWelcomeEmail(event.userId);
}
});
from moose_lib import Stream
from pydantic import BaseModel
from datetime import datetime
class UserEvent(BaseModel):
id: str
user_id: str
timestamp: datetime
event_type: str
user_events_stream = Stream[UserEvent]("user-events")
# Add a consumer to process events
def process_event(event: UserEvent):
print(f"Processing event: {event.id}")
print(f"User: {event.user_id}, Type: {event.event_type}")
# Your processing logic here
# e.g., update analytics, send notifications, etc.
user_events_stream.add_consumer(process_event)
# Add multiple consumers for different purposes
def analytics_consumer(event: UserEvent):
# Analytics processing
if event.event_type == 'purchase':
update_purchase_analytics(event)
def notification_consumer(event: UserEvent):
# Notification processing
if event.event_type == 'signup':
send_welcome_email(event.user_id)
user_events_stream.add_consumer(analytics_consumer)
user_events_stream.add_consumer(notification_consumer)
Processing Patterns
Stateful Processing with MooseCache
Maintain state across event processing using MooseCache for distributed state management:
import { MooseCache } from "@514labs/moose-lib";
// State container for accumulating data
interface AccumulatorState {
id: string;
counter: number;
sum: number;
lastModified: Date;
attributes: Record<string, any>;
}
// Input message structure
interface InputMessage {
id: string;
groupId: string;
numericValue: number;
messageType: string;
timestamp: Date;
payload: Record<string, any>;
}
const messageStream = new Stream<InputMessage>("input-stream");
messageStream.addConsumer(async (message: InputMessage) => {
// Get distributed cache instance
const cache = await MooseCache.get();
const cacheKey = `state:${message.groupId}`;
// Load existing state or create new one
let state: AccumulatorState | null = await cache.get<AccumulatorState>(cacheKey);
if (!state) {
// Initialize new state
state = {
id: message.groupId,
counter: 0,
sum: 0,
lastModified: new Date(),
attributes: {}
};
}
// Apply message to state
state.counter += 1;
state.sum += message.numericValue;
state.lastModified = message.timestamp;
state.attributes = { ...state.attributes, ...message.payload };
// Determine cache lifetime based on message type
const ttlSeconds = message.messageType === 'complete' ? 60 : 3600;
if (message.messageType === 'complete' || shouldFinalize(state)) {
// Finalize and remove state
await finalizeState(state);
await cache.delete(cacheKey);
} else {
// Persist updated state
await cache.set(cacheKey, state, ttlSeconds);
}
});
// Condition for automatic state finalization
function shouldFinalize(state: AccumulatorState): boolean {
const threshold = 100;
const timeLimit = 30 * 60 * 1000; // 30 minutes
const elapsed = new Date().getTime() - state.lastModified.getTime();
return state.counter >= threshold || elapsed > timeLimit;
}
async function finalizeState(state: AccumulatorState): Promise<void> {
console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`);
}
from datetime import datetime
from typing import Dict, Any
from moose_lib import MooseCache
from pydantic import BaseModel
# State container for accumulating data
class AccumulatorState(BaseModel):
id: str
counter: int
sum: float
last_modified: datetime
attributes: Dict[str, Any]
# Input message structure
class InputMessage(BaseModel):
id: str
group_id: str
numeric_value: float
message_type: str
timestamp: datetime
payload: Dict[str, Any]
message_stream = Stream[InputMessage]("input-stream")
# Initialize distributed cache
cache = MooseCache()
def process_message(message: InputMessage):
cache_key = f"state:{message.group_id}"
# Load existing state or create new one
state = cache.get(cache_key, AccumulatorState)
if not state:
# Initialize new state
state = AccumulatorState(
id=message.group_id,
counter=0,
sum=0.0,
last_modified=datetime.now(),
attributes={}
)
# Apply message to state
state.counter += 1
state.sum += message.numeric_value
state.last_modified = message.timestamp
state.attributes.update(message.payload)
# Determine cache lifetime based on message type
ttl_seconds = 60 if message.message_type == 'complete' else 3600
if message.message_type == 'complete' or should_finalize(state):
# Finalize and remove state
finalize_state(state)
cache.delete(cache_key)
else:
# Persist updated state
cache.set(cache_key, state, ttl_seconds=ttl_seconds)
def should_finalize(state: AccumulatorState) -> bool:
"""Condition for automatic state finalization"""
threshold = 100
time_limit_seconds = 30 * 60 # 30 minutes
elapsed = (datetime.now() - state.last_modified).total_seconds()
return state.counter >= threshold or elapsed > time_limit_seconds
def finalize_state(state: AccumulatorState):
print(f"Finalizing state {state.id}: counter={state.counter}, sum={state.sum}")
message_stream.add_consumer(process_message)
Propagating Events to External Systems
You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur:
HTTP API Calls
Send processed data to external APIs:
interface WebhookPayload {
id: string;
data: Record<string, any>;
timestamp: Date;
}
const webhookStream = new Stream<WebhookPayload>("webhook-events");
webhookStream.addConsumer(async (payload: WebhookPayload) => {
try {
// Send to external webhook
const response = await fetch('https://external-api.com/webhook', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + process.env.API_TOKEN
},
body: JSON.stringify({
eventId: payload.id,
eventData: payload.data,
processedAt: new Date().toISOString()
})
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
console.log(`Successfully sent event ${payload.id} to external API`);
} catch (error) {
console.error(`Failed to send event ${payload.id}:`, error);
// Could implement retry logic or dead letter queue here
}
});
import httpx
import os
from datetime import datetime
from typing import Dict, Any
from pydantic import BaseModel
class WebhookPayload(BaseModel):
id: str
data: Dict[str, Any]
timestamp: datetime
webhook_stream = Stream[WebhookPayload]("webhook-events")
async def send_to_external_api(payload: WebhookPayload):
try:
async with httpx.AsyncClient() as client:
response = await client.post(
'https://external-api.com/webhook',
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {os.getenv("API_TOKEN")}'
},
json={
'event_id': payload.id,
'event_data': payload.data,
'processed_at': datetime.now().isoformat()
}
)
if response.status_code != 200:
raise Exception(f"HTTP {response.status_code}: {response.text}")
print(f"Successfully sent event {payload.id} to external API")
except Exception as error:
print(f"Failed to send event {payload.id}: {error}")
# Could implement retry logic or dead letter queue here
webhook_stream.add_consumer(send_to_external_api)
Database Operations
Write processed data to external databases:
import { createConnection } from 'mysql2/promise';
interface DatabaseRecord {
id: string;
category: string;
value: number;
metadata: Record<string, any>;
timestamp: Date;
}
const dbStream = new Stream<DatabaseRecord>("database-events");
// Initialize database connection
const dbConfig = {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME
};
dbStream.addConsumer(async (record: DatabaseRecord) => {
const connection = await createConnection(dbConfig);
try {
// Insert record into external database
await connection.execute(
'INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES (?, ?, ?, ?, ?)',
[
record.id,
record.category,
record.value,
JSON.stringify(record.metadata),
record.timestamp
]
);
console.log(`Inserted record ${record.id} into database`);
} catch (error) {
console.error(`Database insert failed for record ${record.id}:`, error);
} finally {
await connection.end();
}
});
import asyncpg
import json
import os
from datetime import datetime
from typing import Dict, Any
from pydantic import BaseModel
class DatabaseRecord(BaseModel):
id: str
category: str
value: float
metadata: Dict[str, Any]
timestamp: datetime
db_stream = Stream[DatabaseRecord]("database-events")
async def insert_to_database(record: DatabaseRecord):
try:
# Connect to PostgreSQL database
conn = await asyncpg.connect(
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_NAME')
)
# Insert record into external database
await conn.execute(
'''
INSERT INTO processed_events (id, category, value, metadata, created_at)
VALUES ($1, $2, $3, $4, $5)
''',
record.id,
record.category,
record.value,
json.dumps(record.metadata),
record.timestamp
)
print(f"Inserted record {record.id} into database")
except Exception as error:
print(f"Database insert failed for record {record.id}: {error}")
finally:
if 'conn' in locals():
await conn.close()
db_stream.add_consumer(insert_to_database)
File System Operations
Write processed data to files or cloud storage:
import { writeFile, mkdir } from 'fs/promises';
import { join } from 'path';
interface FileOutput {
id: string;
filename: string;
content: string;
directory: string;
format: 'json' | 'csv' | 'txt';
}
const fileStream = new Stream<FileOutput>("file-events");
fileStream.addConsumer(async (output: FileOutput) => {
try {
// Ensure directory exists
await mkdir(output.directory, { recursive: true });
// Format content based on type
let formattedContent: string;
switch (output.format) {
case 'json':
formattedContent = JSON.stringify(JSON.parse(output.content), null, 2);
break;
case 'csv':
formattedContent = output.content; // Assume already CSV formatted
break;
default:
formattedContent = output.content;
}
// Write file with timestamp
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const filename = `${output.filename}_${timestamp}.${output.format}`;
const filepath = join(output.directory, filename);
await writeFile(filepath, formattedContent, 'utf8');
console.log(`Written file: ${filepath}`);
} catch (error) {
console.error(`Failed to write file for output ${output.id}:`, error);
}
});
import os
import json
import aiofiles
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
class FileOutput(BaseModel):
id: str
filename: str
content: str
directory: str
format: Literal['json', 'csv', 'txt']
file_stream = Stream[FileOutput]("file-events")
async def write_to_file(output: FileOutput):
try:
# Ensure directory exists
os.makedirs(output.directory, exist_ok=True)
# Format content based on type
if output.format == 'json':
formatted_content = json.dumps(json.loads(output.content), indent=2)
else:
formatted_content = output.content
# Write file with timestamp
timestamp = datetime.now().isoformat().replace(':', '-').replace('.', '-')
filename = f"{output.filename}_{timestamp}.{output.format}"
filepath = os.path.join(output.directory, filename)
async with aiofiles.open(filepath, 'w', encoding='utf-8') as f:
await f.write(formatted_content)
print(f"Written file: {filepath}")
except Exception as error:
print(f"Failed to write file for output {output.id}: {error}")
file_stream.add_consumer(write_to_file)
Email and Notifications
Send alerts and notifications based on processed events:
import nodemailer from 'nodemailer';
interface NotificationEvent {
id: string;
type: 'email' | 'slack' | 'webhook';
recipient: string;
subject: string;
message: string;
priority: 'low' | 'medium' | 'high';
metadata: Record<string, any>;
}
const notificationStream = new Stream<NotificationEvent>("notifications");
// Configure email transporter
const emailTransporter = nodemailer.createTransporter({
host: process.env.SMTP_HOST,
port: parseInt(process.env.SMTP_PORT || '587'),
secure: false,
auth: {
user: process.env.SMTP_USER,
pass: process.env.SMTP_PASS
}
});
notificationStream.addConsumer(async (notification: NotificationEvent) => {
try {
switch (notification.type) {
case 'email':
await emailTransporter.sendMail({
from: process.env.SMTP_FROM,
to: notification.recipient,
subject: notification.subject,
text: notification.message,
html: `<div>
<h3>${notification.subject}</h3>
<p>${notification.message}</p>
<p><em>Priority: ${notification.priority}</em></p>
</div>`
});
break;
case 'slack':
await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
text: notification.message,
channel: notification.recipient,
username: 'Moose Alert',
icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:'
})
});
break;
case 'webhook':
await fetch(notification.recipient, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
id: notification.id,
subject: notification.subject,
message: notification.message,
priority: notification.priority,
metadata: notification.metadata
})
});
break;
}
console.log(`Sent ${notification.type} notification ${notification.id}`);
} catch (error) {
console.error(`Failed to send notification ${notification.id}:`, error);
}
});
import smtplib
import httpx
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, Any, Literal
from pydantic import BaseModel
class NotificationEvent(BaseModel):
id: str
type: Literal['email', 'slack', 'webhook']
recipient: str
subject: str
message: str
priority: Literal['low', 'medium', 'high']
metadata: Dict[str, Any]
notification_stream = Stream[NotificationEvent]("notifications")
async def send_notification(notification: NotificationEvent):
try:
if notification.type == 'email':
# Send email
msg = MIMEMultipart()
msg['From'] = os.getenv('SMTP_FROM')
msg['To'] = notification.recipient
msg['Subject'] = notification.subject
body = f"""
{notification.message}
Priority: {notification.priority}
"""
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(os.getenv('SMTP_HOST'), int(os.getenv('SMTP_PORT', '587')))
server.starttls()
server.login(os.getenv('SMTP_USER'), os.getenv('SMTP_PASS'))
server.send_message(msg)
server.quit()
elif notification.type == 'slack':
# Send to Slack
async with httpx.AsyncClient() as client:
await client.post(
f"https://hooks.slack.com/services/{os.getenv('SLACK_WEBHOOK')}",
json={
'text': notification.message,
'channel': notification.recipient,
'username': 'Moose Alert',
'icon_emoji': ':warning:' if notification.priority == 'high' else ':information_source:'
}
)
elif notification.type == 'webhook':
# Send to webhook
async with httpx.AsyncClient() as client:
await client.post(
notification.recipient,
json={
'id': notification.id,
'subject': notification.subject,
'message': notification.message,
'priority': notification.priority,
'metadata': notification.metadata
}
)
print(f"Sent {notification.type} notification {notification.id}")
except Exception as error:
print(f"Failed to send notification {notification.id}: {error}")
notification_stream.add_consumer(send_notification)