Moose

Developing

Stream Processing

Stream Processing

Viewing typescript

switch to python

Overview

Streams serve as the transport layer between your data sources and database tables. Built On Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data.

Working with streams

Type safe transport layer

The schema of your Redpanda topic is derived from the data model you define

Buffer data from APIs

Data is buffered in the stream to protect against data loss during high load or service disruptions

Chain transformations on the fly

Add transformations between streams to process, reshape, and enrich data in-flight before landing in your database tables

Sync data to destination tables

Moose automatically syncs data from streams to your database tables when a destination is specified in your stream configuration

Creating Streams

You can create streams in two ways:

  • High-level: Using the IngestPipeline class (recommended)
  • Low-level: Manually configuring the Stream component

Streams for Ingestion

The IngestPipeline class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion:

IngestionStream.ts
import { IngestPipeline, Key } from "@514labs/moose-lib";
 
interface RawData {
  id: Key<string>;
  value: number;
}
 
const rawIngestionStream = new IngestPipeline<RawData>("raw_data", {
  ingest: true, // Creates an ingestion API endpoint at `POST /ingest/raw_data`
  stream: true, // Buffers data between the ingestion API and the database table
  table: true // Creates an OLAP table named `raw_data`
});

Explanation

Ingest API Writes to Stream

Data sent to the `POST /ingest/raw_data` endpoint is buffered in the `raw_data` stream

Stream to Table Sync

Moose automatically executes a process to sync the data from the `raw_data` stream to the `raw_data` OLAP table

Destination Table

Data is stored in the `raw_data` OLAP table

Streams for Transformations

If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data:

Single Stream Transformation

TransformDestinationStream.ts
import { IngestPipeline, Key } from "@514labs/moose-lib";
 
interface RawData {
  id: Key<string>;
  value: number;
}
 
interface TransformedData {
  id: Key<string>;
  transformedValue: number;
  transformedAt: Date;
}
 
// Configure components for raw data ingestion & buffering
const rawData = new IngestPipeline<RawData>("raw_data", {
  ingest: true,
  stream: true, // Buffers data between the ingestion API and the database table
  table: false // Don't create a table for the raw data
});
 
// Create a table for the transformed data
const transformedData = new IngestPipeline<TransformedData>("transformed_data", {
  ingest: false, // Don't create an ingestion API for the transformed data
  stream: true, // Create destination stream for the transformed data
  table: true // Create a table for the transformed data
});
 
rawData.stream.addTransform(transformedData.stream, (record) => ({
  id: record.id,
  transformedValue: record.value * 2,
  transformedAt: new Date()
}));

Chaining Transformations

For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful:

ChainedTransformations.ts
import { IngestPipeline, Key } from "@514labs/moose-lib";
 
// Define the schema for raw input data
interface RawData {
  id: Key<string>;
  value: number;
}
 
// Define the schema for intermediate transformed data
interface IntermediateData {
  id: Key<string>;
  transformedValue: number;
  transformedAt: Date;
}
 
// Define the schema for final transformed data
interface FinalData {
  id: Key<string>;
  transformedValue: number;
  anotherTransformedValue: number;
  transformedAt: Date;
}
 
// Create the first pipeline for raw data ingestion
// Only create an API and a stream (no table) since we're ingesting the raw data
const rawData = new IngestPipeline<RawData>("raw_data", {
  ingest: true,   // Enable HTTP ingestion endpoint
  stream: true,   // Create a stream to buffer data
  table: false    // Don't store raw data in a table
});
 
// Create an intermediate stream to hold data between transformations (no api or table needed)
const intermediateStream = new Stream<IntermediateData>("intermediate_stream");
 
// First transformation: double the value and add timestamp
rawData.stream.addTransform(intermediateStream, (record) => ({
  id: record.id,
  transformedValue: record.value * 2,  // Double the original value
  transformedAt: new Date()            // Add current timestamp
}));
 
// Create the final pipeline that will store the fully transformed data
const finalData = new IngestPipeline<FinalData>("final_stream", {
  ingest: false,  // No direct ingestion to this pipeline
  stream: true,   // Create a stream for processing
  table: true     // Store final results in a table
});
 
// Second transformation: further transform the intermediate data
intermediateStream.addTransform(finalData.stream, (record) => ({
  id: record.id,
  transformedValue: record.transformedValue * 2,       // Double the intermediate value
  anotherTransformedValue: record.transformedValue * 3, // Triple the intermediate value
  transformedAt: new Date()                            // Update timestamp
}));

Implementing Transformations

Reshape and Enrich Data

Transform data shape or enrich records:

DataTransform.ts
import { Stream, Key } from "@514labs/moose-lib";
 
interface RawEvent {
  id: Key<string>;
  timestamp: string;
  data: {
    user_id: string;
    platform: string;
    app_version: string;
    ip_address: string;
  }
}
 
