Moose

Reference

Python API Reference

API Reference

This is a comprehensive reference for the Python moose_lib, detailing all exported components, types, and utilities.

Core Types

Key[T]

A type annotation for marking fields as primary keys in data models. Used with Pydantic.

from moose_lib import Key
from pydantic import BaseModel
 
class MyModel(BaseModel):
    id: Key[str]  # Marks 'id' as a primary key of type string

BaseModel

Pydantic base model used for data modeling in Moose.

from pydantic import BaseModel
 
class MyDataModel(BaseModel):
    id: str
    name: str
    count: int

MooseClient

Client for interacting with ClickHouse and Temporal.

class MooseClient:
    query: QueryClient  # For database queries
    workflow: Optional[WorkflowClient]  # For workflow operations

IngestionFormat

Enum defining supported data ingestion formats.

class IngestionFormat(Enum):
    JSON = "JSON"  # Single JSON object
    JSON_ARRAY = "JSON_ARRAY"  # Array of JSON objects

ConsumptionApiResult

Class representing the result of a consumption API call.

@dataclass
class ConsumptionApiResult:
    status: int  # HTTP status code
    body: Any    # Response body

Configuration Types

OlapConfig

Configuration for OLAP tables.

class OlapConfig(BaseModel):
    order_by_fields: list[str] = []  # Fields to order by
    deduplicate: bool = False        # Enable deduplication
    engine: Optional[ClickHouseEngines] = None  # Table engine

StreamConfig

Configuration for data streams.

class StreamConfig(BaseModel):
    parallelism: int = 1
    retention_period: int = 60 * 60 * 24 * 7  # 7 days
    destination: Optional[OlapTable[Any]] = None

IngestConfig

Configuration for data ingestion.

class IngestConfig(BaseModel):
    format: Optional[IngestionFormat] = None
    destination: Optional[OlapTable[Any]] = None

IngestPipelineConfig

Configuration for creating a complete data pipeline.

class IngestPipelineConfig(BaseModel):
    table: bool | OlapConfig = True
    stream: bool | StreamConfig = True
    ingest: bool | IngestConfig = True

Infrastructure Components

OlapTable[T]

Creates a ClickHouse table with the schema of type T.

# Basic usage
my_table = OlapTable[UserProfile]("user_profiles")
 
# With configuration
my_table = OlapTable[UserProfile]("user_profiles", OlapConfig(
    order_by_fields=["id", "timestamp"],
    deduplicate=True
))

Stream[T]

Creates a Redpanda topic with the schema of type T.

# Basic usage
my_stream = Stream[UserEvent]("user_events")
 
# With configuration
my_stream = Stream[UserEvent]("user_events", StreamConfig(
    parallelism=3,
    retention_period=86400  # 1 day in seconds
))
 
# Adding transformations
def transform_user_event(event: UserEvent) -> ProfileUpdate:
    return ProfileUpdate(user_id=event.user_id, update_type="event")
 
my_stream.add_transform(profile_stream, transform_user_event)

IngestApi[T]

Creates an HTTP endpoint for ingesting data of type T.

# Basic usage with destination stream
my_ingest_api = IngestApi[UserEvent]("user_events", IngestConfigWithDestination(
    destination=my_user_event_stream
))

ConsumptionApi[T, U]

Creates an HTTP endpoint for querying data with request type T and response type U.

# Basic usage
def get_user_profiles(params: UserQuery) -> list[UserProfile]:
    # Query implementation
    return [UserProfile(...), UserProfile(...)]
 
my_api = ConsumptionApi[UserQuery, list[UserProfile]](
    "get_user_profiles",
    get_user_profiles
)

IngestPipeline[T]

Combines ingest API, stream, and table creation in a single component.

# Basic usage
pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(
    ingest=True,
    stream=True,
    table=True
))
 
# With advanced configuration
pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(
    ingest=True,
    stream=StreamConfig(parallelism=3),
    table=OlapConfig(
        order_by_fields=["id", "timestamp"],
        deduplicate=True
    )
))

MaterializedView[T]

Creates a materialized view in ClickHouse.

# Basic usage
view = MaterializedView[UserStatistics](MaterializedViewOptions(
    select_statement="SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id",
    table_name="user_events",
    materialized_view_name="user_statistics",
    order_by_fields=["user_id"]
))

ClickHouse Utilities

ClickHouseEngines

Enum for supported ClickHouse table engines.

class ClickHouseEngines(Enum):
    MergeTree = "MergeTree"
    ReplacingMergeTree = "ReplacingMergeTree"
    SummingMergeTree = "SummingMergeTree"
    AggregatingMergeTree = "AggregatingMergeTree"
    CollapsingMergeTree = "CollapsingMergeTree"
    VersionedCollapsingMergeTree = "VersionedCollapsingMergeTree"
    GraphiteMergeTree = "GraphiteMergeTree"

Task Management

@task

Decorator for defining workflow tasks.

from moose_lib import task
 
@task(retries=5)
def process_data(input_data: dict) -> dict:
    # Process data
    return {
        "task": "process_data",
        "data": {"processed": True}
    }
 
# Can also be used without parameters
@task
def simple_task() -> dict:
    return {
        "task": "simple_task",
        "data": {"completed": True}
    }