1. MooseStack
  2. Moose Streaming
  3. Streaming Consumer Functions

Streaming Consumer Functions

Overview

Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures.

Basic Usage

Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this:

StreamConsumer.ts
import { Stream } from "@514labs/moose-lib"; interface UserEvent {  id: string;  userId: string;  timestamp: Date;  eventType: string;} const userEventsStream = new Stream<UserEvent>("user-events"); // Add a consumer to process eventsuserEventsStream.addConsumer((event: UserEvent) => {  console.log(`Processing event: ${event.id}`);  console.log(`User: ${event.userId}, Type: ${event.eventType}`);    // Your processing logic here  // e.g., update analytics, send notifications, etc.}); // Add multiple consumers for different purposesuserEventsStream.addConsumer((event: UserEvent) => {  // Analytics processing  if (event.eventType === 'purchase') {    updatePurchaseAnalytics(event);  }}); userEventsStream.addConsumer((event: UserEvent) => {  // Notification processing  if (event.eventType === 'signup') {    sendWelcomeEmail(event.userId);  }});

Processing Patterns

Stateful Processing with MooseCache

Maintain state across event processing using MooseCache for distributed state management:

StatefulProcessing.ts
import { MooseCache } from "@514labs/moose-lib"; // State container for accumulating datainterface AccumulatorState {  id: string;  counter: number;  sum: number;  lastModified: Date;  attributes: Record<string, any>;} // Input message structureinterface InputMessage {  id: string;  groupId: string;  numericValue: number;  messageType: string;  timestamp: Date;  payload: Record<string, any>;} const messageStream = new Stream<InputMessage>("input-stream"); messageStream.addConsumer(async (message: InputMessage) => {  // Get distributed cache instance  const cache = await MooseCache.get();  const cacheKey = `state:${message.groupId}`;    // Load existing state or create new one  let state: AccumulatorState | null = await cache.get<AccumulatorState>(cacheKey);    if (!state) {    // Initialize new state    state = {      id: message.groupId,      counter: 0,      sum: 0,      lastModified: new Date(),      attributes: {}    };  }    // Apply message to state  state.counter += 1;  state.sum += message.numericValue;  state.lastModified = message.timestamp;  state.attributes = { ...state.attributes, ...message.payload };    // Determine cache lifetime based on message type  const ttlSeconds = message.messageType === 'complete' ? 60 : 3600;    if (message.messageType === 'complete' || shouldFinalize(state)) {    // Finalize and remove state    await finalizeState(state);    await cache.delete(cacheKey);  } else {    // Persist updated state    await cache.set(cacheKey, state, ttlSeconds);  }}); // Condition for automatic state finalizationfunction shouldFinalize(state: AccumulatorState): boolean {  const threshold = 100;  const timeLimit = 30 * 60 * 1000; // 30 minutes  const elapsed = new Date().getTime() - state.lastModified.getTime();    return state.counter >= threshold || elapsed > timeLimit;} async function finalizeState(state: AccumulatorState): Promise<void> {  console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`);}

Propagating Events to External Systems

You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur:

HTTP API Calls

Send processed data to external APIs:

HttpIntegration.ts
interface WebhookPayload {  id: string;  data: Record<string, any>;  timestamp: Date;} const webhookStream = new Stream<WebhookPayload>("webhook-events"); webhookStream.addConsumer(async (payload: WebhookPayload) => {  try {    // Send to external webhook    const response = await fetch('https://external-api.com/webhook', {      method: 'POST',      headers: {        'Content-Type': 'application/json',        'Authorization': 'Bearer ' + process.env.API_TOKEN      },      body: JSON.stringify({        eventId: payload.id,        eventData: payload.data,        processedAt: new Date().toISOString()      })    });     if (!response.ok) {      throw new Error(`HTTP ${response.status}: ${response.statusText}`);    }     console.log(`Successfully sent event ${payload.id} to external API`);  } catch (error) {    console.error(`Failed to send event ${payload.id}:`, error);    // Could implement retry logic or dead letter queue here  }});

Database Operations

Write processed data to external databases:

DatabaseIntegration.ts
import { createConnection } from 'mysql2/promise'; interface DatabaseRecord {  id: string;  category: string;  value: number;  metadata: Record<string, any>;  timestamp: Date;} const dbStream = new Stream<DatabaseRecord>("database-events"); // Initialize database connectionconst dbConfig = {  host: process.env.DB_HOST,  user: process.env.DB_USER,  password: process.env.DB_PASSWORD,  database: process.env.DB_NAME}; dbStream.addConsumer(async (record: DatabaseRecord) => {  const connection = await createConnection(dbConfig);    try {    // Insert record into external database    await connection.execute(      'INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES (?, ?, ?, ?, ?)',      [        record.id,        record.category,        record.value,        JSON.stringify(record.metadata),        record.timestamp      ]    );     console.log(`Inserted record ${record.id} into database`);  } catch (error) {    console.error(`Database insert failed for record ${record.id}:`, error);  } finally {    await connection.end();  }});

File System Operations

Write processed data to files or cloud storage:

FileSystemIntegration.ts
import { writeFile, mkdir } from 'fs/promises';import { join } from 'path'; interface FileOutput {  id: string;  filename: string;  content: string;  directory: string;  format: 'json' | 'csv' | 'txt';} const fileStream = new Stream<FileOutput>("file-events"); fileStream.addConsumer(async (output: FileOutput) => {  try {    // Ensure directory exists    await mkdir(output.directory, { recursive: true });        // Format content based on type    let formattedContent: string;    switch (output.format) {      case 'json':        formattedContent = JSON.stringify(JSON.parse(output.content), null, 2);        break;      case 'csv':        formattedContent = output.content; // Assume already CSV formatted        break;      default:        formattedContent = output.content;    }        // Write file with timestamp    const timestamp = new Date().toISOString().replace(/[:.]/g, '-');    const filename = `${output.filename}_${timestamp}.${output.format}`;    const filepath = join(output.directory, filename);        await writeFile(filepath, formattedContent, 'utf8');        console.log(`Written file: ${filepath}`);  } catch (error) {    console.error(`Failed to write file for output ${output.id}:`, error);  }});

Email and Notifications

Send alerts and notifications based on processed events:

NotificationIntegration.ts
import nodemailer from 'nodemailer'; interface NotificationEvent {  id: string;  type: 'email' | 'slack' | 'webhook';  recipient: string;  subject: string;  message: string;  priority: 'low' | 'medium' | 'high';  metadata: Record<string, any>;} const notificationStream = new Stream<NotificationEvent>("notifications"); // Configure email transporterconst emailTransporter = nodemailer.createTransporter({  host: process.env.SMTP_HOST,  port: parseInt(process.env.SMTP_PORT || '587'),  secure: false,  auth: {    user: process.env.SMTP_USER,    pass: process.env.SMTP_PASS  }}); notificationStream.addConsumer(async (notification: NotificationEvent) => {  try {    switch (notification.type) {      case 'email':        await emailTransporter.sendMail({          from: process.env.SMTP_FROM,          to: notification.recipient,          subject: notification.subject,          text: notification.message,          html: `<div>            <h3>${notification.subject}</h3>            <p>${notification.message}</p>            <p><em>Priority: ${notification.priority}</em></p>          </div>`        });        break;              case 'slack':        await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, {          method: 'POST',          headers: { 'Content-Type': 'application/json' },          body: JSON.stringify({            text: notification.message,            channel: notification.recipient,            username: 'Moose Alert',            icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:'          })        });        break;              case 'webhook':        await fetch(notification.recipient, {          method: 'POST',          headers: { 'Content-Type': 'application/json' },          body: JSON.stringify({            id: notification.id,            subject: notification.subject,            message: notification.message,            priority: notification.priority,            metadata: notification.metadata          })        });        break;    }        console.log(`Sent ${notification.type} notification ${notification.id}`);  } catch (error) {    console.error(`Failed to send notification ${notification.id}:`, error);  }});

On this page

OverviewBasic UsageProcessing PatternsStateful Processing with MooseCachePropagating Events to External SystemsHTTP API Calls
FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplatesGuides
Release Notes
Source523
  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streaming
    • Manage Streams
    • Create Streams
    • Sync to OLAP
    • Dead Letter Queues
    • Functions
    • Consumer Functions
    • Transformation Functions
    • Writing to Streams
    • From Your Code
    • Schema Registry
    • From CDC Services
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework
StreamConsumer.ts
import { Stream } from "@514labs/moose-lib"; interface UserEvent {  id: string;  userId: string;  timestamp: Date;  eventType: string;} const userEventsStream = new Stream<UserEvent>("user-events"); // Add a consumer to process eventsuserEventsStream.addConsumer((event: UserEvent) => {  console.log(`Processing event: ${event.id}`);  console.log(`User: ${event.userId}, Type: ${event.eventType}`);    // Your processing logic here  // e.g., update analytics, send notifications, etc.}); // Add multiple consumers for different purposesuserEventsStream.addConsumer((event: UserEvent) => {  // Analytics processing  if (event.eventType === 'purchase') {    updatePurchaseAnalytics(event);  }}); userEventsStream.addConsumer((event: UserEvent) => {  // Notification processing  if (event.eventType === 'signup') {    sendWelcomeEmail(event.userId);  }});
StatefulProcessing.ts
import { MooseCache } from "@514labs/moose-lib"; // State container for accumulating datainterface AccumulatorState {  id: string;  counter: number;  sum: number;  lastModified: Date;  attributes: Record<string, any>;} // Input message structureinterface InputMessage {  id: string;  groupId: string;  numericValue: number;  messageType: string;  timestamp: Date;  payload: Record<string, any>;} const messageStream = new Stream<InputMessage>("input-stream"); messageStream.addConsumer(async (message: InputMessage) => {  // Get distributed cache instance  const cache = await MooseCache.get();  const cacheKey = `state:${message.groupId}`;    // Load existing state or create new one  let state: AccumulatorState | null = await cache.get<AccumulatorState>(cacheKey);    if (!state) {    // Initialize new state    state = {      id: message.groupId,      counter: 0,      sum: 0,      lastModified: new Date(),      attributes: {}    };  }    // Apply message to state  state.counter += 1;  state.sum += message.numericValue;  state.lastModified = message.timestamp;  state.attributes = { ...state.attributes, ...message.payload };    // Determine cache lifetime based on message type  const ttlSeconds = message.messageType === 'complete' ? 60 : 3600;    if (message.messageType === 'complete' || shouldFinalize(state)) {    // Finalize and remove state    await finalizeState(state);    await cache.delete(cacheKey);  } else {    // Persist updated state    await cache.set(cacheKey, state, ttlSeconds);  }}); // Condition for automatic state finalizationfunction shouldFinalize(state: AccumulatorState): boolean {  const threshold = 100;  const timeLimit = 30 * 60 * 1000; // 30 minutes  const elapsed = new Date().getTime() - state.lastModified.getTime();    return state.counter >= threshold || elapsed > timeLimit;} async function finalizeState(state: AccumulatorState): Promise<void> {  console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`);}
HttpIntegration.ts
interface WebhookPayload {  id: string;  data: Record<string, any>;  timestamp: Date;} const webhookStream = new Stream<WebhookPayload>("webhook-events"); webhookStream.addConsumer(async (payload: WebhookPayload) => {  try {    // Send to external webhook    const response = await fetch('https://external-api.com/webhook', {      method: 'POST',      headers: {        'Content-Type': 'application/json',        'Authorization': 'Bearer ' + process.env.API_TOKEN      },      body: JSON.stringify({        eventId: payload.id,        eventData: payload.data,        processedAt: new Date().toISOString()      })    });     if (!response.ok) {      throw new Error(`HTTP ${response.status}: ${response.statusText}`);    }     console.log(`Successfully sent event ${payload.id} to external API`);  } catch (error) {    console.error(`Failed to send event ${payload.id}:`, error);    // Could implement retry logic or dead letter queue here  }});
DatabaseIntegration.ts
import { createConnection } from 'mysql2/promise'; interface DatabaseRecord {  id: string;  category: string;  value: number;  metadata: Record<string, any>;  timestamp: Date;} const dbStream = new Stream<DatabaseRecord>("database-events"); // Initialize database connectionconst dbConfig = {  host: process.env.DB_HOST,  user: process.env.DB_USER,  password: process.env.DB_PASSWORD,  database: process.env.DB_NAME}; dbStream.addConsumer(async (record: DatabaseRecord) => {  const connection = await createConnection(dbConfig);    try {    // Insert record into external database    await connection.execute(      'INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES (?, ?, ?, ?, ?)',      [        record.id,        record.category,        record.value,        JSON.stringify(record.metadata),        record.timestamp      ]    );     console.log(`Inserted record ${record.id} into database`);  } catch (error) {    console.error(`Database insert failed for record ${record.id}:`, error);  } finally {    await connection.end();  }});
FileSystemIntegration.ts
import { writeFile, mkdir } from 'fs/promises';import { join } from 'path'; interface FileOutput {  id: string;  filename: string;  content: string;  directory: string;  format: 'json' | 'csv' | 'txt';} const fileStream = new Stream<FileOutput>("file-events"); fileStream.addConsumer(async (output: FileOutput) => {  try {    // Ensure directory exists    await mkdir(output.directory, { recursive: true });        // Format content based on type    let formattedContent: string;    switch (output.format) {      case 'json':        formattedContent = JSON.stringify(JSON.parse(output.content), null, 2);        break;      case 'csv':        formattedContent = output.content; // Assume already CSV formatted        break;      default:        formattedContent = output.content;    }        // Write file with timestamp    const timestamp = new Date().toISOString().replace(/[:.]/g, '-');    const filename = `${output.filename}_${timestamp}.${output.format}`;    const filepath = join(output.directory, filename);        await writeFile(filepath, formattedContent, 'utf8');        console.log(`Written file: ${filepath}`);  } catch (error) {    console.error(`Failed to write file for output ${output.id}:`, error);  }});
NotificationIntegration.ts
import nodemailer from 'nodemailer'; interface NotificationEvent {  id: string;  type: 'email' | 'slack' | 'webhook';  recipient: string;  subject: string;  message: string;  priority: 'low' | 'medium' | 'high';  metadata: Record<string, any>;} const notificationStream = new Stream<NotificationEvent>("notifications"); // Configure email transporterconst emailTransporter = nodemailer.createTransporter({  host: process.env.SMTP_HOST,  port: parseInt(process.env.SMTP_PORT || '587'),  secure: false,  auth: {    user: process.env.SMTP_USER,    pass: process.env.SMTP_PASS  }}); notificationStream.addConsumer(async (notification: NotificationEvent) => {  try {    switch (notification.type) {      case 'email':        await emailTransporter.sendMail({          from: process.env.SMTP_FROM,          to: notification.recipient,          subject: notification.subject,          text: notification.message,          html: `<div>            <h3>${notification.subject}</h3>            <p>${notification.message}</p>            <p><em>Priority: ${notification.priority}</em></p>          </div>`        });        break;              case 'slack':        await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, {          method: 'POST',          headers: { 'Content-Type': 'application/json' },          body: JSON.stringify({            text: notification.message,            channel: notification.recipient,            username: 'Moose Alert',            icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:'          })        });        break;              case 'webhook':        await fetch(notification.recipient, {          method: 'POST',          headers: { 'Content-Type': 'application/json' },          body: JSON.stringify({            id: notification.id,            subject: notification.subject,            message: notification.message,            priority: notification.priority,            metadata: notification.metadata          })        });        break;    }        console.log(`Sent ${notification.type} notification ${notification.id}`);  } catch (error) {    console.error(`Failed to send notification ${notification.id}:`, error);  }});