Moose Stack

Moose Streaming

Consumer Functions

Streaming Consumer Functions

Viewing:

Overview

Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures.

Basic Usage

Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this:

StreamConsumer.ts
import { Stream } from "@514labs/moose-lib";
 
interface UserEvent {
  id: string;
  userId: string;
  timestamp: Date;
  eventType: string;
}
 
const userEventsStream = new Stream<UserEvent>("user-events");
 
// Add a consumer to process events
userEventsStream.addConsumer((event: UserEvent) => {
  console.log(`Processing event: ${event.id}`);
  console.log(`User: ${event.userId}, Type: ${event.eventType}`);
  
  // Your processing logic here
  // e.g., update analytics, send notifications, etc.
});
 
// Add multiple consumers for different purposes
userEventsStream.addConsumer((event: UserEvent) => {
  // Analytics processing
  if (event.eventType === 'purchase') {
    updatePurchaseAnalytics(event);
  }
});
 
userEventsStream.addConsumer((event: UserEvent) => {
  // Notification processing
  if (event.eventType === 'signup') {
    sendWelcomeEmail(event.userId);
  }
});

Processing Patterns

Stateful Processing with MooseCache

Maintain state across event processing using MooseCache for distributed state management:

StatefulProcessing.ts
import { MooseCache } from "@514labs/moose-lib";
 
// State container for accumulating data
interface AccumulatorState {
  id: string;
  counter: number;
  sum: number;
  lastModified: Date;
  attributes: Record<string, any>;
}
 
// Input message structure
interface InputMessage {
  id: string;
  groupId: string;
  numericValue: number;
  messageType: string;
  timestamp: Date;
  payload: Record<string, any>;
}
 
const messageStream = new Stream<InputMessage>("input-stream");
 
messageStream.addConsumer(async (message: InputMessage) => {
  // Get distributed cache instance
  const cache = await MooseCache.get();
  const cacheKey = `state:${message.groupId}`;
  
  // Load existing state or create new one
  let state: AccumulatorState | null = await cache.get<AccumulatorState>(cacheKey);
  
  if (!state) {
    // Initialize new state
    state = {
      id: message.groupId,
      counter: 0,
      sum: 0,
      lastModified: new Date(),
      attributes: {}
    };
  }
  
  // Apply message to state
  state.counter += 1;
  state.sum += message.numericValue;
  state.lastModified = message.timestamp;
  state.attributes = { ...state.attributes, ...message.payload };
  
  // Determine cache lifetime based on message type
  const ttlSeconds = message.messageType === 'complete' ? 60 : 3600;
  
  if (message.messageType === 'complete' || shouldFinalize(state)) {
    // Finalize and remove state
    await finalizeState(state);
    await cache.delete(cacheKey);
  } else {
    // Persist updated state
    await cache.set(cacheKey, state, ttlSeconds);
  }
});
 
// Condition for automatic state finalization
function shouldFinalize(state: AccumulatorState): boolean {
  const threshold = 100;
  const timeLimit = 30 * 60 * 1000; // 30 minutes
  const elapsed = new Date().getTime() - state.lastModified.getTime();
  
  return state.counter >= threshold || elapsed > timeLimit;
}
 
async function finalizeState(state: AccumulatorState): Promise<void> {
  console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`);
}

Propagating Events to External Systems

You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur:

HTTP API Calls

Send processed data to external APIs:

HttpIntegration.ts
interface WebhookPayload {
  id: string;
  data: Record<string, any>;
  timestamp: Date;
}
 
const webhookStream = new Stream<WebhookPayload>("webhook-events");
 
webhookStream.addConsumer(async (payload: WebhookPayload) => {
  try {
    // Send to external webhook
    const response = await fetch('https://external-api.com/webhook', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer ' + process.env.API_TOKEN
      },
      body: JSON.stringify({
        eventId: payload.id,
        eventData: payload.data,
        processedAt: new Date().toISOString()
      })
    });
 
    if (!response.ok) {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }
 
    console.log(`Successfully sent event ${payload.id} to external API`);
  } catch (error) {
    console.error(`Failed to send event ${payload.id}:`, error);
    // Could implement retry logic or dead letter queue here
  }
});

Database Operations

Write processed data to external databases:

DatabaseIntegration.ts
import { createConnection } from 'mysql2/promise';
 
interface DatabaseRecord {
  id: string;
  category: string;
  value: number;
  metadata: Record<string, any>;
  timestamp: Date;
}
 
const dbStream = new Stream<DatabaseRecord>("database-events");
 
// Initialize database connection
const dbConfig = {
  host: process.env.DB_HOST,
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
  database: process.env.DB_NAME
};
 
dbStream.addConsumer(async (record: DatabaseRecord) => {
  const connection = await createConnection(dbConfig);
  
  try {
    // Insert record into external database
    await connection.execute(
      'INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES (?, ?, ?, ?, ?)',
      [
        record.id,
        record.category,
        record.value,
        JSON.stringify(record.metadata),
        record.timestamp
      ]
    );
 
    console.log(`Inserted record ${record.id} into database`);
  } catch (error) {
    console.error(`Database insert failed for record ${record.id}:`, error);
  } finally {
    await connection.end();
  }
});

File System Operations

Write processed data to files or cloud storage:

FileSystemIntegration.ts
import { writeFile, mkdir } from 'fs/promises';
import { join } from 'path';
 
interface FileOutput {
  id: string;
  filename: string;
  content: string;
  directory: string;
  format: 'json' | 'csv' | 'txt';
}
 
const fileStream = new Stream<FileOutput>("file-events");
 
fileStream.addConsumer(async (output: FileOutput) => {
  try {
    // Ensure directory exists
    await mkdir(output.directory, { recursive: true });
    
    // Format content based on type
    let formattedContent: string;
    switch (output.format) {
      case 'json':
        formattedContent = JSON.stringify(JSON.parse(output.content), null, 2);
        break;
      case 'csv':
        formattedContent = output.content; // Assume already CSV formatted
        break;
      default:
        formattedContent = output.content;
    }
    
    // Write file with timestamp
    const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
    const filename = `${output.filename}_${timestamp}.${output.format}`;
    const filepath = join(output.directory, filename);
    
    await writeFile(filepath, formattedContent, 'utf8');
    
    console.log(`Written file: ${filepath}`);
  } catch (error) {
    console.error(`Failed to write file for output ${output.id}:`, error);
  }
});

Email and Notifications

Send alerts and notifications based on processed events:

NotificationIntegration.ts
import nodemailer from 'nodemailer';
 
interface NotificationEvent {
  id: string;
  type: 'email' | 'slack' | 'webhook';
  recipient: string;
  subject: string;
  message: string;
  priority: 'low' | 'medium' | 'high';
  metadata: Record<string, any>;
}
 
const notificationStream = new Stream<NotificationEvent>("notifications");
 
// Configure email transporter
const emailTransporter = nodemailer.createTransporter({
  host: process.env.SMTP_HOST,
  port: parseInt(process.env.SMTP_PORT || '587'),
  secure: false,
  auth: {
    user: process.env.SMTP_USER,
    pass: process.env.SMTP_PASS
  }
});
 
notificationStream.addConsumer(async (notification: NotificationEvent) => {
  try {
    switch (notification.type) {
      case 'email':
        await emailTransporter.sendMail({
          from: process.env.SMTP_FROM,
          to: notification.recipient,
          subject: notification.subject,
          text: notification.message,
          html: `<div>
            <h3>${notification.subject}</h3>
            <p>${notification.message}</p>
            <p><em>Priority: ${notification.priority}</em></p>
          </div>`
        });
        break;
        
      case 'slack':
        await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({
            text: notification.message,
            channel: notification.recipient,
            username: 'Moose Alert',
            icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:'
          })
        });
        break;
        
      case 'webhook':
        await fetch(notification.recipient, {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({
            id: notification.id,
            subject: notification.subject,
            message: notification.message,
            priority: notification.priority,
            metadata: notification.metadata
          })
        });
        break;
    }
    
    console.log(`Sent ${notification.type} notification ${notification.id}`);
  } catch (error) {
    console.error(`Failed to send notification ${notification.id}:`, error);
  }
});