# Moose / Reference / Py Moose Lib Documentation – TypeScript ## Included Files 1. moose/reference/py-moose-lib/py-moose-lib.mdx ## Python Moose Lib Reference Source: moose/reference/py-moose-lib/py-moose-lib.mdx Python Moose Lib Reference # API Reference This is a comprehensive reference for the Python `moose_lib`, detailing all exported components, types, and utilities. ## Core Types ### `Key[T]` A type annotation for marking fields as primary keys in data models. Used with Pydantic. ```python from moose_lib import Key from pydantic import BaseModel class MyModel(BaseModel): id: Key[str] # Marks 'id' as a primary key of type string ``` ### `BaseModel` Pydantic base model used for data modeling in Moose. ```python from pydantic import BaseModel class MyDataModel(BaseModel): id: str name: str count: int ``` ### `MooseClient` Client for interacting with ClickHouse and Temporal. ```python class MooseClient: query: QueryClient # For database queries workflow: Optional[WorkflowClient] # For workflow operations ``` ### `ApiResult` Class representing the result of a analytics API call. ```python @dataclass class ApiResult: status: int # HTTP status code body: Any # Response body ``` ## Configuration Types ### `OlapConfig` Configuration for OLAP tables. ```python from typing import Union, Optional from moose_lib.blocks import EngineConfig class OlapConfig(BaseModel): database: Optional[str] = None # Optional database name (defaults to moose.config.toml clickhouse_config.db_name) order_by_fields: list[str] = [] # Fields to order by engine: Optional[EngineConfig] = None # Table engine configuration ``` ### `EngineConfig` Classes Base class and implementations for table engine configurations. ```python # Base class class EngineConfig: pass # Available engine implementations class MergeTreeEngine(EngineConfig): pass class ReplacingMergeTreeEngine(EngineConfig): ver: Optional[str] = None # Version column for keeping latest is_deleted: Optional[str] = None # Soft delete marker (requires ver) class AggregatingMergeTreeEngine(EngineConfig): pass class SummingMergeTreeEngine(EngineConfig): columns: Optional[List[str]] = None # Columns to sum # Replicated engines class ReplicatedMergeTreeEngine(EngineConfig): keeper_path: Optional[str] = None # ZooKeeper/Keeper path (optional for Cloud) replica_name: Optional[str] = None # Replica name (optional for Cloud) class ReplicatedReplacingMergeTreeEngine(EngineConfig): keeper_path: Optional[str] = None # ZooKeeper/Keeper path (optional for Cloud) replica_name: Optional[str] = None # Replica name (optional for Cloud) ver: Optional[str] = None # Version column for keeping latest is_deleted: Optional[str] = None # Soft delete marker (requires ver) class ReplicatedAggregatingMergeTreeEngine(EngineConfig): keeper_path: Optional[str] = None # ZooKeeper/Keeper path (optional for Cloud) replica_name: Optional[str] = None # Replica name (optional for Cloud) class ReplicatedSummingMergeTreeEngine(EngineConfig): keeper_path: Optional[str] = None # ZooKeeper/Keeper path (optional for Cloud) replica_name: Optional[str] = None # Replica name (optional for Cloud) columns: Optional[List[str]] = None # Columns to sum ``` ### `StreamConfig` Configuration for data streams. ```python class StreamConfig(BaseModel): parallelism: int = 1 retention_period: int = 60 * 60 * 24 * 7 # 7 days destination: Optional[OlapTable[Any]] = None ``` ### `IngestConfig` Configuration for data ingestion. ```python class IngestConfig(BaseModel): destination: Optional[OlapTable[Any]] = None ``` ### `IngestPipelineConfig` Configuration for creating a complete data pipeline. ```python class IngestPipelineConfig(BaseModel): table: bool | OlapConfig = True stream: bool | StreamConfig = True ingest_api: bool | IngestConfig = True ``` ## Infrastructure Components ### `OlapTable[T]` Creates a ClickHouse table with the schema of type T. ```python from moose_lib import OlapTable, OlapConfig from moose_lib.blocks import ReplacingMergeTreeEngine # Basic usage my_table = OlapTable[UserProfile]("user_profiles") # With configuration (fields) my_table = OlapTable[UserProfile]("user_profiles", OlapConfig( order_by_fields=["id", "timestamp"], engine=ReplacingMergeTreeEngine(), )) # With configuration (expression) my_table_expr = OlapTable[UserProfile]("user_profiles_expr", OlapConfig( order_by_expression="(id, timestamp)", engine=ReplacingMergeTreeEngine(), )) # With custom database override analytics_table = OlapTable[UserProfile]("user_profiles", OlapConfig( database="analytics", # Override default database order_by_fields=["id", "timestamp"] )) # Disable sorting entirely my_table_unsorted = OlapTable[UserProfile]("user_profiles_unsorted", OlapConfig( order_by_expression="tuple()", )) ``` ### `Stream[T]` Creates a Redpanda topic with the schema of type T. ```python # Basic usage my_stream = Stream[UserEvent]("user_events") # With configuration my_stream = Stream[UserEvent]("user_events", StreamConfig( parallelism=3, retention_period=86400 # 1 day in seconds )) # Adding transformations def transform_user_event(event: UserEvent) -> ProfileUpdate: return ProfileUpdate(user_id=event.user_id, update_type="event") my_stream.add_transform(profile_stream, transform_user_event) ``` ### `IngestApi[T]` Creates an HTTP endpoint for ingesting data of type T. ```python # Basic usage with destination stream my_ingest_api = IngestApi[UserEvent]("user_events", IngestConfigWithDestination( destination=my_user_event_stream )) ``` ### `Api[T, U]` Creates an HTTP endpoint for querying data with request type T and response type U. ```python # Basic usage def get_user_profiles(params: UserQuery) -> list[UserProfile]: # Query implementation return [UserProfile(...), UserProfile(...)] my_api = Api[UserQuery, list[UserProfile]]( "get_user_profiles", get_user_profiles ) ``` ### `IngestPipeline[T]` Combines ingest API, stream, and table creation in a single component. ```python from moose_lib import IngestPipeline, IngestPipelineConfig, StreamConfig, OlapConfig from moose_lib.blocks import ReplacingMergeTreeEngine # Basic usage pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig( ingest_api=True, stream=True, table=True )) # With advanced configuration pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig( ingest_api=True, stream=StreamConfig(parallelism=3), table=OlapConfig( order_by_fields=["id", "timestamp"], engine=ReplacingMergeTreeEngine(), ) )) ``` ### `MaterializedView[T]` Creates a materialized view in ClickHouse. ```python # Basic usage view = MaterializedView[UserStatistics](MaterializedViewOptions( select_statement="SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id", table_name="user_events", materialized_view_name="user_statistics", order_by_fields=["user_id"] )) ``` ## ClickHouse Utilities ### Engine Configuration Classes Type-safe configuration classes for table engines: ```python from moose_lib.blocks import ( MergeTreeEngine, ReplacingMergeTreeEngine, AggregatingMergeTreeEngine, SummingMergeTreeEngine, ReplicatedMergeTreeEngine, ReplicatedReplacingMergeTreeEngine, ReplicatedAggregatingMergeTreeEngine, ReplicatedSummingMergeTreeEngine, S3QueueEngine ) # ReplacingMergeTree with version control and soft deletes dedup_engine = ReplacingMergeTreeEngine( ver="updated_at", # Optional: version column for keeping latest is_deleted="deleted" # Optional: soft delete marker (requires ver) ) # ReplicatedMergeTree with explicit keeper paths (self-managed ClickHouse) replicated_engine = ReplicatedMergeTreeEngine( keeper_path="/clickhouse/tables/{database}/{shard}/my_table", replica_name="{replica}" ) # ReplicatedReplacingMergeTree with deduplication replicated_dedup_engine = ReplicatedReplacingMergeTreeEngine( keeper_path="/clickhouse/tables/{database}/{shard}/my_dedup_table", replica_name="{replica}", ver="updated_at", is_deleted="deleted" ) # For ClickHouse Cloud or Boreal - omit keeper parameters cloud_replicated = ReplicatedMergeTreeEngine() # No parameters needed # S3Queue configuration for streaming from S3 s3_engine = S3QueueEngine( s3_path="s3://bucket/data/*.json", format="JSONEachRow", aws_access_key_id="AKIA...", # Optional aws_secret_access_key="secret...", # Optional compression="gzip", # Optional headers={"X-Custom": "value"} # Optional ) # Use with OlapTable s3_table = OlapTable[MyData]("s3_events", OlapConfig( engine=s3_engine, settings={ "mode": "unordered", "keeper_path": "/clickhouse/s3queue/events", "loading_retries": "3" } )) # S3 engine for direct S3 access (not streaming) s3_direct_engine = S3Engine( path="s3://bucket/data/file.json", format="JSONEachRow", aws_access_key_id="AKIA...", # Optional aws_secret_access_key="secret...", # Optional compression="gzip" # Optional ) s3_direct_table = OlapTable[MyData]("s3_data", OlapConfig( engine=s3_direct_engine )) # Buffer engine for high-throughput buffered writes buffer_engine = BufferEngine( target_database="local", target_table="destination_table", num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10485760, max_bytes=104857600 ) buffer_table = OlapTable[MyData]("buffer", OlapConfig( engine=buffer_engine )) # Distributed engine for cluster-wide distributed tables distributed_engine = DistributedEngine( cluster="my_cluster", target_database="default", target_table="local_table", sharding_key="cityHash64(id)" # Optional ) distributed_table = OlapTable[MyData]("distributed", OlapConfig( engine=distributed_engine )) ``` ## Task Management ### `Task[T, U]` A class that represents a single task within a workflow system, with typed input and output. ```python from moose_lib import Task, TaskConfig, TaskContext from pydantic import BaseModel # Define input and output models class InputData(BaseModel): user_id: str class OutputData(BaseModel): result: str status: bool # Task with input and output def process_user(ctx: TaskContext[InputData]) -> OutputData: # Process the user data return OutputData(result=f"Processed {ctx.input.user_id}", status=True) user_task = Task[InputData, OutputData]( name="process_user", config=TaskConfig( run=process_user, retries=3, timeout="30s" ) ) # Task with no input, but with output def fetch_data(ctx: TaskContext[None]) -> OutputData: return OutputData(result="Fetched data", status=True) fetch_task = Task[None, OutputData]( name="fetch_data", config=TaskConfig(run=fetch_data) ) # Task with input but no output def log_event(ctx: TaskContext[InputData]) -> None: print(f"Event logged for: {ctx.input.user_id}") log_task = Task[InputData, None]( name="log_event", config=TaskConfig(run=log_event) ) # Task with neither input nor output def cleanup(ctx: TaskContext[None]) -> None: print("Cleanup complete") cleanup_task = Task[None, None]( name="cleanup", config=TaskConfig(run=cleanup) ) ``` ### `TaskConfig[T, U]` Configuration for a Task. ```python @dataclasses.dataclass class TaskConfig(Generic[T, U]): # The handler function that executes the task logic # Can be any of: () -> None, () -> U, (T) -> None, or (T) -> U depending on input/output types run: TaskRunFunc[T, U] # Optional list of tasks to run after this task completes on_complete: Optional[list[Task[U, Any]]] = None # Optional function that is called when the task is cancelled on_cancel: Optional[Callable[[TaskContext[T_none]], Union[None, Awaitable[None]]]] = None # Optional timeout string (e.g. "5m", "1h", "never") timeout: Optional[str] = None # Optional number of retry attempts retries: Optional[int] = None ``` ### `Workflow` Represents a workflow composed of one or more tasks. ```python from moose_lib import Workflow, WorkflowConfig # Create a workflow that starts with the fetch_task data_workflow = Workflow( name="data_processing", config=WorkflowConfig( starting_task=fetch_task, schedule="@every 1h", # Run every hour timeout="10m", # Timeout after 10 minutes retries=2 # Retry up to 2 times if it fails ) ) ``` ### `WorkflowConfig` Configuration for a workflow. ```python @dataclasses.dataclass class WorkflowConfig: # The first task to execute in the workflow starting_task: Task[Any, Any] # Optional number of retry attempts for the entire workflow retries: Optional[int] = None # Optional timeout string for the entire workflow timeout: Optional[str] = None # Optional cron-like schedule string for recurring execution schedule: Optional[str] = None ```