API Reference
This is a comprehensive reference for the Python moose_lib
, detailing all exported components, types, and utilities.
Core Types
Key[T]
A type annotation for marking fields as primary keys in data models. Used with Pydantic.
from moose_lib import Key
from pydantic import BaseModel
class MyModel(BaseModel):
id: Key[str] # Marks 'id' as a primary key of type string
BaseModel
Pydantic base model used for data modeling in Moose.
from pydantic import BaseModel
class MyDataModel(BaseModel):
id: str
name: str
count: int
MooseClient
Client for interacting with ClickHouse and Temporal.
class MooseClient:
query: QueryClient # For database queries
workflow: Optional[WorkflowClient] # For workflow operations
ConsumptionApiResult
Class representing the result of a consumption API call.
@dataclass
class ConsumptionApiResult:
status: int # HTTP status code
body: Any # Response body
Configuration Types
OlapConfig
Configuration for OLAP tables.
class OlapConfig(BaseModel):
order_by_fields: list[str] = [] # Fields to order by
deduplicate: bool = False # Enable deduplication
engine: Optional[ClickHouseEngines] = None # Table engine
StreamConfig
Configuration for data streams.
class StreamConfig(BaseModel):
parallelism: int = 1
retention_period: int = 60 * 60 * 24 * 7 # 7 days
destination: Optional[OlapTable[Any]] = None
IngestConfig
Configuration for data ingestion.
class IngestConfig(BaseModel):
destination: Optional[OlapTable[Any]] = None
IngestPipelineConfig
Configuration for creating a complete data pipeline.
class IngestPipelineConfig(BaseModel):
table: bool | OlapConfig = True
stream: bool | StreamConfig = True
ingest: bool | IngestConfig = True
Infrastructure Components
OlapTable[T]
Creates a ClickHouse table with the schema of type T.
# Basic usage
my_table = OlapTable[UserProfile]("user_profiles")
# With configuration
my_table = OlapTable[UserProfile]("user_profiles", OlapConfig(
order_by_fields=["id", "timestamp"],
deduplicate=True
))
Stream[T]
Creates a Redpanda topic with the schema of type T.
# Basic usage
my_stream = Stream[UserEvent]("user_events")
# With configuration
my_stream = Stream[UserEvent]("user_events", StreamConfig(
parallelism=3,
retention_period=86400 # 1 day in seconds
))
# Adding transformations
def transform_user_event(event: UserEvent) -> ProfileUpdate:
return ProfileUpdate(user_id=event.user_id, update_type="event")
my_stream.add_transform(profile_stream, transform_user_event)
IngestApi[T]
Creates an HTTP endpoint for ingesting data of type T.
# Basic usage with destination stream
my_ingest_api = IngestApi[UserEvent]("user_events", IngestConfigWithDestination(
destination=my_user_event_stream
))
ConsumptionApi[T, U]
Creates an HTTP endpoint for querying data with request type T and response type U.
# Basic usage
def get_user_profiles(params: UserQuery) -> list[UserProfile]:
# Query implementation
return [UserProfile(...), UserProfile(...)]
my_api = ConsumptionApi[UserQuery, list[UserProfile]](
"get_user_profiles",
get_user_profiles
)
IngestPipeline[T]
Combines ingest API, stream, and table creation in a single component.
# Basic usage
pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(
ingest=True,
stream=True,
table=True
))
# With advanced configuration
pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(
ingest=True,
stream=StreamConfig(parallelism=3),
table=OlapConfig(
order_by_fields=["id", "timestamp"],
deduplicate=True
)
))
MaterializedView[T]
Creates a materialized view in ClickHouse.
# Basic usage
view = MaterializedView[UserStatistics](MaterializedViewOptions(
select_statement="SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id",
table_name="user_events",
materialized_view_name="user_statistics",
order_by_fields=["user_id"]
))
ClickHouse Utilities
ClickHouseEngines
Enum for supported ClickHouse table engines.
class ClickHouseEngines(Enum):
MergeTree = "MergeTree"
ReplacingMergeTree = "ReplacingMergeTree"
SummingMergeTree = "SummingMergeTree"
AggregatingMergeTree = "AggregatingMergeTree"
CollapsingMergeTree = "CollapsingMergeTree"
VersionedCollapsingMergeTree = "VersionedCollapsingMergeTree"
GraphiteMergeTree = "GraphiteMergeTree"
Task Management
Task[T, U]
A class that represents a single task within a workflow system, with typed input and output.
from moose_lib import Task, TaskConfig
from pydantic import BaseModel
# Define input and output models
class InputData(BaseModel):
user_id: str
class OutputData(BaseModel):
result: str
status: bool
# Task with input and output
def process_user(user_data: InputData) -> OutputData:
# Process the user data
return OutputData(result=f"Processed {user_data.user_id}", status=True)
user_task = Task[InputData, OutputData](
name="process_user",
config=TaskConfig(
run=process_user,
retries=3,
timeout="30s"
)
)
# Task with no input, but with output
def fetch_data() -> OutputData:
return OutputData(result="Fetched data", status=True)
fetch_task = Task[None, OutputData](
name="fetch_data",
config=TaskConfig(run=fetch_data)
)
# Task with input but no output
def log_event(data: InputData) -> None:
print(f"Event logged for: {data.user_id}")
log_task = Task[InputData, None](
name="log_event",
config=TaskConfig(run=log_event)
)
# Task with neither input nor output
def cleanup() -> None:
print("Cleanup complete")
cleanup_task = Task[None, None](
name="cleanup",
config=TaskConfig(run=cleanup)
)
TaskConfig[T, U]
Configuration for a Task.
@dataclasses.dataclass
class TaskConfig(Generic[T, U]):
# The handler function that executes the task logic
# Can be any of: () -> None, () -> U, (T) -> None, or (T) -> U depending on input/output types
run: TaskRunFunc[T, U]
# Optional list of tasks to run after this task completes
on_complete: Optional[list[Task[U, Any]]] = None
# Optional timeout string (e.g. "5m", "1h")
timeout: Optional[str] = None
# Optional number of retry attempts
retries: Optional[int] = None
Workflow
Represents a workflow composed of one or more tasks.
from moose_lib import Workflow, WorkflowConfig
# Create a workflow that starts with the fetch_task
data_workflow = Workflow(
name="data_processing",
config=WorkflowConfig(
starting_task=fetch_task,
schedule="@every 1h", # Run every hour
timeout="10m", # Timeout after 10 minutes
retries=2 # Retry up to 2 times if it fails
)
)
WorkflowConfig
Configuration for a workflow.
@dataclasses.dataclass
class WorkflowConfig:
# The first task to execute in the workflow
starting_task: Task[Any, Any]
# Optional number of retry attempts for the entire workflow
retries: Optional[int] = None
# Optional timeout string for the entire workflow
timeout: Optional[str] = None
# Optional cron-like schedule string for recurring execution
schedule: Optional[str] = None