# Moose / Data Modeling Documentation – Python ## Included Files 1. moose/data-modeling/data-modeling.mdx ## Data Modeling Source: moose/data-modeling/data-modeling.mdx Data Modeling for Moose # Data Modeling ## Overview In Moose, data models are just Pydantic models that become the authoritative source for your infrastructure schemas. Data Models are used to define: - [OLAP Tables and Materialized Views](/moose/olap) (automatically generated DDL) - [Redpanda/Kafka Streams](/moose/streaming) (schema registry and topic validation) - [API Contracts](/moose/apis) (request/response validation and OpenAPI specs) - [Workflow Task Input and Output Types](/moose/workflows) (typed function inputs/outputs) ## Philosophy ### Problem: Analytical Backends are Prone to Schema Drift Analytical backends are unique in that they typically have to coordinate schemas across multiple systems that each have their own type systems and constraints. Consider a typical pipeline for ingesting events into a ClickHouse table. ```python # What you're building: # API endpoint → Kafka topic → ClickHouse table → Analytics API # Traditional approach: Define schema 4 times # 1. API validation with Pydantic class APIEvent(BaseModel): user_id: str event_type: Literal["click", "view", "purchase"] timestamp: datetime # 2. Kafka schema registration kafka_schema = { "type": "record", "fields": [ {"name": "user_id", "type": "string"}, {"name": "event_type", "type": "string"}, {"name": "timestamp", "type": "string"} ] } # 3. ClickHouse DDL # CREATE TABLE events ( # user_id String, # event_type LowCardinality(String), # timestamp DateTime # ) ENGINE = MergeTree() # 4. Analytics API response class EventsResponse(BaseModel): user_id: str event_type: str timestamp: datetime ``` **The Problem:** When you add a field or change a type, you must update it in multiple places. Miss one, and you get: - Silent data loss (Kafka → ClickHouse sync fails) - Runtime errors - Data quality issues (validation gaps) ### Solution: Model In Code, Reuse Everywhere With Moose you define your schemas in native language types with optional metadata. This lets you reuse your schemas across multiple systems: ```python filename="app/main.py" # 1. Define your schema (WHAT your data looks like) class MyFirstDataModel(BaseModel): id: Key[str] some_string: Annotated[str, "LowCardinality"] some_number: int some_date: datetime some_json: Any # This single model can be reused across multiple systems: my_first_pipeline = IngestPipeline[MyFirstDataModel]("my_first_pipeline", IngestPipelineConfig( ingest_api=True, # POST API endpoint stream=True, # Kafka topic table=True # ClickHouse table )) ``` ## How It Works The key idea is leveraging Annotated types to extend base Python types with "metadata" that represents specific optimizations and details on how to map that type in ClickHouse: ```python from moose_lib import Key, clickhouse_decimal from typing import Annotated class Model(BaseModel): # Base type: str # ClickHouse: String with primary key id: Key[str] # Base type: Decimal # ClickHouse: Decimal(10,2) for precise money amount: clickhouse_decimal(10, 2) # Base type: str # ClickHouse: LowCardinality(String) for enums status: Annotated[str, "LowCardinality"] # Base type: datetime # ClickHouse: DateTime created_at: datetime events = OlapTable[Event]("events") # In your application code: tx = Event( id="id_123", amount=Decimal("99.99"), # Regular Decimal in Python status="completed", # Regular string in Python created_at=datetime.now() ) # In ClickHouse: # CREATE TABLE events ( # id String, # amount Decimal(10,2), # status LowCardinality(String), # created_at DateTime # ) ENGINE = MergeTree() # ORDER BY transaction_id ``` **The metadata annotations are compile-time only** - they don't affect your runtime code. Your application works with regular strings and numbers, while Moose uses the metadata to generate optimized infrastructure. ## Building Data Models: From Simple to Complex Let's walk through how to model data for different infrastructure components and see how types behave across them. ### Simple Data Model Shared Across Infrastructure A basic data model that works identically across all infrastructure components: ```python filename="app/datamodels/simple_shared.py" from pydantic import BaseModel from datetime import datetime class SimpleShared(BaseModel): id: str name: str value: float timestamp: datetime # This SAME model creates all infrastructure pipeline = IngestPipeline[SimpleShared]("simple_shared", IngestPipelineConfig( ingest_api=True, # Creates: POST /ingest/simple_shared stream=True, # Creates: Kafka topic table=True # Creates: ClickHouse table )) # The exact same types work everywhere: # - API validates: { "id": "123", "name": "test", "value": 42, "timestamp": "2024-01-01T00:00:00Z" } # - Kafka stores: { "id": "123", "name": "test", "value": 42, "timestamp": "2024-01-01T00:00:00Z" } # - ClickHouse table: id String, name String, value Float64, timestamp DateTime ``` **Key Point**: One model definition creates consistent schemas across all systems. ### Composite Types Shared Across Infrastructure Complex types including nested objects, arrays, and enums work seamlessly across all components: ```python filename="app/datamodels/composite_shared.py" from moose_lib import Key from pydantic import BaseModel from typing import List, Dict, Any, Optional, Literal from datetime import datetime class Metadata(BaseModel): category: str priority: float tags: List[str] class CompositeShared(BaseModel): id: Key[str] # Primary key status: Literal["active", "pending", "completed"] # Enum # Nested object metadata: Metadata # Arrays and maps values: List[float] attributes: Dict[str, Any] # Optional field description: Optional[str] = None created_at: datetime # Using in IngestPipeline - all types preserved pipeline = IngestPipeline[CompositeShared]("composite_shared", IngestPipelineConfig( ingest_api=True, stream=True, table=True )) # How the types map: # - API validates nested structure and enum values # - Kafka preserves the exact JSON structure # - ClickHouse creates: # - id String (with PRIMARY KEY) # - status Enum8('active', 'pending', 'completed') # - metadata.category String, metadata.priority Float64, metadata.tags Array(String) # - values Array(Float64) # - attributes String (JSON) # - description Nullable(String) # - created_at DateTime ``` **Key Point**: Complex types including nested objects and arrays work consistently across all infrastructure. ### ClickHouse-Specific Types (Standalone vs IngestPipeline) ClickHouse type annotations optimize database performance but are **transparent to other infrastructure**: ```python filename="app/datamodels/clickhouse_optimized.py" from moose_lib import Key, clickhouse_decimal, OlapTable, IngestPipeline, IngestPipelineConfig from typing import Annotated from pydantic import BaseModel from datetime import datetime class Details(BaseModel): name: str value: float class ClickHouseOptimized(BaseModel): id: Key[str] # ClickHouse-specific type annotations amount: clickhouse_decimal(10, 2) # Decimal(10,2) in ClickHouse category: Annotated[str, "LowCardinality"] # LowCardinality(String) in ClickHouse # Optimized nested type details: Annotated[Details, "ClickHouseNamedTuple"] # NamedTuple in ClickHouse timestamp: datetime # SCENARIO 1: Standalone OlapTable - gets all optimizations table = OlapTable[ClickHouseOptimized]("optimized_table", { "order_by_fields": ["id", "timestamp"] }) # Creates ClickHouse table with: # - amount Decimal(10,2) # - category LowCardinality(String) # - details Tuple(name String, value Float64) # SCENARIO 2: IngestPipeline - optimizations ONLY in ClickHouse pipeline = IngestPipeline[ClickHouseOptimized]("optimized_pipeline", IngestPipelineConfig( ingest_api=True, stream=True, table=True )) # What happens at each layer: # 1. API receives/validates: { "amount": "123.45", "category": "electronics", ... } # - Sees amount as str, category as str (annotations ignored) # 2. Kafka stores: { "amount": "123.45", "category": "electronics", ... } # - Plain JSON, no ClickHouse types # 3. ClickHouse table gets optimizations: # - amount stored as Decimal(10,2) # - category stored as LowCardinality(String) # - details stored as NamedTuple ``` **Key Point**: ClickHouse annotations are metadata that ONLY affect the database schema. Your application code and other infrastructure components see regular TypeScript/Python types. ### API Contracts with Runtime Validators APIs use runtime validation to ensure query parameters meet your requirements: ```python filename="app/apis/consumption_with_validation.py" from moose_lib import Api from pydantic import BaseModel, Field from datetime import datetime from typing import Optional, List # Query parameters with runtime validation class SearchParams(BaseModel): # Date range validation start_date: str = Field(..., regex="^\\d{4}-\\d{2}-\\d{2}$") # Must be YYYY-MM-DD end_date: str = Field(..., regex="^\\d{4}-\\d{2}-\\d{2}$") # Numeric constraints min_value: Optional[float] = Field(None, ge=0) # Optional, but if provided >= 0 max_value: Optional[float] = Field(None, le=1000) # Optional, but if provided <= 1000 # String validation category: Optional[str] = Field(None, min_length=2, max_length=50) # Pagination page: Optional[int] = Field(None, ge=1) limit: Optional[int] = Field(None, ge=1, le=100) # Response data model class SearchResult(BaseModel): id: str name: str value: float category: str timestamp: datetime # Create validated API endpoint async def search_handler(params: SearchParams, client: MooseClient) -> List[SearchResult]: # Params are already validated when this runs # Build a parameterized query safely clauses = [ "timestamp >= {startDate}", "timestamp <= {endDate}" ] params_dict = { "startDate": params.start_date, "endDate": params.end_date, "limit": params.limit or 10, "offset": ((params.page or 1) - 1) * (params.limit or 10) } if params.min_value is not None: clauses.append("value >= {minValue}") params_dict["minValue"] = params.min_value if params.max_value is not None: clauses.append("value <= {maxValue}") params_dict["maxValue"] = params.max_value if params.category is not None: clauses.append("category = {category}") params_dict["category"] = params.category where_clause = " AND ".join(clauses) query = f""" SELECT * FROM data_table WHERE {where_clause} LIMIT {limit} OFFSET {offset} """ results = await client.query.execute(query, params=params_dict) return [SearchResult(**row) for row in results] search_api = Api[SearchParams, List[SearchResult]]( "search", handler=search_handler ) # API Usage Examples: # ✅ Valid: GET /api/search?start_date=2024-01-01&end_date=2024-01-31 # ✅ Valid: GET /api/search?start_date=2024-01-01&end_date=2024-01-31&min_value=100&limit=50 # ❌ Invalid: GET /api/search?start_date=Jan-1-2024 (wrong date format) # ❌ Invalid: GET /api/search?start_date=2024-01-01&end_date=2024-01-31&limit=200 (exceeds max) ``` **Key Point**: Runtime validators ensure API consumers provide valid data, returning clear error messages for invalid requests before any database queries run. ## Additional Data Modeling Patterns ### Modeling for Stream Processing When you need to process data in real-time before it hits the database: ```python filename="app/datamodels/stream_example.py" from moose_lib import Key, Stream, OlapTable from pydantic import BaseModel from typing import Dict, Any, Annotated from datetime import datetime # Raw data from external source class RawData(BaseModel): id: Key[str] timestamp: datetime raw_payload: str source_type: Annotated[str, "LowCardinality"] # Processed data after transformation class ProcessedData(BaseModel): id: Key[str] timestamp: datetime field1: str field2: Annotated[str, "LowCardinality"] numeric_value: float attributes: Dict[str, Any] # Create streams raw_stream = Stream[RawData]("raw-stream") processed_table = OlapTable[ProcessedData]("processed_data", OlapConfig( order_by_fields = ["id", "timestamp"] )) processed_stream = Stream[ProcessedData]("processed-stream", StreamConfig( destination=processed_table )) # Transform raw data async def process_data(raw: RawData): parsed = json.loads(raw.raw_payload) processed = ProcessedData( id=raw.id, timestamp=raw.timestamp, field1=parsed["field_1"], field2=parsed["field_2"], numeric_value=float(parsed.get("value", 0)), attributes=parsed.get("attributes", {}) ) raw_stream.add_transform(processed_stream, process_data) ``` ### Modeling for Workflow Tasks Define strongly-typed inputs and outputs for async jobs: ```python filename="app/workflows/task_example.py" from moose_lib import Task, TaskContext from pydantic import BaseModel, Field from typing import Optional, List, Literal, Dict, Any from datetime import datetime # Input validation with constraints class TaskOptions(BaseModel): include_metadata: bool max_items: Optional[int] = Field(None, ge=1, le=100) class TaskInput(BaseModel): id: str = Field(..., regex="^[0-9a-f-]{36}$") items: List[str] task_type: Literal["typeA", "typeB", "typeC"] options: Optional[TaskOptions] = None # Structured output class ResultA(BaseModel): category: str score: float details: Dict[str, Any] class ResultB(BaseModel): values: List[str] metrics: List[float] class ResultC(BaseModel): field1: str field2: str field3: float class TaskOutput(BaseModel): id: str processed_at: datetime result_a: Optional[ResultA] = None result_b: Optional[ResultB] = None result_c: Optional[ResultC] = None # Create workflow task async def run_task(ctx: TaskContext[TaskInput]) -> TaskOutput: # Process data based on task type output = TaskOutput( id=ctx.input.id, processed_at=datetime.now() ) if ctx.input.task_type == "typeA": output.result_a = await process_type_a(ctx.input) return output example_task = Task[TaskInput, TaskOutput]( "example-task", run_function=run_task, retries=3, timeout=30 # seconds ) ```