Moose Ingestion APIs are the entry point for getting data into your Moose application. They provide a fast, reliable, and type-safe way to move data from your sources into streams and tables for analytics and processing.
Ingestion APIs are most useful when you want to implement a push-based pattern for getting data from your data sources into your streams and tables. Common use cases include:
Moose's ingestion APIs are purpose-built for high-throughput data pipelines, offering key advantages over other more general-purpose frameworks:
Moose validates all incoming data against your interface (TypeScript) or Pydantic model (Python). If a record fails validation, Moose can automatically route it to a Dead Letter Queue (DLQ) for later inspection and recovery.
from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfigfrom pydantic import BaseModel class Properties(BaseModel): device: Optional[str] version: Optional[int] class ExampleModel(BaseModel): id: str userId: str timestamp: datetime properties: Properties api = IngestApi[ExampleModel]("your-api-route", IngestConfig( destination=Stream[ExampleModel]("your-stream-name"), dead_letter_queue=DeadLetterQueue[ExampleModel]("your-dlq-name")))If your IngestPipeline's schema marks a field as optional but annotates a ClickHouse default, Moose treats:
Behavior: When the API/stream inserts into ClickHouse and the field is missing, ClickHouse sets it to the configured default value. This keeps request payloads simple while avoiding Nullable columns in storage.
Example:
Annotated[int, clickhouse_default("18")] (or equivalent annotation)
In some scenarios, you may need to accept payloads with arbitrary additional fields beyond your defined schema. This is useful when:
TypeScript's index signatures allow you to define types that accept additional properties. When used with IngestApi or Stream, the API will accept payloads with extra fields without returning validation errors:
import { IngestApi, Stream, Key, DateTime } from "@514labs/moose-lib"; // Input type with known fields + index signature for flexibilitytype UserEventInput = { timestamp: DateTime; eventName: string; userId: Key<string>; orgId?: string; // Index signature: accept any additional properties [key: string]: any;}; const inputStream = new Stream<UserEventInput>("UserEventInput"); // IngestApi accepts payloads with extra fields without validation errorsconst ingestApi = new IngestApi<UserEventInput>("user-events", { destination: inputStream,});How it works:
timestamp, eventName, etc.) are validated against their declared typesIn your streaming function, use destructuring to separate known fields from extra fields:
userEventInputStream.addTransform(outputStream, (input) => { const { timestamp, eventName, userId, ...extraFields } = input; return { timestamp, eventName, userId, properties: extraFields };});You can create ingestion APIs in two ways:
IngestPipeline class (recommended for most use cases)IngestApi component for more granular controlThe IngestPipeline class provides a convenient way to set up ingestion endpoints, streams, and tables with a single declaration:
from moose_lib import Key, IngestPipeline, IngestPipelineConfigfrom pydantic import BaseModel class ExampleSchema(BaseModel): id: Key[str] name: str value: int timestamp: datetime example_pipeline = IngestPipeline[ExampleSchema]( name="example-name", config=IngestPipelineConfig( ingest_api=True, stream=True, table=True ))For more granular control, you can manually configure the IngestApi component:
# Python example would go hereThe types of the destination Stream and Table must match the type of the IngestApi.
Configuration options for both high-level and low-level ingestion APIs are provided below.
In Python, configure your Pydantic model to accept extra fields using model_config:
from pydantic import BaseModel, ConfigDictfrom moose_lib import IngestApi, Stream, Keyfrom datetime import datetime # Input model accepts extra fieldsclass UserEventInput(BaseModel): model_config = ConfigDict(extra='allow') # Accept arbitrary fields timestamp: datetime event_name: str user_id: Key[str] org_id: str | None = None input_stream = Stream[UserEventInput]("UserEventInput") # IngestApi accepts payloads with extra fieldsingest_api = IngestApi[UserEventInput]("user-events", IngestConfigWithDestination( destination=input_stream))How it works:
model_config = ConfigDict(extra='allow') tells Pydantic to accept undeclared fields