FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplatesGuides
Release Notes
Source514
  1. MooseStack
  2. API Reference

On this page

Core TypesKeyJWTApiUtilBaseModelMooseClientApiResultConfiguration TypesOlapConfig / BaseOlapConfigInfrastructure ComponentsOlapTable<T>Stream<T>IngestApi<T>Api<T, R>IngestPipeline<T>MaterializedView<T>ViewSQL Utilitiessql Template TagClickHouse UtilitiesTable Engine ConfigurationsTask ManagementTask<T, R>TaskConfig<T, R>WorkflowWorkflowConfig

API Reference

This is a comprehensive reference for moose_lib, detailing all exported components, types, and utilities.

Core Types

Key

A type annotation for marking fields as primary keys in data models. Used with Pydantic.

from moose_lib import Keyfrom pydantic import BaseModel class MyModel(BaseModel):    id: Key[str]  # Marks 'id' as a primary key of type string

JWT

Not applicable in Python - JWT handling is done through standard Python types.

ApiUtil

Utilities for analytics APIs are provided directly through function parameters in Python.

BaseModel

Pydantic base model used for data modeling in Moose.

from pydantic import BaseModel class MyDataModel(BaseModel):    id: str    name: str    count: int

ApiResult

Class representing the result of a analytics API call.

@dataclassclass ApiResult:    status: int  # HTTP status code    body: Any    # Response body

Configuration Types

OlapConfig / BaseOlapConfig

MooseClient

Client for interacting with ClickHouse and Temporal.

class MooseClient:    query: QueryClient  # For database queries    workflow: Optional[WorkflowClient]  # For workflow operations

Configuration for OLAP tables.

from typing import Union, Optionalfrom moose_lib.blocks import EngineConfig class OlapConfig(BaseModel):    database: Optional[str] = None  # Optional database name (defaults to moose.config.toml clickhouse_config.db_name)    order_by_fields: list[str] = []  # Fields to order by    engine: Optional[EngineConfig] = None  # Table engine configuration

Infrastructure Components

OlapTable<T>

Creates a ClickHouse table with the schema of type T.

from moose_lib import OlapTable, OlapConfigfrom moose_lib.blocks import ReplacingMergeTreeEngine # Basic usagemy_table = OlapTable[UserProfile]("user_profiles") # With configuration (fields)my_table = OlapTable[UserProfile]("user_profiles", OlapConfig(    order_by_fields=["id", "timestamp"],    engine=ReplacingMergeTreeEngine(),)) # With configuration (expression)my_table_expr = OlapTable[UserProfile]("user_profiles_expr", OlapConfig(    order_by_expression="(id, timestamp)",    engine=ReplacingMergeTreeEngine(),)) # Disable sorting entirelymy_table_unsorted = OlapTable[UserProfile]("user_profiles_unsorted", OlapConfig(    order_by_expression="tuple()",))

Stream<T>

Creates a Redpanda topic with the schema of type T.

# Basic usagemy_stream = Stream[UserEvent]("user_events") # With configurationmy_stream = Stream[UserEvent]("user_events", StreamConfig(    parallelism=3,    retention_period=86400  # 1 day in seconds)) # Adding transformationsdef transform_user_event(event: UserEvent) -> ProfileUpdate:    return ProfileUpdate(user_id=event.user_id, update_type="event") my_stream.add_transform(profile_stream, transform_user_event)

IngestApi<T>

Creates an HTTP endpoint for ingesting data of type T.

# Basic usage with destination streammy_ingest_api = IngestApi[UserEvent]("user_events", IngestConfigWithDestination(    destination=my_user_event_stream))

Api<T, R>

Creates an HTTP endpoint for querying data with request type T and response type U.

# Basic usagedef get_user_profiles(params: UserQuery) -> list[UserProfile]:    # Query implementation    return [UserProfile(...), UserProfile(...)] my_api = Api[UserQuery, list[UserProfile]](    "get_user_profiles",    get_user_profiles)

IngestPipeline<T>

Combines ingest API, stream, and table creation in a single component.

from moose_lib import IngestPipeline, IngestPipelineConfig, StreamConfig, OlapConfigfrom moose_lib.blocks import ReplacingMergeTreeEngine # Basic usagepipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(    ingest_api=True,    stream=True,    table=True)) # With advanced configurationpipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(    ingest_api=True,    stream=StreamConfig(parallelism=3),    table=OlapConfig(        order_by_fields=["id", "timestamp"],        engine=ReplacingMergeTreeEngine(),    )))

MaterializedView<T>

Creates a materialized view in ClickHouse.

# Basic usageview = MaterializedView[UserStatistics](MaterializedViewOptions(    select_statement="SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id",    table_name="user_events",    materialized_view_name="user_statistics",    order_by_fields=["user_id"]))

View

