Ingestion APIs
Viewing:
Overview
Moose Ingestion APIs are the entry point for getting data into your Moose application. They provide a fast, reliable, and type-safe way to move data from your sources into streams and tables for analytics and processing.
When to Use Ingestion APIs
Ingestion APIs are most useful when you want to implement a push-based pattern for getting data from your data sources into your streams and tables. Common use cases include:
- Instrumenting external client applications
- Receiving webhooks from third-party services
- Integrating with ETL or data pipeline tools that push data
Why Use Moose’s APIs Over Your Own?
Moose’s ingestion APIs are purpose-built for high-throughput data pipelines, offering key advantages over other more general-purpose frameworks:
- Built-in schema validation: Ensures only valid data enters your pipeline.
- Direct connection to streams/tables: Instantly link HTTP endpoints to Moose data infrastructure to route incoming data to your streams and tables without any glue code.
- Dead Letter Queue (DLQ) support: Invalid records are automatically captured for review and recovery.
- OpenAPI auto-generation: Instantly generate client SDKs and docs for all endpoints, including example data.
- Rust-powered performance: Far higher throughput and lower latency than typical Node.js or Python APIs.
How Moose Handles Data Integrity
Moose validates all incoming data against your TypeScript interface or Python model, ensuring only well-formed, expected data enters your pipeline. If a record fails validation, Moose can automatically route it to a Dead Letter Queue (DLQ) for later inspection and recovery.
interface UserEvent {
id: string;
userId: string;
timestamp: Date;
properties?: {
device?: string;
version?: number;
}
}
const api = new IngestApi<UserEvent>("user-events", {
destination: new Stream<UserEvent>("user-events"),
deadLetterQueue: new DeadLetterQueue<UserEvent>("user-events-dlq")
});
from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfig
from pydantic import BaseModel
class Properties(BaseModel):
device: Optional[str]
version: Optional[int]
class UserEvent(BaseModel):
id: str
userId: str
timestamp: datetime
properties: Properties
api = IngestApi[UserEvent]("user-events", IngestConfig(
destination=Stream[UserEvent]("user-events"),
dead_letter_queue=DeadLetterQueue[UserEvent]("user-events-dlq")
))
Send a valid event - routed to the destination stream
fetch("http://localhost:4000/ingest/user-events", {
method: "POST",
body: JSON.stringify({
id: "event1",
userId: "user1",
timestamp: "2023-05-10T15:30:00Z"
})
})
// ✅ Accepted and routed to the destination stream
// API returns 200 and { success: true }
requests.post("http://localhost:4000/ingest/user-events", json={
"id": "event1",
"userId": "user1",
"timestamp": "2023-05-10T15:30:00Z"
})
# ✅ Accepted and routed to the destination stream
# API returns 200 and { success: true }
Creating Ingestion APIs
You can create ingestion APIs in two ways:
- High-level: Using the
IngestPipeline
class (recommended for most use cases) - Low-level: Manually configuring the
IngestApi
component for more granular control
High-level: IngestPipeline (Recommended)
The IngestPipeline
class provides a convenient way to set up ingestion endpoints, streams, and tables with a single declaration:
import { IngestPipeline } from "@514labs/moose-lib";
interface ExampleSchema {
id: string;
name: string;
value: number;
timestamp: Date;
}
const examplePipeline = new IngestPipeline<ExampleSchema>("example", {
ingest: true, // Creates a REST API endpoint
stream: true, // Connects to a stream
table: true
});
from moose_lib import Key, IngestPipeline, IngestPipelineConfig
from pydantic import BaseModel
class ExampleSchema(BaseModel):
id: Key[str]
name: str
value: int
timestamp: datetime
example_pipeline = IngestPipeline[ExampleSchema](
name="example",
config=IngestPipelineConfig(
ingest=True,
stream=True,
table=True
)
)
Low-level: Standalone IngestApi
For more granular control, you can manually configure the IngestApi
component:
interface ExampleRecord {
id: string;
name: string;
value: number;
timestamp: Date;
}
// Create the ClickHouse table
const exampleTable = new OlapTable<ExampleRecord>("example");
// Create the stream with specific settings
const exampleStream = new Stream<ExampleRecord>("example", {
destination: exampleTable // Connect stream to table
});
// Create the ingestion API
const exampleApi = new IngestApi<ExampleRecord>("example", {
destination: exampleStream, // Connect API to stream
});
Warning:
The types of the destination Stream
and Table
must match the type of the IngestApi
.
Ingestion Pipeline
IngestApi
You want to create a new ingestion endpoint, stream, and table
You have an existing Stream object that you want to connect to
You want to simplify configuration and reduce boilerplate
You want to manually configure the ingestion API
Configuration Reference
Configuration options for both high-level and low-level ingestion APIs are provided below.
interface IngestPipelineConfig<T> {
table?: boolean | OlapConfig<T>;
stream?: boolean | Omit<StreamConfig<T>, "destination">;
ingest?: boolean | Omit<IngestConfig<T>, "destination">;
deadLetterQueue?: boolean | Omit<StreamConfig<T>, "destination">;
version?: string;
metadata?: {
description?: string;
};
}
class IngestPipelineConfig(BaseModel):
table: bool | OlapConfig = True
stream: bool | StreamConfig = True
ingest: bool | IngestConfig = True
dead_letter_queue: bool | StreamConfig = True
version: Optional[str] = None
metadata: Optional[dict] = None