Consumption APIs
Viewing typescript
switch to python
Overview
Consumption APIs make it easy to build type-safe HTTP GET
endpoints for surfacing data from your OLAP database. These APIs can help power user-facing analytics, dashboards and other front-end components, or even enable AI tools to interact with your data.
Working with Consumption APIs
Model your API parameters
Define the names and types of the parameters your API will accept
Write a route handler function
Write a simple, strongly-typed function that constructs a SQL query and returns the results
Moose Generates Validation & Docs
Moose automatically generates validation, type conversion, and OpenAPI documentation for your API request parameters and response body
Integrate with your API client
Use the generated API spec for your frontend applications or use other tools to interact with your API
Creating API Endpoints
import { ConsumptionApi } from "@514labs/moose-lib";
import { SourcePipeline } from "path/to/SourcePipeline";
// Define the query parameters
interface QueryParams {
filterField: string;
maxResults: number;
}
// Model the query result type
interface ResultItem {
id: number;
name: string;
value: number;
}
const SourceTable = SourcePipeline.table!; // Use `!` to assert that the table is not null
const cols = SourceTable.columns;
// Define the result type as an array of the result item type
const exampleApi = new ConsumptionApi<QueryParams, ResultItem[]>("example_endpoint",
async ({ filterField, maxResults }: QueryParams, { client, sql }) => {
const query = sql`
SELECT
${cols.id},
${cols.name},
${cols.value}
FROM ${SourceTable}
WHERE category = ${filterField}
LIMIT ${maxResults}`;
// Set the result type to the type of the each row in the result set
const resultSet = await client.query.execute<ResultItem>(query);
// Return the result set as an array of the result item type
return await resultSet.json();
});
from moose_lib import ConsumptionApi
from pydantic import BaseModel
## Import the source pipeline
from app.path.to.SourcePipeline import SourcePipeline
# Define the query parameters
class QueryParams(BaseModel):
filterField: str
maxResults: int
# Define the response body
class ResponseBody(BaseModel):
id: int
name: str
value: float
SourceTable = SourcePipeline.get_table()
# Define the route handler function
def run(client: MooseClient, params: QueryParams) -> list[ResponseBody]:
query = """
SELECT
id
name,
value
FROM {table: Identifier}
WHERE category = {filterField: String}
LIMIT {maxResults: UInt32}
"""
return client.query.execute(query, {"table": SourceTable.name, "filterField": params.filterField, "maxResults": params.maxResults})
# Create the API
example_api = ConsumptionApi[QueryParams, ResponseBody](name="example_endpoint", query_function=run)
The ConsumptionApi
class takes:
- Route name: The URL path to access your API (e.g.,
"example_endpoint"
) - Handler function: Processes requests with typed parameters and returns the result
The generic type parameters specify:
QueryParams
: The structure of accepted URL parametersResponseBody
: The exact shape of your API’s response data
MooseTip:
You can name these types anything you want. The first type generates validation for query parameters, while the second defines the response structure for OpenAPI documentation.
Moose automatically handles:
URL parameter validation and type conversion
SQL query interpolation and execution
Response formatting
Automated OpenAPI documentation
Defining Query Parameters
Define your API’s parameters as a TypeScript interface:
interface QueryParams {
filterField: string;
maxResults: number;
optionalParam?: string; // Not required for client to provide
}
Define your API’s parameters as a Pydantic model:
from pydantic import BaseModel
from typing import Optional
class QueryParams(BaseModel):
filterField: str = Field(..., description="The field to filter by")
maxResults: int = Field(..., description="The maximum number of results to return")
optionalParam: Optional[str] = Field(None, description="An optional parameter")
Moose automatically handles:
- Runtime validation
- Clear error messages for invalid parameters
- OpenAPI documentation generation
Warning:
Complex nested objects and arrays are not supported. Consumption APIs are GET
endpoints designed to be simple and lightweight.
Adding Advanced Type Validation
Moose uses Typia to extract type definitions and provide runtime validation. Use Typia’s tags for more complex validation:
interface QueryParams {
filterField: string;
// Ensure maxResults is a positive integer
maxResults: number & tags.Type<"int64"> & tags.Minimum<"1">;
}
Moose uses Pydantic for runtime validation. Use Pydantic’s Field
class for more complex validation:
from pydantic import BaseModel, Field
class QueryParams(BaseModel):
filterField: str = Field(pattern=r"^(id|name|email)$", description="The field to filter by") ## Only allow valid column names from the UserTable
maxResults: int = Field(gt=0, description="The maximum number of results to return") ## Positive integer
Common Validation Options
interface QueryParams {
// Numeric validations
id: number & tags.Type<"uint32">; // Positive integer (0 to 4,294,967,295)
age: number & tags.Minimum<18> & tags.Maximum<120>; // Range: 18 <= age <= 120
price: number & tags.ExclusiveMinimum<0> & tags.ExclusiveMaximum<1000>; // Range: 0 < price < 1000
discount: number & tags.MultipleOf<0.5>; // Must be multiple of 0.5
// String validations
username: string & tags.MinLength<3> & tags.MaxLength<20>; // Length between 3-20 characters
email: string & tags.Format<"email">; // Valid email format
zipCode: string & tags.Pattern<"^[0-9]{5}$">; // 5 digits
uuid: string & tags.Format<"uuid">; // Valid UUID
ipAddress: string & tags.Format<"ipv4">; // Valid IPv4 address
// Date validations
startDate: string & tags.Format<"date">; // YYYY-MM-DD format
// Enum validation
status: string & tags.Enum<"active" | "pending" | "inactive">; // Must be one of these values
// Optional parameters
limit?: number & tags.Type<"uint32"> & tags.Maximum<100>; // Optional, if provided: positive integer <= 100
// Combined validations
searchTerm?: (string & tags.MinLength<3>) | null; // Either null or string with ≥3 characters
}
Notice its just regular TypeScript union types. For a full list of validation options, see the Typia documentation.
from pydantic import BaseModel, Field
class QueryParams(BaseModel):
# Numeric validations
id: int = Field(..., gt=0)
age: int = Field(..., gt=0, lt=120)
price: float = Field(..., gt=0, lt=1000)
discount: float = Field(..., gt=0, multiple_of=0.5)
# String validations
username: str = Field(..., min_length=3, max_length=20)
email: str = Field(..., format="email")
zipCode: str = Field(..., pattern=r"^[0-9]{5}$")
uuid: str = Field(..., format="uuid")
ipAddress: str = Field(..., format="ipv4")
# Date validations
startDate: str = Field(..., format="date")
# Enum validation
status: str = Field(..., enum=["active", "pending", "inactive"])
# Optional parameters
limit: int = Field(None, gt=0, lt=100)
For a full list of validation options, see the Pydantic documentation.
Setting Default Values
You can set default values for parameters by setting values for each parameter in the API route handler function signature:
interface QueryParams {
filterField: string;
maxResults: number;
optionalParam?: string; // Not required for client to provide
}
const api = new ConsumptionApi<QueryParams, ResponseBody>("example_endpoint",
async ({ filterField = "example", maxResults = 10, optionalParam = "default" }, { client, sql }) => {
// Your logic here...
}
);
You can set default values for parameters by setting values for each parameter in your Pydantic model:
from pydantic import BaseModel
class QueryParams(BaseModel):
filterField: str = "example"
maxResults: int = 10
optionalParam: str | None = "default"
Implementing Route Handler
Consunmption API route handlers are regular functions, so you can implement whatever arbitrary logic you want inside these functions. The primary purpose of the handler function is to construct a SQL query and return the results to the client:
import { ConsumptionApi } from "@514labs/moose-lib";
import { BarAggregatedMV } from "../views/barAggregated";
import { tags } from "typia";
// This file is where you can define your APIs to consume your data
interface QueryParams {
orderBy?: "totalRows" | "rowsWithText" | "maxTextLength" | "totalTextLength";
limit?: number;
startDay?: number & tags.Type<"int32">;
endDay?: number & tags.Type<"int32">;
}
interface ResponseData {
dayOfMonth: number;
totalRows?: number;
rowsWithText?: number;
maxTextLength?: number;
totalTextLength?: number;
}
// Utility to get the columns of the target table
const cols = BarAggregatedMV.targetTable.columns
export const BarApi = new ConsumptionApi<QueryParams, ResponseData[]>(
"bar",
async (
{ orderBy = "totalRows", limit = 5, startDay = 1, endDay = 31 },
{ client, sql },
) => {
const query = sql`
SELECT
${cols.dayOfMonth},
${cols[orderBy]}
FROM ${BarAggregatedMV.targetTable}
WHERE
dayOfMonth >= ${startDay}
AND dayOfMonth <= ${endDay}
ORDER BY ${cols[orderBy]} DESC
LIMIT ${limit}
`;
const data = await client.query.execute<ResponseData>(query);
return await data.json();
},
);
from moose_lib import MooseClient, ConsumptionApi
from pydantic import BaseModel, Field
from typing import Optional
from app.views.bar_aggregated import barAggregatedMV
# Query params are defined as Pydantic models and are validated automatically
class QueryParams(BaseModel):
order_by: Optional[str] = Field(
default="total_rows",
pattern=r"^(total_rows|rows_with_text|max_text_length|total_text_length)$",
description="Must be one of: total_rows, rows_with_text, max_text_length, total_text_length"
)
limit: Optional[int] = Field(
default=5,
gt=0,
le=100,
description="Must be between 1 and 100"
)
start_day: Optional[int] = Field(
default=1,
gt=0,
le=31,
description="Must be between 1 and 31"
)
end_day: Optional[int] = Field(
default=31,
gt=0,
le=31,
description="Must be between 1 and 31"
)
class QueryResult(BaseModel):
day_of_month: int
total_rows: int
rows_with_text: int
max_text_length: int
total_text_length: int
## The run function is where you can define your API logic
def run(client: MooseClient, params: QueryParams):
start_day = params.start_day
end_day = params.end_day
limit = params.limit
order_by = params.order_by
query = f"""
SELECT
day_of_month,
{order_by}
FROM {barAggregatedMV.target_table.name}
WHERE day_of_month >= {start_day}
AND day_of_month <= {end_day}
ORDER BY {order_by} DESC
LIMIT {limit}
"""
return client.query.execute(query, {})
bar = ConsumptionApi[QueryParams, QueryResult](name="bar", query_function=run)
Connecting to the Database
Moose provides an instance of a MooseClient
to your route handler function. This client provides access to the database and other Moose resources:
import { ConsumptionUtil } from "@514labs/moose-lib";
import { UserTable } from "./UserTable";
async function handler({ client, sql }: ConsumptionUtil) {
const query = sql`SELECT * FROM ${UserTable}`;
const data = await client.query.execute<UserSchema>(query);
}
Pass the type of the result to the client.query.execute<T>()
method to ensure type safety.
Moose provides an instance of a MooseClient
to your route handler function. This client provides access to the database and other Moose resources:
from moose_lib import MooseClient
def run(client: MooseClient, params: QueryParams):
query = """
SELECT * FROM UserTable
"""
return client.query.execute(query)
## Create the API
example_api = ConsumptionApi[QueryParams, ResponseBody](name="example_endpoint", query_function=run)
Constructing Safe SQL Queries
The sql
template literal in Moose provides type-safe query construction with protection against SQL injection. Below are some examples of common patterns for builing safe queries:
There are two recommended patterns for building your queries:
Pattern 1: ClickHouse Native Parameter Binding
This pattern follows the official ClickHouse Python client approach using parameter binding:
def run(client: MooseClient, params: QueryParams):
# Use named parameters with {:type} syntax for safe parameter binding
query = """
SELECT *
FROM users
WHERE age >= {min_age:Int32}
AND status = {status:String}
LIMIT {limit:UInt32}
"""
# Parameters passed as a dictionary
result = client.query.execute(
query,
parameters={
"min_age": params.min_age,
"status": params.status,
"limit": params.limit
}
)
return result
This approach:
- Uses ClickHouse’s native parameter binding with
syntax
- Explicitly specifies ClickHouse types for parameters
- Provides protection against SQL injection by default
- Handles type conversion automatically
Warning:
When using Pattern 2 (f-strings), you MUST be extremely careful with your Pydantic model definitions:
- Use strict field validations with
Field()
constraints - Consider using
Literal
types for fixed value sets - Use
constr
with patterns for string validation - Add bounds checking for numeric values
- Never allow arbitrary string input without validation
Improper validation could lead to SQL injection vulnerabilities.
For most cases, Pattern 1 (ClickHouse Parameter Binding) is recommended as it provides better safety guarantees. Only use Pattern 2 when you have complete confidence in your Pydantic validation rules and understand the security implications.
Basic Query Parameter Interpolation
import { UserTable } from "./UserTable";
const minAge = 18;
const userRole = "admin";
const query = sql`
SELECT * FROM ${UserTable}
WHERE age > ${minAge}
AND role = ${userRole}
`;
// MooseClient handles type conversion and escaping
const data = await client.query.execute<UserSchema>(query);
// EXECUTION: SELECT * FROM users WHERE age > 18 AND role = 'admin'
Table and Column References
Reference tables and columns directly from your Moose objects as variables in your sql
template literals:
import { userTable } from "../tables/userTable";
const query = sql`
SELECT
${UserTable.columns.id},
${UserTable.columns.name},
${UserTable.columns.email}
FROM ${UserTable}
WHERE ${UserTable.columns.isActive} = true
`;
// EXECUTION: SELECT id, name, email FROM users WHERE is_active = true
Type Safety
Static type checking ensures you only reference columns that actually exist.
from moose_lib import ConsumptionApi, MooseClient
from pydantic import BaseModel
from typing import Literal, Optional, List
from enum import Enum
from app.UserTable import UserTable
class QueryParams(BaseModel):
# Validation not needed (but still recommended)
column: str = Field(pattern=r"^(id|name|email)$", description="Uses a regex pattern to only allow valid column names")
search_term: str
limit: int = 10
def run(client: MooseClient, params: QueryParams):
# Use proper ClickHouse parameter binding
query = """
SELECT {column:Identifier}
FROM {tableName:Identifier}
WHERE name ILIKE %{search_term:String}%
LIMIT {limit:UInt32}
"""
# Parameters are safely bound with proper type handling
result = client.query.execute(
query,
parameters={
"column": params.column,
"tableName": UserTable.name,
"search_term": params.search_term,
"limit": params.limit
}
)
return result
# Example usage:
# GET /api/search?column=name&search_term=O'Connor&limit=10
# Safely handles special characters through parameter binding
This approach is recommended because:
- Parameter binding handles all escaping and type conversion
- No need for complex validation rules
- Handles special characters and SQL injection attempts automatically
- Uses enum for column names instead of regex
Table and Column References
Reference tables and columns directly from your Moose objects as variables in your sql
template literals:
import { userTable } from "../tables/userTable";
const query = sql`
SELECT
${UserTable.columns.id},
${UserTable.columns.name},
${UserTable.columns.email}
FROM ${UserTable}
WHERE ${UserTable.columns.isActive} = true
`;
// EXECUTION: SELECT id, name, email FROM users WHERE is_active = true
Advanced Query Patterns
Dynamic Column & Table Selection
Use ConsumptionHelpers
to handle dynamic column and table references in your queries:
import { ConsumptionHelpers as CH } from "@514labs/moose-lib";
interface QueryParams {
sortBy: string; // Column to sort by
fields: string; // Comma-separated list of columns to select (e.g., "id,name,email")
}
const queryHandler = async ({ sortBy = "id", fields = "id,name" }: QueryParams, { client, sql }) => {
// Split the comma-separated string into individual fields
const fieldList = fields.split(',').map(f => f.trim());
// Build the query by selecting each column individually
const query = sql`
SELECT
${fieldList.map(field => sql`${CH.column(field)}`).join(', ')}
FROM ${userTable}
ORDER BY ${CH.column(sortBy)}
`;
// MooseClient converts fieldList to valid ClickHouse identifiers
return client.query.execute(query);
// EXECUTION: `SELECT id, name FROM users ORDER BY id`
};
import { ConsumptionHelpers as CH } from "@514labs/moose-lib";
interface QueryParams {
tableName: string;
}
const queryHandler = async ({ tableName = "users" }: QueryParams, { client, sql }) => {
const query = sql`
SELECT * FROM ${CH.table(tableName)}
`;
// MooseClient converts tableName to a valid ClickHouse identifier
return client.query.execute(query);
// EXECUTION: `SELECT * FROM users`
};
from app.UserTable import UserTable
class QueryParams(BaseModel):
colName: str = Field(pattern=r"^(id|name|email)$", description="Uses a regex pattern to only allow valid column names from the UserTable")
class QueryResult(BaseModel):
id: Optional[int]
name: Optional[str]
email: Optional[str]
def run(client: MooseClient, params: QueryParams):
query = f"""SELECT {params.colName} FROM {UserTable.name}"""
data = client.query.execute(query, {}) ## No parameter binding is done because we are using f-strings
return data
## Create the API
bar = ConsumptionApi[QueryParams, QueryResult](name="bar", query_function=run)
## Call the API
## HTTP Request: GET http://localhost:4000/consumption/bar?colName=id
## EXECUTED QUERY: SELECT id FROM users
Warning:
When using f-strings, be careful to validate your query parameters to prevent SQL injection:
- Use strict field validations with
Field()
constraints - Consider using
pattern
validation for string validation - Never allow arbitrary string input without validation
Conditional WHERE
Clauses
Build WHERE
clauses based on provided parameters:
interface FilterParams {
minAge?: number;
status?: "active" | "inactive";
searchText?: string;
}
const buildQuery = ({ minAge, status, searchText }: FilterParams, { sql }) => {
let conditions = [];
if (minAge !== undefined) {
conditions.push(sql`age >= ${minAge}`);
}
if (status) {
conditions.push(sql`status = ${status}`);
}
if (searchText) {
conditions.push(sql`(name ILIKE ${'%' + searchText + '%'} OR email ILIKE ${'%' + searchText + '%'})`);
}
// Build the full query with conditional WHERE clause
let query = sql`SELECT * FROM ${userTable}`;
if (conditions.length > 0) {
// Join conditions with AND operator
let whereClause = conditions.join(' AND ');
query = sql`${query} WHERE ${whereClause}`;
}
query = sql`${query} ORDER BY created_at DESC`;
return query;
};
class FilterParams(BaseModel):
min_age: Optional[int]
status: Optional[str] = Field(pattern=r"^(active|inactive)$")
search_text: Optional[str] = Field(pattern=r"^[a-zA-Z0-9\s]+$", description="Alphanumeric search text without special characters to prevent SQL injection")
class QueryResult(BaseModel):
id: int
name: str
email: str
def build_query(client: MooseClient, params: FilterParams) -> QueryResult:
conditions = []
interpolated = {}
if params.min_age:
conditions.append(f"age >= {params.min_age}")
interpolated["min_age"] = params.min_age
if params.status:
conditions.append(f"status = {params.status}")
interpolated["status"] = params.status
if params.search_text:
conditions.append(f"name ILIKE '%{params.search_text}%' OR email ILIKE '%{params.search_text}%'")
interpolated["search_text"] = params.search_text
query = f"""SELECT * FROM users WHERE {" AND ".join(conditions)} ORDER BY created_at DESC"""
data = client.query.execute(query, interpolated)
return data
## Create the API
bar = ConsumptionApi[FilterParams, QueryResult](name="bar", query_function=build_query)
## Call the API
## HTTP Request: GET http://localhost:4000/consumption/bar?min_age=20&status=active&search_text=John
## EXECUTED QUERY: SELECT * FROM users WHERE age >= 20 AND status = 'active' AND (name ILIKE '%John%' OR email ILIKE '%John%') ORDER BY created_at DESC
Adding Authentication
Moose supports authentication via JSON web tokens (JWTs). When your client makes a request to your Consumption API, Moose will automatically parse the JWT and pass the authenticated payload to your handler function as the jwt
object:
async (
{ orderBy = "totalRows", limit = 5 },
{ client, sql, jwt }
) => {
// Use jwt.userId to filter data for the current user
const query = sql`
SELECT * FROM userReports
WHERE user_id = ${jwt.userId}
LIMIT ${limit}
`;
return client.query.execute(query);
}
def run(params: QueryParams, client: MooseClient, jwt: dict) -> None:
query = f"""SELECT * FROM userReports WHERE user_id = {jwt["userId"]} LIMIT {limit}"""
data = client.query.execute(query)
return data
JWT Error Handling
Moose validates the JWT signature and ensures the JWT is properly formatted. If the JWT authentication fails, Moose will return a 401 Unauthorized error
.
Understanding Response Codes
Moose automatically provides standard HTTP responses:
Status Code | Meaning | Response Body |
---|---|---|
200 | Success | Your API’s result data |
400 | Validation error | { "error": "Detailed message"} |
401 | Unauthorized | { "error": "Unauthorized"} |
500 | Internal server error | { "error": "Internal server error"} |
Post-Processing Query Results
After executing your database query, you can transform the data before returning it to the client. This allows you to:
Common post-processing operations:
Transform field names or data formats
Calculate derived values
Filter or sort results
Aggregate or group data
Apply business logic
interface QueryParams {
category: string;
maxResults: number;
}
interface ResponseBody {
itemId: number;
displayName: string;
formattedValue: string;
isHighValue: boolean;
date: string;
}
const processDataApi = new ConsumptionApi<QueryParams, ResponseBody>(
"process_data_endpoint",
async ({ category, maxResults = 10 }, { client, sql }) => {
// 1. Fetch raw data
const query = sql`
SELECT id, name, value, timestamp
FROM data_table
WHERE category = ${category}
LIMIT ${maxResults}
`;
const rawResults = await client.query.execute<{
id: number;
name: string;
value: number;
timestamp: string;
}>(query);
// 2. Post-process the results
return rawResults.map(row => ({
// Transform field names
itemId: row.id,
displayName: row.name.toUpperCase(),
// Add derived fields
formattedValue: `$${row.value.toFixed(2)}`,
isHighValue: row.value > 1000,
// Format dates
date: new Date(row.timestamp).toISOString().split('T')[0]
}));
}
);
from datetime import datetime
from moose_lib import ConsumptionApi
from pydantic import BaseModel
class QueryParams(BaseModel):
category: str
maxResults: int = 10
class ResponseItem(BaseModel):
itemId: int
displayName: str
formattedValue: str
isHighValue: bool
date: str
def run(client: MooseClient, params: QueryParams):
# 1. Fetch raw data
query = """
SELECT id, name, value, timestamp
FROM data_table
WHERE category = {category:String}
LIMIT {maxResults:UInt32}
"""
raw_results = client.query.execute(query, {"category": params.category, "maxResults": params.maxResults})
# 2. Post-process the results
processed_results = []
for row in raw_results:
processed_results.append(ResponseItem(
# Transform field names
itemId=row['id'],
displayName=row['name'].upper(),
# Add derived fields
formattedValue=f"${row['value']:.2f}",
isHighValue=row['value'] > 1000,
# Format dates
date=datetime.fromisoformat(row['timestamp']).date().isoformat()
))
return processed_results
# Create the API
process_data_api = ConsumptionApi[QueryParams, ResponseItem](name="process_data_endpoint", query_function=run)
Best Practices
Post-Processing Best Practices
Prefer database processing for large datasets
When working with large amounts of data, perform as much filtering, grouping, and aggregation as possible in your SQL query
Keep response size reasonable
Post-process to reduce response size when needed, especially for user-facing APIs
Format dates and numbers consistently
Ensure consistent formatting for dates, currencies, and other values in your responses
Handle sensitive data appropriately
Use post-processing to remove or mask sensitive information before returning data to clients
Add clear error handling
Include appropriate error handling in your post-processing logic
MooseTip:
While post-processing gives you flexibility, remember that database operations are typically more efficient for heavy data manipulation. Reserve post-processing for transformations that are difficult to express in SQL or that involve application-specific logic.
Next Steps: Frontend Integration Examples
Ready to integrate your Consumption APIs with frontend applications? Check out our Templates page for examples of how to integrate Moose Consumption APIs with frontend applications.