# Moose / Apis Documentation – Python
## Included Files
1. moose/apis/admin-api.mdx
2. moose/apis/analytics-api.mdx
3. moose/apis/auth.mdx
4. moose/apis/ingest-api.mdx
5. moose/apis/openapi-sdk.mdx
6. moose/apis/trigger-api.mdx
## admin-api
Source: moose/apis/admin-api.mdx
# Coming Soon
---
## APIs
Source: moose/apis/analytics-api.mdx
APIs for Moose
# APIs
## Overview
APIs are functions that run on your server and automatically exposed as HTTP `GET` endpoints.
They are designed to read data from your OLAP database. Out of the box, these APIs provide:
- Automatic type validation and type conversion for your query parameters, which are sent in the URL, and response body
- Managed database client connection
- Automatic OpenAPI documentation generation
Common use cases include:
- Powering user-facing analytics, dashboards and other front-end components
- Enabling AI tools to interact with your data
- Building custom APIs for your internal tools
### Enabling APIs
Analytics APIs are enabled by default. To explicitly control this feature in your `moose.config.toml`:
```toml filename="moose.config.toml" copy
[features]
apis = true
```
### Basic Usage
`execute` is the recommended way to execute queries. It provides a thin wrapper around the ClickHouse Python client so that you can safely pass `OlapTable` and `Column` objects to your query without needing to worry about ClickHouse identifiers:
```python filename="ExampleApi.py" copy
from moose_lib import Api, MooseClient
from pydantic import BaseModel
## Import the source pipeline
from app.path.to.SourcePipeline import SourcePipeline
# Define the query parameters
class QueryParams(BaseModel):
filter_field: str
max_results: int
# Define the response body
class ResponseBody(BaseModel):
id: int
name: str
value: float
SourceTable = SourcePipeline.get_table()
# Define the route handler function (parameterized)
def run(client: MooseClient, params: QueryParams) -> list[ResponseBody]:
query = """
SELECT
id,
name,
value
FROM {table}
WHERE category = {category}
LIMIT {limit}
"""
return client.query.execute(query, {"table": SourceTable, "category": params.filter_field, "limit": params.max_results})
# Create the API
example_api = Api[QueryParams, ResponseBody](name="example_endpoint", query_function=run)
```
Use `execute_raw` with parameter binding for safe, typed queries:
```python filename="ExampleApi.py" copy
from moose_lib import Api, MooseClient
from pydantic import BaseModel
## Import the source pipeline
from app.path.to.SourcePipeline import SourcePipeline
# Define the query parameters
class QueryParams(BaseModel):
filterField: str
maxResults: int
# Define the response body
class ResponseBody(BaseModel):
id: int
name: str
value: float
SourceTable = SourcePipeline.get_table()
# Define the route handler function (using execute_raw with typed parameters)
def run(client: MooseClient, params: QueryParams) -> list[ResponseBody]:
query = """
SELECT
id,
name,
value
FROM Source
WHERE category = {category:String}
LIMIT {limit:UInt32}
"""
return client.query.execute_raw(query, {"category": params.filterField, "limit": params.maxResults})
# Create the API
example_api = Api[QueryParams, ResponseBody](name="example_endpoint", query_function=run)
```
```python filename="SourcePipeline.py" copy
from moose_lib import IngestPipeline, IngestPipelineConfig, Key
from pydantic import BaseModel
class SourceSchema(BaseModel):
id: Key[int]
name: str
value: float
SourcePipeline = IngestPipeline[SourceSchema]("Source", IngestPipelineConfig(
ingest_api=False,
stream=True,
table=True,
))
```
The `Api` class takes:
- Route name: The URL path to access your API (e.g., `"example_endpoint"`)
- Handler function: Processes requests with typed parameters and returns the result
The generic type parameters specify:
- `QueryParams`: The structure of accepted URL parameters
- `ResponseBody`: The exact shape of your API's response data
You can name these types anything you want. The first type generates validation for query parameters, while the second defines the response structure for OpenAPI documentation.
## Type Validation
You can also model the query parameters and response body as Pydantic models, which Moose will use to provide automatic type validation and type conversion for your query parameters, which are sent in the URL, and response body.
### Modeling Query Parameters
Define your API's parameters as a Pydantic model:
```python filename="ExampleQueryParams.py" copy
from pydantic import BaseModel
from typing import Optional
class QueryParams(BaseModel):
filterField: str = Field(..., description="The field to filter by")
maxResults: int = Field(..., description="The maximum number of results to return")
optionalParam: Optional[str] = Field(None, description="An optional parameter")
```
Moose automatically handles:
- Runtime validation
- Clear error messages for invalid parameters
- OpenAPI documentation generation
Complex nested objects and arrays are not supported. Analytics APIs are `GET` endpoints designed to be simple and lightweight.
### Adding Advanced Type Validation
Moose uses Pydantic for runtime validation. Use Pydantic's `Field` class for more complex validation:
```python filename="ExampleQueryParams.py" copy
from pydantic import BaseModel, Field
class QueryParams(BaseModel):
filterField: str = Field(pattern=r"^(id|name|email)$", description="The field to filter by") ## Only allow valid column names from the UserTable
maxResults: int = Field(gt=0, description="The maximum number of results to return") ## Positive integer
```
### Common Validation Options
```python filename="ValidationExamples.py" copy
from pydantic import BaseModel, Field
class QueryParams(BaseModel):
# Numeric validations
id: int = Field(..., gt=0)
age: int = Field(..., gt=0, lt=120)
price: float = Field(..., gt=0, lt=1000)
discount: float = Field(..., gt=0, multiple_of=0.5)
# String validations
username: str = Field(..., min_length=3, max_length=20)
email: str = Field(..., format="email")
zipCode: str = Field(..., pattern=r"^[0-9]{5}$")
uuid: str = Field(..., format="uuid")
ipAddress: str = Field(..., format="ipv4")
# Date validations
startDate: str = Field(..., format="date")
# Enum validation
status: str = Field(..., enum=["active", "pending", "inactive"])
# Optional parameters
limit: int = Field(None, gt=0, lt=100)
```
For a full list of validation options, see the [Pydantic documentation](https://docs.pydantic.dev/latest/concepts/types/#customizing-validation-with-fields).
### Setting Default Values
You can set default values for parameters by setting values for each parameter in your Pydantic model:
```python filename="ExampleQueryParams.py" copy {9}
from pydantic import BaseModel
class QueryParams(BaseModel):
filterField: str = "example"
maxResults: int = 10
optionalParam: str | None = "default"
```
## Implementing Route Handler
API route handlers are regular functions, so you can implement whatever arbitrary logic you want inside these functions. Most of the time you will be use APIs to expose your data to your front-end applications or other tools:
### Connecting to the Database
Moose provides a managed `MooseClient` to your function execution context. This client provides access to the database and other Moose resources, and handles connection pooling/lifecycle management for you:
```python filename="ExampleApi.py" copy
from moose_lib import MooseClient
from app.UserTable import UserTable
def run(client: MooseClient, params: QueryParams):
# You can use a formatted string for simple static query
query = """
SELECT COUNT(*) FROM {table}
"""
## You can optionally pass the table object to the query
return client.query.execute(query, {"table": UserTable})
## Create the API
example_api = Api[QueryParams, ResponseBody](name="example_endpoint", query_function=run)
```
Use `execute_raw` with parameter binding:
```python filename="ExampleApi.py" copy
from moose_lib import MooseClient
def run(params: QueryParams, client: MooseClient):
# Using execute_raw for safe queries
query = """
SELECT COUNT(*) FROM {table: Identifier}
"""
## Must be the name of the table, not the table object
return client.query.execute_raw(query, {"table": UserTable.name})
## Create the API
example_api = Api[QueryParams, ResponseBody](name="example_endpoint", query_function=run)
```
### Constructing Safe SQL Queries
```python filename="SafeQueries.py" copy
from pydantic import BaseModel, Field
class QueryParams(BaseModel):
min_age: int = Field(ge=0, le=150)
status: str = Field(pattern=r"^(active|inactive)$")
limit: int = Field(default=10, ge=1, le=1000)
search_text: str = Field(pattern=r'^[a-zA-Z0-9\s]*$')
def run(client: MooseClient, params: QueryParams):
query = """
SELECT *
FROM users
WHERE age >= {min_age}
AND status = '{status}'
AND name ILIKE '%{search_text}%'
LIMIT {limit}
"""
return client.query.execute(query, {"min_age": params.min_age, "status": params.status, "search_text": params.search_text, "limit": params.limit})
```
```python filename="SafeQueries.py" copy
from pydantic import BaseModel, Field
class QueryParams(BaseModel):
min_age: int = Field(ge=0, le=150)
status: str = Field(pattern=r"^(active|inactive)$")
limit: int = Field(default=10, ge=1, le=1000)
search_text: str = Field(pattern=r'^[a-zA-Z0-9\s]*$')
def run(client: MooseClient, params: QueryParams):
query = """
SELECT *
FROM users
WHERE age >= {minAge:UInt32}
AND status = {status:String}
AND name ILIKE {searchPattern:String}
LIMIT {limit:UInt32}
"""
return client.query.execute_raw(query, {
"minAge": params.min_age,
"status": params.status,
"searchPattern": f"%{params.search_text}%",
"limit": params.limit
})
```
#### Basic Query Parameter Interpolation
#### Table and Column References
```python filename="ValidatedQueries.py" copy
from moose_lib import Api, MooseClient
from pydantic import BaseModel, Field, constr
from typing import Literal, Optional
from enum import Enum
from app.UserTable import UserTable
class QueryParams(BaseModel):
# When using f-strings, we need extremely strict validation
column: str = Field(pattern=r"^(id|name|email)$", description="Uses a regex pattern to only allow valid column names")
search_term: str = Field(
pattern=r'^[\w\s\'-]{1,50}$', # Allows letters, numbers, spaces, hyphens, apostrophes; Does not allow special characters that could be used in SQL injection
strip_whitespace=True,
min_length=1,
max_length=50
)
limit: int = Field(
default=10,
ge=1,
le=100,
description="Number of results to return"
)
def run(client: MooseClient, params: QueryParams):
query = """
SELECT {column}
FROM {table}
WHERE name ILIKE '%{search_term}%'
LIMIT {limit}
"""
return client.query.execute(query, {"column": UserTable.cols[params.column], "table": UserTable, "search_term": params.search_term, "limit": params.limit})
```
```python filename="UserTable.py" copy
from moose_lib import OlapTable, Key
from pydantic import BaseModel
class UserSchema(BaseModel):
id: Key[int]
name: str
email: str
UserTable = OlapTable[UserSchema]("users")
```
### Advanced Query Patterns
#### Dynamic Column & Table Selection
```python filename="DynamicColumns.py" copy
from app.UserTable import UserTable
class QueryParams(BaseModel):
colName: str = Field(pattern=r"^(id|name|email)$", description="Uses a regex pattern to only allow valid column names from the UserTable")
class QueryResult(BaseModel):
id: Optional[int]
name: Optional[str]
email: Optional[str]
def run(client: MooseClient, params: QueryParams):
# Put column and table in the dict for variables
query = "SELECT {column} FROM {table}"
return client.query.execute(query, {"column": UserTable.cols[params.colName], "table": UserTable})
## Create the API
bar = Api[QueryParams, QueryResult](name="bar", query_function=run)
## Call the API
## HTTP Request: GET http://localhost:4000/api/bar?colName=id
## EXECUTED QUERY: SELECT id FROM users
```
```python filename="UserTable.py" copy
from moose_lib import OlapTable, Key
from pydantic import BaseModel
class UserSchema(BaseModel):
id: Key[int]
name: str
email: str
UserTable = OlapTable[UserSchema]("users")
```
#### Conditional `WHERE` Clauses
Build `WHERE` clauses based on provided parameters:
```python filename="ConditionalColumns.py" copy
class FilterParams(BaseModel):
min_age: Optional[int]
status: Optional[str] = Field(pattern=r"^(active|inactive)$")
search_text: Optional[str] = Field(pattern=r"^[a-zA-Z0-9\s]+$", description="Alphanumeric search text without special characters to prevent SQL injection")
class QueryResult(BaseModel):
id: int
name: str
email: str
def build_query(client: MooseClient, params: FilterParams) -> QueryResult:
# Using f-strings with validated parameters
conditions = []
if params.min_age:
conditions.append("age >= {min_age}")
parameters["min_age"] = params.min_age
if params.status:
conditions.append("status = {status}")
parameters["status"] = params.status
if params.search_text:
conditions.append("(name ILIKE {search_text} OR email ILIKE {search_text})")
parameters["search_text"] = params.search_text
where_clause = f" WHERE {' AND '.join(conditions)}" if conditions else ""
query = f"""SELECT * FROM users {where_clause} ORDER BY created_at DESC"""
return client.query.execute(query, parameters)
## Create the API
bar = Api[FilterParams, QueryResult](name="bar", query_function=build_query)
## Call the API
## HTTP Request: GET http://localhost:4000/api/bar?min_age=20&status=active&search_text=John
## EXECUTED QUERY: SELECT * FROM users WHERE age >= 20 AND status = 'active' AND (name ILIKE '%John%' OR email ILIKE '%John%') ORDER BY created_at DESC
```
### Adding Authentication
Moose supports authentication via JSON web tokens (JWTs). When your client makes a request to your Analytics API, Moose will automatically parse the JWT and pass the **authenticated** payload to your handler function as the `jwt` object:
```python filename="Authentication.py" copy
def run(client: MooseClient, params: QueryParams, jwt: dict):
# Use parameter binding with JWT data
query = """SELECT * FROM userReports WHERE user_id = {user_id} LIMIT 5"""
return client.query.execute(query, {"user_id": jwt["userId"]})
```
Moose validates the JWT signature and ensures the JWT is properly formatted. If the JWT authentication fails, Moose will return a `401 Unauthorized error`.
## Understanding Response Codes
Moose automatically provides standard HTTP responses:
| Status Code | Meaning | Response Body |
|-------------|-------------------------|---------------------------------|
| 200 | Success | Your API's result data |
| 400 | Validation error | `{ "error": "Detailed message"}`|
| 401 | Unauthorized | `{ "error": "Unauthorized"}` |
| 500 | Internal server error | `{ "error": "Internal server error"}` |
## Post-Processing Query Results
After executing your database query, you can transform the data before returning it to the client. This allows you to:
```python filename="PostProcessingExample.py" copy
from datetime import datetime
from moose_lib import Api
from pydantic import BaseModel
class QueryParams(BaseModel):
category: str
max_results: int = 10
class ResponseItem(BaseModel):
itemId: int
displayName: str
formattedValue: str
isHighValue: bool
date: str
def run(client: MooseClient, params: QueryParams):
# 1. Fetch raw data using parameter binding
query = """
SELECT id, name, value, timestamp
FROM data_table
WHERE category = {category}
LIMIT {limit}
"""
raw_results = client.query.execute(query, {"category": params.category, "limit": params.max_results})
# 2. Post-process the results
processed_results = []
for row in raw_results:
processed_results.append(ResponseItem(
# Transform field names
itemId=row['id'],
displayName=row['name'].upper(),
# Add derived fields
formattedValue=f"${row['value']:.2f}",
isHighValue=row['value'] > 1000,
# Format dates
date=datetime.fromisoformat(row['timestamp']).date().isoformat()
))
return processed_results
# Create the API
process_data_api = Api[QueryParams, ResponseItem](name="process_data_endpoint", query_function=run)
```
### Best Practices
While post-processing gives you flexibility, remember that database operations are typically more efficient for heavy data manipulation. Reserve post-processing for transformations that are difficult to express in SQL or that involve application-specific logic.
## Client Integration
By default, all API endpoints are automatically integrated with OpenAPI/Swagger documentation. You can integrate your OpenAPI SDK generator of choice to generate client libraries for your APIs.
Please refer to the [OpenAPI](/moose/apis/open-api-sdk) page for more information on how to integrate your APIs with OpenAPI.
---
## API Authentication & Security
Source: moose/apis/auth.mdx
Secure your Moose API endpoints with JWT tokens or API keys
# API Authentication & Security
Moose supports two authentication mechanisms for securing your API endpoints:
- **[API Keys](#api-key-authentication)** - Simple, static authentication for internal applications and getting started
- **[JWT (JSON Web Tokens)](#jwt-authentication)** - Token-based authentication for integration with existing identity providers
Choose the method that fits your use case, or use both together with custom configuration.
## Do you want to use API Keys?
API keys are the simplest way to secure your Moose endpoints. They're ideal for:
- Internal applications and microservices
- Getting started quickly with authentication
- Scenarios where you control both client and server
### How API Keys Work
API keys use PBKDF2 HMAC SHA256 hashing for secure storage. You generate a token pair (plain-text and hashed) using the Moose CLI, store the hashed version in environment variables, and send the plain-text version in your request headers.
### Step 1: Generate API Keys
Generate tokens and hashed keys using the Moose CLI:
```bash
moose generate hash-token
```
**Output:**
- **ENV API Keys**: Hashed key for environment variables (use this in your server configuration)
- **Bearer Token**: Plain-text token for client applications (use this in `Authorization` headers)
Use the **hashed key** for environment variables and `moose.config.toml`. Use the **plain-text token** in your `Authorization: Bearer token` headers.
### Step 2: Configure API Keys with Environment Variables
Set environment variables with the **hashed** API keys you generated:
```bash
# For ingest endpoints
export MOOSE_INGEST_API_KEY='your_pbkdf2_hmac_sha256_hashed_key'
# For analytics endpoints
export MOOSE_CONSUMPTION_API_KEY='your_pbkdf2_hmac_sha256_hashed_key'
# For admin endpoints
export MOOSE_ADMIN_TOKEN='your_plain_text_token'
```
Or set the admin API key in `moose.config.toml`:
```toml filename="moose.config.toml"
[authentication]
admin_api_key = "your_pbkdf2_hmac_sha256_hashed_key"
```
Storing the `admin_api_key` (which is a PBKDF2 HMAC SHA256 hash) in your `moose.config.toml` file is an acceptable practice, even if the file is version-controlled. This is because the actual plain-text Bearer token (the secret) is not stored. The hash is computationally expensive to reverse, ensuring that your secret is not exposed in the codebase.
### Step 3: Make Authenticated Requests
All authenticated requests require the `Authorization` header with the **plain-text token**:
```bash
# Using curl
curl -H "Authorization: Bearer your_plain_text_token_here" \
https://your-moose-instance.com/ingest/YourDataModel
# Using JavaScript
fetch('https://your-moose-instance.com/api/endpoint', {
headers: {
'Authorization': 'Bearer your_plain_text_token_here'
}
})
```
## Do you want to use JWTs?
JWT authentication integrates with existing identity providers and follows standard token-based authentication patterns. Use JWTs when:
- You have an existing identity provider (Auth0, Okta, etc.)
- You need user-specific authentication and authorization
- You want standard OAuth 2.0 / OpenID Connect flows
### How JWT Works
Moose validates JWT tokens using RS256 algorithm with your identity provider's public key. You configure the expected issuer and audience, and Moose handles token verification automatically.
### Step 1: Configure JWT Settings
#### Option A: Configure in `moose.config.toml`
```toml filename=moose.config.toml
[jwt]
# Your JWT public key (PEM-formatted RSA public key)
secret = """
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy...
-----END PUBLIC KEY-----
"""
# Expected JWT issuer
issuer = "https://my-auth-server.com/"
# Expected JWT audience
audience = "my-moose-app"
```
The `secret` field should contain your JWT **public key** used to verify signatures using RS256 algorithm.
#### Option B: Configure with Environment Variables
You can also set these values as environment variables:
```bash filename=".env" copy
MOOSE_JWT_PUBLIC_KEY=your_jwt_public_key # PEM-formatted RSA public key (overrides `secret` in `moose.config.toml`)
MOOSE_JWT_ISSUER=your_jwt_issuer # Expected JWT issuer (overrides `issuer` in `moose.config.toml`)
MOOSE_JWT_AUDIENCE=your_jwt_audience # Expected JWT audience (overrides `audience` in `moose.config.toml`)
```
### Step 2: Make Authenticated Requests
Send requests with the JWT token in the `Authorization` header:
```bash
# Using curl
curl -H "Authorization: Bearer your_jwt_token_here" \
https://your-moose-instance.com/ingest/YourDataModel
# Using JavaScript
fetch('https://your-moose-instance.com/api/endpoint', {
headers: {
'Authorization': 'Bearer your_jwt_token_here'
}
})
```
## Want to use both? Here's the caveats
You can configure both JWT and API Key authentication simultaneously. When both are configured, Moose's authentication behavior depends on the `enforce_on_all_*` flags.
### Understanding Authentication Priority
#### Default Behavior (No Enforcement)
By default, when both JWT and API Keys are configured, Moose tries JWT validation first, then falls back to API Key validation:
```toml filename="moose.config.toml"
[jwt]
# JWT configuration
secret = "..."
issuer = "https://my-auth-server.com/"
audience = "my-moose-app"
# enforce flags default to false
```
```bash filename=".env"
# API Key configuration
MOOSE_INGEST_API_KEY='your_pbkdf2_hmac_sha256_hashed_key'
MOOSE_CONSUMPTION_API_KEY='your_pbkdf2_hmac_sha256_hashed_key'
```
**For Ingest Endpoints (`/ingest/*`)**:
- Attempts JWT validation first (RS256 signature check)
- Falls back to API Key validation (PBKDF2 HMAC SHA256) if JWT fails
**For Analytics Endpoints (`/api/*`)**:
- Same fallback behavior as ingest endpoints
This allows you to use either authentication method for your clients.
#### Enforcing JWT Only
If you want to **only** accept JWT tokens (no API key fallback), set the enforcement flags:
```toml filename="moose.config.toml"
[jwt]
secret = "..."
issuer = "https://my-auth-server.com/"
audience = "my-moose-app"
# Only accept JWT, no API key fallback
enforce_on_all_ingest_apis = true
enforce_on_all_consumptions_apis = true
```
**Result**: When enforcement is enabled, API Key authentication is disabled even if the environment variables are set. Only valid JWT tokens will be accepted.
### Common Use Cases
#### Use Case 1: Different Auth for Different Endpoints
Configure JWT for user-facing analytics endpoints, API keys for internal ingestion:
```toml filename="moose.config.toml"
[jwt]
secret = "..."
issuer = "https://my-auth-server.com/"
audience = "my-moose-app"
enforce_on_all_consumptions_apis = true # JWT only for /api/*
enforce_on_all_ingest_apis = false # Allow fallback for /ingest/*
```
```bash filename=".env"
MOOSE_INGEST_API_KEY='hashed_key_for_internal_services'
```
#### Use Case 2: Migration from API Keys to JWT
Start with both configured, no enforcement. Gradually migrate clients to JWT. Once complete, enable enforcement:
```toml filename="moose.config.toml"
[jwt]
secret = "..."
issuer = "https://my-auth-server.com/"
audience = "my-moose-app"
# Start with both allowed during migration
enforce_on_all_ingest_apis = false
enforce_on_all_consumptions_apis = false
# Later, enable to complete migration
# enforce_on_all_ingest_apis = true
# enforce_on_all_consumptions_apis = true
```
### Admin Endpoints
Admin endpoints use API key authentication exclusively (configured separately from ingest/analytics endpoints).
**Configuration precedence** (highest to lowest):
1. `--token` CLI parameter (plain-text token)
2. `MOOSE_ADMIN_TOKEN` environment variable (plain-text token)
3. `admin_api_key` in `moose.config.toml` (hashed token)
**Example:**
```bash
# Option 1: CLI parameter
moose remote plan --token your_plain_text_token
# Option 2: Environment variable
export MOOSE_ADMIN_TOKEN='your_plain_text_token'
moose remote plan
# Option 3: Config file
# In moose.config.toml:
# [authentication]
# admin_api_key = "your_pbkdf2_hmac_sha256_hashed_key"
```
## Security Best Practices
- **Never commit plain-text tokens to version control** - Always use hashed keys in configuration files
- **Use environment variables for production** - Keep secrets out of your codebase
- **Generate unique tokens for different environments** - Separate development, staging, and production credentials
- **Rotate tokens regularly** - Especially for long-running production deployments
- **Choose the right method for your use case**:
- Use **API Keys** for internal services and getting started
- Use **JWT** when integrating with identity providers or need user-level auth
- **Store hashed keys safely** - The PBKDF2 HMAC SHA256 hash in `moose.config.toml` is safe to version control, but the plain-text token should only exist in secure environment variables or secret management systems
Never commit plain-text tokens to version control. Use hashed keys in configuration files and environment variables for production.
---
## Ingestion APIs
Source: moose/apis/ingest-api.mdx
Ingestion APIs for Moose
# Ingestion APIs
## Overview
Moose Ingestion APIs are the entry point for getting data into your Moose application. They provide a fast, reliable, and type-safe way to move data from your sources into streams and tables for analytics and processing.
## When to Use Ingestion APIs
Ingestion APIs are most useful when you want to implement a push-based pattern for getting data from your data sources into your streams and tables. Common use cases include:
- Instrumenting external client applications
- Receiving webhooks from third-party services
- Integrating with ETL or data pipeline tools that push data
## Why Use Moose's APIs Over Your Own?
Moose's ingestion APIs are purpose-built for high-throughput data pipelines, offering key advantages over other more general-purpose frameworks:
- **Built-in schema validation:** Ensures only valid data enters your pipeline.
- **Direct connection to streams/tables:** Instantly link HTTP endpoints to Moose data infrastructure to route incoming data to your streams and tables without any glue code.
- **Dead Letter Queue (DLQ) support:** Invalid records are automatically captured for review and recovery.
- **OpenAPI auto-generation:** Instantly generate client SDKs and docs for all endpoints, including example data.
- **Rust-powered performance:** Far higher throughput and lower latency than typical Node.js or Python APIs.
## Validation
Moose validates all incoming data against your Pydantic model. If a record fails validation, Moose can automatically route it to a Dead Letter Queue (DLQ) for later inspection and recovery.
```python filename="ValidationExample.py" copy
from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfig
from pydantic import BaseModel
class Properties(BaseModel):
device: Optional[str]
version: Optional[int]
class ExampleModel(BaseModel):
id: str
userId: str
timestamp: datetime
properties: Properties
api = IngestApi[ExampleModel]("your-api-route", IngestConfig(
destination=Stream[ExampleModel]("your-stream-name"),
dead_letter_queue=DeadLetterQueue[ExampleModel]("your-dlq-name")
))
```
If your IngestPipeline’s schema marks a field as optional but annotates a ClickHouse default, Moose treats:
- API request and Stream message: field is optional (you may omit it)
- ClickHouse table storage: field is required with a DEFAULT clause
Behavior: When the API/stream inserts into ClickHouse and the field is missing, ClickHouse sets it to the configured default value. This keeps request payloads simple while avoiding Nullable columns in storage.
Example:
`Annotated[int, clickhouse_default("18")]` (or equivalent annotation)
Send a valid event - routed to the destination stream
```python filename="ValidEvent.py" copy
requests.post("http://localhost:4000/ingest/your-api-route", json={
"id": "event1",
"userId": "user1",
"timestamp": "2023-05-10T15:30:00Z"
})
# ✅ Accepted and routed to the destination stream
# API returns 200 and { success: true }
```
Send an invalid event (missing required field) - routed to the DLQ
```python filename="InvalidEventMissingField.py" copy
requests.post("http://localhost:4000/ingest/your-api-route", json={
"id": "event1",
})
# ❌ Routed to DLQ, because it's missing a required field
# API returns 400 response
```
Send an invalid event (bad date format) - routed to the DLQ
```python filename="InvalidEventBadDate.py" copy
requests.post("http://localhost:4000/ingest/your-api-route", json={
"id": "event1",
"userId": "user1",
"timestamp": "not-a-date"
})
# ❌ Routed to DLQ, because the timestamp is not a valid date
# API returns 400 response
```
## Creating Ingestion APIs
You can create ingestion APIs in two ways:
- **High-level:** Using the `IngestPipeline` class (recommended for most use cases)
- **Low-level:** Manually configuring the `IngestApi` component for more granular control
### High-level: IngestPipeline (Recommended)
The `IngestPipeline` class provides a convenient way to set up ingestion endpoints, streams, and tables with a single declaration:
```python filename="IngestPipeline.py" copy
from moose_lib import Key, IngestPipeline, IngestPipelineConfig
from pydantic import BaseModel
class ExampleSchema(BaseModel):
id: Key[str]
name: str
value: int
timestamp: datetime
example_pipeline = IngestPipeline[ExampleSchema](
name="example-name",
config=IngestPipelineConfig(
ingest_api=True,
stream=True,
table=True
)
)
```
### Low-level: Standalone IngestApi
For more granular control, you can manually configure the `IngestApi` component:
The types of the destination `Stream` and `Table` must match the type of the `IngestApi`.
## Configuration Reference
Configuration options for both high-level and low-level ingestion APIs are provided below.
```python filename="IngestPipelineConfig.py" copy
class IngestPipelineConfig(BaseModel):
table: bool | OlapConfig = True
stream: bool | StreamConfig = True
ingest_api: bool | IngestConfig = True
dead_letter_queue: bool | StreamConfig = True
version: Optional[str] = None
metadata: Optional[dict] = None
life_cycle: Optional[LifeCycle] = None
```
```python filename="IngestConfig.py" copy
@dataclass
class IngestConfigWithDestination[T: BaseModel]:
destination: Stream[T]
dead_letter_queue: Optional[DeadLetterQueue[T]] = None
version: Optional[str] = None
metadata: Optional[dict] = None
```
---
## OpenAPI SDK Generation
Source: moose/apis/openapi-sdk.mdx
Generate type-safe client SDKs from your Moose APIs
# OpenAPI SDK Generation
Moose automatically generates OpenAPI specifications for all your APIs, enabling you to create type-safe client SDKs in any language. This allows you to integrate your Moose APIs into any application with full type safety and IntelliSense support.
## Overview
While `moose dev` is running, Moose emits an OpenAPI spec at `.moose/openapi.yaml` covering:
- **Ingestion endpoints** with request/response schemas
- **Analytics APIs** with query parameters and response types
Every time you make a change to your Moose APIs, the OpenAPI spec is updated automatically.
## Generating Typed SDKs from OpenAPI
You can use your preferred generator to create a client from that spec. Below are minimal, tool-agnostic examples you can copy into your project scripts.
### Setup
The following example uses `openapi-python-client` to generate the SDK. Follow the setup instructions here: [openapi-python-client on PyPI](https://pypi.org/project/openapi-python-client/).
Add a generation script in your repository:
```bash filename="scripts/generate_python_sdk.sh" copy
#!/usr/bin/env bash
set -euo pipefail
openapi-python-client generate --path .moose/openapi.yaml --output ./generated/python --overwrite
```
Then configure Moose to run it after each dev reload:
```toml filename="moose.config.toml" copy
[http_server_config]
on_reload_complete_script = "bash scripts/generate_python_sdk.sh"
```
This will regenerate the Python client from the live spec on every reload.
### Hooks for automatic SDK generation
The `on_reload_complete_script` hook is available in your `moose.config.toml` file. It runs after each dev server reload when code/infra changes have been fully applied. This allows you to keep your SDKs continuously up to date as you make changes to your Moose APIs.
Notes:
- The script runs in your project root using your `$SHELL` (falls back to `/bin/sh`).
- Paths like `.moose/openapi.yaml` and `./generated/...` are relative to the project root.
- You can combine multiple generators with `&&` (as shown) or split into a shell script if preferred.
These hooks only affect local development (`moose dev`). The reload hook runs after Moose finishes applying your changes, ensuring `.moose/openapi.yaml` is fresh before regeneration.
## Integration
Import from the output path your generator writes to (see your chosen example repo). The Moose side is unchanged: the spec lives at `.moose/openapi.yaml` during `moose dev`.
## Generators
Use any OpenAPI-compatible generator:
### TypeScript projects
- [OpenAPI Generator (typescript-fetch)](https://github.com/OpenAPITools/openapi-generator) — mature, broad options; generates Fetch-based client
- [Kubb](https://github.com/kubb-project/kubb) — generates types + fetch client with simple config
- [Orval](https://orval.dev/) — flexible output (client + schemas), good DX
- [openapi-typescript](https://github.com/openapi-ts/openapi-typescript) — generates types only (pair with your own client)
- [swagger-typescript-api](https://github.com/acacode/swagger-typescript-api) — codegen for TS clients from OpenAPI
- [openapi-typescript-codegen](https://github.com/ferdikoomen/openapi-typescript-codegen) — TS client + models
- [oazapfts](https://github.com/oazapfts/oazapfts) — minimal TS client based on fetch
- [openapi-zod-client](https://github.com/astahmer/openapi-zod-client) — Zod schema-first client generation
### Python projects
- [openapi-python-client](https://pypi.org/project/openapi-python-client/) — modern typed client for OpenAPI 3.0/3.1
- [OpenAPI Generator (python)](https://github.com/OpenAPITools/openapi-generator) — multiple Python generators (python, python-nextgen)
---
## Trigger APIs
Source: moose/apis/trigger-api.mdx
Create APIs that trigger workflows and other processes
# Trigger APIs
## Overview
You can create APIs to initiate workflows, data processing jobs, or other automated processes.
## Basic Usage
```python filename="app/apis/trigger_workflow.py" copy
from moose_lib import MooseClient, Api
from pydantic import BaseModel, Field
from datetime import datetime
class WorkflowParams(BaseModel):
input_value: str
priority: str = Field(default="normal")
class WorkflowResponse(BaseModel):
workflow_id: str
status: str
def run(params: WorkflowParams, client: MooseClient) -> WorkflowResponse:
# Trigger the workflow with input parameters
workflow_execution = client.workflow.execute(
workflow="data-processing",
params={
"input_value": params.input_value,
"priority": params.priority,
"triggered_at": datetime.now().isoformat()
}
)
return WorkflowResponse(
workflow_id=workflow_execution.id,
status="started"
)
api = Api[WorkflowParams, WorkflowResponse]("trigger-workflow", run)
```
## Using the Trigger API
Once deployed, you can trigger workflows via HTTP requests:
```bash filename="Terminal" copy
curl "http://localhost:4000/api/trigger-workflow?inputValue=process-user-data&priority=high"
```
Response:
```json
{
"workflowId": "workflow-12345",
"status": "started"
}
```