In Moose, data models are just TypeScript interfaces (TypeScript) or Pydantic models (Python) that become the authoritative source for your infrastructure schemas.
Data Models are used to define:
Analytical backends are unique in that they typically have to coordinate schemas across multiple systems that each have their own type systems and constraints.
Consider a typical pipeline for ingesting events into a ClickHouse table.
// What you're building:// API endpoint → Kafka topic → ClickHouse table → Analytics API // Traditional approach: Define schema 4 times// 1. API validation schemaconst apiSchema = z.object({ userId: z.string(), eventType: z.enum(["click", "view", "purchase"]), timestamp: z.string().datetime()}); // 2. Kafka schema (Avro/JSON Schema)const kafkaSchema = { type: "record", fields: [ { name: "userId", type: "string" }, { name: "eventType", type: "string" }, { name: "timestamp", type: "string" } ]}; // 3. ClickHouse DDL// CREATE TABLE events (// user_id String,// event_type LowCardinality(String),// timestamp DateTime// ) ENGINE = MergeTree() // 4. Analytics API response typeinterface EventsResponse { userId: string; eventType: string; timestamp: Date;}The Problem: When you add a field or change a type, you must update it in multiple places. Miss one, and you get:
With Moose you define your schemas in native language types with optional metadata. This lets you reuse your schemas across multiple systems:
// 1. Define your schema (WHAT your data looks like)interface MyDataModel { primaryKey: Key<string>; someString: string & LowCardinality; someNumber: number; someDate: Date; someJson: Record<string, any>;} // This single interface can be reused across multiple systems:const pipeline = new IngestPipeline<MyDataModel>("MyDataPipeline", { ingestApi: true, // POST API endpoint stream: true, // Kafka topic table: { // ClickHouse table orderByFields: ["primaryKey", "someDate"] }});The key idea is leveraging Union Types to extend base TypeScript types with "metadata" that represents specific optimizations and details on how to either:
interface Event { // Base type: string // ClickHouse: String with primary key id: Key<string>; // Base type: string // ClickHouse: Decimal(10,2) for precise money amount: string & ClickHouseDecimal<10, 2>; // Base type: string // ClickHouse: LowCardinality(String) for enums status: string & LowCardinality; // Base type: Date // ClickHouse: DateTime createdAt: Date;} // In your application code:const event: Event = { id: "id_123", amount: "99.99", // Just a string in TypeScript status: "completed", // Just a string in TypeScript createdAt: new Date()}; // In ClickHouse:// CREATE TABLE events (// id String,// amount Decimal(10,2),// status LowCardinality(String),// created_at DateTime// ) ENGINE = MergeTree()// ORDER BY transaction_idThe metadata annotations are compile-time only - they don't affect your runtime code. Your application works with regular strings and numbers, while Moose uses the metadata to generate optimized infrastructure.
Let's walk through how to model data for different infrastructure components and see how types behave across them.
A basic data model that works identically across all infrastructure components:
export interface SimpleShared { id: string; name: string; value: number; timestamp: Date;} // This SAME model creates all infrastructureconst pipeline = new IngestPipeline<SimpleShared>("simple_shared", { ingestApi: true, // Creates: POST /ingest/simple_shared stream: true, // Creates: Kafka topic table: true // Creates: ClickHouse table}); // The exact same types work everywhere:// - API validates: { id: "123", name: "test", value: 42, timestamp: "2024-01-01T00:00:00Z" }// - Kafka stores: { id: "123", name: "test", value: 42, timestamp: "2024-01-01T00:00:00Z" }// - ClickHouse table: id String, name String, value Float64, timestamp DateTimeKey Point: One model definition creates consistent schemas across all systems.
Complex types including nested objects, arrays, and enums work seamlessly across all components:
import { Key } from "@514labs/moose-lib"; export interface CompositeShared { id: Key<string>; // Primary key status: "active" | "pending" | "completed"; // Enum // Nested object metadata: { category: string; priority: number; tags: string[]; }; // Arrays and maps values: number[]; attributes: Record<string, any>; // Optional field description?: string; createdAt: Date;} // Using in IngestPipeline - all types preservedconst pipeline = new IngestPipeline<CompositeShared>("composite_shared", { ingestApi: true, stream: true, table: true}); // How the types map:// - API validates nested structure and enum values// - Kafka preserves the exact JSON structure// - ClickHouse creates:// - id String (with PRIMARY KEY)// - status Enum8('active', 'pending', 'completed')// - metadata.category String, metadata.priority Float64, metadata.tags Array(String)// - values Array(Float64)// - attributes String (JSON)// - description Nullable(String)// - createdAt DateTimeKey Point: Complex types including nested objects and arrays work consistently across all infrastructure.
ClickHouse type annotations optimize database performance but are transparent to other infrastructure:
import { Key, Decimal, ClickHouseDecimal, LowCardinality, ClickHouseNamedTuple } from "@514labs/moose-lib"; export interface ClickHouseOptimized { id: Key<string>; // ClickHouse-specific type annotations amount: Decimal<10, 2>; // Decimal(10,2) in ClickHouse // Alternative: amount: string & ClickHouseDecimal<10, 2>; // Verbose syntax still works category: string & LowCardinality; // LowCardinality(String) in ClickHouse // Optimized nested type details: { name: string; value: number; } & ClickHouseNamedTuple; // NamedTuple in ClickHouse timestamp: Date;} // SCENARIO 1: Standalone OlapTable - gets all optimizationsconst table = new OlapTable<ClickHouseOptimized>("optimized_table", { orderByFields: ["id", "timestamp"]});// Creates ClickHouse table with:// - amount Decimal(10,2)// - category LowCardinality(String)// - details Tuple(name String, value Float64) // SCENARIO 2: IngestPipeline - optimizations ONLY in ClickHouseconst pipeline = new IngestPipeline<ClickHouseOptimized>("optimized_pipeline", { ingestApi: true, stream: true, table: true}); // What happens at each layer:// 1. API receives/validates: { amount: "123.45", category: "electronics", ... }// - Sees amount as string, category as string (annotations ignored)// 2. Kafka stores: { amount: "123.45", category: "electronics", ... }// - Plain JSON, no ClickHouse types// 3. ClickHouse table gets optimizations:// - amount stored as Decimal(10,2)// - category stored as LowCardinality(String)// - details stored as NamedTupleKey Point: ClickHouse annotations are metadata that ONLY affect the database schema. Your application code and other infrastructure components see regular TypeScript/Python types.
APIs use runtime validation to ensure query parameters meet your requirements:
import { tags, Api } from "@514labs/moose-lib"; // Query parameters with runtime validationinterface SearchParams { // Date range validation startDate: string & tags.Format<"date">; // Must be YYYY-MM-DD endDate: string & tags.Format<"date">; // Numeric constraints minValue?: number & tags.Minimum<0>; // Optional, but if provided >= 0 maxValue?: number & tags.Maximum<1000>; // Optional, but if provided <= 1000 // String validation category?: string & tags.MinLength<2> & tags.MaxLength<50>; // Pagination page?: number & tags.Type<"int32"> & tags.Minimum<1>; limit?: number & tags.Type<"int32"> & tags.Minimum<1> & tags.Maximum<100>;} // Response data modelinterface SearchResult { id: string; name: string; value: number; category: string; timestamp: Date;} // Create validated API endpointconst searchAPI = new Api<SearchParams, SearchResult[]>( "search", async (params, { client }) => { // Params are already validated when this runs const query = ` SELECT * FROM data_table WHERE timestamp >= {startDate: Date} AND timestamp <= {endDate: Date} ${params.minValue ? `AND value >= {minValue: Float64}` : ''} ${params.maxValue ? `AND value <= {maxValue: Float64}` : ''} ${params.category ? `AND category = {category: String}` : ''} LIMIT {limit: UInt32} OFFSET {offset: UInt32} `; return client.query(query, { startDate: params.startDate, endDate: params.endDate, minValue: params.minValue, maxValue: params.maxValue, category: params.category, limit: params.limit || 10, offset: ((params.page || 1) - 1) * (params.limit || 10) }); }); // API Usage Examples:// ✅ Valid: GET /api/search?startDate=2024-01-01&endDate=2024-01-31// ✅ Valid: GET /api/search?startDate=2024-01-01&endDate=2024-01-31&minValue=100&limit=50// ❌ Invalid: GET /api/search?startDate=Jan-1-2024 (wrong date format)// ❌ Invalid: GET /api/search?startDate=2024-01-01&endDate=2024-01-31&limit=200 (exceeds max)Key Point: Runtime validators ensure API consumers provide valid data, returning clear error messages for invalid requests before any database queries run.
When you need to process data in real-time before it hits the database:
import { Key, LowCardinality } from "@514labs/moose-lib"; // Raw data from external sourceinterface RawData { id: Key<string>; timestamp: Date; rawPayload: string; sourceType: string & LowCardinality;} // Processed data after transformationinterface ProcessedData { id: Key<string>; timestamp: Date; field1: string; field2: string & LowCardinality; numericValue: number; attributes: Record<string, any>;} // Create stream with transformationconst rawStream = new Stream<RawData>("raw-stream");const processedStream = new Stream<ProcessedData>("processed-stream"); // Transform raw data to processedrawStream.addConsumer(async (raw: RawData) => { const parsed = JSON.parse(raw.rawPayload); const processed: ProcessedData = { id: raw.id, timestamp: raw.timestamp, field1: parsed.field_1, field2: parsed.field_2, numericValue: parseFloat(parsed.value) || 0, attributes: parsed.attributes || {} }; await processedStream.publish(processed);}); // Sink to ClickHouseconst table = new OlapTable<ProcessedData>("processed_data", { stream: processedStream, orderByFields: ["id", "timestamp"]});Define strongly-typed inputs and outputs for async jobs:
import { Task, tags } from "@514labs/moose-lib"; // Input validation with constraintsinterface TaskInput { id: string & tags.Format<"uuid">; items: string[]; taskType: "typeA" | "typeB" | "typeC"; options?: { includeMetadata: boolean; maxItems?: number & tags.Minimum<1> & tags.Maximum<100>; };} // Structured outputinterface TaskOutput { id: string; processedAt: Date; resultA?: { category: string; score: number; details: Record<string, any>; }; resultB?: { values: string[]; metrics: number[]; }; resultC?: { field1: string; field2: string; field3: number; };} // Create workflow taskconst exampleTask = new Task<TaskInput, TaskOutput>( "example-task", { run: async (ctx) => { // Process data based on task type const output: TaskOutput = { id: ctx.input.id, processedAt: new Date() }; if (ctx.input.taskType === "typeA") { output.resultA = await processTypeA(ctx.input); } return output; }, retries: 3, timeout: 30000 // 30 seconds });import { Task, tags } from "@514labs/moose-lib"; // Input validation with constraintsinterface TaskInput { id: string & tags.Format<"uuid">; items: string[]; taskType: "typeA" | "typeB" | "typeC"; options?: { includeMetadata: boolean; maxItems?: number & tags.Minimum<1> & tags.Maximum<100>; };} // Structured outputinterface TaskOutput { id: string; processedAt: Date; resultA?: { category: string; score: number; details: Record<string, any>; }; resultB?: { values: string[]; metrics: number[]; }; resultC?: { field1: string; field2: string; field3: number; };} // Create workflow taskconst exampleTask = new Task<TaskInput, TaskOutput>( "example-task", { run: async (ctx) => { // Process data based on task type const output: TaskOutput = { id: ctx.input.id, processedAt: new Date() }; if (ctx.input.taskType === "typeA") { output.resultA = await processTypeA(ctx.input); } return output; }, retries: 3, timeout: 30000 // 30 seconds });