Creates a standard SQL view in ClickHouse. Views are read-time projections defined by a SELECT statement over one or more base tables or other views.

from moose_lib import View # Basic usageactive_user_events = View(    "active_user_events",    """    SELECT      events.id        AS event_id,      users.id         AS user_id,      users.name       AS user_name,      events.ts        AS ts    FROM events    JOIN users ON events.user_id = users.id    WHERE users.active = 1    """,    [events, users])

Note: Use View for read-time projections. For write-time transformations with separate storage, use MaterializedView instead.

SQL Utilities

sql Template Tag

Python uses formatted strings with parameter substitution for SQL queries.

# Python uses formatted strings with execute methodquery = """  SELECT * FROM users   WHERE age > {min_age}   AND country = {country}  LIMIT {limit}"""rows = client.query.execute(query, {"min_age": min_age, "country": country, "limit": limit})

ClickHouse Utilities

Table Engine Configurations

ClickHouseEngines Enum

Available table engines:

enum ClickHouseEngines {  MergeTree = "MergeTree",  ReplacingMergeTree = "ReplacingMergeTree",  AggregatingMergeTree = "AggregatingMergeTree",  SummingMergeTree = "SummingMergeTree",  ReplicatedMergeTree = "ReplicatedMergeTree",  ReplicatedReplacingMergeTree = "ReplicatedReplacingMergeTree",  ReplicatedAggregatingMergeTree = "ReplicatedAggregatingMergeTree",  ReplicatedSummingMergeTree = "ReplicatedSummingMergeTree",  S3Queue = "S3Queue"}

ReplacingMergeTreeConfig<T>

Configuration for ReplacingMergeTree tables:

