# Moose / Apis / Ingest Api Documentation – Python ## Included Files 1. moose/apis/ingest-api/ingest-api.mdx ## Ingestion APIs Source: moose/apis/ingest-api/ingest-api.mdx Ingestion APIs for Moose # Ingestion APIs ## Overview Moose Ingestion APIs are the entry point for getting data into your Moose application. They provide a fast, reliable, and type-safe way to move data from your sources into streams and tables for analytics and processing. ## When to Use Ingestion APIs Ingestion APIs are most useful when you want to implement a push-based pattern for getting data from your data sources into your streams and tables. Common use cases include: - Instrumenting external client applications - Receiving webhooks from third-party services - Integrating with ETL or data pipeline tools that push data ## Why Use Moose's APIs Over Your Own? Moose's ingestion APIs are purpose-built for high-throughput data pipelines, offering key advantages over other more general-purpose frameworks: - **Built-in schema validation:** Ensures only valid data enters your pipeline. - **Direct connection to streams/tables:** Instantly link HTTP endpoints to Moose data infrastructure to route incoming data to your streams and tables without any glue code. - **Dead Letter Queue (DLQ) support:** Invalid records are automatically captured for review and recovery. - **OpenAPI auto-generation:** Instantly generate client SDKs and docs for all endpoints, including example data. - **Rust-powered performance:** Far higher throughput and lower latency than typical Node.js or Python APIs. ## Validation Moose validates all incoming data against your Pydantic model. If a record fails validation, Moose can automatically route it to a Dead Letter Queue (DLQ) for later inspection and recovery. ```python filename="ValidationExample.py" copy from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfig from pydantic import BaseModel class Properties(BaseModel): device: Optional[str] version: Optional[int] class ExampleModel(BaseModel): id: str userId: str timestamp: datetime properties: Properties api = IngestApi[ExampleModel]("your-api-route", IngestConfig( destination=Stream[ExampleModel]("your-stream-name"), dead_letter_queue=DeadLetterQueue[ExampleModel]("your-dlq-name") )) ``` If your IngestPipeline’s schema marks a field as optional but annotates a ClickHouse default, Moose treats: - API request and Stream message: field is optional (you may omit it) - ClickHouse table storage: field is required with a DEFAULT clause Behavior: When the API/stream inserts into ClickHouse and the field is missing, ClickHouse sets it to the configured default value. This keeps request payloads simple while avoiding Nullable columns in storage. Example: `Annotated[int, clickhouse_default("18")]` (or equivalent annotation) Send a valid event - routed to the destination stream ```python filename="ValidEvent.py" copy requests.post("http://localhost:4000/ingest/your-api-route", json={ "id": "event1", "userId": "user1", "timestamp": "2023-05-10T15:30:00Z" }) # ✅ Accepted and routed to the destination stream # API returns 200 and { success: true } ``` Send an invalid event (missing required field) - routed to the DLQ ```python filename="InvalidEventMissingField.py" copy requests.post("http://localhost:4000/ingest/your-api-route", json={ "id": "event1", }) # ❌ Routed to DLQ, because it's missing a required field # API returns 400 response ``` Send an invalid event (bad date format) - routed to the DLQ ```python filename="InvalidEventBadDate.py" copy requests.post("http://localhost:4000/ingest/your-api-route", json={ "id": "event1", "userId": "user1", "timestamp": "not-a-date" }) # ❌ Routed to DLQ, because the timestamp is not a valid date # API returns 400 response ``` ## Creating Ingestion APIs You can create ingestion APIs in two ways: - **High-level:** Using the `IngestPipeline` class (recommended for most use cases) - **Low-level:** Manually configuring the `IngestApi` component for more granular control ### High-level: IngestPipeline (Recommended) The `IngestPipeline` class provides a convenient way to set up ingestion endpoints, streams, and tables with a single declaration: ```python filename="IngestPipeline.py" copy from moose_lib import Key, IngestPipeline, IngestPipelineConfig from pydantic import BaseModel class ExampleSchema(BaseModel): id: Key[str] name: str value: int timestamp: datetime example_pipeline = IngestPipeline[ExampleSchema]( name="example-name", config=IngestPipelineConfig( ingest_api=True, stream=True, table=True ) ) ``` ### Low-level: Standalone IngestApi For more granular control, you can manually configure the `IngestApi` component: The types of the destination `Stream` and `Table` must match the type of the `IngestApi`. ## Configuration Reference Configuration options for both high-level and low-level ingestion APIs are provided below. ```python filename="IngestPipelineConfig.py" copy class IngestPipelineConfig(BaseModel): table: bool | OlapConfig = True stream: bool | StreamConfig = True ingest_api: bool | IngestConfig = True dead_letter_queue: bool | StreamConfig = True version: Optional[str] = None metadata: Optional[dict] = None life_cycle: Optional[LifeCycle] = None ``` ```python filename="IngestConfig.py" copy @dataclass class IngestConfigWithDestination[T: BaseModel]: destination: Stream[T] dead_letter_queue: Optional[DeadLetterQueue[T]] = None version: Optional[str] = None metadata: Optional[dict] = None ```