# Moose / Apis Documentation – Python ## Included Files 1. moose/apis/admin-api.mdx 2. moose/apis/analytics-api.mdx 3. moose/apis/auth.mdx 4. moose/apis/ingest-api.mdx 5. moose/apis/openapi-sdk.mdx 6. moose/apis/trigger-api.mdx ## admin-api Source: moose/apis/admin-api.mdx # Coming Soon --- ## APIs Source: moose/apis/analytics-api.mdx APIs for Moose # APIs ## Overview APIs are functions that run on your server and automatically exposed as HTTP `GET` endpoints. They are designed to read data from your OLAP database. Out of the box, these APIs provide: - Automatic type validation and type conversion for your query parameters, which are sent in the URL, and response body - Managed database client connection - Automatic OpenAPI documentation generation Common use cases include: - Powering user-facing analytics, dashboards and other front-end components - Enabling AI tools to interact with your data - Building custom APIs for your internal tools ### Enabling APIs Analytics APIs are enabled by default. To explicitly control this feature in your `moose.config.toml`: ```toml filename="moose.config.toml" copy [features] apis = true ``` ### Basic Usage `execute` is the recommended way to execute queries. It provides a thin wrapper around the ClickHouse Python client so that you can safely pass `OlapTable` and `Column` objects to your query without needing to worry about ClickHouse identifiers: ```python filename="ExampleApi.py" copy from moose_lib import Api, MooseClient from pydantic import BaseModel ## Import the source pipeline from app.path.to.SourcePipeline import SourcePipeline # Define the query parameters class QueryParams(BaseModel): filter_field: str max_results: int # Define the response body class ResponseBody(BaseModel): id: int name: str value: float SourceTable = SourcePipeline.get_table() # Define the route handler function (parameterized) def run(client: MooseClient, params: QueryParams) -> list[ResponseBody]: query = """ SELECT id, name, value FROM {table} WHERE category = {category} LIMIT {limit} """ return client.query.execute(query, {"table": SourceTable, "category": params.filter_field, "limit": params.max_results}) # Create the API example_api = Api[QueryParams, ResponseBody](name="example_endpoint", query_function=run) ``` Use `execute_raw` with parameter binding for safe, typed queries: ```python filename="ExampleApi.py" copy from moose_lib import Api, MooseClient from pydantic import BaseModel ## Import the source pipeline from app.path.to.SourcePipeline import SourcePipeline # Define the query parameters class QueryParams(BaseModel): filterField: str maxResults: int # Define the response body class ResponseBody(BaseModel): id: int name: str value: float SourceTable = SourcePipeline.get_table() # Define the route handler function (using execute_raw with typed parameters) def run(client: MooseClient, params: QueryParams) -> list[ResponseBody]: query = """ SELECT id, name, value FROM Source WHERE category = {category:String} LIMIT {limit:UInt32} """ return client.query.execute_raw(query, {"category": params.filterField, "limit": params.maxResults}) # Create the API example_api = Api[QueryParams, ResponseBody](name="example_endpoint", query_function=run) ``` ```python filename="SourcePipeline.py" copy from moose_lib import IngestPipeline, IngestPipelineConfig, Key from pydantic import BaseModel class SourceSchema(BaseModel): id: Key[int] name: str value: float SourcePipeline = IngestPipeline[SourceSchema]("Source", IngestPipelineConfig( ingest_api=False, stream=True, table=True, )) ``` The `Api` class takes: - Route name: The URL path to access your API (e.g., `"example_endpoint"`) - Handler function: Processes requests with typed parameters and returns the result The generic type parameters specify: - `QueryParams`: The structure of accepted URL parameters - `ResponseBody`: The exact shape of your API's response data You can name these types anything you want. The first type generates validation for query parameters, while the second defines the response structure for OpenAPI documentation. ## Type Validation You can also model the query parameters and response body as Pydantic models, which Moose will use to provide automatic type validation and type conversion for your query parameters, which are sent in the URL, and response body. ### Modeling Query Parameters Define your API's parameters as a Pydantic model: ```python filename="ExampleQueryParams.py" copy from pydantic import BaseModel from typing import Optional class QueryParams(BaseModel): filterField: str = Field(..., description="The field to filter by") maxResults: int = Field(..., description="The maximum number of results to return") optionalParam: Optional[str] = Field(None, description="An optional parameter") ``` Moose automatically handles: - Runtime validation - Clear error messages for invalid parameters - OpenAPI documentation generation Complex nested objects and arrays are not supported. Analytics APIs are `GET` endpoints designed to be simple and lightweight. ### Adding Advanced Type Validation Moose uses Pydantic for runtime validation. Use Pydantic's `Field` class for more complex validation: ```python filename="ExampleQueryParams.py" copy from pydantic import BaseModel, Field class QueryParams(BaseModel): filterField: str = Field(pattern=r"^(id|name|email)$", description="The field to filter by") ## Only allow valid column names from the UserTable maxResults: int = Field(gt=0, description="The maximum number of results to return") ## Positive integer ``` ### Common Validation Options ```python filename="ValidationExamples.py" copy from pydantic import BaseModel, Field class QueryParams(BaseModel): # Numeric validations id: int = Field(..., gt=0) age: int = Field(..., gt=0, lt=120) price: float = Field(..., gt=0, lt=1000) discount: float = Field(..., gt=0, multiple_of=0.5) # String validations username: str = Field(..., min_length=3, max_length=20) email: str = Field(..., format="email") zipCode: str = Field(..., pattern=r"^[0-9]{5}$") uuid: str = Field(..., format="uuid") ipAddress: str = Field(..., format="ipv4") # Date validations startDate: str = Field(..., format="date") # Enum validation status: str = Field(..., enum=["active", "pending", "inactive"]) # Optional parameters limit: int = Field(None, gt=0, lt=100) ``` For a full list of validation options, see the [Pydantic documentation](https://docs.pydantic.dev/latest/concepts/types/#customizing-validation-with-fields). ### Setting Default Values You can set default values for parameters by setting values for each parameter in your Pydantic model: ```python filename="ExampleQueryParams.py" copy {9} from pydantic import BaseModel class QueryParams(BaseModel): filterField: str = "example" maxResults: int = 10 optionalParam: str | None = "default" ``` ## Implementing Route Handler API route handlers are regular functions, so you can implement whatever arbitrary logic you want inside these functions. Most of the time you will be use APIs to expose your data to your front-end applications or other tools: ### Connecting to the Database Moose provides a managed `MooseClient` to your function execution context. This client provides access to the database and other Moose resources, and handles connection pooling/lifecycle management for you: ```python filename="ExampleApi.py" copy from moose_lib import MooseClient from app.UserTable import UserTable def run(client: MooseClient, params: QueryParams): # You can use a formatted string for simple static query query = """ SELECT COUNT(*) FROM {table} """ ## You can optionally pass the table object to the query return client.query.execute(query, {"table": UserTable}) ## Create the API example_api = Api[QueryParams, ResponseBody](name="example_endpoint", query_function=run) ``` Use `execute_raw` with parameter binding: ```python filename="ExampleApi.py" copy from moose_lib import MooseClient def run(params: QueryParams, client: MooseClient): # Using execute_raw for safe queries query = """ SELECT COUNT(*) FROM {table: Identifier} """ ## Must be the name of the table, not the table object return client.query.execute_raw(query, {"table": UserTable.name}) ## Create the API example_api = Api[QueryParams, ResponseBody](name="example_endpoint", query_function=run) ``` ### Constructing Safe SQL Queries ```python filename="SafeQueries.py" copy from pydantic import BaseModel, Field class QueryParams(BaseModel): min_age: int = Field(ge=0, le=150) status: str = Field(pattern=r"^(active|inactive)$") limit: int = Field(default=10, ge=1, le=1000) search_text: str = Field(pattern=r'^[a-zA-Z0-9\s]*$') def run(client: MooseClient, params: QueryParams): query = """ SELECT * FROM users WHERE age >= {min_age} AND status = '{status}' AND name ILIKE '%{search_text}%' LIMIT {limit} """ return client.query.execute(query, {"min_age": params.min_age, "status": params.status, "search_text": params.search_text, "limit": params.limit}) ``` ```python filename="SafeQueries.py" copy from pydantic import BaseModel, Field class QueryParams(BaseModel): min_age: int = Field(ge=0, le=150) status: str = Field(pattern=r"^(active|inactive)$") limit: int = Field(default=10, ge=1, le=1000) search_text: str = Field(pattern=r'^[a-zA-Z0-9\s]*$') def run(client: MooseClient, params: QueryParams): query = """ SELECT * FROM users WHERE age >= {minAge:UInt32} AND status = {status:String} AND name ILIKE {searchPattern:String} LIMIT {limit:UInt32} """ return client.query.execute_raw(query, { "minAge": params.min_age, "status": params.status, "searchPattern": f"%{params.search_text}%", "limit": params.limit }) ``` #### Basic Query Parameter Interpolation #### Table and Column References ```python filename="ValidatedQueries.py" copy from moose_lib import Api, MooseClient from pydantic import BaseModel, Field, constr from typing import Literal, Optional from enum import Enum from app.UserTable import UserTable class QueryParams(BaseModel): # When using f-strings, we need extremely strict validation column: str = Field(pattern=r"^(id|name|email)$", description="Uses a regex pattern to only allow valid column names") search_term: str = Field( pattern=r'^[\w\s\'-]{1,50}$', # Allows letters, numbers, spaces, hyphens, apostrophes; Does not allow special characters that could be used in SQL injection strip_whitespace=True, min_length=1, max_length=50 ) limit: int = Field( default=10, ge=1, le=100, description="Number of results to return" ) def run(client: MooseClient, params: QueryParams): query = """ SELECT {column} FROM {table} WHERE name ILIKE '%{search_term}%' LIMIT {limit} """ return client.query.execute(query, {"column": UserTable.cols[params.column], "table": UserTable, "search_term": params.search_term, "limit": params.limit}) ``` ```python filename="UserTable.py" copy from moose_lib import OlapTable, Key from pydantic import BaseModel class UserSchema(BaseModel): id: Key[int] name: str email: str UserTable = OlapTable[UserSchema]("users") ``` ### Advanced Query Patterns #### Dynamic Column & Table Selection ```python filename="DynamicColumns.py" copy from app.UserTable import UserTable class QueryParams(BaseModel): colName: str = Field(pattern=r"^(id|name|email)$", description="Uses a regex pattern to only allow valid column names from the UserTable") class QueryResult(BaseModel): id: Optional[int] name: Optional[str] email: Optional[str] def run(client: MooseClient, params: QueryParams): # Put column and table in the dict for variables query = "SELECT {column} FROM {table}" return client.query.execute(query, {"column": UserTable.cols[params.colName], "table": UserTable}) ## Create the API bar = Api[QueryParams, QueryResult](name="bar", query_function=run) ## Call the API ## HTTP Request: GET http://localhost:4000/api/bar?colName=id ## EXECUTED QUERY: SELECT id FROM users ``` ```python filename="UserTable.py" copy from moose_lib import OlapTable, Key from pydantic import BaseModel class UserSchema(BaseModel): id: Key[int] name: str email: str UserTable = OlapTable[UserSchema]("users") ``` #### Conditional `WHERE` Clauses Build `WHERE` clauses based on provided parameters: ```python filename="ConditionalColumns.py" copy class FilterParams(BaseModel): min_age: Optional[int] status: Optional[str] = Field(pattern=r"^(active|inactive)$") search_text: Optional[str] = Field(pattern=r"^[a-zA-Z0-9\s]+$", description="Alphanumeric search text without special characters to prevent SQL injection") class QueryResult(BaseModel): id: int name: str email: str def build_query(client: MooseClient, params: FilterParams) -> QueryResult: # Using f-strings with validated parameters conditions = [] if params.min_age: conditions.append("age >= {min_age}") parameters["min_age"] = params.min_age if params.status: conditions.append("status = {status}") parameters["status"] = params.status if params.search_text: conditions.append("(name ILIKE {search_text} OR email ILIKE {search_text})") parameters["search_text"] = params.search_text where_clause = f" WHERE {' AND '.join(conditions)}" if conditions else "" query = f"""SELECT * FROM users {where_clause} ORDER BY created_at DESC""" return client.query.execute(query, parameters) ## Create the API bar = Api[FilterParams, QueryResult](name="bar", query_function=build_query) ## Call the API ## HTTP Request: GET http://localhost:4000/api/bar?min_age=20&status=active&search_text=John ## EXECUTED QUERY: SELECT * FROM users WHERE age >= 20 AND status = 'active' AND (name ILIKE '%John%' OR email ILIKE '%John%') ORDER BY created_at DESC ``` ### Adding Authentication Moose supports authentication via JSON web tokens (JWTs). When your client makes a request to your Analytics API, Moose will automatically parse the JWT and pass the **authenticated** payload to your handler function as the `jwt` object: ```python filename="Authentication.py" copy def run(client: MooseClient, params: QueryParams, jwt: dict): # Use parameter binding with JWT data query = """SELECT * FROM userReports WHERE user_id = {user_id} LIMIT 5""" return client.query.execute(query, {"user_id": jwt["userId"]}) ``` Moose validates the JWT signature and ensures the JWT is properly formatted. If the JWT authentication fails, Moose will return a `401 Unauthorized error`. ## Understanding Response Codes Moose automatically provides standard HTTP responses: | Status Code | Meaning | Response Body | |-------------|-------------------------|---------------------------------| | 200 | Success | Your API's result data | | 400 | Validation error | `{ "error": "Detailed message"}`| | 401 | Unauthorized | `{ "error": "Unauthorized"}` | | 500 | Internal server error | `{ "error": "Internal server error"}` | ## Post-Processing Query Results After executing your database query, you can transform the data before returning it to the client. This allows you to: ```python filename="PostProcessingExample.py" copy from datetime import datetime from moose_lib import Api from pydantic import BaseModel class QueryParams(BaseModel): category: str max_results: int = 10 class ResponseItem(BaseModel): itemId: int displayName: str formattedValue: str isHighValue: bool date: str def run(client: MooseClient, params: QueryParams): # 1. Fetch raw data using parameter binding query = """ SELECT id, name, value, timestamp FROM data_table WHERE category = {category} LIMIT {limit} """ raw_results = client.query.execute(query, {"category": params.category, "limit": params.max_results}) # 2. Post-process the results processed_results = [] for row in raw_results: processed_results.append(ResponseItem( # Transform field names itemId=row['id'], displayName=row['name'].upper(), # Add derived fields formattedValue=f"${row['value']:.2f}", isHighValue=row['value'] > 1000, # Format dates date=datetime.fromisoformat(row['timestamp']).date().isoformat() )) return processed_results # Create the API process_data_api = Api[QueryParams, ResponseItem](name="process_data_endpoint", query_function=run) ``` ### Best Practices While post-processing gives you flexibility, remember that database operations are typically more efficient for heavy data manipulation. Reserve post-processing for transformations that are difficult to express in SQL or that involve application-specific logic. ## Client Integration By default, all API endpoints are automatically integrated with OpenAPI/Swagger documentation. You can integrate your OpenAPI SDK generator of choice to generate client libraries for your APIs. Please refer to the [OpenAPI](/moose/apis/open-api-sdk) page for more information on how to integrate your APIs with OpenAPI. --- ## API Authentication & Security Source: moose/apis/auth.mdx Secure your Moose API endpoints with JWT tokens or API keys # API Authentication & Security Moose supports two authentication mechanisms for securing your API endpoints: - **[API Keys](#api-key-authentication)** - Simple, static authentication for internal applications and getting started - **[JWT (JSON Web Tokens)](#jwt-authentication)** - Token-based authentication for integration with existing identity providers Choose the method that fits your use case, or use both together with custom configuration. ## Do you want to use API Keys? API keys are the simplest way to secure your Moose endpoints. They're ideal for: - Internal applications and microservices - Getting started quickly with authentication - Scenarios where you control both client and server ### How API Keys Work API keys use PBKDF2 HMAC SHA256 hashing for secure storage. You generate a token pair (plain-text and hashed) using the Moose CLI, store the hashed version in environment variables, and send the plain-text version in your request headers. ### Step 1: Generate API Keys Generate tokens and hashed keys using the Moose CLI: ```bash moose generate hash-token ``` **Output:** - **ENV API Keys**: Hashed key for environment variables (use this in your server configuration) - **Bearer Token**: Plain-text token for client applications (use this in `Authorization` headers) Use the **hashed key** for environment variables and `moose.config.toml`. Use the **plain-text token** in your `Authorization: Bearer token` headers. ### Step 2: Configure API Keys with Environment Variables Set environment variables with the **hashed** API keys you generated: ```bash # For ingest endpoints export MOOSE_INGEST_API_KEY='your_pbkdf2_hmac_sha256_hashed_key' # For analytics endpoints export MOOSE_CONSUMPTION_API_KEY='your_pbkdf2_hmac_sha256_hashed_key' # For admin endpoints export MOOSE_ADMIN_TOKEN='your_plain_text_token' ``` Or set the admin API key in `moose.config.toml`: ```toml filename="moose.config.toml" [authentication] admin_api_key = "your_pbkdf2_hmac_sha256_hashed_key" ``` Storing the `admin_api_key` (which is a PBKDF2 HMAC SHA256 hash) in your `moose.config.toml` file is an acceptable practice, even if the file is version-controlled. This is because the actual plain-text Bearer token (the secret) is not stored. The hash is computationally expensive to reverse, ensuring that your secret is not exposed in the codebase. ### Step 3: Make Authenticated Requests All authenticated requests require the `Authorization` header with the **plain-text token**: ```bash # Using curl curl -H "Authorization: Bearer your_plain_text_token_here" \ https://your-moose-instance.com/ingest/YourDataModel # Using JavaScript fetch('https://your-moose-instance.com/api/endpoint', { headers: { 'Authorization': 'Bearer your_plain_text_token_here' } }) ``` ## Do you want to use JWTs? JWT authentication integrates with existing identity providers and follows standard token-based authentication patterns. Use JWTs when: - You have an existing identity provider (Auth0, Okta, etc.) - You need user-specific authentication and authorization - You want standard OAuth 2.0 / OpenID Connect flows ### How JWT Works Moose validates JWT tokens using RS256 algorithm with your identity provider's public key. You configure the expected issuer and audience, and Moose handles token verification automatically. ### Step 1: Configure JWT Settings #### Option A: Configure in `moose.config.toml` ```toml filename=moose.config.toml [jwt] # Your JWT public key (PEM-formatted RSA public key) secret = """ -----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy... -----END PUBLIC KEY----- """ # Expected JWT issuer issuer = "https://my-auth-server.com/" # Expected JWT audience audience = "my-moose-app" ``` The `secret` field should contain your JWT **public key** used to verify signatures using RS256 algorithm. #### Option B: Configure with Environment Variables You can also set these values as environment variables: ```bash filename=".env" copy MOOSE_JWT_PUBLIC_KEY=your_jwt_public_key # PEM-formatted RSA public key (overrides `secret` in `moose.config.toml`) MOOSE_JWT_ISSUER=your_jwt_issuer # Expected JWT issuer (overrides `issuer` in `moose.config.toml`) MOOSE_JWT_AUDIENCE=your_jwt_audience # Expected JWT audience (overrides `audience` in `moose.config.toml`) ``` ### Step 2: Make Authenticated Requests Send requests with the JWT token in the `Authorization` header: ```bash # Using curl curl -H "Authorization: Bearer your_jwt_token_here" \ https://your-moose-instance.com/ingest/YourDataModel # Using JavaScript fetch('https://your-moose-instance.com/api/endpoint', { headers: { 'Authorization': 'Bearer your_jwt_token_here' } }) ``` ## Want to use both? Here's the caveats You can configure both JWT and API Key authentication simultaneously. When both are configured, Moose's authentication behavior depends on the `enforce_on_all_*` flags. ### Understanding Authentication Priority #### Default Behavior (No Enforcement) By default, when both JWT and API Keys are configured, Moose tries JWT validation first, then falls back to API Key validation: ```toml filename="moose.config.toml" [jwt] # JWT configuration secret = "..." issuer = "https://my-auth-server.com/" audience = "my-moose-app" # enforce flags default to false ``` ```bash filename=".env" # API Key configuration MOOSE_INGEST_API_KEY='your_pbkdf2_hmac_sha256_hashed_key' MOOSE_CONSUMPTION_API_KEY='your_pbkdf2_hmac_sha256_hashed_key' ``` **For Ingest Endpoints (`/ingest/*`)**: - Attempts JWT validation first (RS256 signature check) - Falls back to API Key validation (PBKDF2 HMAC SHA256) if JWT fails **For Analytics Endpoints (`/api/*`)**: - Same fallback behavior as ingest endpoints This allows you to use either authentication method for your clients. #### Enforcing JWT Only If you want to **only** accept JWT tokens (no API key fallback), set the enforcement flags: ```toml filename="moose.config.toml" [jwt] secret = "..." issuer = "https://my-auth-server.com/" audience = "my-moose-app" # Only accept JWT, no API key fallback enforce_on_all_ingest_apis = true enforce_on_all_consumptions_apis = true ``` **Result**: When enforcement is enabled, API Key authentication is disabled even if the environment variables are set. Only valid JWT tokens will be accepted. ### Common Use Cases #### Use Case 1: Different Auth for Different Endpoints Configure JWT for user-facing analytics endpoints, API keys for internal ingestion: ```toml filename="moose.config.toml" [jwt] secret = "..." issuer = "https://my-auth-server.com/" audience = "my-moose-app" enforce_on_all_consumptions_apis = true # JWT only for /api/* enforce_on_all_ingest_apis = false # Allow fallback for /ingest/* ``` ```bash filename=".env" MOOSE_INGEST_API_KEY='hashed_key_for_internal_services' ``` #### Use Case 2: Migration from API Keys to JWT Start with both configured, no enforcement. Gradually migrate clients to JWT. Once complete, enable enforcement: ```toml filename="moose.config.toml" [jwt] secret = "..." issuer = "https://my-auth-server.com/" audience = "my-moose-app" # Start with both allowed during migration enforce_on_all_ingest_apis = false enforce_on_all_consumptions_apis = false # Later, enable to complete migration # enforce_on_all_ingest_apis = true # enforce_on_all_consumptions_apis = true ``` ### Admin Endpoints Admin endpoints use API key authentication exclusively (configured separately from ingest/analytics endpoints). **Configuration precedence** (highest to lowest): 1. `--token` CLI parameter (plain-text token) 2. `MOOSE_ADMIN_TOKEN` environment variable (plain-text token) 3. `admin_api_key` in `moose.config.toml` (hashed token) **Example:** ```bash # Option 1: CLI parameter moose remote plan --token your_plain_text_token # Option 2: Environment variable export MOOSE_ADMIN_TOKEN='your_plain_text_token' moose remote plan # Option 3: Config file # In moose.config.toml: # [authentication] # admin_api_key = "your_pbkdf2_hmac_sha256_hashed_key" ``` ## Security Best Practices - **Never commit plain-text tokens to version control** - Always use hashed keys in configuration files - **Use environment variables for production** - Keep secrets out of your codebase - **Generate unique tokens for different environments** - Separate development, staging, and production credentials - **Rotate tokens regularly** - Especially for long-running production deployments - **Choose the right method for your use case**: - Use **API Keys** for internal services and getting started - Use **JWT** when integrating with identity providers or need user-level auth - **Store hashed keys safely** - The PBKDF2 HMAC SHA256 hash in `moose.config.toml` is safe to version control, but the plain-text token should only exist in secure environment variables or secret management systems Never commit plain-text tokens to version control. Use hashed keys in configuration files and environment variables for production. --- ## Ingestion APIs Source: moose/apis/ingest-api.mdx Ingestion APIs for Moose # Ingestion APIs ## Overview Moose Ingestion APIs are the entry point for getting data into your Moose application. They provide a fast, reliable, and type-safe way to move data from your sources into streams and tables for analytics and processing. ## When to Use Ingestion APIs Ingestion APIs are most useful when you want to implement a push-based pattern for getting data from your data sources into your streams and tables. Common use cases include: - Instrumenting external client applications - Receiving webhooks from third-party services - Integrating with ETL or data pipeline tools that push data ## Why Use Moose's APIs Over Your Own? Moose's ingestion APIs are purpose-built for high-throughput data pipelines, offering key advantages over other more general-purpose frameworks: - **Built-in schema validation:** Ensures only valid data enters your pipeline. - **Direct connection to streams/tables:** Instantly link HTTP endpoints to Moose data infrastructure to route incoming data to your streams and tables without any glue code. - **Dead Letter Queue (DLQ) support:** Invalid records are automatically captured for review and recovery. - **OpenAPI auto-generation:** Instantly generate client SDKs and docs for all endpoints, including example data. - **Rust-powered performance:** Far higher throughput and lower latency than typical Node.js or Python APIs. ## Validation Moose validates all incoming data against your Pydantic model. If a record fails validation, Moose can automatically route it to a Dead Letter Queue (DLQ) for later inspection and recovery. ```python filename="ValidationExample.py" copy from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfig from pydantic import BaseModel class Properties(BaseModel): device: Optional[str] version: Optional[int] class ExampleModel(BaseModel): id: str userId: str timestamp: datetime properties: Properties api = IngestApi[ExampleModel]("your-api-route", IngestConfig( destination=Stream[ExampleModel]("your-stream-name"), dead_letter_queue=DeadLetterQueue[ExampleModel]("your-dlq-name") )) ``` If your IngestPipeline’s schema marks a field as optional but annotates a ClickHouse default, Moose treats: - API request and Stream message: field is optional (you may omit it) - ClickHouse table storage: field is required with a DEFAULT clause Behavior: When the API/stream inserts into ClickHouse and the field is missing, ClickHouse sets it to the configured default value. This keeps request payloads simple while avoiding Nullable columns in storage. Example: `Annotated[int, clickhouse_default("18")]` (or equivalent annotation) Send a valid event - routed to the destination stream ```python filename="ValidEvent.py" copy requests.post("http://localhost:4000/ingest/your-api-route", json={ "id": "event1", "userId": "user1", "timestamp": "2023-05-10T15:30:00Z" }) # ✅ Accepted and routed to the destination stream # API returns 200 and { success: true } ``` Send an invalid event (missing required field) - routed to the DLQ ```python filename="InvalidEventMissingField.py" copy requests.post("http://localhost:4000/ingest/your-api-route", json={ "id": "event1", }) # ❌ Routed to DLQ, because it's missing a required field # API returns 400 response ``` Send an invalid event (bad date format) - routed to the DLQ ```python filename="InvalidEventBadDate.py" copy requests.post("http://localhost:4000/ingest/your-api-route", json={ "id": "event1", "userId": "user1", "timestamp": "not-a-date" }) # ❌ Routed to DLQ, because the timestamp is not a valid date # API returns 400 response ``` ## Creating Ingestion APIs You can create ingestion APIs in two ways: - **High-level:** Using the `IngestPipeline` class (recommended for most use cases) - **Low-level:** Manually configuring the `IngestApi` component for more granular control ### High-level: IngestPipeline (Recommended) The `IngestPipeline` class provides a convenient way to set up ingestion endpoints, streams, and tables with a single declaration: ```python filename="IngestPipeline.py" copy from moose_lib import Key, IngestPipeline, IngestPipelineConfig from pydantic import BaseModel class ExampleSchema(BaseModel): id: Key[str] name: str value: int timestamp: datetime example_pipeline = IngestPipeline[ExampleSchema]( name="example-name", config=IngestPipelineConfig( ingest_api=True, stream=True, table=True ) ) ``` ### Low-level: Standalone IngestApi For more granular control, you can manually configure the `IngestApi` component: The types of the destination `Stream` and `Table` must match the type of the `IngestApi`. ## Configuration Reference Configuration options for both high-level and low-level ingestion APIs are provided below. ```python filename="IngestPipelineConfig.py" copy class IngestPipelineConfig(BaseModel): table: bool | OlapConfig = True stream: bool | StreamConfig = True ingest_api: bool | IngestConfig = True dead_letter_queue: bool | StreamConfig = True version: Optional[str] = None metadata: Optional[dict] = None life_cycle: Optional[LifeCycle] = None ``` ```python filename="IngestConfig.py" copy @dataclass class IngestConfigWithDestination[T: BaseModel]: destination: Stream[T] dead_letter_queue: Optional[DeadLetterQueue[T]] = None version: Optional[str] = None metadata: Optional[dict] = None ``` --- ## OpenAPI SDK Generation Source: moose/apis/openapi-sdk.mdx Generate type-safe client SDKs from your Moose APIs # OpenAPI SDK Generation Moose automatically generates OpenAPI specifications for all your APIs, enabling you to create type-safe client SDKs in any language. This allows you to integrate your Moose APIs into any application with full type safety and IntelliSense support. ## Overview While `moose dev` is running, Moose emits an OpenAPI spec at `.moose/openapi.yaml` covering: - **Ingestion endpoints** with request/response schemas - **Analytics APIs** with query parameters and response types Every time you make a change to your Moose APIs, the OpenAPI spec is updated automatically. ## Generating Typed SDKs from OpenAPI You can use your preferred generator to create a client from that spec. Below are minimal, tool-agnostic examples you can copy into your project scripts. ### Setup The following example uses `openapi-python-client` to generate the SDK. Follow the setup instructions here: [openapi-python-client on PyPI](https://pypi.org/project/openapi-python-client/). Add a generation script in your repository: ```bash filename="scripts/generate_python_sdk.sh" copy #!/usr/bin/env bash set -euo pipefail openapi-python-client generate --path .moose/openapi.yaml --output ./generated/python --overwrite ``` Then configure Moose to run it after each dev reload: ```toml filename="moose.config.toml" copy [http_server_config] on_reload_complete_script = "bash scripts/generate_python_sdk.sh" ``` This will regenerate the Python client from the live spec on every reload. ### Hooks for automatic SDK generation The `on_reload_complete_script` hook is available in your `moose.config.toml` file. It runs after each dev server reload when code/infra changes have been fully applied. This allows you to keep your SDKs continuously up to date as you make changes to your Moose APIs. Notes: - The script runs in your project root using your `$SHELL` (falls back to `/bin/sh`). - Paths like `.moose/openapi.yaml` and `./generated/...` are relative to the project root. - You can combine multiple generators with `&&` (as shown) or split into a shell script if preferred. These hooks only affect local development (`moose dev`). The reload hook runs after Moose finishes applying your changes, ensuring `.moose/openapi.yaml` is fresh before regeneration. ## Integration Import from the output path your generator writes to (see your chosen example repo). The Moose side is unchanged: the spec lives at `.moose/openapi.yaml` during `moose dev`. ## Generators Use any OpenAPI-compatible generator: ### TypeScript projects - [OpenAPI Generator (typescript-fetch)](https://github.com/OpenAPITools/openapi-generator) — mature, broad options; generates Fetch-based client - [Kubb](https://github.com/kubb-project/kubb) — generates types + fetch client with simple config - [Orval](https://orval.dev/) — flexible output (client + schemas), good DX - [openapi-typescript](https://github.com/openapi-ts/openapi-typescript) — generates types only (pair with your own client) - [swagger-typescript-api](https://github.com/acacode/swagger-typescript-api) — codegen for TS clients from OpenAPI - [openapi-typescript-codegen](https://github.com/ferdikoomen/openapi-typescript-codegen) — TS client + models - [oazapfts](https://github.com/oazapfts/oazapfts) — minimal TS client based on fetch - [openapi-zod-client](https://github.com/astahmer/openapi-zod-client) — Zod schema-first client generation ### Python projects - [openapi-python-client](https://pypi.org/project/openapi-python-client/) — modern typed client for OpenAPI 3.0/3.1 - [OpenAPI Generator (python)](https://github.com/OpenAPITools/openapi-generator) — multiple Python generators (python, python-nextgen) --- ## Trigger APIs Source: moose/apis/trigger-api.mdx Create APIs that trigger workflows and other processes # Trigger APIs ## Overview You can create APIs to initiate workflows, data processing jobs, or other automated processes. ## Basic Usage ```python filename="app/apis/trigger_workflow.py" copy from moose_lib import MooseClient, Api from pydantic import BaseModel, Field from datetime import datetime class WorkflowParams(BaseModel): input_value: str priority: str = Field(default="normal") class WorkflowResponse(BaseModel): workflow_id: str status: str def run(params: WorkflowParams, client: MooseClient) -> WorkflowResponse: # Trigger the workflow with input parameters workflow_execution = client.workflow.execute( workflow="data-processing", params={ "input_value": params.input_value, "priority": params.priority, "triggered_at": datetime.now().isoformat() } ) return WorkflowResponse( workflow_id=workflow_execution.id, status="started" ) api = Api[WorkflowParams, WorkflowResponse]("trigger-workflow", run) ``` ## Using the Trigger API Once deployed, you can trigger workflows via HTTP requests: ```bash filename="Terminal" copy curl "http://localhost:4000/api/trigger-workflow?inputValue=process-user-data&priority=high" ``` Response: ```json { "workflowId": "workflow-12345", "status": "started" } ```