FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplatesGuides
Release Notes
Source514
  1. MooseStack
  2. Data Modeling

On this page

OverviewPhilosophyProblem: Analytical Backends are Prone to Schema DriftSolution: Model In Code, Reuse EverywhereHow It WorksBuilding Data Models: From Simple to ComplexSimple Data Model Shared Across InfrastructureComposite Types Shared Across InfrastructureClickHouse-Specific Types (Standalone vs IngestPipeline)API Contracts with Runtime ValidatorsAdditional Data Modeling PatternsModeling for Stream ProcessingModeling for Workflow Tasks

Data Modeling

Overview

In Moose, data models are just TypeScript interfaces (TypeScript) or Pydantic models (Python) that become the authoritative source for your infrastructure schemas.

Data Models are used to define:

  • OLAP Tables and Materialized Views (automatically generated DDL)
  • Redpanda/Kafka Streams (schema registry and topic validation)
  • API Contracts (request/response validation and OpenAPI specs)
  • Workflow Task Input and Output Types (typed function inputs/outputs)

Philosophy

Problem: Analytical Backends are Prone to Schema Drift

Analytical backends are unique in that they typically have to coordinate schemas across multiple systems that each have their own type systems and constraints.

Consider a typical pipeline for ingesting events into a ClickHouse table.

# What you're building:# API endpoint → Kafka topic → ClickHouse table → Analytics API # Traditional approach: Define schema 4 times# 1. API validation with Pydanticclass APIEvent(BaseModel):    user_id: str    event_type: Literal["click", "view", "purchase"]    timestamp: datetime # 2. Kafka schema registrationkafka_schema = {    "type": "record",    "fields": [        {"name": "user_id", "type": "string"},        {"name": "event_type", "type": "string"},        {"name": "timestamp", "type": "string"}    ]} # 3. ClickHouse DDL# CREATE TABLE events (#   user_id String,#   event_type LowCardinality(String),#   timestamp DateTime# ) ENGINE = MergeTree() # 4. Analytics API responseclass EventsResponse(BaseModel):    user_id: str    event_type: str    timestamp: datetime

The Problem: When you add a field or change a type, you must update it in multiple places. Miss one, and you get:

  • Silent data loss (Kafka → ClickHouse sync fails)
  • Runtime errors
  • Data quality issues (validation gaps)

Solution: Model In Code, Reuse Everywhere

With Moose you define your schemas in native language types with optional metadata. This lets you reuse your schemas across multiple systems:

app/main.py
# 1. Define your schema (WHAT your data looks like)class MyFirstDataModel(BaseModel):    id: Key[str]    some_string: Annotated[str, "LowCardinality"]    some_number: int    some_date: datetime    some_json: Any # This single model can be reused across multiple systems:my_first_pipeline = IngestPipeline[MyFirstDataModel]("my_first_pipeline", IngestPipelineConfig(    ingest_api=True,  # POST API endpoint    stream=True,  # Kafka topic    table=True    # ClickHouse table))

Benefits:

End-to-end type safety across your code and infrastructure

Full control over your infrastructure with code

Zero schema drift - change your types in one place, automatically update your infrastructure

How It Works

The key idea is leveraging Annotated types to extend base Python types with "metadata" that represents specific optimizations and details on how to map that type in ClickHouse:

from moose_lib import Key, clickhouse_decimalfrom typing import Annotated class Model(BaseModel):    # Base type: str    # ClickHouse: String with primary key    id: Key[str]     # Base type: Decimal    # ClickHouse: Decimal(10,2) for precise money    amount: clickhouse_decimal(10, 2)     # Base type: str    # ClickHouse: LowCardinality(String) for enums    status: Annotated[str, "LowCardinality"]     # Base type: datetime    # ClickHouse: DateTime    created_at: datetime events = OlapTable[Event]("events") # In your application code:tx = Event(    id="id_123",    amount=Decimal("99.99"),     # Regular Decimal in Python    status="completed",          # Regular string in Python    created_at=datetime.now()) # In ClickHouse:# CREATE TABLE events (#   id String,#   amount Decimal(10,2),#   status LowCardinality(String),#   created_at DateTime# ) ENGINE = MergeTree()# ORDER BY transaction_id

The metadata annotations are compile-time only - they don't affect your runtime code. Your application works with regular strings and numbers, while Moose uses the metadata to generate optimized infrastructure.

Building Data Models: From Simple to Complex

Let's walk through how to model data for different infrastructure components and see how types behave across them.

Simple Data Model Shared Across Infrastructure

A basic data model that works identically across all infrastructure components:

app/datamodels/simple_shared.py
from pydantic import BaseModelfrom datetime import datetime class SimpleShared(BaseModel):    id: str    name: str    value: float    timestamp: datetime # This SAME model creates all infrastructurepipeline = IngestPipeline[SimpleShared]("simple_shared", IngestPipelineConfig(    ingest_api=True,  # Creates: POST /ingest/simple_shared    stream=True,  # Creates: Kafka topic    table=True    # Creates: ClickHouse table)) # The exact same types work everywhere:# - API validates: { "id": "123", "name": "test", "value": 42, "timestamp": "2024-01-01T00:00:00Z" }# - Kafka stores: { "id": "123", "name": "test", "value": 42, "timestamp": "2024-01-01T00:00:00Z" }# - ClickHouse table: id String, name String, value Float64, timestamp DateTime

Key Point: One model definition creates consistent schemas across all systems.

Composite Types Shared Across Infrastructure

Complex types including nested objects, arrays, and enums work seamlessly across all components:

app/datamodels/composite_shared.py
from moose_lib import Keyfrom pydantic import BaseModelfrom typing import List, Dict, Any, Optional, Literalfrom datetime import datetime class Metadata(BaseModel):    category: str    priority: float    tags: List[str] class CompositeShared(BaseModel):    id: Key[str]  # Primary key    status: Literal["active", "pending", "completed"]  # Enum     # Nested object    metadata: Metadata     # Arrays and maps    values: List[float]    attributes: Dict[str, Any]     # Optional field    description: Optional[str] = None    created_at: datetime # Using in IngestPipeline - all types preservedpipeline = IngestPipeline[CompositeShared]("composite_shared", IngestPipelineConfig(    ingest_api=True,    stream=True,    table=True)) # How the types map:# - API validates nested structure and enum values# - Kafka preserves the exact JSON structure# - ClickHouse creates:#   - id String (with PRIMARY KEY)#   - status Enum8('active', 'pending', 'completed')#   - metadata.category String, metadata.priority Float64, metadata.tags Array(String)#   - values Array(Float64)#   - attributes String (JSON)#   - description Nullable(String)#   - created_at DateTime

Key Point: Complex types including nested objects and arrays work consistently across all infrastructure.

ClickHouse-Specific Types (Standalone vs IngestPipeline)

ClickHouse type annotations optimize database performance but are transparent to other infrastructure:

app/datamodels/clickhouse_optimized.py
from moose_lib import Key, clickhouse_decimal, OlapTable, IngestPipeline, IngestPipelineConfigfrom typing import Annotatedfrom pydantic import BaseModelfrom datetime import datetime class Details(BaseModel):    name: str    value: float class ClickHouseOptimized(BaseModel):    id: Key[str]     # ClickHouse-specific type annotations    amount: clickhouse_decimal(10, 2)              # Decimal(10,2) in ClickHouse    category: Annotated[str, "LowCardinality"]    # LowCardinality(String) in ClickHouse     # Optimized nested type    details: Annotated[Details, "ClickHouseNamedTuple"]  # NamedTuple in ClickHouse     timestamp: datetime # SCENARIO 1: Standalone OlapTable - gets all optimizationstable = OlapTable[ClickHouseOptimized]("optimized_table", {    "order_by_fields": ["id", "timestamp"]})# Creates ClickHouse table with:# - amount Decimal(10,2)# - category LowCardinality(String)# - details Tuple(name String, value Float64) # SCENARIO 2: IngestPipeline - optimizations ONLY in ClickHousepipeline = IngestPipeline[ClickHouseOptimized]("optimized_pipeline", IngestPipelineConfig(    ingest_api=True,    stream=True,    table=True)) # What happens at each layer:# 1. API receives/validates: { "amount": "123.45", "category": "electronics", ... }#    - Sees amount as str, category as str (annotations ignored)# 2. Kafka stores: { "amount": "123.45", "category": "electronics", ... }#    - Plain JSON, no ClickHouse types# 3. ClickHouse table gets optimizations:#    - amount stored as Decimal(10,2)#    - category stored as LowCardinality(String)#    - details stored as NamedTuple

