1. MooseStack
  2. Moose Streaming
  3. Streaming Consumer Functions

On this page

OverviewBasic UsageProcessing PatternsStateful Processing with MooseCachePropagating Events to External SystemsHTTP API Calls

Streaming Consumer Functions

Overview

Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures.

Basic Usage

Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this:

import { Stream } from "@514labs/moose-lib"; interface UserEvent {  id: string;  userId: string;  timestamp: Date;  eventType: string;} const userEventsStream = new Stream<UserEvent>("user-events"); // Add a consumer to process eventsuserEventsStream.addConsumer((event: UserEvent) => {  console.log(`Processing event: ${event.id}`);  console.log(`User: ${event.userId}, Type: ${event.eventType}`);    // Your processing logic here  // e.g., update analytics, send notifications, etc.}); // Add multiple consumers for different purposesuserEventsStream.addConsumer((event: UserEvent) => {  // Analytics processing  if (event.eventType === 'purchase') {    updatePurchaseAnalytics(event);  }}); userEventsStream.addConsumer((event: UserEvent) => {  // Notification processing  if (event.eventType === 'signup') {    sendWelcomeEmail(event.userId);  }});

Processing Patterns

Stateful Processing with MooseCache

Maintain state across event processing using MooseCache for distributed state management:

import { MooseCache } from "@514labs/moose-lib"; // State container for accumulating datainterface AccumulatorState {  id: string;  counter: number;  sum: number;  lastModified: Date;  attributes: Record<string, any>;} // Input message structureinterface InputMessage {  id: string;  groupId: string;  numericValue: number;  messageType: string;  timestamp: Date;  payload: Record<string, any>;} const messageStream = new Stream<InputMessage>("input-stream"); messageStream.addConsumer(async (message: InputMessage) => {  // Get distributed cache instance  const cache = await MooseCache.get();  const cacheKey = `state:${message.groupId}`;    // Load existing state or create new one  let state: AccumulatorState | null = await cache.get<AccumulatorState>(cacheKey);    if (!state) {    // Initialize new state    state = {      id: message.groupId,      counter: 0,      sum: 0,      lastModified: new Date(),      attributes: {}    };  }    // Apply message to state  state.counter += 1;  state.sum += message.numericValue;  state.lastModified = message.timestamp;  state.attributes = { ...state.attributes, ...message.payload };    // Determine cache lifetime based on message type  const ttlSeconds = message.messageType === 'complete' ? 60 : 3600;    if (message.messageType === 'complete' || shouldFinalize(state)) {    // Finalize and remove state    await finalizeState(state);    await cache.delete(cacheKey);  } else {    // Persist updated state    await cache.set(cacheKey, state, ttlSeconds);  }}); // Condition for automatic state finalizationfunction shouldFinalize(state: AccumulatorState): boolean {  const threshold = 100;  const timeLimit = 30 * 60 * 1000; // 30 minutes  const elapsed = new Date().getTime() - state.lastModified.getTime();    return state.counter >= threshold || elapsed > timeLimit;} async function finalizeState(state: AccumulatorState): Promise<void> {  console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`);}
import { MooseCache } from "@514labs/moose-lib"; // State container for accumulating datainterface AccumulatorState {  id: string;  counter: number;  sum: number;  lastModified: Date;  attributes: Record<string, any>;} // Input message structureinterface InputMessage {  id: string;  groupId: string;  numericValue: number;  messageType: string;  timestamp: Date;  payload: Record<string, any>;} const messageStream = new Stream<InputMessage>("input-stream"); messageStream.addConsumer(async (message: InputMessage) => {  // Get distributed cache instance  const cache = await MooseCache.get();  const cacheKey = `state:${message.groupId}`;    // Load existing state or create new one  let state: AccumulatorState | null = await cache.get<AccumulatorState>(cacheKey);    if (!state) {    // Initialize new state    state = {      id: message.groupId,      counter: 0,      sum: 0,      lastModified: new Date(),      attributes: {}    };  }    // Apply message to state  state.counter += 1;  state.sum += message.numericValue;  state.lastModified = message.timestamp;  state.attributes = { ...state.attributes, ...message.payload };    // Determine cache lifetime based on message type  const ttlSeconds = message.messageType === 'complete' ? 60 : 3600;    if (message.messageType === 'complete' || shouldFinalize(state)) {    // Finalize and remove state    await finalizeState(state);    await cache.delete(cacheKey);  } else {    // Persist updated state    await cache.set(cacheKey, state, ttlSeconds);  }}); // Condition for automatic state finalizationfunction shouldFinalize(state: AccumulatorState): boolean {  const threshold = 100;  const timeLimit = 30 * 60 * 1000; // 30 minutes  const elapsed = new Date().getTime() - state.lastModified.getTime();    return state.counter >= threshold || elapsed > timeLimit;} async function finalizeState(state: AccumulatorState): Promise<void> {  console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`);}

