Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures.
Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this:
import { Stream } from "@514labs/moose-lib"; interface UserEvent { id: string; userId: string; timestamp: Date; eventType: string;} const userEventsStream = new Stream<UserEvent>("user-events"); // Add a consumer to process eventsuserEventsStream.addConsumer((event: UserEvent) => { console.log(`Processing event: ${event.id}`); console.log(`User: ${event.userId}, Type: ${event.eventType}`); // Your processing logic here // e.g., update analytics, send notifications, etc.}); // Add multiple consumers for different purposesuserEventsStream.addConsumer((event: UserEvent) => { // Analytics processing if (event.eventType === 'purchase') { updatePurchaseAnalytics(event); }}); userEventsStream.addConsumer((event: UserEvent) => { // Notification processing if (event.eventType === 'signup') { sendWelcomeEmail(event.userId); }});Maintain state across event processing using MooseCache for distributed state management:
import { MooseCache } from "@514labs/moose-lib"; // State container for accumulating datainterface AccumulatorState { id: string; counter: number; sum: number; lastModified: Date; attributes: Record<string, any>;} // Input message structureinterface InputMessage { id: string; groupId: string; numericValue: number; messageType: string; timestamp: Date; payload: Record<string, any>;} const messageStream = new Stream<InputMessage>("input-stream"); messageStream.addConsumer(async (message: InputMessage) => { // Get distributed cache instance const cache = await MooseCache.get(); const cacheKey = `state:${message.groupId}`; // Load existing state or create new one let state: AccumulatorState | null = await cache.get<AccumulatorState>(cacheKey); if (!state) { // Initialize new state state = { id: message.groupId, counter: 0, sum: 0, lastModified: new Date(), attributes: {} }; } // Apply message to state state.counter += 1; state.sum += message.numericValue; state.lastModified = message.timestamp; state.attributes = { ...state.attributes, ...message.payload }; // Determine cache lifetime based on message type const ttlSeconds = message.messageType === 'complete' ? 60 : 3600; if (message.messageType === 'complete' || shouldFinalize(state)) { // Finalize and remove state await finalizeState(state); await cache.delete(cacheKey); } else { // Persist updated state await cache.set(cacheKey, state, ttlSeconds); }}); // Condition for automatic state finalizationfunction shouldFinalize(state: AccumulatorState): boolean { const threshold = 100; const timeLimit = 30 * 60 * 1000; // 30 minutes const elapsed = new Date().getTime() - state.lastModified.getTime(); return state.counter >= threshold || elapsed > timeLimit;} async function finalizeState(state: AccumulatorState): Promise<void> { console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`);}import { MooseCache } from "@514labs/moose-lib"; // State container for accumulating datainterface AccumulatorState { id: string; counter: number; sum: number; lastModified: Date; attributes: Record<string, any>;} // Input message structureinterface InputMessage { id: string; groupId: string; numericValue: number; messageType: string; timestamp: Date; payload: Record<string, any>;} const messageStream = new Stream<InputMessage>("input-stream"); messageStream.addConsumer(async (message: InputMessage) => { // Get distributed cache instance const cache = await MooseCache.get(); const cacheKey = `state:${message.groupId}`; // Load existing state or create new one let state: AccumulatorState | null = await cache.get<AccumulatorState>(cacheKey); if (!state) { // Initialize new state state = { id: message.groupId, counter: 0, sum: 0, lastModified: new Date(), attributes: {} }; } // Apply message to state state.counter += 1; state.sum += message.numericValue; state.lastModified = message.timestamp; state.attributes = { ...state.attributes, ...message.payload }; // Determine cache lifetime based on message type const ttlSeconds = message.messageType === 'complete' ? 60 : 3600; if (message.messageType === 'complete' || shouldFinalize(state)) { // Finalize and remove state await finalizeState(state); await cache.delete(cacheKey); } else { // Persist updated state await cache.set(cacheKey, state, ttlSeconds); }}); // Condition for automatic state finalizationfunction shouldFinalize(state: AccumulatorState): boolean { const threshold = 100; const timeLimit = 30 * 60 * 1000; // 30 minutes const elapsed = new Date().getTime() - state.lastModified.getTime(); return state.counter >= threshold || elapsed > timeLimit;} async function finalizeState(state: AccumulatorState): Promise<void> { console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`);}You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur:
Send processed data to external APIs:
interface WebhookPayload { id: string; data: Record<string, any>; timestamp: Date;} const webhookStream = new Stream<WebhookPayload>("webhook-events"); webhookStream.addConsumer(async (payload: WebhookPayload) => { try { // Send to external webhook const response = await fetch('https://external-api.com/webhook', { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + process.env.API_TOKEN }, body: JSON.stringify({ eventId: payload.id, eventData: payload.data, processedAt: new Date().toISOString() }) }); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } console.log(`Successfully sent event ${payload.id} to external API`); } catch (error) { console.error(`Failed to send event ${payload.id}:`, error); // Could implement retry logic or dead letter queue here }});Write processed data to external databases:
import { createConnection } from 'mysql2/promise'; interface DatabaseRecord { id: string; category: string; value: number; metadata: Record<string, any>; timestamp: Date;} const dbStream = new Stream<DatabaseRecord>("database-events"); // Initialize database connectionconst dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME}; dbStream.addConsumer(async (record: DatabaseRecord) => { const connection = await createConnection(dbConfig); try { // Insert record into external database await connection.execute( 'INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES (?, ?, ?, ?, ?)', [ record.id, record.category, record.value, JSON.stringify(record.metadata), record.timestamp ] ); console.log(`Inserted record ${record.id} into database`); } catch (error) { console.error(`Database insert failed for record ${record.id}:`, error); } finally { await connection.end(); }});Write processed data to files or cloud storage:
import { writeFile, mkdir } from 'fs/promises';import { join } from 'path'; interface FileOutput { id: string; filename: string; content: string; directory: string; format: 'json' | 'csv' | 'txt';} const fileStream = new Stream<FileOutput>("file-events"); fileStream.addConsumer(async (output: FileOutput) => { try { // Ensure directory exists await mkdir(output.directory, { recursive: true }); // Format content based on type let formattedContent: string; switch (output.format) { case 'json': formattedContent = JSON.stringify(JSON.parse(output.content), null, 2); break; case 'csv': formattedContent = output.content; // Assume already CSV formatted break; default: formattedContent = output.content; } // Write file with timestamp const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const filename = `${output.filename}_${timestamp}.${output.format}`; const filepath = join(output.directory, filename); await writeFile(filepath, formattedContent, 'utf8'); console.log(`Written file: ${filepath}`); } catch (error) { console.error(`Failed to write file for output ${output.id}:`, error); }});Send alerts and notifications based on processed events:
import nodemailer from 'nodemailer'; interface NotificationEvent { id: string; type: 'email' | 'slack' | 'webhook'; recipient: string; subject: string; message: string; priority: 'low' | 'medium' | 'high'; metadata: Record<string, any>;} const notificationStream = new Stream<NotificationEvent>("notifications"); // Configure email transporterconst emailTransporter = nodemailer.createTransporter({ host: process.env.SMTP_HOST, port: parseInt(process.env.SMTP_PORT || '587'), secure: false, auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS }}); notificationStream.addConsumer(async (notification: NotificationEvent) => { try { switch (notification.type) { case 'email': await emailTransporter.sendMail({ from: process.env.SMTP_FROM, to: notification.recipient, subject: notification.subject, text: notification.message, html: `<div> <h3>${notification.subject}</h3> <p>${notification.message}</p> <p><em>Priority: ${notification.priority}</em></p> </div>` }); break; case 'slack': await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ text: notification.message, channel: notification.recipient, username: 'Moose Alert', icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:' }) }); break; case 'webhook': await fetch(notification.recipient, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ id: notification.id, subject: notification.subject, message: notification.message, priority: notification.priority, metadata: notification.metadata }) }); break; } console.log(`Sent ${notification.type} notification ${notification.id}`); } catch (error) { console.error(`Failed to send notification ${notification.id}:`, error); }});import nodemailer from 'nodemailer'; interface NotificationEvent { id: string; type: 'email' | 'slack' | 'webhook'; recipient: string; subject: string; message: string; priority: 'low' | 'medium' | 'high'; metadata: Record<string, any>;} const notificationStream = new Stream<NotificationEvent>("notifications"); // Configure email transporterconst emailTransporter = nodemailer.createTransporter({ host: process.env.SMTP_HOST, port: parseInt(process.env.SMTP_PORT || '587'), secure: false, auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS }}); notificationStream.addConsumer(async (notification: NotificationEvent) => { try { switch (notification.type) { case 'email': await emailTransporter.sendMail({ from: process.env.SMTP_FROM, to: notification.recipient, subject: notification.subject, text: notification.message, html: `<div> <h3>${notification.subject}</h3> <p>${notification.message}</p> <p><em>Priority: ${notification.priority}</em></p> </div>` }); break; case 'slack': await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ text: notification.message, channel: notification.recipient, username: 'Moose Alert', icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:' }) }); break; case 'webhook': await fetch(notification.recipient, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ id: notification.id, subject: notification.subject, message: notification.message, priority: notification.priority, metadata: notification.metadata }) }); break; } console.log(`Sent ${notification.type} notification ${notification.id}`); } catch (error) { console.error(`Failed to send notification ${notification.id}:`, error); }});