Tables in Moose let you define your database schema entirely in code using native TypeScript/Python typing.
You can integrate tables into your pipelines as destinations for new data or as sources for analytics queries in your downstream transformations, APIs, and more.
from pydantic import BaseModelfrom moose_lib import Key, OlapTablefrom pydantic import BaseModel class MyFirstTable(BaseModel): id: Key[str] name: str age: int # Create a table named "first_table"my_table = OlapTable[MyFirstTable]("first_table") # No export needed - Python modules are automatically discoveredCreate a table directly for custom data flows or when you need fine-grained control:
from moose_lib import Key, OlapTablefrom pydantic import BaseModel class ExampleSchema(BaseModel): id: Key[str] date_field: Date numeric_field: float boolean_field: bool # Create a standalone table named "example_table"from moose_lib import OlapTable, OlapConfigfrom moose_lib.blocks import ReplacingMergeTreeEngine example_table = OlapTable[ExampleSchema]("example_table", OlapConfig( order_by_fields=["id", "date_field"], engine=ReplacingMergeTreeEngine()))For end-to-end data flows, create tables as part of an ingestion pipeline:
from moose_lib import IngestPipeline, Key, OlapTablefrom pydantic import BaseModel class UserEvent(BaseModel): id: Key[str] user_id: str timestamp: Date event_type: str from moose_lib import IngestPipeline, IngestPipelineConfig, OlapConfigfrom moose_lib.blocks import ReplacingMergeTreeEngine events_pipeline = IngestPipeline[UserEvent]("user_events", IngestPipelineConfig( ingest_api=True, # Creates a REST API endpoint at POST localhost:4000/ingest/user_events stream=True, # Creates a Kafka/Redpanda topic table=OlapConfig( # Creates and configures the table named "user_events" order_by_fields=["id", "timestamp"], engine=ReplacingMergeTreeEngine() ))) # Access the table component when needed:events_table = events_pipeline.get_table()from moose_lib import Key, clickhouse_decimal, ClickHouseNamedTuplefrom typing import Annotatedfrom pydantic import BaseModelfrom datetime import datetime class Customer(BaseModel): name: str address: str class Order(BaseModel): order_id: Key[str] amount: clickhouse_decimal(10, 2) status: Literal["Paid", "Shipped", "Delivered"] # translated to LowCardinality(String) in ClickHouse created_at: datetime customer: Annotated[Customer, "ClickHouseNamedTuple"]Use defaults instead of nullable columns to keep queries fast and schemas simple. You can specify defaults at the column level so Moose generates ClickHouse defaults in your table DDL.
from typing import Annotatedfrom pydantic import BaseModelfrom moose_lib import OlapTable, Key, clickhouse_default, clickhouse_decimalfrom datetime import datetime class Event(BaseModel): id: Key[str] # Static defaults status: Annotated[str, clickhouse_default("'pending'")] # DEFAULT 'pending' retries: Annotated[int, clickhouse_default("0")] # DEFAULT 0 # Server-side timestamps created_at: Annotated[datetime, clickhouse_default("now()")] # Decimal with default amount: Annotated[float, clickhouse_decimal(10, 2)] = 0 events = OlapTable[Event]("events", { "orderByFields": ["id", "created_at"],})The value passed into the clickhouse_default function can either be a string literal or a stringified ClickHouse SQL expression.
If a field is optional in your app model but you provide a ClickHouse default, Moose infers a non-nullable ClickHouse column with a DEFAULT clause.
clickhouse_default("18") in annotations) → non-nullable column with default 18.This lets you keep optional fields at the application layer while avoiding Nullable columns in ClickHouse when a server-side default exists.
Add documentation directly to your data model fields that propagates to ClickHouse COMMENT clauses. This makes your schema self-documenting and easier to understand for anyone querying the database.
Python supports two ways to add column comments:
Option 1: Field(description=...)
from pydantic import BaseModel, Fieldfrom moose_lib import Key, OlapTable, OlapConfigfrom datetime import datetime class UserEvent(BaseModel): id: Key[str] = Field(description="Unique identifier for the user event") timestamp: datetime = Field(description="Timestamp when the event occurred") email: str = Field(description="Email address of the user (must be valid format)") amount: float = Field(description="Total transaction amount in USD") # Fields without description have no COMMENT in DDL status: str user_event_table = OlapTable[UserEvent]( "user_events", OlapConfig(order_by_fields=["id", "timestamp"]),)Option 2: Attribute Docstrings
Use attribute docstrings with use_attribute_docstrings=True in your model config:
from pydantic import BaseModel, ConfigDictfrom moose_lib import Key, OlapTable, OlapConfigfrom datetime import datetime class UserEvent(BaseModel): model_config = ConfigDict(use_attribute_docstrings=True) id: Key[str] """Unique identifier for the user event.""" timestamp: datetime """Timestamp when the event occurred.""" email: str """Email address of the user (must be valid format).""" amount: float """Total transaction amount in USD.""" status: str # No docstring = no COMMENT in DDL user_event_table = OlapTable[UserEvent]( "user_events", OlapConfig(order_by_fields=["id", "timestamp"]),)Both approaches can be mixed in the same model.
The generated ClickHouse DDL includes column comments:
CREATE TABLE user_events ( `id` String COMMENT 'Unique identifier for the user event', `timestamp` DateTime COMMENT 'Timestamp when the event occurred', `email` String COMMENT 'Email address of the user (must be valid format)', `amount` Float64 COMMENT 'Total transaction amount in USD', `status` String) ENGINE = MergeTree()ORDER BY (id, timestamp)Query column comments using SELECT name, comment FROM system.columns WHERE table = 'user_events' or DESCRIBE TABLE user_events FORMAT Vertical.
By default, tables are created in the database specified in your moose.config.toml ClickHouse configuration. You can override this on a per-table basis using the database field:
from moose_lib import Key, OlapTable, OlapConfigfrom pydantic import BaseModel class UserData(BaseModel): id: Key[str] name: str email: str # Table in default database (from moose.config.toml)default_table = OlapTable[UserData]("users") # Table in specific database (e.g., "analytics")analytics_table = OlapTable[UserData]("users", OlapConfig( database="analytics", order_by_fields=["id"]))To use custom databases, configure them in your moose.config.toml:
[clickhouse_config]db_name = "local"additional_databases = ["analytics", "staging"]The databases in additional_databases will be created automatically when you start your Moose application.
You must configure table indexing using one of these approaches:
Key in your table schemaorderByFields in the table configKey fields must come first in the orderByFields array)from moose_lib import Key, OlapTablefrom pydantic import BaseModel class Record1(BaseModel): id: Key[str] # Primary key field field1: str field2: int table1 = OlapTable[Record1]("table1") # id is the primary keyLeverage the OlapConfig class to configure your table:
from moose_lib import Key, OlapTable, OlapConfigfrom pydantic import BaseModelfrom datetime import datetime class SchemaWithoutPrimaryKey(BaseModel): field1: str field2: int field3: datetime table2 = OlapTable[SchemaWithoutPrimaryKey]("table2", OlapConfig( order_by_fields=["field1", "field2"] # Specify ordering without primary key))Use a ClickHouse SQL expression to control ordering directly. This is useful for advanced patterns (functions, transformations) or when you want to disable sorting entirely.
from moose_lib import OlapTable, OlapConfigfrom pydantic import BaseModelfrom datetime import datetime class Events(BaseModel): user_id: str created_at: datetime event_type: str # Equivalent to order_by_fields=["user_id", "created_at", "event_type"]events = OlapTable[Events]("events", OlapConfig( order_by_expression="(user_id, created_at, event_type)",)) # Advanced: functions inside expressionevents_by_month = OlapTable[Events]("events_by_month", OlapConfig( order_by_expression="(user_id, toYYYYMM(created_at))",)) # No sortingunsorted = OlapTable[Events]("events_unsorted", OlapConfig( order_by_expression="tuple()",))Use a ClickHouse SQL expression to define the primary key explicitly. This is useful when:
cityHash64(id))Important: When primaryKeyExpression is specified, any Key<T> annotations on columns are ignored for PRIMARY KEY generation.
from moose_lib import OlapTable, OlapConfig, Keyfrom pydantic import BaseModelfrom datetime import datetime # Example 1: Primary key with functionclass UserEvents(BaseModel): user_id: str event_id: str timestamp: datetime events_table = OlapTable[UserEvents]("user_events", OlapConfig( # Use hash function in primary key for better distribution primary_key_expression="(user_id, cityHash64(event_id))", order_by_expression="(user_id, timestamp)",)) # Example 2: Different ordering in primary key vs columnsclass Product(BaseModel): category: str brand: str product_id: str name: str products_table = OlapTable[Product]("products", OlapConfig( # Primary key order optimized for uniqueness primary_key_expression="(product_id)", # Order by optimized for common queries order_by_fields=["category", "brand", "product_id"],)) # Example 3: Override Key[T] annotationclass Record(BaseModel): id: Key[str] # This Key[T] annotation will be IGNORED other_id: str record_table = OlapTable[Record]("records", OlapConfig( # This expression overrides the Key[T] annotation primary_key_expression="(other_id, id)", order_by_expression="(other_id, id)",))Rationale for Primary Key Expression:
Function Support: Primary keys can use ClickHouse functions like cityHash64() for better data distribution, which cannot be expressed through column-level annotations.
Flexible Ordering: The ordering of columns in the primary key can be different from the ordering in the schema definition, allowing optimization for both data uniqueness and query patterns.
Separation of Concerns: PRIMARY KEY and ORDER BY serve different purposes in ClickHouse:
Sometimes these need different column orderings for optimal performance.
Important Constraint:
⚠️ PRIMARY KEY must be a prefix of ORDER BY in ClickHouse. This means ORDER BY must start with all PRIMARY KEY columns in the same order.
Valid:
(userId) with ORDER BY (userId, timestamp) ✅(userId, cityHash64(eventId)) with ORDER BY (userId, cityHash64(eventId), timestamp) ✅Invalid:
(userId, eventId) with ORDER BY (userId, timestamp) ❌ (missing eventId)(userId, eventId) with ORDER BY (eventId, userId) ❌ (wrong order)from moose_lib import Key, OlapTable, OlapConfigfrom pydantic import BaseModel class SchemaWithKey(BaseModel): id: Key[str] field1: str field2: int table3 = OlapTable[SchemaWithKey]("table3", OlapConfig( order_by_fields=["id", "field1"] # Primary key must be first))from moose_lib import Key, OlapTable, OlapConfigfrom pydantic import BaseModel class MultiKeyRecord(BaseModel): key1: Key[str] key2: Key[int] field1: str multi_key_table = OlapTable[MultiKeyRecord]("multi_key_table", OlapConfig( order_by_fields=["key1", "key2", "field1"] # Multiple keys must come first))By default, Moose will create tables with the MergeTree engine. You can use different engines by setting the engine in the table configuration.
from moose_lib import OlapTable, OlapConfigfrom moose_lib.blocks import MergeTreeEngine, ReplacingMergeTreeEngine # Default MergeTree enginetable = OlapTable[Record]("table", OlapConfig( order_by_fields=["id"])) # Explicitly specify enginededup_table = OlapTable[Record]("table", OlapConfig( order_by_fields=["id"], engine=ReplacingMergeTreeEngine()))Use the ReplacingMergeTree engine to keep only the latest record for your designated sort key:
from moose_lib import Key, OlapTable, OlapConfigfrom moose_lib.blocks import ReplacingMergeTreeEngine class Record(BaseModel): id: Key[str] updated_at: str # Version column deleted: int = 0 # Soft delete marker (UInt8) # Basic deduplicationtable = OlapTable[Record]("table", OlapConfig( order_by_fields=["id"], engine=ReplacingMergeTreeEngine())) # With version column (keeps record with highest version)versioned_table = OlapTable[Record]("table", OlapConfig( order_by_fields=["id"], engine=ReplacingMergeTreeEngine(ver="updated_at"))) # With soft deletes (requires ver parameter)soft_delete_table = OlapTable[Record]("table", OlapConfig( order_by_fields=["id"], engine=ReplacingMergeTreeEngine( ver="updated_at", is_deleted="deleted" # UInt8 column: 1 marks row for deletion )))ClickHouse's ReplacingMergeTree engine runs deduplication in the background AFTER data is inserted into the table. This means that duplicate records may not be removed immediately.
Version Column (ver): When specified, ClickHouse keeps the row with the maximum version value for each unique sort key.
Soft Deletes (is_deleted): When specified along with ver, rows where this column equals 1 are deleted during merges. This column must be UInt8 type.
For more details, see the ClickHouse documentation.
Use the S3Queue engine to automatically ingest data from S3 buckets as files are added:
S3Queue tables only process new files added to S3 after table creation. When used as a source for materialized views, no backfill occurs - the MV will only start populating as new files arrive. See the Materialized Views documentation for more details.
from moose_lib import OlapTable, OlapConfigfrom moose_lib.blocks import S3QueueEngine class S3Event(BaseModel): id: str timestamp: datetime data: dict # Modern API using engine configurations3_events = OlapTable[S3Event]("s3_events", OlapConfig( engine=S3QueueEngine( s3_path="s3://my-bucket/data/*.json", format="JSONEachRow", # ⚠️ WARNING: See security callout below about credentials aws_access_key_id="AKIA...", aws_secret_access_key="secret..." ), settings={ "mode": "unordered", "keeper_path": "/clickhouse/s3queue/events" }))Security Risk: Hardcoding credentials in your code embeds them in Docker images and deployment artifacts, creating serious security vulnerabilities.
Solution: Use mooseRuntimeEnv for runtime credential resolution:
from moose_lib import OlapTable, OlapConfig, moose_runtime_envfrom moose_lib.blocks import S3QueueEngine # ✅ RECOMMENDED: Runtime environment variable resolutionsecure_s3_events = OlapTable[S3Event]("s3_events", OlapConfig( engine=S3QueueEngine( s3_path="s3://my-bucket/data/*.json", format="JSONEachRow", aws_access_key_id=moose_runtime_env.get("AWS_ACCESS_KEY_ID"), aws_secret_access_key=moose_runtime_env.get("AWS_SECRET_ACCESS_KEY") ), settings={ "mode": "unordered", "keeper_path": "/clickhouse/s3queue/events" }))Then set environment variables:
export AWS_ACCESS_KEY_ID="AKIA..."
export AWS_SECRET_ACCESS_KEY="your-secret-key"
moose prod upBenefits:
S3Queue requires ClickHouse 24.7+ and proper ZooKeeper/ClickHouse Keeper configuration for coordination between replicas. Files are processed exactly once across all replicas.
Use the S3 engine for direct read/write access to S3 storage without streaming semantics:
from moose_lib import OlapTable, OlapConfig, moose_runtime_envfrom moose_lib.blocks import S3Engine # S3 table with credentials (recommended with moose_runtime_env)s3_data = OlapTable[DataRecord]("s3_data", OlapConfig( engine=S3Engine( path="s3://my-bucket/data/file.json", format="JSONEachRow", aws_access_key_id=moose_runtime_env.get("AWS_ACCESS_KEY_ID"), aws_secret_access_key=moose_runtime_env.get("AWS_SECRET_ACCESS_KEY"), compression="gzip" ))) # Public S3 bucket (no authentication needed - just omit credentials)public_s3 = OlapTable[DataRecord]("public_s3", OlapConfig( engine=S3Engine( path="s3://public-bucket/data/*.parquet", format="Parquet" )))Both engines support the same credential management and format options.
The IcebergS3 engine provides read-only access to Iceberg tables stored in S3:
from moose_lib import OlapTable, OlapConfig, moose_runtime_envfrom moose_lib.blocks import IcebergS3Engine # Iceberg table with AWS credentials (recommended with moose_runtime_env)iceberg_events = OlapTable[Event]("iceberg_events", OlapConfig( engine=IcebergS3Engine( path="s3://my-bucket/warehouse/db/table/", format="Parquet", # or "ORC" aws_access_key_id=moose_runtime_env.get("AWS_ACCESS_KEY_ID"), aws_secret_access_key=moose_runtime_env.get("AWS_SECRET_ACCESS_KEY"), )))orderByFields, orderByExpression, partitionBy, and sampleByExpression are not supportedThe Buffer engine provides an in-memory buffer that flushes data to a destination table based on time, row count, or size thresholds:
from moose_lib import OlapTable, OlapConfigfrom moose_lib.blocks import MergeTreeEngine, BufferEngine # First create the destination tabledestination_table = OlapTable[Record]("destination", OlapConfig( engine=MergeTreeEngine(), order_by_fields=["id", "timestamp"])) # Then create buffer that writes to itbuffer_table = OlapTable[Record]("buffer", OlapConfig( engine=BufferEngine( target_database="local", target_table="destination", num_layers=16, min_time=10, # Min 10 seconds before flush max_time=100, # Max 100 seconds before flush min_rows=10000, # Min 10k rows before flush max_rows=1000000, # Max 1M rows before flush min_bytes=10485760, # Min 10MB before flush max_bytes=104857600 # Max 100MB before flush )))orderByFields, partitionBy, or sampleByExpression on buffer tablesFor more details, see the ClickHouse Buffer documentation.
The Distributed engine creates a distributed table across a ClickHouse cluster for horizontal scaling:
from moose_lib import OlapTable, OlapConfigfrom moose_lib.blocks import DistributedEngine # Distributed table across clusterdistributed_table = OlapTable[Record]("distributed_data", OlapConfig( engine=DistributedEngine( cluster="my_cluster", target_database="default", target_table="local_table", sharding_key="cityHash64(id)" # Optional: how to distribute data )))orderByFields, partitionBy, or sampleByExpression on distributed tablescluster name must match a cluster defined in your ClickHouse configurationFor more details, see the ClickHouse Distributed documentation.
Replicated engines provide high availability and data replication across multiple ClickHouse nodes. Moose supports all standard replicated MergeTree variants:
ReplicatedMergeTree - Replicated version of MergeTreeReplicatedReplacingMergeTree - Replicated with deduplicationReplicatedAggregatingMergeTree - Replicated with aggregationReplicatedSummingMergeTree - Replicated with summationfrom moose_lib import OlapTable, OlapConfigfrom moose_lib.blocks import ( ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine) class Record(BaseModel): id: Key[str] updated_at: datetime deleted: int = 0 # Basic replicated table with explicit pathsreplicated_table = OlapTable[Record]("records", OlapConfig( order_by_fields=["id"], engine=ReplicatedMergeTreeEngine( keeper_path="/clickhouse/tables/{database}/{shard}/records", replica_name="{replica}" ))) # Replicated with deduplicationreplicated_dedup = OlapTable[Record]("dedup_records", OlapConfig( order_by_fields=["id"], engine=ReplicatedReplacingMergeTreeEngine( keeper_path="/clickhouse/tables/{database}/{shard}/dedup_records", replica_name="{replica}", ver="updated_at", is_deleted="deleted" ))) # For ClickHouse Cloud or Boreal (no parameters needed)cloud_replicated = OlapTable[Record]("cloud_records", OlapConfig( order_by_fields=["id"], engine=ReplicatedMergeTreeEngine()))Replicated engines support three configuration approaches. Choose the one that fits your deployment:
Omit all replication parameters. Moose uses smart defaults that work in both ClickHouse Cloud and self-managed environments:
table = OlapTable[Record]("my_table", OlapConfig( engine=ReplicatedMergeTreeEngine(), # No parameters order_by_fields=["id"]))Moose auto-injects: /clickhouse/tables/{database}/{shard}/{table_name} and {replica} in local development. ClickHouse Cloud uses its own patterns automatically.
For multi-node deployments, specify a cluster name to use ON CLUSTER DDL operations:
table = OlapTable[Record]("my_table", OlapConfig( engine=ReplicatedMergeTreeEngine(), order_by_fields=["id"], cluster="default" # References cluster from moose.config.toml))Configuration in moose.config.toml:
[[clickhouse_config.clusters]]name = "default"Use when:
ON CLUSTER DDL for distributed operationsFor custom replication topology, specify both keeper_path and replica_name:
table = OlapTable[Record]("my_table", OlapConfig( engine=ReplicatedMergeTreeEngine( keeper_path="/clickhouse/tables/{database}/{shard}/my_table", replica_name="{replica}" ), order_by_fields=["id"]))Use when:
Cannot mix approaches: Specifying both cluster and explicit keeper_path/replica_name will cause an error. Choose one approach.
Cluster is a deployment directive: Changing cluster won't recreate your table—it only affects future DDL operations.
For more details, see the ClickHouse documentation on data replication.
Use the Kafka engine to consume data directly from Kafka compatible topics.
For more details on the Kafka engine, see the ClickHouse documentation on Kafka integration.
Kafka tables are streaming interfaces that don't persist data. Use a MaterializedView to continuously move data to a table.
from moose_lib import OlapTable, OlapConfigfrom moose_lib.blocks import KafkaEnginefrom moose_lib.dmv2 import MaterializedView, MaterializedViewOptions class KafkaEvent(BaseModel): event_id: str user_id: str timestamp: int # Unix seconds for JSONEachRow # Kafka table - reads from topic, doesn't persistkafka_source = OlapTable[KafkaEvent]("kafka_events", OlapConfig( engine=KafkaEngine( broker_list="redpanda:9092", topic_list="events", group_name="my_consumer_group", format="JSONEachRow" ), settings={"kafka_num_consumers": "1"})) # MaterializedView moves data to MergeTree for persistenceevents_mv = MaterializedView[KafkaEvent](MaterializedViewOptions( select_statement="SELECT event_id, user_id, timestamp FROM kafka_events", select_tables=[kafka_source], table_name="events_dest", materialized_view_name="events_mv", order_by_fields=["event_id"],))orderByFieldsIf a ClickHouse column name isn't a valid Python identifier or starts with an underscore, you can use a safe Python field name and set a Pydantic alias to the real column name. MooseOLAP then uses the alias for ClickHouse DDL and data mapping, so your model remains valid while preserving the true column name.
from pydantic import BaseModel, Field class CHUser(BaseModel): # ClickHouse: "_id" → safe Python attribute with alias UNDERSCORE_PREFIXED_id: str = Field(alias="_id") # ClickHouse: "user name" → replace spaces, keep alias user_name: str = Field(alias="user name")If you have a table that is managed by an external system (e.g Change Data Capture like ClickPipes), you can still use Moose to query it. You can set the config in the table config to set the lifecycle to EXTERNALLY_MANAGED.
from moose_lib import OlapTable, OlapConfig, LifeCycle # Table managed by external systemexternal_table = OlapTable[UserData]("external_users", OlapConfig( order_by_fields=["id", "timestamp"], life_cycle=LifeCycle.EXTERNALLY_MANAGED # Moose won't create or modify this table in prod mode))Learn more about the different lifecycle options and how to use them in the LifeCycle Management documentation.
from moose_lib import Key, OlapTable, OlapConfigfrom typing import Optional class BadRecord1(BaseModel): field1: str field2: int bad_table1 = OlapTable[BadRecord1]("bad_table1") ## No primary key or orderByFields class BadRecord2(BaseModel): id: Key[str] field1: str bad_table2 = OlapTable[BadRecord2]("bad_table2", OlapConfig( order_by_fields=["field1", "id"] # Wrong order - primary key must be first)) class BadRecord3(BaseModel): id: Key[str] field1: str field2: Optional[int] bad_table3 = OlapTable[BadRecord3]("bad_table3", OlapConfig( order_by_fields=["id", "field2"] # Can't have nullable field in orderByFields))One of the powerful features of Moose is its integration with the local development server:
moose devOlapTable in your code and save the file:
For example, if you add a new field to your schema:
# Beforeclass BasicSchema(BaseModel): id: Key[str] name: str # After adding a fieldclass BasicSchema(BaseModel): id: Key[str] name: str created_at: datetimeThe Moose framework will:
You can verify your tables were created correctly using:
# List all tables in your local environment
moose lsYou can connect to your local ClickHouse instance with your favorite database client. Your credentials are located in your moose.config.toml file:
[clickhouse_config]db_name = "local"user = "panda"password = "pandapass"use_ssl = falsehost = "localhost"host_port = 18123native_port = 9000