interface EnrichedEvent {
  eventId: Key<string>;
  timestamp: Date;
  userId: Key<string>;
  properties: {
    platform: string;
    version: string;
    country: string;
  };
  metadata: {
    originalTimestamp: string;
    processedAt: Date;
  }
}
 
const rawStream = new Stream<RawEvent>("raw_events");
const enrichedStream = new Stream<EnrichedEvent>("enriched_events");
 
// Reshape and enrich data
rawStream.addTransform(enrichedStream, async (record: RawEvent) => ({
  eventId: record.id,
  timestamp: new Date(record.timestamp),
  userId: record.data.user_id,
  properties: {
    platform: record.data.platform || 'unknown',
    version: record.data.app_version,
    country: await lookupCountry(record.data.ip_address)
  },
  metadata: {
    originalTimestamp: record.timestamp,
    processedAt: new Date()
  }
}));

Filtering

Remove or filter records based on conditions:

FilterStream.ts
interface MetricRecord {
  id: string;
  name: string;
  value: number;
  timestamp: Date;
}
 
const inputStream = new Stream<MetricRecord>("input_metrics");
const validMetrics = new Stream<MetricRecord>("valid_metrics");
 
// Multiple filtering conditions
inputStream.addTransform(validMetrics, (record) => {
  // Filter out records with invalid values
  if (isNaN(record.value) || record.value < 0) {
    return undefined;
  }
 
  // Filter out old records
  if (record.timestamp < getStartOfDay()) {
    return undefined;
  }
 
  // Filter out specific metrics
  if (record.name.startsWith('debug_')) {
    return undefined;
  }
 
  return record;
});

Fan Out (1:N)

Send data to multiple downstream processors:

FanOut.ts
interface Order {
  orderId: string;
  userId: string;
  amount: number;
  items: string[];
}
 
interface HighPriorityOrder extends Order {
  priority: 'high';
}
 
interface ArchivedOrder extends Order {
  archivedAt: Date;
}
 
// Define destination streams
const analyticsStream = new Stream<Order>("order_analytics");
const notificationStream = new Stream<HighPriorityOrder>("order_notifications");
const archiveStream = new Stream<ArchivedOrder>("order_archive");
 
// Source stream
const orderStream = new Stream<Order>("orders");
 
// Send all orders to analytics
orderStream.addTransform(analyticsStream, (order) => order);
 
// Send large orders to notifications
orderStream.addTransform(notificationStream, (order) => {
  if (order.amount > 1000) {
    return {
      ...order,
      priority: 'high'
    };
  }
  return undefined; // Skip small orders
});
 
// Archive all orders
orderStream.addTransform(archiveStream, (order) => ({
  ...order,
  archivedAt: new Date()
}));

Fan In (N:1)

Combine data from multiple sources:

FanIn.ts
import { Stream, OlapTable, Key } from "@514labs/moose-lib";
 
interface UserEvent {
  userId: Key<string>;
  eventType: string;
  timestamp: Date;
  source: string;
}
 
// Source streams
const webEvents = new Stream<UserEvent>("web_events");
const mobileEvents = new Stream<UserEvent>("mobile_events");
const apiEvents = new Stream<UserEvent>("api_events");
 
// Create a stream and table for the combined events
const eventsTable = new OlapTable<UserEvent>("all_events");
const allEvents = new Stream<UserEvent>("all_events", {
  destination: eventsTable
});
 
// Fan in from web
webEvents.addTransform(allEvents, (event) => ({
  ...event,
  source: 'web',
  timestamp: new Date()
}));
 
// Fan in from mobile
mobileEvents.addTransform(allEvents, (event) => ({
  ...event,
  source: 'mobile',
  timestamp: new Date()
}));
 
// Fan in from API
apiEvents.addTransform(allEvents, (event) => ({
  ...event,
  source: 'api',
  timestamp: new Date()
}));

Unnesting

Flatten nested records:

Unnest.ts
import { Stream, Key } from "@514labs/moose-lib";
 
interface NestedRecord {
  id: Key<string>;
  nested: {
    value: number;
  }[];
}
 
interface FlattenedRecord {
  id: Key<string>;
  value: number;
}
 
const nestedStream = new Stream<NestedRecord>("nested_records");
const flattenedStream = new Stream<FlattenedRecord>("flattened_records");
 
nestedStream.addTransform(flattenedStream, (record) => record.nested.map((n) => ({
  id: record.id,
  value: n.value
})));

See API Reference for all available transform options.

Stream Configurations

Parallelism and Retention

StreamConfig.ts
const highThroughputStream = new Stream<Data>("high_throughput", {
  parallelism: 4,              // Process 4 records simultaneously
  retentionPeriod: 86400      // Keep data for 1 day
});

See the API Reference for complete configuration options.