type ReplacingMergeTreeConfig<T> = {  engine: ClickHouseEngines.ReplacingMergeTree;  orderByFields?: (keyof T & string)[];  ver?: keyof T & string;  // Optional: version column for keeping latest  isDeleted?: keyof T & string;  // Optional: soft delete marker (requires ver)  settings?: { [key: string]: string };}

Replicated Engine Configurations

Configuration for replicated table engines:

// ReplicatedMergeTreetype ReplicatedMergeTreeConfig<T> = {  engine: ClickHouseEngines.ReplicatedMergeTree;  keeperPath?: string;  // Optional: ZooKeeper/Keeper path (omit for Cloud)  replicaName?: string; // Optional: replica name (omit for Cloud)  orderByFields?: (keyof T & string)[];  settings?: { [key: string]: string };} // ReplicatedReplacingMergeTreetype ReplicatedReplacingMergeTreeConfig<T> = {  engine: ClickHouseEngines.ReplicatedReplacingMergeTree;  keeperPath?: string;  // Optional: ZooKeeper/Keeper path (omit for Cloud)  replicaName?: string; // Optional: replica name (omit for Cloud)  ver?: keyof T & string;  // Optional: version column  isDeleted?: keyof T & string;  // Optional: soft delete marker  orderByFields?: (keyof T & string)[];  settings?: { [key: string]: string };} // ReplicatedAggregatingMergeTreetype ReplicatedAggregatingMergeTreeConfig<T> = {  engine: ClickHouseEngines.ReplicatedAggregatingMergeTree;  keeperPath?: string;  // Optional: ZooKeeper/Keeper path (omit for Cloud)  replicaName?: string; // Optional: replica name (omit for Cloud)  orderByFields?: (keyof T & string)[];  settings?: { [key: string]: string };} // ReplicatedSummingMergeTreetype ReplicatedSummingMergeTreeConfig<T> = {  engine: ClickHouseEngines.ReplicatedSummingMergeTree;  keeperPath?: string;  // Optional: ZooKeeper/Keeper path (omit for Cloud)  replicaName?: string; // Optional: replica name (omit for Cloud)  columns?: string[];   // Optional: columns to sum  orderByFields?: (keyof T & string)[];  settings?: { [key: string]: string };}

Task Management

Task<T, R>

A class that represents a single task within a workflow system, with typed input and output.

from moose_lib import Task, TaskConfig, TaskContextfrom pydantic import BaseModel # Define input and output modelsclass InputData(BaseModel):    user_id: str    class OutputData(BaseModel):    result: str    status: bool # Task with input and outputdef process_user(ctx: TaskContext[InputData]) -> OutputData:    # Process the user data    return OutputData(result=f"Processed {ctx.input.user_id}", status=True) user_task = Task[InputData, OutputData](    name="process_user",    config=TaskConfig(        run=process_user,        retries=3,        timeout="30s"    ))

TaskConfig<T, R>

Configuration for a Task.

@dataclasses.dataclassclass TaskConfig(Generic[T, U]):    # The handler function that executes the task logic    # Can be any of: () -> None, () -> U, (T) -> None, or (T) -> U depending on input/output types    run: TaskRunFunc[T, U]        # Optional list of tasks to run after this task completes    on_complete: Optional[list[Task[U, Any]]] = None     # Optional function that is called when the task is cancelled    on_cancel: Optional[Callable[[TaskContext[T_none]], Union[None, Awaitable[None]]]] = None        # Optional timeout string (e.g. "5m", "1h", "never")    timeout: Optional[str] = None        # Optional number of retry attempts    retries: Optional[int] = None

Workflow

Represents a workflow composed of one or more tasks.

from moose_lib import Workflow, WorkflowConfig # Create a workflow that starts with the fetch_taskdata_workflow = Workflow(    name="data_processing",    config=WorkflowConfig(        starting_task=fetch_task,        schedule="@every 1h",  # Run every hour        timeout="10m",         # Timeout after 10 minutes        retries=2              # Retry up to 2 times if it fails    ))

WorkflowConfig

Configuration for a workflow.

@dataclasses.dataclassclass WorkflowConfig:    # The first task to execute in the workflow    starting_task: Task[Any, Any]        # Optional number of retry attempts for the entire workflow    retries: Optional[int] = None        # Optional timeout string for the entire workflow    timeout: Optional[str] = None        # Optional cron-like schedule string for recurring execution    schedule: Optional[str] = None

Export Required

Ensure your Infrastructure Components is correctly imported into your main.py file.

Example (TypeScript equivalent): export { myTable, myStream, myApi, myWorkflow, myTask, myPipeline, myMaterializedView, myView }

Learn more about export pattern: local development / hosted.

Important: The following components must be exported from your app/index.ts (TypeScript) or imported into main.py (Python) for Moose to detect them:

  • OlapTable instances
  • Stream instances
  • IngestApi instances
  • Api instances
  • IngestPipeline instances
  • MaterializedView instances
  • View instances
  • Task instances
  • Workflow instances

Configuration objects and utilities (like DeadLetterQueue, Key, sql) do not need to be exported as they are used as dependencies of the main components.

  • OlapTable instances
  • Stream instances
  • IngestApi instances

Engine Configuration Classes

Type-safe configuration classes for table engines:

from moose_lib.blocks import (    MergeTreeEngine,    ReplacingMergeTreeEngine,    AggregatingMergeTreeEngine,    SummingMergeTreeEngine,    ReplicatedMergeTreeEngine,    ReplicatedReplacingMergeTreeEngine,    ReplicatedAggregatingMergeTreeEngine,    ReplicatedSummingMergeTreeEngine,    S3QueueEngine) # ReplacingMergeTree with version control and soft deletesdedup_engine = ReplacingMergeTreeEngine(    ver="updated_at",  # Optional: version column for keeping latest    is_deleted="deleted"  # Optional: soft delete marker (requires ver)) # ReplicatedMergeTree with explicit keeper paths (self-managed ClickHouse)replicated_engine = ReplicatedMergeTreeEngine(    keeper_path="/clickhouse/tables/{database}/{shard}/my_table",    replica_name="{replica}") # ReplicatedReplacingMergeTree with deduplicationreplicated_dedup_engine = ReplicatedReplacingMergeTreeEngine(    keeper_path="/clickhouse/tables/{database}/{shard}/my_dedup_table",    replica_name="{replica}",    ver="updated_at",    is_deleted="deleted") # For ClickHouse Cloud or Boreal - omit keeper parameterscloud_replicated = ReplicatedMergeTreeEngine()  # No parameters needed

Note: The keeper_path and replica_name parameters are optional. When omitted, Moose uses smart defaults that work in both ClickHouse Cloud and self-managed environments. You can still provide both parameters explicitly if you need custom replication paths.

Note: The keeperPath and replicaName parameters are optional. When omitted, Moose uses smart defaults that work in both ClickHouse Cloud and self-managed environments (default path: /clickhouse/tables/{uuid}/{shard} with replica {replica}). You can still provide both parameters explicitly if you need custom replication paths.

  • Api instances
  • IngestPipeline instances
  • MaterializedView instances
  • View instances
  • Task instances
  • Workflow instances
  • Configuration objects and utilities (like DeadLetterQueue, Key) do not need to be imported as they are used as dependencies of the main components.

    • Overview
    Build a New App
    • 5 Minute Quickstart
    • Browse Templates
    • Existing ClickHouse
    Add to Existing App
    • Next.js
    • Fastify
    Fundamentals
    • Moose Runtime
    • MooseDev MCP
    • Data Modeling
    Moose Modules
    • Moose OLAP
    • Moose Streaming
    • Moose Workflows
    • Moose APIs & Web Apps
    Deployment & Lifecycle
    • Moose Migrate
    • Moose Deploy
    Reference
    • API Reference
    • Data Types
    • Table Engines
    • CLI
    • Configuration
    • Observability Metrics
    • Help
    • Release Notes
    Contribution
    • Documentation
    • Framework