API Reference

This is a comprehensive reference for the Python moose_lib, detailing all exported components, types, and utilities.

Core Types

Key[T]

A type annotation for marking fields as primary keys in data models. Used with Pydantic.

from moose_lib import Key
from pydantic import BaseModel
 
class MyModel(BaseModel):
    id: Key[str]  # Marks 'id' as a primary key of type string

BaseModel

Pydantic base model used for data modeling in Moose.

from pydantic import BaseModel
 
class MyDataModel(BaseModel):
    id: str
    name: str
    count: int

MooseClient

Client for interacting with ClickHouse and Temporal.

class MooseClient:
    query: QueryClient  # For database queries
    workflow: Optional[WorkflowClient]  # For workflow operations

ConsumptionApiResult

Class representing the result of a consumption API call.

@dataclass
class ConsumptionApiResult:
    status: int  # HTTP status code
    body: Any    # Response body

Configuration Types

OlapConfig

Configuration for OLAP tables.

class OlapConfig(BaseModel):
    order_by_fields: list[str] = []  # Fields to order by
    deduplicate: bool = False        # Enable deduplication
    engine: Optional[ClickHouseEngines] = None  # Table engine

StreamConfig

Configuration for data streams.

class StreamConfig(BaseModel):
    parallelism: int = 1
    retention_period: int = 60 * 60 * 24 * 7  # 7 days
    destination: Optional[OlapTable[Any]] = None

IngestConfig

Configuration for data ingestion.

class IngestConfig(BaseModel):
    destination: Optional[OlapTable[Any]] = None

IngestPipelineConfig

Configuration for creating a complete data pipeline.

class IngestPipelineConfig(BaseModel):
    table: bool | OlapConfig = True
    stream: bool | StreamConfig = True
    ingest: bool | IngestConfig = True

Infrastructure Components

OlapTable[T]

Creates a ClickHouse table with the schema of type T.

# Basic usage
my_table = OlapTable[UserProfile]("user_profiles")
 
# With configuration
my_table = OlapTable[UserProfile]("user_profiles", OlapConfig(
    order_by_fields=["id", "timestamp"],
    deduplicate=True
))

Stream[T]

Creates a Redpanda topic with the schema of type T.

# Basic usage
my_stream = Stream[UserEvent]("user_events")
 
# With configuration
my_stream = Stream[UserEvent]("user_events", StreamConfig(
    parallelism=3,
    retention_period=86400  # 1 day in seconds
))
 
# Adding transformations
def transform_user_event(event: UserEvent) -> ProfileUpdate:
    return ProfileUpdate(user_id=event.user_id, update_type="event")
 
my_stream.add_transform(profile_stream, transform_user_event)

IngestApi[T]

Creates an HTTP endpoint for ingesting data of type T.

# Basic usage with destination stream
my_ingest_api = IngestApi[UserEvent]("user_events", IngestConfigWithDestination(
    destination=my_user_event_stream
))

ConsumptionApi[T, U]

Creates an HTTP endpoint for querying data with request type T and response type U.

# Basic usage
def get_user_profiles(params: UserQuery) -> list[UserProfile]:
    # Query implementation
    return [UserProfile(...), UserProfile(...)]
 
my_api = ConsumptionApi[UserQuery, list[UserProfile]](
    "get_user_profiles",
    get_user_profiles
)

IngestPipeline[T]

Combines ingest API, stream, and table creation in a single component.

# Basic usage
pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(
    ingest=True,
    stream=True,
    table=True
))
 
# With advanced configuration
pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(
    ingest=True,
    stream=StreamConfig(parallelism=3),
    table=OlapConfig(
        order_by_fields=["id", "timestamp"],
        deduplicate=True
    )
))

MaterializedView[T]

Creates a materialized view in ClickHouse.

# Basic usage
view = MaterializedView[UserStatistics](MaterializedViewOptions(
    select_statement="SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id",
    table_name="user_events",
    materialized_view_name="user_statistics",
    order_by_fields=["user_id"]
))

ClickHouse Utilities

ClickHouseEngines

Enum for supported ClickHouse table engines.

class ClickHouseEngines(Enum):
    MergeTree = "MergeTree"
    ReplacingMergeTree = "ReplacingMergeTree"
    SummingMergeTree = "SummingMergeTree"
    AggregatingMergeTree = "AggregatingMergeTree"
    CollapsingMergeTree = "CollapsingMergeTree"
    VersionedCollapsingMergeTree = "VersionedCollapsingMergeTree"
    GraphiteMergeTree = "GraphiteMergeTree"

Task Management

Task[T, U]

A class that represents a single task within a workflow system, with typed input and output.

from moose_lib import Task, TaskConfig
from pydantic import BaseModel
 
# Define input and output models
class InputData(BaseModel):
    user_id: str
    
class OutputData(BaseModel):
    result: str
    status: bool
 
# Task with input and output
def process_user(user_data: InputData) -> OutputData:
    # Process the user data
    return OutputData(result=f"Processed {user_data.user_id}", status=True)
 
user_task = Task[InputData, OutputData](
    name="process_user",
    config=TaskConfig(
        run=process_user,
        retries=3,
        timeout="30s"
    )
)
 
# Task with no input, but with output
def fetch_data() -> OutputData:
    return OutputData(result="Fetched data", status=True)
    
fetch_task = Task[None, OutputData](
    name="fetch_data",
    config=TaskConfig(run=fetch_data)
)
 
# Task with input but no output
def log_event(data: InputData) -> None:
    print(f"Event logged for: {data.user_id}")
 
log_task = Task[InputData, None](
    name="log_event",
    config=TaskConfig(run=log_event)
)
 
# Task with neither input nor output
def cleanup() -> None:
    print("Cleanup complete")
 
cleanup_task = Task[None, None](
    name="cleanup",
    config=TaskConfig(run=cleanup)
)

TaskConfig[T, U]

Configuration for a Task.

@dataclasses.dataclass
class TaskConfig(Generic[T, U]):
    # The handler function that executes the task logic
    # Can be any of: () -> None, () -> U, (T) -> None, or (T) -> U depending on input/output types
    run: TaskRunFunc[T, U]
    
    # Optional list of tasks to run after this task completes
    on_complete: Optional[list[Task[U, Any]]] = None
    
    # Optional timeout string (e.g. "5m", "1h")
    timeout: Optional[str] = None
    
    # Optional number of retry attempts
    retries: Optional[int] = None

Workflow

Represents a workflow composed of one or more tasks.

from moose_lib import Workflow, WorkflowConfig
 
# Create a workflow that starts with the fetch_task
data_workflow = Workflow(
    name="data_processing",
    config=WorkflowConfig(
        starting_task=fetch_task,
        schedule="@every 1h",  # Run every hour
        timeout="10m",         # Timeout after 10 minutes
        retries=2              # Retry up to 2 times if it fails
    )
)

WorkflowConfig

Configuration for a workflow.

@dataclasses.dataclass
class WorkflowConfig:
    # The first task to execute in the workflow
    starting_task: Task[Any, Any]
    
    # Optional number of retry attempts for the entire workflow
    retries: Optional[int] = None
    
    # Optional timeout string for the entire workflow
    timeout: Optional[str] = None
    
    # Optional cron-like schedule string for recurring execution
    schedule: Optional[str] = None