API Reference
This is a comprehensive reference for the Python moose_lib
, detailing all exported components, types, and utilities.
Core Types
Key[T]
A type annotation for marking fields as primary keys in data models. Used with Pydantic.
from moose_lib import Key
from pydantic import BaseModel
class MyModel(BaseModel):
id: Key[str] # Marks 'id' as a primary key of type string
BaseModel
Pydantic base model used for data modeling in Moose.
from pydantic import BaseModel
class MyDataModel(BaseModel):
id: str
name: str
count: int
MooseClient
Client for interacting with ClickHouse and Temporal.
class MooseClient:
query: QueryClient # For database queries
workflow: Optional[WorkflowClient] # For workflow operations
IngestionFormat
Enum defining supported data ingestion formats.
class IngestionFormat(Enum):
JSON = "JSON" # Single JSON object
JSON_ARRAY = "JSON_ARRAY" # Array of JSON objects
ConsumptionApiResult
Class representing the result of a consumption API call.
@dataclass
class ConsumptionApiResult:
status: int # HTTP status code
body: Any # Response body
Configuration Types
OlapConfig
Configuration for OLAP tables.
class OlapConfig(BaseModel):
order_by_fields: list[str] = [] # Fields to order by
deduplicate: bool = False # Enable deduplication
engine: Optional[ClickHouseEngines] = None # Table engine
StreamConfig
Configuration for data streams.
class StreamConfig(BaseModel):
parallelism: int = 1
retention_period: int = 60 * 60 * 24 * 7 # 7 days
destination: Optional[OlapTable[Any]] = None
IngestConfig
Configuration for data ingestion.
class IngestConfig(BaseModel):
format: Optional[IngestionFormat] = None
destination: Optional[OlapTable[Any]] = None
IngestPipelineConfig
Configuration for creating a complete data pipeline.
class IngestPipelineConfig(BaseModel):
table: bool | OlapConfig = True
stream: bool | StreamConfig = True
ingest: bool | IngestConfig = True
Infrastructure Components
OlapTable[T]
Creates a ClickHouse table with the schema of type T.
# Basic usage
my_table = OlapTable[UserProfile]("user_profiles")
# With configuration
my_table = OlapTable[UserProfile]("user_profiles", OlapConfig(
order_by_fields=["id", "timestamp"],
deduplicate=True
))
Stream[T]
Creates a Redpanda topic with the schema of type T.
# Basic usage
my_stream = Stream[UserEvent]("user_events")
# With configuration
my_stream = Stream[UserEvent]("user_events", StreamConfig(
parallelism=3,
retention_period=86400 # 1 day in seconds
))
# Adding transformations
def transform_user_event(event: UserEvent) -> ProfileUpdate:
return ProfileUpdate(user_id=event.user_id, update_type="event")
my_stream.add_transform(profile_stream, transform_user_event)
IngestApi[T]
Creates an HTTP endpoint for ingesting data of type T.
# Basic usage with destination stream
my_ingest_api = IngestApi[UserEvent]("user_events", IngestConfigWithDestination(
destination=my_user_event_stream
))
ConsumptionApi[T, U]
Creates an HTTP endpoint for querying data with request type T and response type U.
# Basic usage
def get_user_profiles(params: UserQuery) -> list[UserProfile]:
# Query implementation
return [UserProfile(...), UserProfile(...)]
my_api = ConsumptionApi[UserQuery, list[UserProfile]](
"get_user_profiles",
get_user_profiles
)
IngestPipeline[T]
Combines ingest API, stream, and table creation in a single component.
# Basic usage
pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(
ingest=True,
stream=True,
table=True
))
# With advanced configuration
pipeline = IngestPipeline[UserEvent]("user_pipeline", IngestPipelineConfig(
ingest=True,
stream=StreamConfig(parallelism=3),
table=OlapConfig(
order_by_fields=["id", "timestamp"],
deduplicate=True
)
))
MaterializedView[T]
Creates a materialized view in ClickHouse.
# Basic usage
view = MaterializedView[UserStatistics](MaterializedViewOptions(
select_statement="SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id",
table_name="user_events",
materialized_view_name="user_statistics",
order_by_fields=["user_id"]
))
ClickHouse Utilities
ClickHouseEngines
Enum for supported ClickHouse table engines.
class ClickHouseEngines(Enum):
MergeTree = "MergeTree"
ReplacingMergeTree = "ReplacingMergeTree"
SummingMergeTree = "SummingMergeTree"
AggregatingMergeTree = "AggregatingMergeTree"
CollapsingMergeTree = "CollapsingMergeTree"
VersionedCollapsingMergeTree = "VersionedCollapsingMergeTree"
GraphiteMergeTree = "GraphiteMergeTree"
Task Management
@task
Decorator for defining workflow tasks.
from moose_lib import task
@task(retries=5)
def process_data(input_data: dict) -> dict:
# Process data
return {
"task": "process_data",
"data": {"processed": True}
}
# Can also be used without parameters
@task
def simple_task() -> dict:
return {
"task": "simple_task",
"data": {"completed": True}
}