Moose

Developing

Ingesting Data via API

Ingestion APIs

Viewing

Overview

Ingestion APIs are the entry point for data flowing into your Moose application.

Working with Ingestion APIs

Model data for API validation

Create a type definition that validates incoming data

Set up ingestion endpoints

Configure HTTP REST APIs based on your data models

Connect to destination streams

Direct validated data to streams for buffering and processing

Creating Ingestion APIs

You can create ingestion APIs in two ways:

  • High-level: Using the IngestPipeline class (recommended)
  • Low-level: Manually configuring the IngestApi component for more granular control

Basic Ingestion Pipeline

The IngestPipeline class provides a convenient way to set up ingestion endpoints, streams, and tables with a single declaration:

AnalyticsPipeline.ts
import { IngestPipeline } from "@514labs/moose-lib";
 
interface ExampleSchema {
  id: string;
  name: string;
  value: number;
  timestamp: Date;
}
 
const examplePipeline = new IngestPipeline<ExampleSchema>("example", {
  ingest: true, // Creates a REST API endpoint
  stream: true, // Connects to a stream
  table: true
});

Single Record vs Batch Ingestion

Ingestion APIs automatically support ingesting a single JSON record at a time OR a batched array of JSON objects.

Ingest Pipeline Data Flow:

Received data is validated against your data model type

Moose writes validated data to the associated stream (if configured)

Moose automatically syncs buffered data to the configured table (if enabled)

Using Ingestion APIs

Moose operates a webserver on port 4000 by default. All ingestion endpoints are available at POST localhost:4000/ingest/<name>:

Ingestion API Endpoint

The ingestion API endpoint is POST localhost:4000/ingest/<name>, where <name> is the string value you provided as the first argument (“name”) to the IngestPipeline or IngestApi constructor.

Record.ts
import { IngestPipeline } from "@514labs/moose-lib";
 
interface Record {
  id: string;
  name: string;
  value: number;
  timestamp: Date;
}
 
const simplePipeline = new IngestPipeline<Record>("example", {
  ingest: true, // Creates a REST API endpoint at POST localhost:4000/ingest/example
  stream: true,
  table: true
});
# Single record ingestion
curl -X POST http://localhost:4000/ingest/example \
  -H "Content-Type: application/json" \
  -d '{
    "id": "pv_123",
    "name": "John Doe",
    "value": 100,
    "timestamp": "2024-03-24T10:30:00Z"
  }'
 
# Batch ingestion
curl -X POST http://localhost:4000/ingest/example \
  -H "Content-Type: application/json" \
  -d '[
    {
      "id": "123",
      "name": "John Doe",
      "value": 100,
      "timestamp": "2024-03-24T10:30:00Z"
    },
    {
      "id": "456",
      "name": "Jane Doe",
      "value": 200,
      "timestamp": "2024-03-24T10:30:00Z"
    }
  ]'

Creating Ingestion Workflows

If you do not need to push data into your ingestion pipeline from an external source, you can create an ingestion workflow to push data into your pipeline.

In your TypeScript workflow, you can use the fetch function to hit the ingestion API:

IngestionWorkflow.ts
 
await fetch("http://localhost:4000/ingest/example", {
  method: "POST",
  body: JSON.stringify({
    id: "pv_123",
    name: "John Doe",
    value: 100,
    timestamp: "2024-03-24T10:30:00Z" 
  })
});

View the Workflows page for more information.

Response Codes

Moose automatically provides standard HTTP responses:

Status CodeMeaningResponse Body
200Success{ "success": true }
400Validation error{ "error": "Detailed message"}

Client Generation with OpenAPI

Moose automatically generates OpenAPI documentation for all ingestion endpoints:

Locating OpenAPI Spec

Development

http://localhost:5001/openapi.yaml

Production

https://your-domain.com/openapi.yaml

Project file

.moose/openapi.yaml

Using OpenAPI UI Client

Use OpenAPI Viewer IDE Extension:

We recommend the OpenAPI Viewer extension for VSCode.

The OpenAPI spec includes example data for each Data Model schema, which can be used to construct requests. If using the Swagger UI, you can use the example data to construct requests by clicking the Try it out button.

Generate SDKs with OpenAPI Generator

Install the OpenAPI Generator:

npm install -g @openapitools/openapi-generator-cli
# Generate TypeScript client
openapi-generator-cli generate \
  -i .moose/openapi.yaml \
  -g typescript-fetch \
  -o ./sdk
 
# Generate Python client
openapi-generator-cli generate \
  -i .moose/openapi.yaml \
  -g python-requests \
  -o ./sdk

Configuration Options

IngestPipeline Options

PipelineConfig.ts
interface PipelineConfig<T> {
  ingest?: boolean;
  stream?: boolean | {
    parallelism?: number;      // Default: 1
    retentionPeriod?: number;  // In seconds, default: 24h
  };
  table?: boolean | {
    orderByFields?: (keyof T)[];
    deduplicate?: boolean;
  };
}

Using IngestPipeline vs. IngestApi

Use IngestPipeline when:

You want to create a new ingestion endpoint, stream, and table

You want to simplify configuration and reduce boilerplate

Use IngestApi when:

You have an existing Stream object that you want to connect to

Validation Layer

Moose’s ingestion APIs automatically validate all incoming data against your TypeScript interface:

ValidationExample.ts
interface UserEvent {
  id: string;             // Required string
  userId: string;         // Required string
  timestamp: Date;        // Required date (ISO format)
  properties?: {          // Optional object
    device?: string;      // Optional string
    version?: number;     // Optional number
  }
}
 
// This will be validated against the interface:
// ✅ Valid: { "id": "event1", "userId": "user1", "timestamp": "2023-05-10T15:30:00Z" }
// ❌ Invalid: { "id": "event1" } // Missing required fields
// ❌ Invalid: { "id": "event1", "userId": "user1", "timestamp": "not-a-date" } // Invalid date format

Best Practices

API Usage

Use batch ingestion for high-volume data

Implement client-side retry logic

Consider rate limiting implications

Handle validation errors gracefully

Performance Considerations

Configure proper stream parallelism based on load

Use batch ingestion for high-throughput scenarios

Monitor API response times

Set appropriate retention periods

See the API Reference for complete configuration options.