This is a comprehensive reference for moose_lib, detailing all exported components, types, and utilities.
A type annotation for marking fields as primary keys in data models. Used with Pydantic.
from moose_lib import Keyfrom pydantic import BaseModel class MyModel(BaseModel): id: Key[str] # Marks 'id' as a primary key of type stringNot applicable in Python - JWT handling is done through standard Python types.
Utilities for analytics APIs are provided directly through function parameters in Python.
Pydantic base model used for data modeling in Moose.
from pydantic import BaseModel class MyDataModel(BaseModel): id: str name: str count: intClass representing the result of a analytics API call.
@dataclassclass ApiResult: status: int # HTTP status code body: Any # Response bodyClient for interacting with ClickHouse and Temporal.
class MooseClient: query: QueryClient # For database queries workflow: Optional[WorkflowClient] # For workflow operationsConfiguration for OLAP tables.
from typing import Union, Optionalfrom moose_lib.blocks import EngineConfig class OlapConfig(BaseModel): database: Optional[str] = None # Optional database name (defaults to moose.config.toml clickhouse_config.db_name) order_by_fields: list[str] = [] # Fields to order by engine: Optional[EngineConfig] = None # Table engine configurationCreates a ClickHouse table with the schema of type T.
from moose_lib import OlapTable, OlapConfigfrom moose_lib.blocks import ReplacingMergeTreeEngine # Basic usagemy_table = OlapTable[UserProfile]("user_profiles") # With configuration (fields)my_table = OlapTable[UserProfile]("user_profiles", OlapConfig( order_by_fields=["id", "timestamp"], engine=ReplacingMergeTreeEngine(),)) # With configuration (expression)my_table_expr = OlapTable[UserProfile]("user_profiles_expr", OlapConfig( order_by_expression="(id, timestamp)", engine=ReplacingMergeTreeEngine(),)) # Disable sorting entirelymy_table_unsorted = OlapTable[UserProfile]("user_profiles_unsorted", OlapConfig( order_by_expression="tuple()",))Creates a Redpanda topic with the schema of type T.
# Basic usagemy_stream = Stream[UserEvent]("user_events") # With configurationmy_stream = Stream[UserEvent]("user_events", StreamConfig( parallelism=3, retention_period=86400 # 1 day in seconds)) # Adding transformationsdef transform_user_event(event: UserEvent) -> ProfileUpdate: return ProfileUpdate(user_id=event.user_id, update_type="event") my_stream.add_transform(profile_stream, transform_user_event)Creates an HTTP endpoint for ingesting data of type T.
# Basic usage with destination streammy_ingest_api = IngestApi[UserEvent]("user_events", IngestConfigWithDestination( destination=my_user_event_stream))Creates an HTTP endpoint for querying data with request type T and response type U.
# Basic usagedef get_user_profiles(params: UserQuery) -> list[UserProfile]: # Query implementation return [UserProfile(...), UserProfile(...)] my_api = Api[UserQuery, list[UserProfile]]( "get_user_profiles", get_user_profiles)Combines ingest API, stream, and table creation in a single component.
from moose_lib import IngestPipeline, IngestPipelineConfig, StreamConfig, OlapConfigfrom moose_lib.blocks import ReplacingMergeTreeEngine # Basic usagepipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig( ingest_api=True, stream=True, table=True)) # With advanced configurationpipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig( ingest_api=True, stream=StreamConfig(parallelism=3), table=OlapConfig( order_by_fields=["id", "timestamp"], engine=ReplacingMergeTreeEngine(), )))Creates a materialized view in ClickHouse.
# Basic usageview = MaterializedView[UserStatistics](MaterializedViewOptions( select_statement="SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id", table_name="user_events", materialized_view_name="user_statistics", order_by_fields=["user_id"]))Creates a standard SQL view in ClickHouse. Views are read-time projections defined by a SELECT statement over one or more base tables or other views.
from moose_lib import View # Basic usageactive_user_events = View( "active_user_events", """ SELECT events.id AS event_id, users.id AS user_id, users.name AS user_name, events.ts AS ts FROM events JOIN users ON events.user_id = users.id WHERE users.active = 1 """, [events, users])Note: Use View for read-time projections. For write-time transformations with separate storage, use MaterializedView instead.
Python uses formatted strings with parameter substitution for SQL queries.
# Python uses formatted strings with execute methodquery = """ SELECT * FROM users WHERE age > {min_age} AND country = {country} LIMIT {limit}"""rows = client.query.execute(query, {"min_age": min_age, "country": country, "limit": limit})Available table engines:
enum ClickHouseEngines { MergeTree = "MergeTree", ReplacingMergeTree = "ReplacingMergeTree", AggregatingMergeTree = "AggregatingMergeTree", SummingMergeTree = "SummingMergeTree", ReplicatedMergeTree = "ReplicatedMergeTree", ReplicatedReplacingMergeTree = "ReplicatedReplacingMergeTree", ReplicatedAggregatingMergeTree = "ReplicatedAggregatingMergeTree", ReplicatedSummingMergeTree = "ReplicatedSummingMergeTree", S3Queue = "S3Queue"}Configuration for ReplacingMergeTree tables:
type ReplacingMergeTreeConfig<T> = { engine: ClickHouseEngines.ReplacingMergeTree; orderByFields?: (keyof T & string)[]; ver?: keyof T & string; // Optional: version column for keeping latest isDeleted?: keyof T & string; // Optional: soft delete marker (requires ver) settings?: { [key: string]: string };}Configuration for replicated table engines:
// ReplicatedMergeTreetype ReplicatedMergeTreeConfig<T> = { engine: ClickHouseEngines.ReplicatedMergeTree; keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud) replicaName?: string; // Optional: replica name (omit for Cloud) orderByFields?: (keyof T & string)[]; settings?: { [key: string]: string };} // ReplicatedReplacingMergeTreetype ReplicatedReplacingMergeTreeConfig<T> = { engine: ClickHouseEngines.ReplicatedReplacingMergeTree; keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud) replicaName?: string; // Optional: replica name (omit for Cloud) ver?: keyof T & string; // Optional: version column isDeleted?: keyof T & string; // Optional: soft delete marker orderByFields?: (keyof T & string)[]; settings?: { [key: string]: string };} // ReplicatedAggregatingMergeTreetype ReplicatedAggregatingMergeTreeConfig<T> = { engine: ClickHouseEngines.ReplicatedAggregatingMergeTree; keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud) replicaName?: string; // Optional: replica name (omit for Cloud) orderByFields?: (keyof T & string)[]; settings?: { [key: string]: string };} // ReplicatedSummingMergeTreetype ReplicatedSummingMergeTreeConfig<T> = { engine: ClickHouseEngines.ReplicatedSummingMergeTree; keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud) replicaName?: string; // Optional: replica name (omit for Cloud) columns?: string[]; // Optional: columns to sum orderByFields?: (keyof T & string)[]; settings?: { [key: string]: string };}A class that represents a single task within a workflow system, with typed input and output.
from moose_lib import Task, TaskConfig, TaskContextfrom pydantic import BaseModel # Define input and output modelsclass InputData(BaseModel): user_id: str class OutputData(BaseModel): result: str status: bool # Task with input and outputdef process_user(ctx: TaskContext[InputData]) -> OutputData: # Process the user data return OutputData(result=f"Processed {ctx.input.user_id}", status=True) user_task = Task[InputData, OutputData]( name="process_user", config=TaskConfig( run=process_user, retries=3, timeout="30s" ))Configuration for a Task.
@dataclasses.dataclassclass TaskConfig(Generic[T, U]): # The handler function that executes the task logic # Can be any of: () -> None, () -> U, (T) -> None, or (T) -> U depending on input/output types run: TaskRunFunc[T, U] # Optional list of tasks to run after this task completes on_complete: Optional[list[Task[U, Any]]] = None # Optional function that is called when the task is cancelled on_cancel: Optional[Callable[[TaskContext[T_none]], Union[None, Awaitable[None]]]] = None # Optional timeout string (e.g. "5m", "1h", "never") timeout: Optional[str] = None # Optional number of retry attempts retries: Optional[int] = NoneRepresents a workflow composed of one or more tasks.
from moose_lib import Workflow, WorkflowConfig # Create a workflow that starts with the fetch_taskdata_workflow = Workflow( name="data_processing", config=WorkflowConfig( starting_task=fetch_task, schedule="@every 1h", # Run every hour timeout="10m", # Timeout after 10 minutes retries=2 # Retry up to 2 times if it fails ))Configuration for a workflow.
@dataclasses.dataclassclass WorkflowConfig: # The first task to execute in the workflow starting_task: Task[Any, Any] # Optional number of retry attempts for the entire workflow retries: Optional[int] = None # Optional timeout string for the entire workflow timeout: Optional[str] = None # Optional cron-like schedule string for recurring execution schedule: Optional[str] = NoneEnsure your Infrastructure Components is correctly imported into your main.py file.
Example (TypeScript equivalent): export { myTable, myStream, myApi, myWorkflow, myTask, myPipeline, myMaterializedView, myView }
Learn more about export pattern: local development / hosted.
Important: The following components must be exported from your app/index.ts (TypeScript) or imported into main.py (Python) for Moose to detect them:
OlapTable instancesStream instancesIngestApi instancesApi instancesIngestPipeline instancesMaterializedView instancesView instancesTask instancesWorkflow instancesConfiguration objects and utilities (like DeadLetterQueue, Key, sql) do not need to be exported as they are used as dependencies of the main components.
OlapTable instancesStream instancesIngestApi instancesType-safe configuration classes for table engines:
from moose_lib.blocks import ( MergeTreeEngine, ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, SummingMergeTreeEngine, ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine, S3QueueEngine) # ReplacingMergeTree with version control and soft deletesdedup_engine = ReplacingMergeTreeEngine( ver="updated_at", # Optional: version column for keeping latest is_deleted="deleted" # Optional: soft delete marker (requires ver)) # ReplicatedMergeTree with explicit keeper paths (self-managed ClickHouse)replicated_engine = ReplicatedMergeTreeEngine( keeper_path="/clickhouse/tables/{database}/{shard}/my_table", replica_name="{replica}") # ReplicatedReplacingMergeTree with deduplicationreplicated_dedup_engine = ReplicatedReplacingMergeTreeEngine( keeper_path="/clickhouse/tables/{database}/{shard}/my_dedup_table", replica_name="{replica}", ver="updated_at", is_deleted="deleted") # For ClickHouse Cloud or Boreal - omit keeper parameterscloud_replicated = ReplicatedMergeTreeEngine() # No parameters neededNote: The keeper_path and replica_name parameters are optional. When omitted, Moose uses smart defaults that work in both ClickHouse Cloud and self-managed environments. You can still provide both parameters explicitly if you need custom replication paths.
Note: The keeperPath and replicaName parameters are optional. When omitted, Moose uses smart defaults that work in both ClickHouse Cloud and self-managed environments (default path: /clickhouse/tables/{uuid}/{shard} with replica {replica}). You can still provide both parameters explicitly if you need custom replication paths.
Api instancesIngestPipeline instancesMaterializedView instancesView instancesTask instancesWorkflow instancesConfiguration objects and utilities (like DeadLetterQueue, Key) do not need to be imported as they are used as dependencies of the main components.