FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplatesGuides
Release Notes
Source514
  1. MooseStack
  2. Moose APIs & Web Apps
  3. Ingestion APIs

On this page

OverviewWhen to Use Ingestion APIsWhy Use Moose's APIs Over Your Own?ValidationAccepting Arbitrary FieldsUsing Index Signatures (TypeScript)Using Extra Fields (Python)Creating Ingestion APIsHigh-level: IngestPipeline (Recommended)Low-level: Standalone IngestApiConfiguration Reference

Ingestion APIs

Overview

Moose Ingestion APIs are the entry point for getting data into your Moose application. They provide a fast, reliable, and type-safe way to move data from your sources into streams and tables for analytics and processing.

When to Use Ingestion APIs

Ingestion APIs are most useful when you want to implement a push-based pattern for getting data from your data sources into your streams and tables. Common use cases include:

  • Instrumenting external client applications
  • Receiving webhooks from third-party services
  • Integrating with ETL or data pipeline tools that push data

Why Use Moose's APIs Over Your Own?

Moose's ingestion APIs are purpose-built for high-throughput data pipelines, offering key advantages over other more general-purpose frameworks:

  • Built-in schema validation: Ensures only valid data enters your pipeline.
  • Direct connection to streams/tables: Instantly link HTTP endpoints to Moose data infrastructure to route incoming data to your streams and tables without any glue code.
  • Dead Letter Queue (DLQ) support: Invalid records are automatically captured for review and recovery.
  • OpenAPI auto-generation: Instantly generate client SDKs and docs for all endpoints, including example data.
  • Rust-powered performance: Far higher throughput and lower latency than typical Node.js or Python APIs.

Validation

Moose validates all incoming data against your interface (TypeScript) or Pydantic model (Python). If a record fails validation, Moose can automatically route it to a Dead Letter Queue (DLQ) for later inspection and recovery.

ValidationExample.py
from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfigfrom pydantic import BaseModel class Properties(BaseModel):    device: Optional[str]    version: Optional[int] class ExampleModel(BaseModel):    id: str    userId: str    timestamp: datetime    properties: Properties api = IngestApi[ExampleModel]("your-api-route", IngestConfig(    destination=Stream[ExampleModel]("your-stream-name"),    dead_letter_queue=DeadLetterQueue[ExampleModel]("your-dlq-name")))
Optional fields with ClickHouse defaults in IngestPipeline

If your IngestPipeline's schema marks a field as optional but annotates a ClickHouse default, Moose treats:

  • API request and Stream message: field is optional (you may omit it)
  • ClickHouse table storage: field is required with a DEFAULT clause

Behavior: When the API/stream inserts into ClickHouse and the field is missing, ClickHouse sets it to the configured default value. This keeps request payloads simple while avoiding Nullable columns in storage.

Example:

Annotated[int, clickhouse_default("18")] (or equivalent annotation)

Accepting Arbitrary Fields

In some scenarios, you may need to accept payloads with arbitrary additional fields beyond your defined schema. This is useful when:

  • You don't control the payload structure from an upstream service
  • You want to gracefully accept extra fields without validation errors

Using Index Signatures (TypeScript)

TypeScript's index signatures allow you to define types that accept additional properties. When used with IngestApi or Stream, the API will accept payloads with extra fields without returning validation errors:

FlexiblePayload.ts
import { IngestApi, Stream, Key, DateTime } from "@514labs/moose-lib"; // Input type with known fields + index signature for flexibilitytype UserEventInput = {  timestamp: DateTime;  eventName: string;  userId: Key<string>;  orgId?: string;  // Index signature: accept any additional properties  [key: string]: any;}; const inputStream = new Stream<UserEventInput>("UserEventInput"); // IngestApi accepts payloads with extra fields without validation errorsconst ingestApi = new IngestApi<UserEventInput>("user-events", {  destination: inputStream,});

How it works:

  • Known fields (timestamp, eventName, etc.) are validated against their declared types
  • Additional fields matching the index signature are accepted by the API (no validation error returned)
  • All fields (known and extra) are passed through to streaming functions for processing
  • Extra fields can be extracted in your streaming function and stored in a JSON column
Extracting Extra Fields

In your streaming function, use destructuring to separate known fields from extra fields:

userEventInputStream.addTransform(outputStream, (input) => {  const { timestamp, eventName, userId, ...extraFields } = input;  return { timestamp, eventName, userId, properties: extraFields };});

Creating Ingestion APIs

You can create ingestion APIs in two ways:

  • High-level: Using the IngestPipeline class (recommended for most use cases)
  • Low-level: Manually configuring the IngestApi component for more granular control

High-level: IngestPipeline (Recommended)

The IngestPipeline class provides a convenient way to set up ingestion endpoints, streams, and tables with a single declaration:

IngestPipeline.py
from moose_lib import Key, IngestPipeline, IngestPipelineConfigfrom pydantic import BaseModel class ExampleSchema(BaseModel):    id: Key[str]    name: str    value: int    timestamp: datetime example_pipeline = IngestPipeline[ExampleSchema](    name="example-name",    config=IngestPipelineConfig(        ingest_api=True,        stream=True,        table=True    ))

Low-level: Standalone IngestApi

For more granular control, you can manually configure the IngestApi component:

AnalyticsPipelineManual.py
# Python example would go here
Warning:

The types of the destination Stream and Table must match the type of the IngestApi.

Ingestion Pipeline

IngestApi

You want to create a new ingestion endpoint, stream, and table

You have an existing Stream object that you want to connect to

You want to simplify configuration and reduce boilerplate

You want to manually configure the ingestion API

Configuration Reference

Configuration options for both high-level and low-level ingestion APIs are provided below.

Using Extra Fields (Python)

In Python, configure your Pydantic model to accept extra fields using model_config:

FlexiblePayload.py
from pydantic import BaseModel, ConfigDictfrom moose_lib import IngestApi, Stream, Keyfrom datetime import datetime # Input model accepts extra fieldsclass UserEventInput(BaseModel):    model_config = ConfigDict(extra='allow')  # Accept arbitrary fields        timestamp: datetime    event_name: str    user_id: Key[str]    org_id: str | None = None input_stream = Stream[UserEventInput]("UserEventInput") # IngestApi accepts payloads with extra fieldsingest_api = IngestApi[UserEventInput]("user-events", IngestConfigWithDestination(    destination=input_stream))

How it works:

  • model_config = ConfigDict(extra='allow') tells Pydantic to accept undeclared fields
  • The API accepts payloads with extra fields without validation errors
  • Known fields are validated and passed through to streaming functions
  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streaming
  • Moose Workflows
  • Moose APIs & Web Apps
    • Native APIs
    • Ingest API
    • Analytics API
    • Workflow Trigger
    • Admin APIs
    • Authentication
    • Use Your Web Framework
    • Overview
    • FastAPI
Deployment & Lifecycle
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework