In Moose, data models are just TypeScript interfaces (TypeScript) or Pydantic models (Python) that become the authoritative source for your infrastructure schemas.
Data Models are used to define:
Analytical backends are unique in that they typically have to coordinate schemas across multiple systems that each have their own type systems and constraints.
Consider a typical pipeline for ingesting events into a ClickHouse table.
# What you're building:# API endpoint → Kafka topic → ClickHouse table → Analytics API # Traditional approach: Define schema 4 times# 1. API validation with Pydanticclass APIEvent(BaseModel): user_id: str event_type: Literal["click", "view", "purchase"] timestamp: datetime # 2. Kafka schema registrationkafka_schema = { "type": "record", "fields": [ {"name": "user_id", "type": "string"}, {"name": "event_type", "type": "string"}, {"name": "timestamp", "type": "string"} ]} # 3. ClickHouse DDL# CREATE TABLE events (# user_id String,# event_type LowCardinality(String),# timestamp DateTime# ) ENGINE = MergeTree() # 4. Analytics API responseclass EventsResponse(BaseModel): user_id: str event_type: str timestamp: datetimeThe Problem: When you add a field or change a type, you must update it in multiple places. Miss one, and you get:
With Moose you define your schemas in native language types with optional metadata. This lets you reuse your schemas across multiple systems:
# 1. Define your schema (WHAT your data looks like)class MyFirstDataModel(BaseModel): id: Key[str] some_string: Annotated[str, "LowCardinality"] some_number: int some_date: datetime some_json: Any # This single model can be reused across multiple systems:my_first_pipeline = IngestPipeline[MyFirstDataModel]("my_first_pipeline", IngestPipelineConfig( ingest_api=True, # POST API endpoint stream=True, # Kafka topic table=True # ClickHouse table))The key idea is leveraging Annotated types to extend base Python types with "metadata" that represents specific optimizations and details on how to map that type in ClickHouse:
from moose_lib import Key, clickhouse_decimalfrom typing import Annotated class Model(BaseModel): # Base type: str # ClickHouse: String with primary key id: Key[str] # Base type: Decimal # ClickHouse: Decimal(10,2) for precise money amount: clickhouse_decimal(10, 2) # Base type: str # ClickHouse: LowCardinality(String) for enums status: Annotated[str, "LowCardinality"] # Base type: datetime # ClickHouse: DateTime created_at: datetime events = OlapTable[Event]("events") # In your application code:tx = Event( id="id_123", amount=Decimal("99.99"), # Regular Decimal in Python status="completed", # Regular string in Python created_at=datetime.now()) # In ClickHouse:# CREATE TABLE events (# id String,# amount Decimal(10,2),# status LowCardinality(String),# created_at DateTime# ) ENGINE = MergeTree()# ORDER BY transaction_idThe metadata annotations are compile-time only - they don't affect your runtime code. Your application works with regular strings and numbers, while Moose uses the metadata to generate optimized infrastructure.
Let's walk through how to model data for different infrastructure components and see how types behave across them.
A basic data model that works identically across all infrastructure components:
from pydantic import BaseModelfrom datetime import datetime class SimpleShared(BaseModel): id: str name: str value: float timestamp: datetime # This SAME model creates all infrastructurepipeline = IngestPipeline[SimpleShared]("simple_shared", IngestPipelineConfig( ingest_api=True, # Creates: POST /ingest/simple_shared stream=True, # Creates: Kafka topic table=True # Creates: ClickHouse table)) # The exact same types work everywhere:# - API validates: { "id": "123", "name": "test", "value": 42, "timestamp": "2024-01-01T00:00:00Z" }# - Kafka stores: { "id": "123", "name": "test", "value": 42, "timestamp": "2024-01-01T00:00:00Z" }# - ClickHouse table: id String, name String, value Float64, timestamp DateTimeKey Point: One model definition creates consistent schemas across all systems.
Complex types including nested objects, arrays, and enums work seamlessly across all components:
from moose_lib import Keyfrom pydantic import BaseModelfrom typing import List, Dict, Any, Optional, Literalfrom datetime import datetime class Metadata(BaseModel): category: str priority: float tags: List[str] class CompositeShared(BaseModel): id: Key[str] # Primary key status: Literal["active", "pending", "completed"] # Enum # Nested object metadata: Metadata # Arrays and maps values: List[float] attributes: Dict[str, Any] # Optional field description: Optional[str] = None created_at: datetime # Using in IngestPipeline - all types preservedpipeline = IngestPipeline[CompositeShared]("composite_shared", IngestPipelineConfig( ingest_api=True, stream=True, table=True)) # How the types map:# - API validates nested structure and enum values# - Kafka preserves the exact JSON structure# - ClickHouse creates:# - id String (with PRIMARY KEY)# - status Enum8('active', 'pending', 'completed')# - metadata.category String, metadata.priority Float64, metadata.tags Array(String)# - values Array(Float64)# - attributes String (JSON)# - description Nullable(String)# - created_at DateTimeKey Point: Complex types including nested objects and arrays work consistently across all infrastructure.
ClickHouse type annotations optimize database performance but are transparent to other infrastructure:
from moose_lib import Key, clickhouse_decimal, OlapTable, IngestPipeline, IngestPipelineConfigfrom typing import Annotatedfrom pydantic import BaseModelfrom datetime import datetime class Details(BaseModel): name: str value: float class ClickHouseOptimized(BaseModel): id: Key[str] # ClickHouse-specific type annotations amount: clickhouse_decimal(10, 2) # Decimal(10,2) in ClickHouse category: Annotated[str, "LowCardinality"] # LowCardinality(String) in ClickHouse # Optimized nested type details: Annotated[Details, "ClickHouseNamedTuple"] # NamedTuple in ClickHouse timestamp: datetime # SCENARIO 1: Standalone OlapTable - gets all optimizationstable = OlapTable[ClickHouseOptimized]("optimized_table", { "order_by_fields": ["id", "timestamp"]})# Creates ClickHouse table with:# - amount Decimal(10,2)# - category LowCardinality(String)# - details Tuple(name String, value Float64) # SCENARIO 2: IngestPipeline - optimizations ONLY in ClickHousepipeline = IngestPipeline[ClickHouseOptimized]("optimized_pipeline", IngestPipelineConfig( ingest_api=True, stream=True, table=True)) # What happens at each layer:# 1. API receives/validates: { "amount": "123.45", "category": "electronics", ... }# - Sees amount as str, category as str (annotations ignored)# 2. Kafka stores: { "amount": "123.45", "category": "electronics", ... }# - Plain JSON, no ClickHouse types# 3. ClickHouse table gets optimizations:# - amount stored as Decimal(10,2)# - category stored as LowCardinality(String)# - details stored as NamedTupleKey Point: ClickHouse annotations are metadata that ONLY affect the database schema. Your application code and other infrastructure components see regular TypeScript/Python types.
APIs use runtime validation to ensure query parameters meet your requirements:
from moose_lib import Apifrom pydantic import BaseModel, Fieldfrom datetime import datetimefrom typing import Optional, List # Query parameters with runtime validationclass SearchParams(BaseModel): # Date range validation start_date: str = Field(..., regex="^\\d{4}-\\d{2}-\\d{2}$") # Must be YYYY-MM-DD end_date: str = Field(..., regex="^\\d{4}-\\d{2}-\\d{2}$") # Numeric constraints min_value: Optional[float] = Field(None, ge=0) # Optional, but if provided >= 0 max_value: Optional[float] = Field(None, le=1000) # Optional, but if provided <= 1000 # String validation category: Optional[str] = Field(None, min_length=2, max_length=50) # Pagination page: Optional[int] = Field(None, ge=1) limit: Optional[int] = Field(None, ge=1, le=100) # Response data modelclass SearchResult(BaseModel): id: str name: str value: float category: str timestamp: datetime # Create validated API endpointasync def search_handler(params: SearchParams, client: MooseClient) -> List[SearchResult]: # Params are already validated when this runs # Build a parameterized query safely clauses = [ "timestamp >= {startDate}", "timestamp <= {endDate}" ] params_dict = { "startDate": params.start_date, "endDate": params.end_date, "limit": params.limit or 10, "offset": ((params.page or 1) - 1) * (params.limit or 10) } if params.min_value is not None: clauses.append("value >= {minValue}") params_dict["minValue"] = params.min_value if params.max_value is not None: clauses.append("value <= {maxValue}") params_dict["maxValue"] = params.max_value if params.category is not None: clauses.append("category = {category}") params_dict["category"] = params.category where_clause = " AND ".join(clauses) query = f""" SELECT * FROM data_table WHERE {where_clause} LIMIT {limit} OFFSET {offset} """ results = await client.query.execute(query, params=params_dict) return [SearchResult(**row) for row in results] search_api = Api[SearchParams, List[SearchResult]]( "search", handler=search_handler) # API Usage Examples:# ✅ Valid: GET /api/search?start_date=2024-01-01&end_date=2024-01-31# ✅ Valid: GET /api/search?start_date=2024-01-01&end_date=2024-01-31&min_value=100&limit=50# ❌ Invalid: GET /api/search?start_date=Jan-1-2024 (wrong date format)# ❌ Invalid: GET /api/search?start_date=2024-01-01&end_date=2024-01-31&limit=200 (exceeds max)Key Point: Runtime validators ensure API consumers provide valid data, returning clear error messages for invalid requests before any database queries run.
When you need to process data in real-time before it hits the database:
from moose_lib import Key, Stream, OlapTablefrom pydantic import BaseModelfrom typing import Dict, Any, Annotatedfrom datetime import datetimeimport json # Raw data from external sourceclass RawData(BaseModel): id: Key[str] timestamp: datetime raw_payload: str source_type: Annotated[str, "LowCardinality"] # Processed data after transformationclass ProcessedData(BaseModel): id: Key[str] timestamp: datetime field1: str field2: Annotated[str, "LowCardinality"] numeric_value: float attributes: Dict[str, Any] # Create streamsraw_stream = Stream[RawData]("raw-stream") processed_table = OlapTable[ProcessedData]("processed_data", OlapConfig( order_by_fields = ["id", "timestamp"])) processed_stream = Stream[ProcessedData]("processed-stream", StreamConfig( destination=processed_table)) # Transform raw dataasync def process_data(raw: RawData): parsed = json.loads(raw.raw_payload) processed = ProcessedData( id=raw.id, timestamp=raw.timestamp, field1=parsed["field_1"], field2=parsed["field_2"], numeric_value=float(parsed.get("value", 0)), attributes=parsed.get("attributes", {}) ) raw_stream.add_transform(processed_stream, process_data)Define strongly-typed inputs and outputs for async jobs:
import { Task, tags } from "@514labs/moose-lib"; // Input validation with constraintsinterface TaskInput { id: string & tags.Format<"uuid">; items: string[]; taskType: "typeA" | "typeB" | "typeC"; options?: { includeMetadata: boolean; maxItems?: number & tags.Minimum<1> & tags.Maximum<100>; };} // Structured outputinterface TaskOutput { id: string; processedAt: Date; resultA?: { category: string; score: number; details: Record<string, any>; }; resultB?: { values: string[]; metrics: number[]; }; resultC?: { field1: string; field2: string; field3: number; };} // Create workflow taskconst exampleTask = new Task<TaskInput, TaskOutput>( "example-task", { run: async (ctx) => { // Process data based on task type const output: TaskOutput = { id: ctx.input.id, processedAt: new Date() }; if (ctx.input.taskType === "typeA") { output.resultA = await processTypeA(ctx.input); } return output; }, retries: 3, timeout: 30000 // 30 seconds });from moose_lib import Task, TaskContextfrom pydantic import BaseModel, Fieldfrom typing import Optional, List, Literal, Dict, Anyfrom datetime import datetime # Input validation with constraintsclass TaskOptions(BaseModel): include_metadata: bool max_items: Optional[int] = Field(None, ge=1, le=100) class TaskInput(BaseModel): id: str = Field(..., regex="^[0-9a-f-]{36}$") items: List[str] task_type: Literal["typeA", "typeB", "typeC"] options: Optional[TaskOptions] = None # Structured outputclass ResultA(BaseModel): category: str score: float details: Dict[str, Any] class ResultB(BaseModel): values: List[str] metrics: List[float] class ResultC(BaseModel): field1: str field2: str field3: float class TaskOutput(BaseModel): id: str processed_at: datetime result_a: Optional[ResultA] = None result_b: Optional[ResultB] = None result_c: Optional[ResultC] = None # Create workflow taskasync def run_task(ctx: TaskContext[TaskInput]) -> TaskOutput: # Process data based on task type output = TaskOutput( id=ctx.input.id, processed_at=datetime.now() ) if ctx.input.task_type == "typeA": output.result_a = await process_type_a(ctx.input) return output example_task = Task[TaskInput, TaskOutput]( "example-task", run_function=run_task, retries=3, timeout=30 # seconds)