# Moose / Olap / Model Materialized View Documentation – Python ## Included Files 1. moose/olap/model-materialized-view/model-materialized-view.mdx ## Creating Materialized Views Source: moose/olap/model-materialized-view/model-materialized-view.mdx Create and configure materialized views for data transformations # Modeling Materialized Views ## Overview Materialized views are write-time transformations in ClickHouse. A static `SELECT` populates a destination table from one or more sources. You query the destination like any other table. The `MaterializedView` class wraps [ClickHouse `MATERIALIZED VIEW`](https://clickhouse.com/docs/en/sql-reference/statements/create/view/#create-materialized-view) and keeps the `SELECT` explicit. When you edit the destination schema in code and update the `SELECT` accordingly, Moose applies the corresponding DDL, orders dependent updates, and backfills as needed, so the pipeline stays consistent as you iterate. In local dev, Moose Migrate generates and applies DDL to your local database. Today, destination schemas are declared in code and kept in sync manually with your `SELECT`. Moose Migrate coordinates DDL and dependencies when you make those changes. A future enhancement will infer the destination schema from the `SELECT` and update it automatically. This dependency awareness is critical for [cascading materialized views](https://clickhouse.com/docs/en/sql-reference/statements/create/view/#create-materialized-view-with-dependencies). Moose Migrate [orders DDL across views and tables](https://www.fiveonefour.com/blog/Moose-SQL-Getting-DDL-Dependencies-in-Order) to avoid failed migrations and partial states. ### Basic Usage ```python filename="BasicUsage.py" copy from moose_lib import MaterializedView, MaterializedViewOptions, ClickHouseEngines from source_table import source_table # Define the schema of the transformed rows-- this is static and it must match the results of your SELECT. It also represents the schema of your entire destination table. class TargetSchema(BaseModel): id: str average_rating: float num_reviews: int mv = MaterializedView[TargetSchema](MaterializedViewOptions( # The transformation to run on the source table select_statement=""" SELECT {source_table.columns.id}, avg({source_table.columns.rating}) AS average_rating, count(*) AS num_reviews FROM {source_table} GROUP BY {source_table.columns.id} """, # Reference to the source table(s) that the SELECT reads from select_tables=[source_table], # Creates a new OlapTable named "target_table" where the transformed rows are written to. table_name="target_table", order_by_fields=["id"], # The name of the materialized view in ClickHouse materialized_view_name="mv_to_target_table", )) ``` The ClickHouse `MATERIALIZED VIEW` object acts like a trigger: on new inserts into the source table(s), it runs the SELECT and writes the transformed rows to the destination. ### Quick Reference ```python filename="ViewOptions.py" copy from moose_lib import MaterializedView, sql from source_table import source_table class MaterializedViewOptions(BaseModel): select_statement: str table_name: str materialized_view_name: str select_tables: List[OlapTable | View] engine: ClickHouseEngines = ClickHouseEngines.MergeTree order_by_fields: List[str] = [] ``` ## Modeling the Target Table The destination table is where the transformed rows are written by the materialized view. You can model it in two ways: ### Option 1 — Define target table inside the MaterializedView (most cases) - Simple, co-located lifecycle: the destination table is created/updated/dropped with the MV. - Best for: projection/denormalization, filtered serving tables, enrichment joins, and most rollups. ```python filename="InlineTarget.py" copy from pydantic import BaseModel from moose_lib import MaterializedView, MaterializedViewOptions class TargetSchema(BaseModel): id: str value: int mv = MaterializedView[TargetSchema](MaterializedViewOptions( select_statement=""" SELECT {source_table.columns.id}, toInt32({source_table.columns.value}) AS value FROM {source_table} """, select_tables=[source_table], table_name="serving_table", order_by_fields=["id"], materialized_view_name="mv_to_serving_table", )) ``` ### Option 2 — Decoupled: reference a standalone `OlapTable` Certain use cases may benefit from a separate lifecycle for the target table that is managed independently from the MV. ```python filename="DecoupledTarget.py" copy from pydantic import BaseModel from moose_lib import MaterializedView, MaterializedViewOptions, OlapConfig, ClickHouseEngines class TargetSchema(BaseModel): id: str value: int # Create the standalone table target_table = OlapTable[TargetSchema](OlapConfig( name="target_table", engine=ClickHouseEngines.MergeTree, order_by_fields=["id"], )) mv = MaterializedView[TargetSchema](MaterializedViewOptions( select_statement=""" SELECT {source_table.columns.id}, toInt32({source_table.columns.value}) AS value FROM {source_table} """, select_tables=[source_table], materialized_view_name="mv_to_target_table", ), target_table=target_table) ``` ### Basic Transformation, Cleaning, Filtering, Denormalization Create a narrower, query-optimized table from a wide source. Apply light transforms (cast, rename, parse) at write time. ```python filename="Denormalization.py" copy from pydantic import BaseModel from moose_lib import MaterializedView, MaterializedViewOptions class Dest(BaseModel): id: str value: int created_at: str mv = MaterializedView[Dest](MaterializedViewOptions( select_statement=""" SELECT {source_table.columns.id}, toInt32({source_table.columns.value}) AS value, {source_table.columns.created_at} AS created_at FROM {source_table} WHERE active = 1 """, select_tables=[source_table], table_name="proj_table", order_by_fields=["id"], materialized_view_name="mv_to_proj_table", )) ``` ### Aggregations ### Simple Additive Rollups When you want to maintain running sums (counts, totals) that are additive per key, use the `SummingMergeTree` engine: ```python filename="Summing.py" copy from pydantic import BaseModel from moose_lib import MaterializedView, MaterializedViewOptions, ClickHouseEngines class DailyCounts(BaseModel): day: str user_id: str events: int stmt = """ SELECT toDate({events.columns.timestamp}) AS day, {events.columns.user_id} AS user_id, count(*) AS events FROM {events} GROUP BY day, user_id """ mv = MaterializedView[DailyCounts](MaterializedViewOptions( select_statement=STMT, select_tables=[events], table_name="daily_counts", engine=ClickHouseEngines.SummingMergeTree, order_by_fields=["day", "user_id"], materialized_view_name="mv_to_daily_counts", )) ``` #### Complex Aggregations When you want to compute complex aggregation metrics that are not just simple additive operations (sum, count, avg, etc), but instead uses more complex anlaytical functions: (topK,percentile, etc), create a target table with the `AggregatingMergeTree` engine. ```python filename="AggTransform.py" copy from typing import Annotated from pydantic import BaseModel from moose_lib import MaterializedView, AggregateFunction, MaterializedViewOptions, ClickHouseEngines class MetricsById(BaseModel): id: str avg_rating: Annotated[float, AggregateFunction(agg_func="avg", param_types=[float])] daily_uniques: Annotated[int, AggregateFunction(agg_func="uniqExact", param_types=[str])] # The SELECT must output aggregate states STMT = """ SELECT id, avgState(${events.columns.rating}) AS avg_rating, uniqExactState(${events.columns.user_id}) AS daily_uniques FROM ${events} GROUP BY ${events.columns.id} """ # Create the MV (engine config shown in TS example) mv = MaterializedView[MetricsById](MaterializedViewOptions( select_statement=STMT, table_name="metrics_by_id", materialized_view_name="mv_metrics_by_id", engine=ClickHouseEngines.AggregatingMergeTree, order_by_fields=["id"], select_tables=[events], )) ``` Jump to the [Advanced: AggregatingMergeTree transformations](#advanced-aggregatingmergetree-transformations) section for more details. ### Fan-in Patterns When you have multiple sources that you want to merge into a single destination table, its best to create an OlapTable and reference it in each MV that needs to write to it: ```python filename="FanIn.py" copy from pydantic import BaseModel from moose_lib import MaterializedView, MaterializedViewOptions, OlapConfig, ClickHouseEngines class DailyCounts(BaseModel): day: str user_id: str events: int # Create the destination table explicitly daily = OlapTable[DailyCounts]("daily_counts", OlapConfig( engine=ClickHouseEngines.SummingMergeTree, order_by_fields=["day", "user_id"], )) # MV 1 - write to the daily_counts table mv1 = MaterializedView[DailyCounts](MaterializedViewOptions( select_statement="SELECT toDate(ts) AS day, user_id, 1 AS events FROM {webEvents}", select_tables=[webEvents], materialized_view_name="mv_web_to_daily_counts", ), target_table=daily) # MV 2 - write to the daily_counts table mv2 = MaterializedView[DailyCounts](MaterializedViewOptions( select_statement="SELECT toDate(ts) AS day, user_id, 1 AS events FROM {mobileEvents}", select_tables=[mobileEvents], materialized_view_name="mv_mobile_to_daily_counts", ), target_table=daily) ``` ### Blue/green schema migrations Create a new table for a breaking schema change and use an MV to copy data from the old table; when complete, switch reads to the new table and drop just the MV and old table. For more information on how to use materialized views to perform blue/green schema migrations, see the [Schema Versioning](./schema-versioning) guide. ## Defining the transformation The `select_statement` is a static SQL query that Moose runs to transform data from your source table(s) into rows for the destination table. Transformations are defined as ClickHouse SQL queries. We strongly recommend using the ClickHouse SQL reference and functions overview to help you develop your transformations. You can use f-strings to interpolate tables and columns identifiers to your queries. Since these are static, you don't need to worry about SQL injection. ```python filename="Transformation.py" copy from pydantic import BaseModel from moose_lib import MaterializedView, MaterializedViewOptions, OlapConfig class Dest(BaseModel): id: str name: str day: str mv = MaterializedView[Dest](MaterializedViewOptions( select_statement=""" SELECT {events.columns.id} AS id, {events.columns.name} AS name, toDate({events.columns.ts}) AS day FROM {events} JOIN {users} ON {events.columns.user_id} = {users.columns.id} WHERE {events.columns.active} = 1 """, select_tables=[events, users], order_by_fields=["id"], table_name="user_activity_by_day", materialized_view_name="mv_user_activity_by_day", )) ``` The columns returned by your `SELECT` must exactly match the destination table schema. - Use column aliases (`AS target_column_name`) to align names. - All destination columns must be present in the `SELECT`, or the materialized view won't be created. Adjust your transformation or table schema so they match. Go to the [Advanced: Writing SELECT statements to Aggregated tables](#writing-select-statements-to-aggregated-tables) section for more details. ## Backfill Destination Tables When the MaterializedView is created, Moose backfills the destination once by running your `SELECT` (so you start with a fully populated table). Materialized views that source from S3Queue tables are **not backfilled** automatically. S3Queue tables only process new files added to S3 after the table is created - there is no historical data to backfill from. The MV will start populating as new files arrive in S3. You can see the SQL that Moose will run to backfill the destination table when you generate the [Migration Plan](./migration-plan). During dev mode, as soon as you save the MaterializedView, Moose will run the backfill and you can see the results in the destination table by querying it in your local ClickHouse instance. ## Query Destination Tables You can query the destination table like any other table. For inline or decoupled target tables, you can reference target table columns and tables directly in your queries: ```python filename="Query.py" copy # Query inline destination table by name QUERY = """ SELECT {mv.target_table.columns.id}, {mv.target_table.columns.value} FROM {mv.target_table} ORDER BY {mv.target_table.columns.id} LIMIT 10 """ ``` If you define your target table outside of the MaterializedView, you can also just reference the table by its variable name in your queries: ```python filename="QueryDecoupled.py" copy # Query the standalone destination table by name target_table = OlapTable[TargetTable](OlapConfig( name="target_table", engine=ClickHouseEngines.MergeTree, order_by_fields=["id"], )) QUERY = """ SELECT {target_table.columns.id}, {target_table.columns.average_rating} FROM {target_table} WHERE {target_table.columns.id} = 'abc' """ ``` Go to the [Querying Aggregated tables](#querying-aggregated-tables) section for more details on how to query Aggregated tables. ## Advanced: Aggregations + Materialized Views This section dives deeper into advanced patterns and tradeoffs when building aggregated materialized views. ### Target Tables with `AggregatingMergeTree` When using an `AggregatingMergeTree` target table, you must use the `AggregateFunction` type to model the result of the aggregation functions: ```python filename="AggTransform.py" copy from typing import Annotated, TypedDict from moose_lib import MaterializedView, AggregateFunction, MaterializedViewOptions class MetricsById(TypedDict): id: Key[str] # avg_rating stores result of avgState(events.rating) # daily_uniques stores result of uniqExactState(events.user_id) # - uniqExact returns an integer; use number & ClickHouseInt<"uint64"> for precision # - Aggregated arg type is [string] because the column (events.user_id) is a string # - Aggregated function name is "uniqExact" avg_rating: Annotated[float, AggregateFunction(agg_func="avg", param_types=[float])] # daily_uniques stores result of uniqExactState(events.user_id) # - uniqExact returns an integer; Annotated[int, ...] to model this result type # - Aggregated function name is "uniqExact" # - The column we are aggregating (events.user_id) is a string, so the Aggregated arg type is [string]. daily_uniques: Annotated[int, AggregateFunction(agg_func="uniqExact", param_types=[str])] # The SELECT must output aggregate states STMT = """ SELECT id, avgState(${events.columns.rating}) AS avg_rating, uniqExactState(${events.columns.user_id}) AS daily_uniques FROM ${events} GROUP BY ${events.columns.id} """ # Create the MV (engine config shown in TS example) mv = MaterializedView[MetricsById](MaterializedViewOptions( select_statement=STMT, table_name="metrics_by_id", materialized_view_name="mv_metrics_by_id", select_tables=[events], )) ``` - Using `avg()`/`uniqExact()` in the SELECT instead of `avgState()`/`uniqExactState()` - Forgetting to annotate the schema with `AggregateFunction(...)` so the target table can be created correctly - Mismatch between `GROUP BY` keys in your `SELECT` and the `order_by_fields` of your target table ### Modeling columns with `AggregateFunction` - Pattern: `Annotated[U, AggregateFunction(agg_func="avg", param_types=[float])]` - `U` is the read-time type (e.g., `float`, `int`) - `agg_func` is the aggregation name (e.g., `avg`, `uniqExact`) - `param_types` are the argument types. These are the types of the columns that are being aggregated. ```python filename="FunctionToTypeMapping.py" copy Annotated[int, Aggregated["avg", [int]]] # avgState(col: int) Annotated[int, Aggregated["uniqExact", [str]]] # uniqExactState(col: str) Annotated[int, Aggregated["count", []]] # countState(col: any) Annotated[str, Aggregated["argMax", [str, datetime]]] # argMaxState(col: str, value: datetime) Annotated[str, Aggregated["argMin", [str, datetime]]] # argMinState(col: str, value: datetime) Annotated[float, Aggregated["corr", [float, float]]] # corrState(col1: float, col2: float) Annotated[float, Aggregated["quantiles", [float]]] # quantilesState(levels: float, value: float) ``` ### Writing SELECT statements to Aggregated tables When you write to an `AggregatingMergeTree` table, you must add a `State` suffix to the aggregation functions in your `SELECT` statement. ```python filename="AggTransform.py" copy from pydantic import BaseModel from typing import Annotated from moose_lib import MaterializedView, ClickHouseEngines, AggregateFunction, MaterializedViewOptions class MetricsById(BaseModel): id: str avg_rating: Annotated[float, AggregateFunction(agg_func="avg", param_types=[float])] total_reviews: Annotated[int, AggregateFunction(agg_func="sum", param_types=[int])] agg_stmt = ''' SELECT {reviews.columns.id} AS id, avgState({reviews.columns.rating}) AS avg_rating, countState({reviews.columns.id}) AS total_reviews FROM {reviews} GROUP BY {reviews.columns.id} ''' mv = MaterializedView[MetricsById](MaterializedViewOptions( select_statement=agg_stmt, select_tables=[reviews], table_name="metrics_by_id", engine=ClickHouseEngines.AggregatingMergeTree, order_by_fields=["id"], materialized_view_name="mv_metrics_by_id", )) ``` Why states? Finalized values (e.g., `avg()`) are not incrementally mergeable. Storing states lets ClickHouse maintain results efficiently as new data arrives. Docs: https://clickhouse.com/docs/en/sql-reference/aggregate-functions/index and https://clickhouse.com/docs/en/sql-reference/aggregate-functions/combinators#-state ### Querying Aggregated Tables When you query a table with an `AggregatingMergeTree` engine, you must use aggregate functions with the `Merge` suffix (e.g., `avgMerge`) ```python filename="QueryAgg.py" copy # Manual finalization using ...Merge QUERY = """ SELECT avgMerge(avg_rating) AS avg_rating, countMerge(total_reviews) AS total_reviews FROM metrics_by_id WHERE id = '123' """ ``` ## Choosing the right engine - Use `MergeTree` for copies/filters/enrichment without aggregation semantics. - Use `SummingMergeTree` when all measures are additive, and you want compact, eventually-consistent sums. - Use `AggregatingMergeTree` for non-additive metrics and advanced functions; store states and finalize on read. - Use `ReplacingMergeTree` for dedup/upserts or as an idempotent staging layer before rollups.