AggregatingMergeTree stores pre-aggregated values and/or aggregate states that are automatically merged during background compaction.
Use it with:
SimpleAggregateFunction for simple rollups (store merged values directly)AggregateFunction for complex aggregations (store aggregate states and merge them at read time)from typing import Annotatedfrom moose_lib import OlapTable, OlapConfig, simple_aggregated, AggregateFunctionfrom moose_lib.blocks import AggregatingMergeTreeEnginefrom pydantic import BaseModelfrom datetime import datetime class DailyStats(BaseModel): date: datetime user_id: str # SimpleAggregateFunction(sum, UInt64) total_views: simple_aggregated('sum', int) # SimpleAggregateFunction(max, Float64) max_score: simple_aggregated('max', float) # SimpleAggregateFunction(anyLast, DateTime) last_activity: simple_aggregated('anyLast', datetime) daily_stats = OlapTable[DailyStats]("daily_stats", OlapConfig( engine=AggregatingMergeTreeEngine(), order_by_fields=["date", "user_id"])) # AggregateFunction types model aggregate *states* (written via ...State() and read via ...Merge()).class MetricsById(BaseModel): id: str avg_rating: Annotated[float, AggregateFunction(agg_func="avg", param_types=[float])] daily_uniques: Annotated[int, AggregateFunction(agg_func="uniqExact", param_types=[str])] metrics_by_id = OlapTable[MetricsById]("metrics_by_id", OlapConfig( engine=AggregatingMergeTreeEngine(), order_by_fields=["id"]))Review the ClickHouse aggregate functions documentation for the complete list of supported functions.
Pre-aggregating with AggregatingMergeTree can reduce query latency by orders of magnitude for dashboard queries, since aggregation happens at insert time rather than query time.
A common pattern is populating an AggregatingMergeTree table from a materialized view:
import { OlapTable, MaterializedView, ClickHouseEngines, UInt64, SimpleAggregated, DateTime, sql} from "@514labs/moose-lib"; // Source: raw eventsinterface PageView { timestamp: DateTime; user_id: string; page: string; duration_ms: number;} const pageViews = new OlapTable<PageView>("page_views", { orderByFields: ["timestamp", "user_id"],}); // Target: daily aggregatesinterface DailyPageStats { date: DateTime; user_id: string; view_count: UInt64 & SimpleAggregated<"sum", UInt64>; total_duration: number & SimpleAggregated<"sum", number>; max_duration: number & SimpleAggregated<"max", number>;} const targetTable = new OlapTable<DailyPageStats>("daily_page_stats", { engine: ClickHouseEngines.AggregatingMergeTree, orderByFields: ["date", "user_id"],}); const mv = new MaterializedView<DailyPageStats>({ materializedViewName: "mv_daily_page_stats", selectTables: [pageViews], targetTable: targetTable, selectStatement: sql` SELECT toStartOfDay(${pageViews.columns.timestamp}) AS date, ${pageViews.columns.user_id} AS user_id, count() AS view_count, sum(${pageViews.columns.duration_ms}) AS total_duration, max(${pageViews.columns.duration_ms}) AS max_duration FROM ${pageViews} GROUP BY date, user_id `,});SimpleAggregateFunction column typesAggregateFunction / aggregate statesfrom moose_lib import ( OlapTable, OlapConfig, MaterializedView, MaterializedViewOptions, AggregatingMergeTreeEngine, simple_aggregated,)from pydantic import BaseModelfrom datetime import datetime # Source: raw eventsclass PageView(BaseModel): timestamp: datetime user_id: str page: str duration_ms: int # Target: daily aggregatesclass DailyPageStats(BaseModel): date: datetime user_id: str view_count: simple_aggregated('sum', int) total_duration: simple_aggregated('sum', int) max_duration: simple_aggregated('max', int) page_views = OlapTable[PageView]("page_views", OlapConfig( order_by_fields=["timestamp", "user_id"])) daily_stats = OlapTable[DailyPageStats]("daily_page_stats", OlapConfig( engine=AggregatingMergeTreeEngine(), order_by_fields=["date", "user_id"])) rollup_view = MaterializedView[DailyPageStats]( MaterializedViewOptions( materialized_view_name="mv_daily_page_stats", select_tables=[page_views], select_statement=""" SELECT toStartOfDay(timestamp) AS date, user_id, count() AS view_count, sum(duration_ms) AS total_duration, max(duration_ms) AS max_duration FROM {page_views} GROUP BY date, user_id """, ), target_table=daily_stats,)