FiveonefourFiveonefour
Fiveonefour Docs
MooseStackTemplatesGuides
Release Notes
Source514
  1. MooseStack
  2. Moose OLAP
  3. Inserting Data

On this page

From a Stream (Streaming Ingest)From a Workflow (Batch Insert)From a Client AppVia REST APIComing Soon: MooseClientDirect Data InsertionInserting Arrays of RecordsHandling Large Batch InsertsValidation MethodsError Handling StrategiesPerformance OptimizationBest Practices

Inserting Data

Inserting data into your database is a common task. MooseStack provides a few different ways to insert data into your database.

Optional fields with ClickHouse defaults

If a table column is modeled as optional in your app type but has a ClickHouse default, Moose treats incoming records as optional at the API/stream boundary, but the ClickHouse table stores the column as required with a DEFAULT clause. If you omit the field in the payload, ClickHouse fills it with the default at insert time.

Annotated[int, clickhouse_default("18")]

From a Stream (Streaming Ingest)

When you need to stream data into your ClickHouse tables, you can set the Stream.destination as a reference to the OlapTable you want to insert into. This will automatically provision a synchronization process that batches and inserts data into the table.

StreamInsert.ts
import { Stream } from "@514labs/moose-lib"; interface Event {    id: Key<string>;    userId: string;    timestamp: Date;    eventType: string;} const eventsTable = new OlapTable<Event>("Events"); const stream = new Stream<Event>("Events", {    destination: eventsTable // automatically syncs the stream to the table in ClickHouse-optimized batches});
StreamInsert.py
from moose_lib import Stream, StreamConfig, Keyfrom pydantic import BaseModelfrom datetime import datetime class Event(BaseModel):    id: Key[str]    user_id: str    timestamp: datetime    event_type: str events_table = OlapTable[Event]("user_events") events_pipeline = Stream[Event]("user_events", StreamConfig(    destination=events_table # Automatically syncs the stream to the table in ClickHouse-optimized batches))
ClickHouse Requires Batched Inserts

ClickHouse inserts need to be batched for optimal performance. Moose automatically batches your data into ClickHouse-optimized batches of up to 100,000 records, with automatic flushing every second. It also handles at-least-once delivery and retries on connection errors to ensure your data is never lost.

From a Workflow (Batch Insert)

If you have data source better suited for batch patterns, use a workflow and the direct insert() method to land data into your tables:

WorkflowInsert.py
from moose_lib import OlapTable, Key, InsertOptionsfrom pydantic import BaseModelfrom datetime import datetime class UserEvent(BaseModel):    id: Key[str]    user_id: str    timestamp: datetime    event_type: str events_table = OlapTable[UserEvent]("user_events") # Direct insertion for ETL workflowsresult = events_table.insert([    {"id": "evt_1", "user_id": "user_123", "timestamp": datetime.now(), "event_type": "click"},    {"id": "evt_2", "user_id": "user_456", "timestamp": datetime.now(), "event_type": "view"}]) print(f"Successfully inserted: {result.successful} records")print(f"Failed: {result.failed} records")

From a Client App

Via REST API

In your Moose code, you can leverage the built in MooseAPI module to place a POST REST API endpoint in front of your streams and tables to allow you to insert data from external applications.

IngestApi.py
from moose_lib import IngestApi, IngestConfig ingest_api = IngestApi[Event]("user_events", IngestConfig(    destination=events_stream))
OpenAPI Client Integration

With these APIs you can leverage the built-in OpenAPI client integration to generate API clients in your own language to connect to your pipelines from external applications.

Coming Soon: MooseClient

We're working on a new client library that you can use to interact with your Moose pipelines from external applications.

Want to get involved?

Join the community slack to stay updated and let us know if you're interested in helping us build it.

Direct Data Insertion

The OlapTable provides an insert() method that allows you to directly insert data into ClickHouse tables with validation and error handling.

Inserting Arrays of Records

DirectInsert.py
from moose_lib import OlapTable, Key, InsertOptionsfrom pydantic import BaseModelfrom datetime import datetime class UserEvent(BaseModel):    id: Key[str]    user_id: str    timestamp: datetime    event_type: str events_table = OlapTable[UserEvent]("user_events") # Insert single record or array of recordsresult = events_table.insert([    {"id": "evt_1", "user_id": "user_123", "timestamp": datetime.now(), "event_type": "click"},    {"id": "evt_2", "user_id": "user_456", "timestamp": datetime.now(), "event_type": "view"}]) print(f"Successfully inserted: {result.successful} records")print(f"Failed: {result.failed} records")
Best Practice: Use Batching

ClickHouse strongly recommends batching inserts. You should avoid inserting single records in to tables, and consider using Moose Streams and Ingest Pipelines if your data source sends events as individual records.

Handling Large Batch Inserts

For large datasets, use Python generators for memory-efficient processing:

StreamInsert.py
def user_event_generator():    """Generate user events for memory-efficient processing."""    for i in range(10000):        yield {            "id": f"evt_{i}",            "user_id": f"user_{i % 100}",            "timestamp": datetime.now(),            "event_type": "click" if i % 2 == 0 else "view"        } # Insert from generator (validation not available for streams)result = events_table.insert(user_event_generator(), InsertOptions(strategy="fail-fast"))

Validation Methods

Before inserting data, you can validate it using the following methods:

ValidationMethods.py
from moose_lib import OlapTable, Keyfrom pydantic import BaseModel class UserEvent(BaseModel):    id: Key[str]    user_id: str    event_type: str events_table = OlapTable[UserEvent]("user_events") # Validate a single recordvalidated_data, error = events_table.validate_record(unknown_data)if validated_data is not None:    print("Valid data:", validated_data)else:    print("Validation error:", error) # Validate multiple records with detailed error reportingvalidation_result = events_table.validate_records(data_array)print(f"Valid records: {len(validation_result.valid)}")print(f"Invalid records: {len(validation_result.invalid)}")for error in validation_result.invalid:    print(f"Record {error.index} failed: {error.error}")

Error Handling Strategies

Choose from three error handling strategies based on your reliability requirements:

Fail-Fast Strategy (Default)

FailFast.py
from moose_lib import InsertOptions # Stops immediately on any errorresult = events_table.insert(data, InsertOptions(strategy="fail-fast"))

Discard Strategy

Discard.py
from moose_lib import InsertOptions # Discards invalid records, continues with valid onesresult = events_table.insert(data, InsertOptions(    strategy="discard",    allow_errors=10,           # Allow up to 10 failed records    allow_errors_ratio=0.05    # Allow up to 5% failure rate))

Isolate Strategy

Isolate.py
from moose_lib import InsertOptions # Retries individual records to isolate failuresresult = events_table.insert(data, InsertOptions(    strategy="isolate",    allow_errors_ratio=0.1)) # Access detailed failure informationif result.failed_records:    for failed in result.failed_records:        print(f"Record {failed.index} failed: {failed.error}")

Performance Optimization

The insert API includes several performance optimizations:

  • Memoized connections: ClickHouse clients are reused across insert calls
  • Batch processing: Optimized batch sizes for large datasets
  • Async inserts: Automatic async insert mode for datasets > 1000 records
  • Connection management: Use close_client() when completely done
Performance.py
from moose_lib import InsertOptions # For high-throughput scenariosresult = events_table.insert(large_dataset, InsertOptions(    validate=False,  # Skip validation for performance    strategy="discard")) # Clean up when completely done (optional)events_table.close_client()

Best Practices

Insert Best Practices

Use streams for real-time data

Use IngestPipeline with streams for continuous data ingestion from APIs and external sources

Use direct insertion for batch processing

Use OlapTable.insert() for ETL workflows and bulk data imports

Validate data before insertion

Use validation methods to catch data quality issues early

Choose appropriate error handling

Use fail-fast for critical data, discard for high-volume scenarios, and isolate for debugging

  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Data Modeling
Moose Modules
  • Moose OLAP
    • Data Modeling
    • Tables
    • Views
    • Materialized Views
    • Materialized Columns
    • External Data & Introspection
    • External Tables
    • Introspecting Tables
    • Data Access
    • Inserting Data
    • Reading Data
    • Performance & Optimization
    • Schema Optimization
    • Secondary & Data-skipping Indexes
    • TTL (Time-to-Live)
    • Schema Versioning
  • Moose Streaming
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework