Moose Ingestion APIs are the entry point for getting data into your Moose application. They provide a fast, reliable, and type-safe way to move data from your sources into streams and tables for analytics and processing.
Ingestion APIs are most useful when you want to implement a push-based pattern for getting data from your data sources into your streams and tables. Common use cases include:
Moose's ingestion APIs are purpose-built for high-throughput data pipelines, offering key advantages over other more general-purpose frameworks:
Moose validates all incoming data against your interface (TypeScript) or Pydantic model (Python). If a record fails validation, Moose can automatically route it to a Dead Letter Queue (DLQ) for later inspection and recovery.
interface ExampleModel { id: string; userId: string; timestamp: Date; properties?: { device?: string; version?: number; }} export const api = new IngestApi<ExampleModel>("your-api-route", { destination: new Stream<ExampleModel>("your-stream-name"), deadLetterQueue: new DeadLetterQueue<ExampleModel>("your-dlq-name")});If your IngestPipeline's schema marks a field as optional but annotates a ClickHouse default, Moose treats:
Behavior: When the API/stream inserts into ClickHouse and the field is missing, ClickHouse sets it to the configured default value. This keeps request payloads simple while avoiding Nullable columns in storage.
Example:
field?: number & ClickHouseDefault<"18"> or WithDefault<number, "18">
In some scenarios, you may need to accept payloads with arbitrary additional fields beyond your defined schema. This is useful when:
TypeScript's index signatures allow you to define types that accept additional properties. When used with IngestApi or Stream, the API will accept payloads with extra fields without returning validation errors:
import { IngestApi, Stream, Key, DateTime } from "@514labs/moose-lib"; // Input type with known fields + index signature for flexibilitytype UserEventInput = { timestamp: DateTime; eventName: string; userId: Key<string>; orgId?: string; // Index signature: accept any additional properties [key: string]: any;}; const inputStream = new Stream<UserEventInput>("UserEventInput"); // IngestApi accepts payloads with extra fields without validation errorsconst ingestApi = new IngestApi<UserEventInput>("user-events", { destination: inputStream,});How it works:
timestamp, eventName, etc.) are validated against their declared typesIn your streaming function, use destructuring to separate known fields from extra fields:
userEventInputStream.addTransform(outputStream, (input) => { const { timestamp, eventName, userId, ...extraFields } = input; return { timestamp, eventName, userId, properties: extraFields };});TypeScript's index signatures allow you to define types that accept additional properties. When used with IngestApi or Stream, the API will accept payloads with extra fields without returning validation errors:
import { IngestApi, Stream, Key, DateTime } from "@514labs/moose-lib"; // Input type with known fields + index signature for flexibilitytype UserEventInput = { timestamp: DateTime; eventName: string; userId: Key<string>; orgId?: string; // Index signature: accept any additional properties [key: string]: any;}; const inputStream = new Stream<UserEventInput>("UserEventInput"); // IngestApi accepts payloads with extra fields without validation errorsconst ingestApi = new IngestApi<UserEventInput>("user-events", { destination: inputStream,});How it works:
timestamp, eventName, etc.) are validated against their declared typesIn your streaming function, use destructuring to separate known fields from extra fields:
userEventInputStream.addTransform(outputStream, (input) => { const { timestamp, eventName, userId, ...extraFields } = input; return { timestamp, eventName, userId, properties: extraFields };});You can create ingestion APIs in two ways:
IngestPipeline class (recommended for most use cases)IngestApi component for more granular controlThe IngestPipeline class provides a convenient way to set up ingestion endpoints, streams, and tables with a single declaration:
import { IngestPipeline } from "@514labs/moose-lib"; interface ExampleModel { id: string; name: string; value: number; timestamp: Date;} const examplePipeline = new IngestPipeline<ExampleModel>("example-name", { ingestApi: true, // Creates a REST API endpoint stream: true, // Connects to a stream table: true});For more granular control, you can manually configure the IngestApi component:
interface ExampleRecord { id: string; name: string; value: number; timestamp: Date;} // Create the ClickHouse tableconst exampleTable = new OlapTable<ExampleRecord>("example-table-name"); // Create the stream with specific settingsconst exampleStream = new Stream<ExampleRecord>("example-stream-name", { destination: exampleTable // Connect stream to table}); // Create the ingestion APIconst exampleApi = new IngestApi<ExampleRecord>("example-api-route", { destination: exampleStream, // Connect API to stream});The types of the destination Stream and Table must match the type of the IngestApi.
Configuration options for both high-level and low-level ingestion APIs are provided below.