API Reference
This is a comprehensive reference for the Python moose_lib
, detailing all exported components, types, and utilities.
Core Types
Key[T]
A type annotation for marking fields as primary keys in data models. Used with Pydantic.
from moose_lib import Key
from pydantic import BaseModel
class MyModel(BaseModel):
id: Key[str] # Marks 'id' as a primary key of type string
BaseModel
Pydantic base model used for data modeling in Moose.
from pydantic import BaseModel
class MyDataModel(BaseModel):
id: str
name: str
count: int
MooseClient
Client for interacting with ClickHouse and Temporal.
class MooseClient:
query: QueryClient # For database queries
workflow: Optional[WorkflowClient] # For workflow operations
ApiResult
Class representing the result of a analytics API call.
@dataclass
class ApiResult:
status: int # HTTP status code
body: Any # Response body
Configuration Types
OlapConfig
Configuration for OLAP tables.
from typing import Union
from moose_lib.blocks import EngineConfig
class OlapConfig(BaseModel):
order_by_fields: list[str] = [] # Fields to order by
engine: Optional[EngineConfig] = None # Table engine configuration
EngineConfig
Classes
Base class and implementations for table engine configurations.
# Base class
class EngineConfig:
pass
# Available engine implementations
class MergeTreeEngine(EngineConfig):
pass
class ReplacingMergeTreeEngine(EngineConfig):
ver: Optional[str] = None # Version column for keeping latest
is_deleted: Optional[str] = None # Soft delete marker (requires ver)
class AggregatingMergeTreeEngine(EngineConfig):
pass
class SummingMergeTreeEngine(EngineConfig):
pass
StreamConfig
Configuration for data streams.
class StreamConfig(BaseModel):
parallelism: int = 1
retention_period: int = 60 * 60 * 24 * 7 # 7 days
destination: Optional[OlapTable[Any]] = None
IngestConfig
Configuration for data ingestion.
class IngestConfig(BaseModel):
destination: Optional[OlapTable[Any]] = None
IngestPipelineConfig
Configuration for creating a complete data pipeline.
class IngestPipelineConfig(BaseModel):
table: bool | OlapConfig = True
stream: bool | StreamConfig = True
ingest: bool | IngestConfig = True
Infrastructure Components
OlapTable[T]
Creates a ClickHouse table with the schema of type T.
from moose_lib import OlapTable, OlapConfig
from moose_lib.blocks import ReplacingMergeTreeEngine
# Basic usage
my_table = OlapTable[UserProfile]("user_profiles")
# With configuration
my_table = OlapTable[UserProfile]("user_profiles", OlapConfig(
order_by_fields=["id", "timestamp"],
engine=ReplacingMergeTreeEngine(),
))
Stream[T]
Creates a Redpanda topic with the schema of type T.
# Basic usage
my_stream = Stream[UserEvent]("user_events")
# With configuration
my_stream = Stream[UserEvent]("user_events", StreamConfig(
parallelism=3,
retention_period=86400 # 1 day in seconds
))
# Adding transformations
def transform_user_event(event: UserEvent) -> ProfileUpdate:
return ProfileUpdate(user_id=event.user_id, update_type="event")
my_stream.add_transform(profile_stream, transform_user_event)
IngestApi[T]
Creates an HTTP endpoint for ingesting data of type T.
# Basic usage with destination stream
my_ingest_api = IngestApi[UserEvent]("user_events", IngestConfigWithDestination(
destination=my_user_event_stream
))
Api[T, U]
Creates an HTTP endpoint for querying data with request type T and response type U.
# Basic usage
def get_user_profiles(params: UserQuery) -> list[UserProfile]:
# Query implementation
return [UserProfile(...), UserProfile(...)]
my_api = Api[UserQuery, list[UserProfile]](
"get_user_profiles",
get_user_profiles
)
IngestPipeline[T]
Combines ingest API, stream, and table creation in a single component.
from moose_lib import IngestPipeline, IngestPipelineConfig, StreamConfig, OlapConfig
from moose_lib.blocks import ReplacingMergeTreeEngine
# Basic usage
pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(
ingest=True,
stream=True,
table=True
))
# With advanced configuration
pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(
ingest=True,
stream=StreamConfig(parallelism=3),
table=OlapConfig(
order_by_fields=["id", "timestamp"],
engine=ReplacingMergeTreeEngine(),
)
))
MaterializedView[T]
Creates a materialized view in ClickHouse.
# Basic usage
view = MaterializedView[UserStatistics](MaterializedViewOptions(
select_statement="SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id",
table_name="user_events",
materialized_view_name="user_statistics",
order_by_fields=["user_id"]
))
ClickHouse Utilities
Engine Configuration Classes
Type-safe configuration classes for table engines:
from moose_lib.blocks import (
MergeTreeEngine,
ReplacingMergeTreeEngine,
AggregatingMergeTreeEngine,
SummingMergeTreeEngine,
S3QueueEngine
)
# ReplacingMergeTree with version control and soft deletes
dedup_engine = ReplacingMergeTreeEngine(
ver="updated_at", # Optional: version column for keeping latest
is_deleted="deleted" # Optional: soft delete marker (requires ver)
)
# S3Queue configuration for streaming from S3
s3_engine = S3QueueEngine(
s3_path="s3://bucket/data/*.json",
format="JSONEachRow",
aws_access_key_id="AKIA...", # Optional
aws_secret_access_key="secret...", # Optional
compression="gzip", # Optional
headers={"X-Custom": "value"} # Optional
)
# Use with OlapTable
s3_table = OlapTable[MyData]("s3_events", OlapConfig(
engine=s3_engine,
settings={
"mode": "unordered",
"keeper_path": "/clickhouse/s3queue/events",
"loading_retries": "3"
}
))
Task Management
Task[T, U]
A class that represents a single task within a workflow system, with typed input and output.
from moose_lib import Task, TaskConfig, TaskContext
from pydantic import BaseModel
# Define input and output models
class InputData(BaseModel):
user_id: str
class OutputData(BaseModel):
result: str
status: bool
# Task with input and output
def process_user(ctx: TaskContext[InputData]) -> OutputData:
# Process the user data
return OutputData(result=f"Processed {ctx.input.user_id}", status=True)
user_task = Task[InputData, OutputData](
name="process_user",
config=TaskConfig(
run=process_user,
retries=3,
timeout="30s"
)
)
# Task with no input, but with output
def fetch_data(ctx: TaskContext[None]) -> OutputData:
return OutputData(result="Fetched data", status=True)
fetch_task = Task[None, OutputData](
name="fetch_data",
config=TaskConfig(run=fetch_data)
)
# Task with input but no output
def log_event(ctx: TaskContext[InputData]) -> None:
print(f"Event logged for: {ctx.input.user_id}")
log_task = Task[InputData, None](
name="log_event",
config=TaskConfig(run=log_event)
)
# Task with neither input nor output
def cleanup(ctx: TaskContext[None]) -> None:
print("Cleanup complete")
cleanup_task = Task[None, None](
name="cleanup",
config=TaskConfig(run=cleanup)
)
TaskConfig[T, U]
Configuration for a Task.
@dataclasses.dataclass
class TaskConfig(Generic[T, U]):
# The handler function that executes the task logic
# Can be any of: () -> None, () -> U, (T) -> None, or (T) -> U depending on input/output types
run: TaskRunFunc[T, U]
# Optional list of tasks to run after this task completes
on_complete: Optional[list[Task[U, Any]]] = None
# Optional function that is called when the task is cancelled
on_cancel: Optional[Callable[[TaskContext[T_none]], Union[None, Awaitable[None]]]] = None
# Optional timeout string (e.g. "5m", "1h", "never")
timeout: Optional[str] = None
# Optional number of retry attempts
retries: Optional[int] = None
Workflow
Represents a workflow composed of one or more tasks.
from moose_lib import Workflow, WorkflowConfig
# Create a workflow that starts with the fetch_task
data_workflow = Workflow(
name="data_processing",
config=WorkflowConfig(
starting_task=fetch_task,
schedule="@every 1h", # Run every hour
timeout="10m", # Timeout after 10 minutes
retries=2 # Retry up to 2 times if it fails
)
)
WorkflowConfig
Configuration for a workflow.
@dataclasses.dataclass
class WorkflowConfig:
# The first task to execute in the workflow
starting_task: Task[Any, Any]
# Optional number of retry attempts for the entire workflow
retries: Optional[int] = None
# Optional timeout string for the entire workflow
timeout: Optional[str] = None
# Optional cron-like schedule string for recurring execution
schedule: Optional[str] = None