Key Point: ClickHouse annotations are metadata that ONLY affect the database schema. Your application code and other infrastructure components see regular TypeScript/Python types.

API Contracts with Runtime Validators

APIs use runtime validation to ensure query parameters meet your requirements:

app/apis/consumption_with_validation.py
from moose_lib import Apifrom pydantic import BaseModel, Fieldfrom datetime import datetimefrom typing import Optional, List # Query parameters with runtime validationclass SearchParams(BaseModel):    # Date range validation    start_date: str = Field(..., regex="^\\d{4}-\\d{2}-\\d{2}$")  # Must be YYYY-MM-DD    end_date: str = Field(..., regex="^\\d{4}-\\d{2}-\\d{2}$")     # Numeric constraints    min_value: Optional[float] = Field(None, ge=0)          # Optional, but if provided >= 0    max_value: Optional[float] = Field(None, le=1000)       # Optional, but if provided <= 1000     # String validation    category: Optional[str] = Field(None, min_length=2, max_length=50)     # Pagination    page: Optional[int] = Field(None, ge=1)    limit: Optional[int] = Field(None, ge=1, le=100) # Response data modelclass SearchResult(BaseModel):    id: str    name: str    value: float    category: str    timestamp: datetime # Create validated API endpointasync def search_handler(params: SearchParams, client: MooseClient) -> List[SearchResult]:    # Params are already validated when this runs    # Build a parameterized query safely    clauses = [      "timestamp >= {startDate}",      "timestamp <= {endDate}"    ]    params_dict = {      "startDate": params.start_date,      "endDate": params.end_date,      "limit": params.limit or 10,      "offset": ((params.page or 1) - 1) * (params.limit or 10)    }    if params.min_value is not None:      clauses.append("value >= {minValue}")      params_dict["minValue"] = params.min_value    if params.max_value is not None:      clauses.append("value <= {maxValue}")      params_dict["maxValue"] = params.max_value    if params.category is not None:      clauses.append("category = {category}")      params_dict["category"] = params.category     where_clause = " AND ".join(clauses)    query = f"""      SELECT * FROM data_table      WHERE {where_clause}      LIMIT {limit}      OFFSET {offset}    """     results = await client.query.execute(query, params=params_dict)    return [SearchResult(**row) for row in results] search_api = Api[SearchParams, List[SearchResult]](    "search",    handler=search_handler) # API Usage Examples:# ✅ Valid: GET /api/search?start_date=2024-01-01&end_date=2024-01-31# ✅ Valid: GET /api/search?start_date=2024-01-01&end_date=2024-01-31&min_value=100&limit=50# ❌ Invalid: GET /api/search?start_date=Jan-1-2024 (wrong date format)# ❌ Invalid: GET /api/search?start_date=2024-01-01&end_date=2024-01-31&limit=200 (exceeds max)

Key Point: Runtime validators ensure API consumers provide valid data, returning clear error messages for invalid requests before any database queries run.

Additional Data Modeling Patterns

Modeling for Stream Processing

When you need to process data in real-time before it hits the database:

app/datamodels/stream_example.py
from moose_lib import Key, Stream, OlapTablefrom pydantic import BaseModelfrom typing import Dict, Any, Annotatedfrom datetime import datetimeimport json # Raw data from external sourceclass RawData(BaseModel):    id: Key[str]    timestamp: datetime    raw_payload: str    source_type: Annotated[str, "LowCardinality"] # Processed data after transformationclass ProcessedData(BaseModel):    id: Key[str]    timestamp: datetime    field1: str    field2: Annotated[str, "LowCardinality"]    numeric_value: float    attributes: Dict[str, Any] # Create streamsraw_stream = Stream[RawData]("raw-stream")  processed_table = OlapTable[ProcessedData]("processed_data", OlapConfig(    order_by_fields = ["id", "timestamp"])) processed_stream = Stream[ProcessedData]("processed-stream", StreamConfig(    destination=processed_table)) # Transform raw dataasync def process_data(raw: RawData):    parsed = json.loads(raw.raw_payload)     processed = ProcessedData(        id=raw.id,        timestamp=raw.timestamp,        field1=parsed["field_1"],        field2=parsed["field_2"],        numeric_value=float(parsed.get("value", 0)),        attributes=parsed.get("attributes", {})    ) raw_stream.add_transform(processed_stream, process_data)

Modeling for Workflow Tasks

Define strongly-typed inputs and outputs for async jobs:

app/workflows/task_example.ts
import { Task, tags } from "@514labs/moose-lib"; // Input validation with constraintsinterface TaskInput {  id: string & tags.Format<"uuid">;  items: string[];  taskType: "typeA" | "typeB" | "typeC";  options?: {    includeMetadata: boolean;    maxItems?: number & tags.Minimum<1> & tags.Maximum<100>;  };} // Structured outputinterface TaskOutput {  id: string;  processedAt: Date;  resultA?: {    category: string;    score: number;    details: Record<string, any>;  };  resultB?: {    values: string[];    metrics: number[];  };  resultC?: {    field1: string;    field2: string;    field3: number;  };} // Create workflow taskconst exampleTask = new Task<TaskInput, TaskOutput>(  "example-task",  {    run: async (ctx) => {      // Process data based on task type      const output: TaskOutput = {        id: ctx.input.id,        processedAt: new Date()      };       if (ctx.input.taskType === "typeA") {        output.resultA = await processTypeA(ctx.input);      }       return output;    },     retries: 3,    timeout: 30000  // 30 seconds  });
app/workflows/task_example.py
from moose_lib import Task, TaskContextfrom pydantic import BaseModel, Fieldfrom typing import Optional, List, Literal, Dict, Anyfrom datetime import datetime # Input validation with constraintsclass TaskOptions(BaseModel):    include_metadata: bool    max_items: Optional[int] = Field(None, ge=1, le=100) class TaskInput(BaseModel):    id: str = Field(..., regex="^[0-9a-f-]{36}$")    items: List[str]    task_type: Literal["typeA", "typeB", "typeC"]    options: Optional[TaskOptions] = None # Structured outputclass ResultA(BaseModel):    category: str    score: float    details: Dict[str, Any] class ResultB(BaseModel):    values: List[str]    metrics: List[float] class ResultC(BaseModel):    field1: str    field2: str    field3: float class TaskOutput(BaseModel):    id: str    processed_at: datetime    result_a: Optional[ResultA] = None    result_b: Optional[ResultB] = None    result_c: Optional[ResultC] = None # Create workflow taskasync def run_task(ctx: TaskContext[TaskInput]) -> TaskOutput:    # Process data based on task type    output = TaskOutput(        id=ctx.input.id,        processed_at=datetime.now()    )     if ctx.input.task_type == "typeA":        output.result_a = await process_type_a(ctx.input)     return output example_task = Task[TaskInput, TaskOutput](    "example-task",    run_function=run_task,    retries=3,    timeout=30  # seconds)
  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streaming
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework