Ingestion APIs
Viewing typescript
switch to python
Overview
Ingestion APIs are the entry point for data flowing into your Moose application.
Working with Ingestion APIs
Model data for API validation
Create a type definition that validates incoming data
Set up ingestion endpoints
Configure how data enters your pipeline with HTTP REST APIs-- single record or batch
Connect to destination streams
Direct validated data to streams for buffering and processing
Creating Ingestion APIs
You can create ingestion APIs in two ways:
- High-level: Using the
IngestPipeline
class (recommended) - Low-level: Manually configuring the
IngestApi
component for more granular control
Basic Ingestion Pipeline
The IngestPipeline
class provides a convenient way to set up ingestion endpoints, streams, and tables with a single declaration:
import { IngestPipeline, IngestionFormat } from "@514labs/moose-lib";
interface ExampleSchema {
id: string;
name: string;
value: number;
timestamp: Date;
}
const examplePipeline = new IngestPipeline<ExampleSchema>("example", {
ingest: true, // Creates a REST API endpoint
stream: true, // Connects to a stream
table: true
});
from moose_lib import Key, IngestPipeline, IngestPipelineConfig
from pydantic import BaseModel
class ExampleSchema(BaseModel):
id: Key[str]
name: str
value: int
timestamp: datetime
example_pipeline = IngestPipeline[ExampleSchema](
name="example",
config=IngestPipelineConfig(
ingest=True,
stream=True,
table=True
)
)
Single Record Ingestion
By default, ingestion APIs accept a single JSON
record at a time.
Batch Ingestion Pipeline
Use the format
option to configure the ingestion API to accept arrays of records:
interface BatchRecord {
id: string;
name: string;
value: number;
timestamp: Date;
}
const batchPipeline = new IngestPipeline<BatchRecord>("batch_records", {
ingest: {
format: IngestionFormat.JSON_ARRAY // Accept arrays of records
},
stream: true,
table: true,
});
Ingest Pipeline Data Flow:
Received data is validated against your data model type
Moose writes validated data to the associated stream (if configured)
Moose automatically syncs buffered data to the configured table (if enabled)
Using Ingestion APIs
Moose operates a webserver on port 4000 by default. All ingestion endpoints are available at POST localhost:4000/ingest/<name>
:
Ingestion API Endpoint
The ingestion API endpoint is POST localhost:4000/ingest/<name>
, where <name>
is the string value you provided as the first argument to the IngestPipeline
or IngestApi
constructor.
import { IngestPipeline, IngestionFormat } from "@514labs/moose-lib";
interface Record {
id: string;
name: string;
value: number;
timestamp: Date;
}
const simplePipeline = new IngestPipeline<Record>("record", {
ingest: true, // Creates a REST API endpoint at POST localhost:4000/ingest/record
stream: true,
table: true
});
// Creates a REST API endpoint at POST localhost:4000/ingest/batch_records
const batchPipeline = new IngestPipeline<Record>("batch_records", {
ingest: {
format: IngestionFormat.JSON_ARRAY // Accept batch of records
},
stream: true,
table: true
});
from moose_lib import IngestPipeline, IngestPipelineConfig, IngestConfig, IngestionFormat
from pydantic import BaseModel
class Record(BaseModel):
id: Key[str]
name: str
value: int
timestamp: datetime
simple_pipeline = IngestPipeline[Record](
name="record", # Ingestion API endpoint is POST localhost:4000/ingest/record
config=IngestPipelineConfig(
ingest=True,
stream=True,
table=True
)
)
batch_pipeline = IngestPipeline[Record](
name="batch_records", # Ingestion API endpoint is POST localhost:4000/ingest/batch_records
config=IngestPipelineConfig(
ingest=IngestConfig(
format=IngestionFormat.JSON_ARRAY # Accept batch of records
),
stream=True,
table=True
)
)
# Single record ingestion
curl -X POST http://localhost:4000/ingest/example \
-H "Content-Type: application/json" \
-d '{
"id": "pv_123",
"name": "John Doe",
"value": 100,
"timestamp": "2024-03-24T10:30:00Z"
}'
# Batch ingestion
curl -X POST http://localhost:4000/ingest/batch_records \
-H "Content-Type: application/json" \
-d '[
{
"id": "123",
"name": "John Doe",
"value": 100,
"timestamp": "2024-03-24T10:30:00Z"
},
{
"id": "456",
"name": "Jane Doe",
"value": 200,
"timestamp": "2024-03-24T10:30:00Z"
}
]'
Response Codes
Moose automatically provides standard HTTP responses:
Status Code | Meaning | Response Body |
---|---|---|
200 | Success | { "success": true } |
400 | Validation error | { "error": "Detailed message"} |
Client Generation with OpenAPI
Moose automatically generates OpenAPI documentation for all ingestion endpoints:
Locating OpenAPI Spec
Development
http://localhost:5001/openapi.yaml
Production
https://your-domain.com/openapi.yaml
Project file
.moose/openapi.yaml
Using OpenAPI UI Client
Use OpenAPI Viewer IDE Extension:
We recommend the OpenAPI Viewer extension for VSCode.

The OpenAPI spec includes example data for each Data Model schema, which can be used to construct requests. If using the Swagger UI, you can use the example data to construct requests by clicking the Try it out
button.
Generate SDKs with OpenAPI Generator
Install the OpenAPI Generator:
npm install -g @openapitools/openapi-generator-cli
# Generate TypeScript client
openapi-generator-cli generate \
-i .moose/openapi.yaml \
-g typescript-fetch \
-o ./sdk
# Generate Python client
openapi-generator-cli generate \
-i .moose/openapi.yaml \
-g python-requests \
-o ./sdk
Configuration Options
IngestPipeline Options
interface PipelineConfig<T> {
ingest?: boolean | {
format?: IngestionFormat; // JSON (default) or JSON_ARRAY
};
stream?: boolean | {
parallelism?: number; // Default: 1
retentionPeriod?: number; // In seconds, default: 24h
};
table?: boolean | {
orderByFields?: (keyof T)[];
deduplicate?: boolean;
};
}
Data Formats
import { IngestionFormat } from "@514labs/moose-lib";
// Single record ingestion
const singleRecordPipeline = new IngestPipeline<Record>("single_records", {
ingest: {
format: IngestionFormat.JSON // Default
}
});
// Batch record ingestion
const batchPipeline = new IngestPipeline<Record>("batch_records", {
ingest: {
format: IngestionFormat.JSON_ARRAY
}
});
Using IngestPipeline
vs. IngestApi
Use IngestPipeline when:
You want to create a new ingestion endpoint, stream, and table
You want to simplify configuration and reduce boilerplate
Use IngestApi when:
You have an existing Stream object that you want to connect to
Validation Layer
Moose’s ingestion APIs automatically validate all incoming data against your TypeScript interface:
interface UserEvent {
id: string; // Required string
userId: string; // Required string
timestamp: Date; // Required date (ISO format)
properties?: { // Optional object
device?: string; // Optional string
version?: number; // Optional number
}
}
// This will be validated against the interface:
// ✅ Valid: { "id": "event1", "userId": "user1", "timestamp": "2023-05-10T15:30:00Z" }
// ❌ Invalid: { "id": "event1" } // Missing required fields
// ❌ Invalid: { "id": "event1", "userId": "user1", "timestamp": "not-a-date" } // Invalid date format
Best Practices
API Usage
Use batch ingestion for high-volume data
Implement client-side retry logic
Consider rate limiting implications
Handle validation errors gracefully
Performance Considerations
Configure proper stream parallelism based on load
Use batch ingestion for high-throughput scenarios
Monitor API response times
Set appropriate retention periods
See the API Reference for complete configuration options.