Moose

Developing

Ingesting via API

Ingestion APIs

Viewing typescript

switch to python

Overview

Ingestion APIs are the entry point for data flowing into your Moose application.

Working with Ingestion APIs

Model data for API validation

Create a type definition that validates incoming data

Set up ingestion endpoints

Configure how data enters your pipeline with HTTP REST APIs-- single record or batch

Connect to destination streams

Direct validated data to streams for buffering and processing

Creating Ingestion APIs

You can create ingestion APIs in two ways:

  • High-level: Using the IngestPipeline class (recommended)
  • Low-level: Manually configuring the IngestApi component for more granular control

Basic Ingestion Pipeline

The IngestPipeline class provides a convenient way to set up ingestion endpoints, streams, and tables with a single declaration:

AnalyticsPipeline.ts
import { IngestPipeline, IngestionFormat } from "@514labs/moose-lib";
 
interface ExampleSchema {
  id: string;
  name: string;
  value: number;
  timestamp: Date;
}
 
const examplePipeline = new IngestPipeline<ExampleSchema>("example", {
  ingest: true, // Creates a REST API endpoint
  stream: true, // Connects to a stream
  table: true
});

Single Record Ingestion

By default, ingestion APIs accept a single JSON record at a time.

Batch Ingestion Pipeline

Use the format option to configure the ingestion API to accept arrays of records:

BatchPipeline.ts
interface BatchRecord {
  id: string;
  name: string;
  value: number;
  timestamp: Date;
}
 
const batchPipeline = new IngestPipeline<BatchRecord>("batch_records", {
  ingest: {
    format: IngestionFormat.JSON_ARRAY  // Accept arrays of records
  },
  stream: true,
  table: true,
});

Ingest Pipeline Data Flow:

Received data is validated against your data model type

Moose writes validated data to the associated stream (if configured)

Moose automatically syncs buffered data to the configured table (if enabled)

Using Ingestion APIs

Moose operates a webserver on port 4000 by default. All ingestion endpoints are available at POST localhost:4000/ingest/<name>:

Ingestion API Endpoint

The ingestion API endpoint is POST localhost:4000/ingest/<name>, where <name> is the string value you provided as the first argument to the IngestPipeline or IngestApi constructor.

Record.ts
import { IngestPipeline, IngestionFormat } from "@514labs/moose-lib";
 
interface Record {
  id: string;
  name: string;
  value: number;
  timestamp: Date;
}
 
const simplePipeline = new IngestPipeline<Record>("record", {
  ingest: true, // Creates a REST API endpoint at POST localhost:4000/ingest/record
  stream: true,
  table: true
});
 
 // Creates a REST API endpoint at POST localhost:4000/ingest/batch_records
const batchPipeline = new IngestPipeline<Record>("batch_records", {
  ingest: {
    format: IngestionFormat.JSON_ARRAY // Accept batch of records
  },
  stream: true,
  table: true
});
# Single record ingestion
curl -X POST http://localhost:4000/ingest/example \
  -H "Content-Type: application/json" \
  -d '{
    "id": "pv_123",
    "name": "John Doe",
    "value": 100,
    "timestamp": "2024-03-24T10:30:00Z"
  }'
 
# Batch ingestion
curl -X POST http://localhost:4000/ingest/batch_records \
  -H "Content-Type: application/json" \
  -d '[
    {
      "id": "123",
      "name": "John Doe",
      "value": 100,
      "timestamp": "2024-03-24T10:30:00Z"
    },
    {
      "id": "456",
      "name": "Jane Doe",
      "value": 200,
      "timestamp": "2024-03-24T10:30:00Z"
    }
  ]'

Response Codes

Moose automatically provides standard HTTP responses:

Status CodeMeaningResponse Body
200Success{ "success": true }
400Validation error{ "error": "Detailed message"}

Client Generation with OpenAPI

Moose automatically generates OpenAPI documentation for all ingestion endpoints:

Locating OpenAPI Spec

Development

http://localhost:5001/openapi.yaml

Production

https://your-domain.com/openapi.yaml

Project file

.moose/openapi.yaml

Using OpenAPI UI Client

Use OpenAPI Viewer IDE Extension:

We recommend the OpenAPI Viewer extension for VSCode.

The OpenAPI spec includes example data for each Data Model schema, which can be used to construct requests. If using the Swagger UI, you can use the example data to construct requests by clicking the Try it out button.

Generate SDKs with OpenAPI Generator

Install the OpenAPI Generator:

npm install -g @openapitools/openapi-generator-cli
# Generate TypeScript client
openapi-generator-cli generate \
  -i .moose/openapi.yaml \
  -g typescript-fetch \
  -o ./sdk
 
# Generate Python client
openapi-generator-cli generate \
  -i .moose/openapi.yaml \
  -g python-requests \
  -o ./sdk

Configuration Options

IngestPipeline Options

PipelineConfig.ts
interface PipelineConfig<T> {
  ingest?: boolean | {
    format?: IngestionFormat;  // JSON (default) or JSON_ARRAY
  };
  stream?: boolean | {
    parallelism?: number;      // Default: 1
    retentionPeriod?: number;  // In seconds, default: 24h
  };
  table?: boolean | {
    orderByFields?: (keyof T)[];
    deduplicate?: boolean;
  };
}

Data Formats

FormatConfig.ts
import { IngestionFormat } from "@514labs/moose-lib";
 
// Single record ingestion
const singleRecordPipeline = new IngestPipeline<Record>("single_records", {
  ingest: {
    format: IngestionFormat.JSON  // Default
  }
});
 
// Batch record ingestion
const batchPipeline = new IngestPipeline<Record>("batch_records", {
  ingest: {
    format: IngestionFormat.JSON_ARRAY
  }
});

Using IngestPipeline vs. IngestApi

Use IngestPipeline when:

You want to create a new ingestion endpoint, stream, and table

You want to simplify configuration and reduce boilerplate

Use IngestApi when:

You have an existing Stream object that you want to connect to

Validation Layer

Moose’s ingestion APIs automatically validate all incoming data against your TypeScript interface:

ValidationExample.ts
interface UserEvent {
  id: string;             // Required string
  userId: string;         // Required string
  timestamp: Date;        // Required date (ISO format)
  properties?: {          // Optional object
    device?: string;      // Optional string
    version?: number;     // Optional number
  }
}
 
// This will be validated against the interface:
// ✅ Valid: { "id": "event1", "userId": "user1", "timestamp": "2023-05-10T15:30:00Z" }
// ❌ Invalid: { "id": "event1" } // Missing required fields
// ❌ Invalid: { "id": "event1", "userId": "user1", "timestamp": "not-a-date" } // Invalid date format

Best Practices

API Usage

Use batch ingestion for high-volume data

Implement client-side retry logic

Consider rate limiting implications

Handle validation errors gracefully

Performance Considerations

Configure proper stream parallelism based on load

Use batch ingestion for high-throughput scenarios

Monitor API response times

Set appropriate retention periods

See the API Reference for complete configuration options.