# Moose / Streaming / Consumer Functions Documentation – TypeScript ## Included Files 1. moose/streaming/consumer-functions/consumer-functions.mdx ## Streaming Consumer Functions Source: moose/streaming/consumer-functions/consumer-functions.mdx Read and process data from streams with consumers and processors # Streaming Consumer Functions ## Overview Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures. ## Basic Usage Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this: ```typescript filename="StreamConsumer.ts" interface UserEvent { id: string; userId: string; timestamp: Date; eventType: string; } const userEventsStream = new Stream("user-events"); // Add a consumer to process events userEventsStream.addConsumer((event: UserEvent) => { console.log(`Processing event: ${event.id}`); console.log(`User: ${event.userId}, Type: ${event.eventType}`); // Your processing logic here // e.g., update analytics, send notifications, etc. }); // Add multiple consumers for different purposes userEventsStream.addConsumer((event: UserEvent) => { // Analytics processing if (event.eventType === 'purchase') { updatePurchaseAnalytics(event); } }); userEventsStream.addConsumer((event: UserEvent) => { // Notification processing if (event.eventType === 'signup') { sendWelcomeEmail(event.userId); } }); ``` ## Processing Patterns ### Stateful Processing with MooseCache Maintain state across event processing using MooseCache for distributed state management: ```typescript filename="StatefulProcessing.ts" // State container for accumulating data interface AccumulatorState { id: string; counter: number; sum: number; lastModified: Date; attributes: Record; } // Input message structure interface InputMessage { id: string; groupId: string; numericValue: number; messageType: string; timestamp: Date; payload: Record; } const messageStream = new Stream("input-stream"); messageStream.addConsumer(async (message: InputMessage) => { // Get distributed cache instance const cache = await MooseCache.get(); const cacheKey = `state:${message.groupId}`; // Load existing state or create new one let state: AccumulatorState | null = await cache.get(cacheKey); if (!state) { // Initialize new state state = { id: message.groupId, counter: 0, sum: 0, lastModified: new Date(), attributes: {} }; } // Apply message to state state.counter += 1; state.sum += message.numericValue; state.lastModified = message.timestamp; state.attributes = { ...state.attributes, ...message.payload }; // Determine cache lifetime based on message type const ttlSeconds = message.messageType === 'complete' ? 60 : 3600; if (message.messageType === 'complete' || shouldFinalize(state)) { // Finalize and remove state await finalizeState(state); await cache.delete(cacheKey); } else { // Persist updated state await cache.set(cacheKey, state, ttlSeconds); } }); // Condition for automatic state finalization function shouldFinalize(state: AccumulatorState): boolean { const threshold = 100; const timeLimit = 30 * 60 * 1000; // 30 minutes const elapsed = new Date().getTime() - state.lastModified.getTime(); return state.counter >= threshold || elapsed > timeLimit; } async function finalizeState(state: AccumulatorState): Promise { console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`); } ``` ## Propagating Events to External Systems You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur: ### HTTP API Calls Send processed data to external APIs: ```typescript filename="HttpIntegration.ts" interface WebhookPayload { id: string; data: Record; timestamp: Date; } const webhookStream = new Stream("webhook-events"); webhookStream.addConsumer(async (payload: WebhookPayload) => { try { // Send to external webhook const response = await fetch('https://external-api.com/webhook', { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + process.env.API_TOKEN }, body: JSON.stringify({ eventId: payload.id, eventData: payload.data, processedAt: new Date().toISOString() }) }); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } console.log(`Successfully sent event ${payload.id} to external API`); } catch (error) { console.error(`Failed to send event ${payload.id}:`, error); // Could implement retry logic or dead letter queue here } }); ``` #### Database Operations Write processed data to external databases: ```typescript filename="DatabaseIntegration.ts" interface DatabaseRecord { id: string; category: string; value: number; metadata: Record; timestamp: Date; } const dbStream = new Stream("database-events"); // Initialize database connection const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME }; dbStream.addConsumer(async (record: DatabaseRecord) => { const connection = await createConnection(dbConfig); try { // Insert record into external database await connection.execute( 'INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES (?, ?, ?, ?, ?)', [ record.id, record.category, record.value, JSON.stringify(record.metadata), record.timestamp ] ); console.log(`Inserted record ${record.id} into database`); } catch (error) { console.error(`Database insert failed for record ${record.id}:`, error); } finally { await connection.end(); } }); ``` #### File System Operations Write processed data to files or cloud storage: ```typescript filename="FileSystemIntegration.ts" interface FileOutput { id: string; filename: string; content: string; directory: string; format: 'json' | 'csv' | 'txt'; } const fileStream = new Stream("file-events"); fileStream.addConsumer(async (output: FileOutput) => { try { // Ensure directory exists await mkdir(output.directory, { recursive: true }); // Format content based on type let formattedContent: string; switch (output.format) { case 'json': formattedContent = JSON.stringify(JSON.parse(output.content), null, 2); break; case 'csv': formattedContent = output.content; // Assume already CSV formatted break; default: formattedContent = output.content; } // Write file with timestamp const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const filename = `${output.filename}_${timestamp}.${output.format}`; const filepath = join(output.directory, filename); await writeFile(filepath, formattedContent, 'utf8'); console.log(`Written file: ${filepath}`); } catch (error) { console.error(`Failed to write file for output ${output.id}:`, error); } }); ``` #### Email and Notifications Send alerts and notifications based on processed events: ```typescript filename="NotificationIntegration.ts" interface NotificationEvent { id: string; type: 'email' | 'slack' | 'webhook'; recipient: string; subject: string; message: string; priority: 'low' | 'medium' | 'high'; metadata: Record; } const notificationStream = new Stream("notifications"); // Configure email transporter const emailTransporter = nodemailer.createTransporter({ host: process.env.SMTP_HOST, port: parseInt(process.env.SMTP_PORT || '587'), secure: false, auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS } }); notificationStream.addConsumer(async (notification: NotificationEvent) => { try { switch (notification.type) { case 'email': await emailTransporter.sendMail({ from: process.env.SMTP_FROM, to: notification.recipient, subject: notification.subject, text: notification.message, html: `

${notification.subject}

${notification.message}

Priority: ${notification.priority}

` }); break; case 'slack': await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ text: notification.message, channel: notification.recipient, username: 'Moose Alert', icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:' }) }); break; case 'webhook': await fetch(notification.recipient, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ id: notification.id, subject: notification.subject, message: notification.message, priority: notification.priority, metadata: notification.metadata }) }); break; } console.log(`Sent ${notification.type} notification ${notification.id}`); } catch (error) { console.error(`Failed to send notification ${notification.id}:`, error); } }); ```