Inserting Data
Viewing:
Inserting data into your database is a common task. MooseStack provides a few different ways to insert data into your database.
From a Stream (Streaming Ingest)
When you need to stream data into your ClickHouse tables, you can set the Stream.destination
as a reference to the OlapTable
you want to insert into. This will automatically provision a synchronization process that batches and inserts data into the table.
import { Stream } from "@514labs/moose-lib";
interface Event {
id: Key<string>;
userId: string;
timestamp: Date;
eventType: string;
}
const eventsTable = new OlapTable<Event>("Events");
const stream = new Stream<Event>("Events", {
destination: eventsTable // automatically syncs the stream to the table in ClickHouse-optimized batches
});
from moose_lib import Stream, StreamConfig, Key
from pydantic import BaseModel
from datetime import datetime
class Event(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
events_table = OlapTable[Event]("user_events")
events_pipeline = Stream[Event]("user_events", StreamConfig(
destination=events_table # Automatically syncs the stream to the table in ClickHouse-optimized batches
))
ClickHouse Requires Batched Inserts
ClickHouse inserts need to be batched for optimal performance. Moose automatically batches your data into ClickHouse-optimized batches of up to 100,000 records, with automatic flushing every second. It also handles at-least-once delivery and retries on connection errors to ensure your data is never lost.
From a Workflow (Batch Insert)
If you have data source better suited for batch patterns, use a workflow and the direct insert()
method to land data into your tables:
import { OlapTable, Key } from "@514labs/moose-lib";
interface Event {
id: Key<string>;
userId: string;
timestamp: Date;
eventType: string;
}
const eventsTable = new OlapTable<Event>("user_events");
const etlTask = new Task<null, void>({
name: "ETL",
run: async () => {
const result = await eventsTable.insert([
{ id: "evt_1", userId: "user_123", timestamp: new Date(), eventType: "click" },
{ id: "evt_2", userId: "user_456", timestamp: new Date(), eventType: "view" }
// ... more records of type Event
]);
}
})
export const etlWorkflow = new Workflow({
name: "ETL",
startingTask: [etlTask]
})
from moose_lib import OlapTable, Key, InsertOptions
from pydantic import BaseModel
from datetime import datetime
class UserEvent(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
events_table = OlapTable[UserEvent]("user_events")
# Direct insertion for ETL workflows
result = events_table.insert([
{"id": "evt_1", "user_id": "user_123", "timestamp": datetime.now(), "event_type": "click"},
{"id": "evt_2", "user_id": "user_456", "timestamp": datetime.now(), "event_type": "view"}
])
print(f"Successfully inserted: {result.successful} records")
print(f"Failed: {result.failed} records")
From a Client App
Via REST API
In your Moose code, you can leverage the built in MooseAPI module to place a POST
REST API endpoint in front of your streams and tables to allow you to insert data from external applications.
import { IngestApi } from "@514labs/moose-lib";
const ingestApi = new IngestApi("user_events", {
destination: events_stream
});
from moose_lib import IngestApi, IngestConfig
ingest_api = IngestApi[Event]("user_events", IngestConfig(
destination=events_stream
))
OpenAPI Client Integration
With these APIs you can leverage the built-in OpenAPI client integration to generate API clients in your own language to connect to your pipelines from external applications.
Coming Soon: MooseClient
We’re working on a new client library that you can use to interact with your Moose pipelines from external applications.
Want to get involved?
Join the community slack to stay updated and let us know if you’re interested in helping us build it.
Direct Data Insertion
The OlapTable
provides an insert()
method that allows you to directly insert data into ClickHouse tables with validation and error handling.
Inserting Arrays of Records
import { OlapTable, Key } from "@514labs/moose-lib";
interface UserEvent {
id: Key<string>;
userId: string;
timestamp: Date;
eventType: string;
}
const eventsTable = new OlapTable<UserEvent>("user_events");
// Insert single record or array of records
const result = await eventsTable.insert([
{ id: "evt_1", userId: "user_123", timestamp: new Date(), eventType: "click" },
{ id: "evt_2", userId: "user_456", timestamp: new Date(), eventType: "view" }
]);
console.log(`Successfully inserted: ${result.successful} records`);
console.log(`Failed: ${result.failed} records`);
from moose_lib import OlapTable, Key, InsertOptions
from pydantic import BaseModel
from datetime import datetime
class UserEvent(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
events_table = OlapTable[UserEvent]("user_events")
# Insert single record or array of records
result = events_table.insert([
{"id": "evt_1", "user_id": "user_123", "timestamp": datetime.now(), "event_type": "click"},
{"id": "evt_2", "user_id": "user_456", "timestamp": datetime.now(), "event_type": "view"}
])
print(f"Successfully inserted: {result.successful} records")
print(f"Failed: {result.failed} records")
Best Practice: Use Batching
ClickHouse strongly recommends batching inserts. You should avoid inserting single records in to tables, and consider using Moose Streams and Ingest Pipelines if your data source sends events as individual records.
Handling Large Batch Inserts
For large datasets, use Node.js streams for memory-efficient processing:
import { Readable } from 'node:stream';
const dataStream = new Readable({
objectMode: true,
read() {
// Stream implementation
}
});
const result = await eventsTable.insert(dataStream, {
strategy: 'fail-fast' // Note: 'isolate' not supported with streams
});
For large datasets, use Python generators for memory-efficient processing:
def user_event_generator():
"""Generate user events for memory-efficient processing."""
for i in range(10000):
yield {
"id": f"evt_{i}",
"user_id": f"user_{i % 100}",
"timestamp": datetime.now(),
"event_type": "click" if i % 2 == 0 else "view"
}
# Insert from generator (validation not available for streams)
result = events_table.insert(user_event_generator(), InsertOptions(strategy="fail-fast"))
Validation Methods
Before inserting data, you can validate it using the following methods:
// Type guard with compile-time type narrowing
if (eventsTable.isValidRecord(unknownData)) {
// TypeScript now knows unknownData is UserEvent
console.log(unknownData.userId); // Type-safe access
}
// Detailed validation with error reporting
const validationResult = eventsTable.validateRecord(unknownData);
if (validationResult.success) {
console.log("Valid data:", validationResult.data);
} else {
console.log("Validation errors:", validationResult.errors);
}
// Assert validation (throws on failure)
try {
const validData = eventsTable.assertValidRecord(unknownData);
// Use validData with full type safety
} catch (error) {
console.log("Validation failed:", error.message);
}
from moose_lib import OlapTable, Key
from pydantic import BaseModel
class UserEvent(BaseModel):
id: Key[str]
user_id: str
event_type: str
events_table = OlapTable[UserEvent]("user_events")
# Validate a single record
validated_data, error = events_table.validate_record(unknown_data)
if validated_data is not None:
print("Valid data:", validated_data)
else:
print("Validation error:", error)
# Validate multiple records with detailed error reporting
validation_result = events_table.validate_records(data_array)
print(f"Valid records: {len(validation_result.valid)}")
print(f"Invalid records: {len(validation_result.invalid)}")
for error in validation_result.invalid:
print(f"Record {error.index} failed: {error.error}")
Error Handling Strategies
Choose from three error handling strategies based on your reliability requirements:
Fail-Fast Strategy (Default)
// Stops immediately on any error
const result = await eventsTable.insert(data, {
strategy: 'fail-fast'
});
from moose_lib import InsertOptions
# Stops immediately on any error
result = events_table.insert(data, InsertOptions(strategy="fail-fast"))
Discard Strategy
// Discards invalid records, continues with valid ones
const result = await eventsTable.insert(data, {
strategy: 'discard',
allowErrors: 10, // Allow up to 10 failed records
allowErrorsRatio: 0.05 // Allow up to 5% failure rate
});
from moose_lib import InsertOptions
# Discards invalid records, continues with valid ones
result = events_table.insert(data, InsertOptions(
strategy="discard",
allow_errors=10, # Allow up to 10 failed records
allow_errors_ratio=0.05 # Allow up to 5% failure rate
))
Isolate Strategy
// Retries individual records to isolate failures
const result = await eventsTable.insert(data, {
strategy: 'isolate',
allowErrorsRatio: 0.1
});
// Access detailed failure information
if (result.failedRecords) {
result.failedRecords.forEach(failed => {
console.log(`Record ${failed.index} failed: ${failed.error}`);
});
}
from moose_lib import InsertOptions
# Retries individual records to isolate failures
result = events_table.insert(data, InsertOptions(
strategy="isolate",
allow_errors_ratio=0.1
))
# Access detailed failure information
if result.failed_records:
for failed in result.failed_records:
print(f"Record {failed.index} failed: {failed.error}")
Performance Optimization
The insert API includes several performance optimizations:
- Memoized connections: ClickHouse clients are reused across insert calls
- Batch processing: Optimized batch sizes for large datasets
- Async inserts: Automatic async insert mode for datasets > 1000 records
- Connection management: Use
close_client()
when completely done
// For high-throughput scenarios
const result = await eventsTable.insert(largeDataset, {
validate: false, // Skip validation for performance
strategy: 'discard'
});
// Clean up when completely done (optional)
await eventsTable.closeClient();
from moose_lib import InsertOptions
# For high-throughput scenarios
result = events_table.insert(large_dataset, InsertOptions(
validate=False, # Skip validation for performance
strategy="discard"
))
# Clean up when completely done (optional)
events_table.close_client()
Best Practices
Insert Best Practices
Use streams for real-time data
Use IngestPipeline with streams for continuous data ingestion from APIs and external sources
Use direct insertion for batch processing
Use OlapTable.insert() for ETL workflows and bulk data imports
Validate data before insertion
Use validation methods to catch data quality issues early
Choose appropriate error handling
Use fail-fast for critical data, discard for high-volume scenarios, and isolate for debugging