Data Modeling
Viewing:
Overview
In Moose, data models are just TypeScript interfacesPydantic models that become the authoritative source for your infrastructure schemas.
Data Models are used to define:
- OLAP Tables and Materialized Views (automatically generated DDL)
- Redpanda/Kafka Streams (schema registry and topic validation)
- API Contracts (request/response validation and OpenAPI specs)
- Workflow Task Input and Output Types (typed function inputs/outputs)
Philosophy
Problem: Analytical Backends are Prone to Schema Drift
Analytical backends are unique in that they typically have to coordinate schemas across multiple systems that each have their own type systems and constraints.
Consider a typical pipeline for ingesting events into a ClickHouse table.
// What you're building:
// API endpoint → Kafka topic → ClickHouse table → Analytics API
// Traditional approach: Define schema 4 times
// 1. API validation schema
const apiSchema = z.object({
userId: z.string(),
eventType: z.enum(["click", "view", "purchase"]),
timestamp: z.string().datetime()
});
// 2. Kafka schema (Avro/JSON Schema)
const kafkaSchema = {
type: "record",
fields: [
{ name: "userId", type: "string" },
{ name: "eventType", type: "string" },
{ name: "timestamp", type: "string" }
]
};
// 3. ClickHouse DDL
// CREATE TABLE events (
// user_id String,
// event_type LowCardinality(String),
// timestamp DateTime
// ) ENGINE = MergeTree()
// 4. Analytics API response type
interface EventsResponse {
userId: string;
eventType: string;
timestamp: Date;
}
# What you're building:
# API endpoint → Kafka topic → ClickHouse table → Analytics API
# Traditional approach: Define schema 4 times
# 1. API validation with Pydantic
class APIEvent(BaseModel):
user_id: str
event_type: Literal["click", "view", "purchase"]
timestamp: datetime
# 2. Kafka schema registration
kafka_schema = {
"type": "record",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "string"}
]
}
# 3. ClickHouse DDL
# CREATE TABLE events (
# user_id String,
# event_type LowCardinality(String),
# timestamp DateTime
# ) ENGINE = MergeTree()
# 4. Analytics API response
class EventsResponse(BaseModel):
user_id: str
event_type: str
timestamp: datetime
The Problem: When you add a field or change a type, you must update it in multiple places. Miss one, and you get:
- Silent data loss (Kafka → ClickHouse sync fails)
- Runtime errors
- Data quality issues (validation gaps)
Solution: Model In Code, Reuse Everywhere
With Moose you define your schemas in native language types with optional metadata. This lets you reuse your schemas across multiple systems:
// 1. Define your schema (WHAT your data looks like)
interface MyDataModel {
primaryKey: Key<string>;
someString: string & LowCardinality;
someNumber: number;
someDate: Date;
someJson: Record<string, any>;
}
// This single interface can be reused across multiple systems:
const pipeline = new IngestPipeline<MyDataModel>("MyDataPipeline", {
ingest: true, // POST API endpoint
stream: true, // Kafka topic
table: { // ClickHouse table
orderByFields: ["primaryKey", "someDate"],
deduplicate: true
}
});
# 1. Define your schema (WHAT your data looks like)
class MyFirstDataModel(BaseModel):
id: Key[str]
some_string: Annotated[str, "LowCardinality"]
some_number: int
some_date: datetime
some_json: Any
# This single model can be reused across multiple systems:
my_first_pipeline = IngestPipeline[MyFirstDataModel]("my_first_pipeline", IngestPipelineConfig(
ingest=True, # POST API endpoint
stream=True, # Kafka topic
table=True # ClickHouse table
))
Benefits:
End-to-end type safety across your code and infrastructure
Full control over your infrastructure with code
Zero schema drift - change your types in one place, automatically update your infrastructure
How It Works
The key idea is leveraging Union Types to extend base TypeScript types with “metadata” that represents specific optimizations and details on how to either:
- map that type in ClickHouse
- validate the data at runtime
interface Event {
// Base type: string
// ClickHouse: String with primary key
id: Key<string>;
// Base type: string
// ClickHouse: Decimal(10,2) for precise money
amount: string & ClickHouseDecimal<10, 2>;
// Base type: string
// ClickHouse: LowCardinality(String) for enums
status: string & LowCardinality;
// Base type: Date
// ClickHouse: DateTime
createdAt: Date;
}
// In your application code:
const event: Event = {
id: "id_123",
amount: "99.99", // Just a string in TypeScript
status: "completed", // Just a string in TypeScript
createdAt: new Date()
};
// In ClickHouse:
// CREATE TABLE events (
// id String,
// amount Decimal(10,2),
// status LowCardinality(String),
// created_at DateTime
// ) ENGINE = MergeTree()
// ORDER BY transaction_id
The key idea is leveraging Annotated types to extend base Python types with “metadata” that represents specific optimizations and details on how to map that type in ClickHouse:
from moose_lib import Key, clickhouse_decimal
from typing import Annotated
class Model(BaseModel):
# Base type: str
# ClickHouse: String with primary key
id: Key[str]
# Base type: Decimal
# ClickHouse: Decimal(10,2) for precise money
amount: clickhouse_decimal(10, 2)
# Base type: str
# ClickHouse: LowCardinality(String) for enums
status: Annotated[str, "LowCardinality"]
# Base type: datetime
# ClickHouse: DateTime
created_at: datetime
events = OlapTable[Event]("events")
# In your application code:
tx = Event(
id="id_123",
amount=Decimal("99.99"), # Regular Decimal in Python
status="completed", # Regular string in Python
created_at=datetime.now()
)
# In ClickHouse:
# CREATE TABLE events (
# id String,
# amount Decimal(10,2),
# status LowCardinality(String),
# created_at DateTime
# ) ENGINE = MergeTree()
# ORDER BY transaction_id
The metadata annotations are compile-time only - they don’t affect your runtime code. Your application works with regular strings and numbers, while Moose uses the metadata to generate optimized infrastructure.
Building Data Models: From Simple to Complex
Let’s walk through how to model data for different infrastructure components and see how types behave across them.
Simple Data Model Shared Across Infrastructure
A basic data model that works identically across all infrastructure components:
export interface SimpleShared {
id: string;
name: string;
value: number;
timestamp: Date;
}
// This SAME model creates all infrastructure
const pipeline = new IngestPipeline<SimpleShared>("simple_shared", {
ingest: true, // Creates: POST /ingest/simple_shared
stream: true, // Creates: Kafka topic
table: true // Creates: ClickHouse table
});
// The exact same types work everywhere:
// - API validates: { id: "123", name: "test", value: 42, timestamp: "2024-01-01T00:00:00Z" }
// - Kafka stores: { id: "123", name: "test", value: 42, timestamp: "2024-01-01T00:00:00Z" }
// - ClickHouse table: id String, name String, value Float64, timestamp DateTime
from pydantic import BaseModel
from datetime import datetime
class SimpleShared(BaseModel):
id: str
name: str
value: float
timestamp: datetime
# This SAME model creates all infrastructure
pipeline = IngestPipeline[SimpleShared]("simple_shared", IngestPipelineConfig(
ingest=True, # Creates: POST /ingest/simple_shared
stream=True, # Creates: Kafka topic
table=True # Creates: ClickHouse table
))
# The exact same types work everywhere:
# - API validates: { "id": "123", "name": "test", "value": 42, "timestamp": "2024-01-01T00:00:00Z" }
# - Kafka stores: { "id": "123", "name": "test", "value": 42, "timestamp": "2024-01-01T00:00:00Z" }
# - ClickHouse table: id String, name String, value Float64, timestamp DateTime
Key Point: One model definition creates consistent schemas across all systems.
Composite Types Shared Across Infrastructure
Complex types including nested objects, arrays, and enums work seamlessly across all components:
import { Key } from "@514labs/moose-lib";
export interface CompositeShared {
id: Key<string>; // Primary key
status: "active" | "pending" | "completed"; // Enum
// Nested object
metadata: {
category: string;
priority: number;
tags: string[];
};
// Arrays and maps
values: number[];
attributes: Record<string, any>;
// Optional field
description?: string;
createdAt: Date;
}
// Using in IngestPipeline - all types preserved
const pipeline = new IngestPipeline<CompositeShared>("composite_shared", {
ingest: true,
stream: true,
table: true
});
// How the types map:
// - API validates nested structure and enum values
// - Kafka preserves the exact JSON structure
// - ClickHouse creates:
// - id String (with PRIMARY KEY)
// - status Enum8('active', 'pending', 'completed')
// - metadata.category String, metadata.priority Float64, metadata.tags Array(String)
// - values Array(Float64)
// - attributes String (JSON)
// - description Nullable(String)
// - createdAt DateTime
from moose_lib import Key
from pydantic import BaseModel
from typing import List, Dict, Any, Optional, Literal
from datetime import datetime
class Metadata(BaseModel):
category: str
priority: float
tags: List[str]
class CompositeShared(BaseModel):
id: Key[str] # Primary key
status: Literal["active", "pending", "completed"] # Enum
# Nested object
metadata: Metadata
# Arrays and maps
values: List[float]
attributes: Dict[str, Any]
# Optional field
description: Optional[str] = None
created_at: datetime
# Using in IngestPipeline - all types preserved
pipeline = IngestPipeline[CompositeShared]("composite_shared", IngestPipelineConfig(
ingest=True,
stream=True,
table=True
))
# How the types map:
# - API validates nested structure and enum values
# - Kafka preserves the exact JSON structure
# - ClickHouse creates:
# - id String (with PRIMARY KEY)
# - status Enum8('active', 'pending', 'completed')
# - metadata.category String, metadata.priority Float64, metadata.tags Array(String)
# - values Array(Float64)
# - attributes String (JSON)
# - description Nullable(String)
# - created_at DateTime
Key Point: Complex types including nested objects and arrays work consistently across all infrastructure.
ClickHouse-Specific Types (Standalone vs IngestPipeline)
ClickHouse type annotations optimize database performance but are transparent to other infrastructure:
import { Key, ClickHouseDecimal, LowCardinality, ClickHouseNamedTuple } from "@514labs/moose-lib";
export interface ClickHouseOptimized {
id: Key<string>;
// ClickHouse-specific type annotations
amount: string & ClickHouseDecimal<10, 2>; // Decimal(10,2) in ClickHouse
category: string & LowCardinality; // LowCardinality(String) in ClickHouse
// Optimized nested type
details: {
name: string;
value: number;
} & ClickHouseNamedTuple; // NamedTuple in ClickHouse
timestamp: Date;
}
// SCENARIO 1: Standalone OlapTable - gets all optimizations
const table = new OlapTable<ClickHouseOptimized>("optimized_table", {
orderByFields: ["id", "timestamp"]
});
// Creates ClickHouse table with:
// - amount Decimal(10,2)
// - category LowCardinality(String)
// - details Tuple(name String, value Float64)
// SCENARIO 2: IngestPipeline - optimizations ONLY in ClickHouse
const pipeline = new IngestPipeline<ClickHouseOptimized>("optimized_pipeline", {
ingest: true,
stream: true,
table: true
});
// What happens at each layer:
// 1. API receives/validates: { amount: "123.45", category: "electronics", ... }
// - Sees amount as string, category as string (annotations ignored)
// 2. Kafka stores: { amount: "123.45", category: "electronics", ... }
// - Plain JSON, no ClickHouse types
// 3. ClickHouse table gets optimizations:
// - amount stored as Decimal(10,2)
// - category stored as LowCardinality(String)
// - details stored as NamedTuple
from moose_lib import Key, clickhouse_decimal, OlapTable, IngestPipeline, IngestPipelineConfig
from typing import Annotated
from pydantic import BaseModel
from datetime import datetime
class Details(BaseModel):
name: str
value: float
class ClickHouseOptimized(BaseModel):
id: Key[str]
# ClickHouse-specific type annotations
amount: clickhouse_decimal(10, 2) # Decimal(10,2) in ClickHouse
category: Annotated[str, "LowCardinality"] # LowCardinality(String) in ClickHouse
# Optimized nested type
details: Annotated[Details, "ClickHouseNamedTuple"] # NamedTuple in ClickHouse
timestamp: datetime
# SCENARIO 1: Standalone OlapTable - gets all optimizations
table = OlapTable[ClickHouseOptimized]("optimized_table", {
"order_by_fields": ["id", "timestamp"]
})
# Creates ClickHouse table with:
# - amount Decimal(10,2)
# - category LowCardinality(String)
# - details Tuple(name String, value Float64)
# SCENARIO 2: IngestPipeline - optimizations ONLY in ClickHouse
pipeline = IngestPipeline[ClickHouseOptimized]("optimized_pipeline", IngestPipelineConfig(
ingest=True,
stream=True,
table=True
))
# What happens at each layer:
# 1. API receives/validates: { "amount": "123.45", "category": "electronics", ... }
# - Sees amount as str, category as str (annotations ignored)
# 2. Kafka stores: { "amount": "123.45", "category": "electronics", ... }
# - Plain JSON, no ClickHouse types
# 3. ClickHouse table gets optimizations:
# - amount stored as Decimal(10,2)
# - category stored as LowCardinality(String)
# - details stored as NamedTuple
Key Point: ClickHouse annotations are metadata that ONLY affect the database schema. Your application code and other infrastructure components see regular TypeScript/Python types.
API Contracts with Runtime Validators
APIs use runtime validation to ensure query parameters meet your requirements:
import { tags, ConsumptionAPI } from "@514labs/moose-lib";
// Query parameters with runtime validation
interface SearchParams {
// Date range validation
startDate: string & tags.Format<"date">; // Must be YYYY-MM-DD
endDate: string & tags.Format<"date">;
// Numeric constraints
minValue?: number & tags.Minimum<0>; // Optional, but if provided >= 0
maxValue?: number & tags.Maximum<1000>; // Optional, but if provided <= 1000
// String validation
category?: string & tags.MinLength<2> & tags.MaxLength<50>;
// Pagination
page?: number & tags.Type<"int32"> & tags.Minimum<1>;
limit?: number & tags.Type<"int32"> & tags.Minimum<1> & tags.Maximum<100>;
}
// Response data model
interface SearchResult {
id: string;
name: string;
value: number;
category: string;
timestamp: Date;
}
// Create validated API endpoint
const searchAPI = new ConsumptionAPI<SearchParams, SearchResult[]>(
"search",
async (params, { client }) => {
// Params are already validated when this runs
const query = `
SELECT * FROM data_table
WHERE timestamp >= {startDate: Date}
AND timestamp <= {endDate: Date}
${params.minValue ? `AND value >= {minValue: Float64}` : ''}
${params.maxValue ? `AND value <= {maxValue: Float64}` : ''}
${params.category ? `AND category = {category: String}` : ''}
LIMIT {limit: UInt32}
OFFSET {offset: UInt32}
`;
return client.query(query, {
startDate: params.startDate,
endDate: params.endDate,
minValue: params.minValue,
maxValue: params.maxValue,
category: params.category,
limit: params.limit || 10,
offset: ((params.page || 1) - 1) * (params.limit || 10)
});
}
);
// API Usage Examples:
// âś… Valid: GET /consumption/search?startDate=2024-01-01&endDate=2024-01-31
// âś… Valid: GET /consumption/search?startDate=2024-01-01&endDate=2024-01-31&minValue=100&limit=50
// ❌ Invalid: GET /consumption/search?startDate=Jan-1-2024 (wrong date format)
// ❌ Invalid: GET /consumption/search?startDate=2024-01-01&endDate=2024-01-31&limit=200 (exceeds max)
from moose_lib import ConsumptionAPI
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional, List
# Query parameters with runtime validation
class SearchParams(BaseModel):
# Date range validation
start_date: str = Field(..., regex="^\\d{4}-\\d{2}-\\d{2}$") # Must be YYYY-MM-DD
end_date: str = Field(..., regex="^\\d{4}-\\d{2}-\\d{2}$")
# Numeric constraints
min_value: Optional[float] = Field(None, ge=0) # Optional, but if provided >= 0
max_value: Optional[float] = Field(None, le=1000) # Optional, but if provided <= 1000
# String validation
category: Optional[str] = Field(None, min_length=2, max_length=50)
# Pagination
page: Optional[int] = Field(None, ge=1)
limit: Optional[int] = Field(None, ge=1, le=100)
# Response data model
class SearchResult(BaseModel):
id: str
name: str
value: float
category: str
timestamp: datetime
# Create validated API endpoint
async def search_handler(params: SearchParams, context) -> List[SearchResult]:
# Params are already validated when this runs
query = f"""
SELECT * FROM data_table
WHERE timestamp >= '{params.start_date}'
AND timestamp <= '{params.end_date}'
{"AND value >= " + str(params.min_value) if params.min_value else ""}
{"AND value <= " + str(params.max_value) if params.max_value else ""}
{"AND category = '" + params.category + "'" if params.category else ""}
LIMIT {params.limit or 10}
OFFSET {((params.page or 1) - 1) * (params.limit or 10)}
"""
results = await context.client.query(query)
return [SearchResult(**row) for row in results]
search_api = ConsumptionAPI[SearchParams, List[SearchResult]](
"search",
handler=search_handler
)
# API Usage Examples:
# âś… Valid: GET /consumption/search?start_date=2024-01-01&end_date=2024-01-31
# âś… Valid: GET /consumption/search?start_date=2024-01-01&end_date=2024-01-31&min_value=100&limit=50
# ❌ Invalid: GET /consumption/search?start_date=Jan-1-2024 (wrong date format)
# ❌ Invalid: GET /consumption/search?start_date=2024-01-01&end_date=2024-01-31&limit=200 (exceeds max)
Key Point: Runtime validators ensure API consumers provide valid data, returning clear error messages for invalid requests before any database queries run.
Additional Data Modeling Patterns
Modeling for Stream Processing
When you need to process data in real-time before it hits the database:
import { Key, LowCardinality } from "@514labs/moose-lib";
// Raw data from external source
interface RawData {
id: Key<string>;
timestamp: Date;
rawPayload: string;
sourceType: string & LowCardinality;
}
// Processed data after transformation
interface ProcessedData {
id: Key<string>;
timestamp: Date;
field1: string;
field2: string & LowCardinality;
numericValue: number;
attributes: Record<string, any>;
}
// Create stream with transformation
const rawStream = new Stream<RawData>("raw-stream");
const processedStream = new Stream<ProcessedData>("processed-stream");
// Transform raw data to processed
rawStream.addConsumer(async (raw: RawData) => {
const parsed = JSON.parse(raw.rawPayload);
const processed: ProcessedData = {
id: raw.id,
timestamp: raw.timestamp,
field1: parsed.field_1,
field2: parsed.field_2,
numericValue: parseFloat(parsed.value) || 0,
attributes: parsed.attributes || {}
};
await processedStream.publish(processed);
});
// Sink to ClickHouse
const table = new OlapTable<ProcessedData>("processed_data", {
stream: processedStream,
orderByFields: ["id", "timestamp"]
});
from moose_lib import Key, Stream, OlapTable
from pydantic import BaseModel
from typing import Dict, Any, Annotated
from datetime import datetime
import json
# Raw data from external source
class RawData(BaseModel):
id: Key[str]
timestamp: datetime
raw_payload: str
source_type: Annotated[str, "LowCardinality"]
# Processed data after transformation
class ProcessedData(BaseModel):
id: Key[str]
timestamp: datetime
field1: str
field2: Annotated[str, "LowCardinality"]
numeric_value: float
attributes: Dict[str, Any]
# Create streams
raw_stream = Stream[RawData]("raw-stream")
processed_stream = Stream[ProcessedData]("processed-stream")
# Transform raw data
async def process_data(raw: RawData):
parsed = json.loads(raw.raw_payload)
processed = ProcessedData(
id=raw.id,
timestamp=raw.timestamp,
field1=parsed["field_1"],
field2=parsed["field_2"],
numeric_value=float(parsed.get("value", 0)),
attributes=parsed.get("attributes", {})
)
await processed_stream.publish(processed)
raw_stream.add_consumer(process_data)
# Sink to ClickHouse
table = OlapTable[ProcessedData]("processed_data", {
"stream": processed_stream,
"order_by_fields": ["id", "timestamp"]
})
Modeling for Workflow Tasks
Define strongly-typed inputs and outputs for async jobs:
import { Task, tags } from "@514labs/moose-lib";
// Input validation with constraints
interface TaskInput {
id: string & tags.Format<"uuid">;
items: string[];
taskType: "typeA" | "typeB" | "typeC";
options?: {
includeMetadata: boolean;
maxItems?: number & tags.Minimum<1> & tags.Maximum<100>;
};
}
// Structured output
interface TaskOutput {
id: string;
processedAt: Date;
resultA?: {
category: string;
score: number;
details: Record<string, any>;
};
resultB?: {
values: string[];
metrics: number[];
};
resultC?: {
field1: string;
field2: string;
field3: number;
};
}
// Create workflow task
const exampleTask = new Task<TaskInput, TaskOutput>(
"example-task",
{
run: async (input) => {
// Process data based on task type
const output: TaskOutput = {
id: input.id,
processedAt: new Date()
};
if (input.taskType === "typeA") {
output.resultA = await processTypeA(input);
}
return output;
},
retries: 3,
timeout: 30000 // 30 seconds
}
);
from moose_lib import Task
from pydantic import BaseModel, Field
from typing import Optional, List, Literal, Dict, Any
from datetime import datetime
# Input validation with constraints
class TaskOptions(BaseModel):
include_metadata: bool
max_items: Optional[int] = Field(None, ge=1, le=100)
class TaskInput(BaseModel):
id: str = Field(..., regex="^[0-9a-f-]{36}$")
items: List[str]
task_type: Literal["typeA", "typeB", "typeC"]
options: Optional[TaskOptions] = None
# Structured output
class ResultA(BaseModel):
category: str
score: float
details: Dict[str, Any]
class ResultB(BaseModel):
values: List[str]
metrics: List[float]
class ResultC(BaseModel):
field1: str
field2: str
field3: float
class TaskOutput(BaseModel):
id: str
processed_at: datetime
result_a: Optional[ResultA] = None
result_b: Optional[ResultB] = None
result_c: Optional[ResultC] = None
# Create workflow task
async def run_task(input_data: TaskInput) -> TaskOutput:
# Process data based on task type
output = TaskOutput(
id=input_data.id,
processed_at=datetime.now()
)
if input_data.task_type == "typeA":
output.result_a = await process_type_a(input_data)
return output
example_task = Task[TaskInput, TaskOutput](
"example-task",
run_function=run_task,
retries=3,
timeout=30 # seconds
)