Propagating Events to External Systems

You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur:

HTTP API Calls

Send processed data to external APIs:

interface WebhookPayload {  id: string;  data: Record<string, any>;  timestamp: Date;} const webhookStream = new Stream<WebhookPayload>("webhook-events"); webhookStream.addConsumer(async (payload: WebhookPayload) => {  try {    // Send to external webhook    const response = await fetch('https://external-api.com/webhook', {      method: 'POST',      headers: {        'Content-Type': 'application/json',        'Authorization': 'Bearer ' + process.env.API_TOKEN      },      body: JSON.stringify({        eventId: payload.id,        eventData: payload.data,        processedAt: new Date().toISOString()      })    });     if (!response.ok) {      throw new Error(`HTTP ${response.status}: ${response.statusText}`);    }     console.log(`Successfully sent event ${payload.id} to external API`);  } catch (error) {    console.error(`Failed to send event ${payload.id}:`, error);    // Could implement retry logic or dead letter queue here  }});

Database Operations

Write processed data to external databases:

import { createConnection } from 'mysql2/promise'; interface DatabaseRecord {  id: string;  category: string;  value: number;  metadata: Record<string, any>;  timestamp: Date;} const dbStream = new Stream<DatabaseRecord>("database-events"); // Initialize database connectionconst dbConfig = {  host: process.env.DB_HOST,  user: process.env.DB_USER,  password: process.env.DB_PASSWORD,  database: process.env.DB_NAME}; dbStream.addConsumer(async (record: DatabaseRecord) => {  const connection = await createConnection(dbConfig);    try {    // Insert record into external database    await connection.execute(      'INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES (?, ?, ?, ?, ?)',      [        record.id,        record.category,        record.value,        JSON.stringify(record.metadata),        record.timestamp      ]    );     console.log(`Inserted record ${record.id} into database`);  } catch (error) {    console.error(`Database insert failed for record ${record.id}:`, error);  } finally {    await connection.end();  }});

File System Operations

Write processed data to files or cloud storage:

import { writeFile, mkdir } from 'fs/promises';import { join } from 'path'; interface FileOutput {  id: string;  filename: string;  content: string;  directory: string;  format: 'json' | 'csv' | 'txt';} const fileStream = new Stream<FileOutput>("file-events"); fileStream.addConsumer(async (output: FileOutput) => {  try {    // Ensure directory exists    await mkdir(output.directory, { recursive: true });        // Format content based on type    let formattedContent: string;    switch (output.format) {      case 'json':        formattedContent = JSON.stringify(JSON.parse(output.content), null, 2);        break;      case 'csv':        formattedContent = output.content; // Assume already CSV formatted        break;      default:        formattedContent = output.content;    }        // Write file with timestamp    const timestamp = new Date().toISOString().replace(/[:.]/g, '-');    const filename = `${output.filename}_${timestamp}.${output.format}`;    const filepath = join(output.directory, filename);        await writeFile(filepath, formattedContent, 'utf8');        console.log(`Written file: ${filepath}`);  } catch (error) {    console.error(`Failed to write file for output ${output.id}:`, error);  }});

Email and Notifications

Send alerts and notifications based on processed events:

import nodemailer from 'nodemailer'; interface NotificationEvent {  id: string;  type: 'email' | 'slack' | 'webhook';  recipient: string;  subject: string;  message: string;  priority: 'low' | 'medium' | 'high';  metadata: Record<string, any>;} const notificationStream = new Stream<NotificationEvent>("notifications"); // Configure email transporterconst emailTransporter = nodemailer.createTransporter({  host: process.env.SMTP_HOST,  port: parseInt(process.env.SMTP_PORT || '587'),  secure: false,  auth: {    user: process.env.SMTP_USER,    pass: process.env.SMTP_PASS  }}); notificationStream.addConsumer(async (notification: NotificationEvent) => {  try {    switch (notification.type) {      case 'email':        await emailTransporter.sendMail({          from: process.env.SMTP_FROM,          to: notification.recipient,          subject: notification.subject,          text: notification.message,          html: `<div>            <h3>${notification.subject}</h3>            <p>${notification.message}</p>            <p><em>Priority: ${notification.priority}</em></p>          </div>`        });        break;              case 'slack':        await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, {          method: 'POST',          headers: { 'Content-Type': 'application/json' },          body: JSON.stringify({            text: notification.message,            channel: notification.recipient,            username: 'Moose Alert',            icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:'          })        });        break;              case 'webhook':        await fetch(notification.recipient, {          method: 'POST',          headers: { 'Content-Type': 'application/json' },          body: JSON.stringify({            id: notification.id,            subject: notification.subject,            message: notification.message,            priority: notification.priority,            metadata: notification.metadata          })        });        break;    }        console.log(`Sent ${notification.type} notification ${notification.id}`);  } catch (error) {    console.error(`Failed to send notification ${notification.id}:`, error);  }});
import nodemailer from 'nodemailer'; interface NotificationEvent {  id: string;  type: 'email' | 'slack' | 'webhook';  recipient: string;  subject: string;  message: string;  priority: 'low' | 'medium' | 'high';  metadata: Record<string, any>;} const notificationStream = new Stream<NotificationEvent>("notifications"); // Configure email transporterconst emailTransporter = nodemailer.createTransporter({  host: process.env.SMTP_HOST,  port: parseInt(process.env.SMTP_PORT || '587'),  secure: false,  auth: {    user: process.env.SMTP_USER,    pass: process.env.SMTP_PASS  }}); notificationStream.addConsumer(async (notification: NotificationEvent) => {  try {    switch (notification.type) {      case 'email':        await emailTransporter.sendMail({          from: process.env.SMTP_FROM,          to: notification.recipient,          subject: notification.subject,          text: notification.message,          html: `<div>            <h3>${notification.subject}</h3>            <p>${notification.message}</p>            <p><em>Priority: ${notification.priority}</em></p>          </div>`        });        break;              case 'slack':        await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, {          method: 'POST',          headers: { 'Content-Type': 'application/json' },          body: JSON.stringify({            text: notification.message,            channel: notification.recipient,            username: 'Moose Alert',            icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:'          })        });        break;              case 'webhook':        await fetch(notification.recipient, {          method: 'POST',          headers: { 'Content-Type': 'application/json' },          body: JSON.stringify({            id: notification.id,            subject: notification.subject,            message: notification.message,            priority: notification.priority,            metadata: notification.metadata          })        });        break;    }        console.log(`Sent ${notification.type} notification ${notification.id}`);  } catch (error) {    console.error(`Failed to send notification ${notification.id}:`, error);  }});
  • Overview
  • Quick Start
  • Templates / Examples
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Data Modeling
MooseStack in your App
  • App / API frameworks
Modules
  • Moose OLAP
  • Moose Streaming
    • Manage Streams
    • Create Streams
    • Sync to OLAP
    • Dead Letter Queues
    • Functions
    • Consumer Functions
    • Transformation Functions
    • Writing to Streams
    • From Your Code
    • Schema Registry
    • From CDC Services
  • Moose Workflows
  • Moose APIs
Deployment & Lifecycle
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Changelog
Contribution
  • Documentation
  • Framework
FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplates
Changelog
Source506