Inserting data into your database is a common task. MooseStack provides a few different ways to insert data into your database.
If a table column is modeled as optional in your app type but has a ClickHouse default, Moose treats incoming records as optional at the API/stream boundary, but the ClickHouse table stores the column as required with a DEFAULT clause. If you omit the field in the payload, ClickHouse fills it with the default at insert time.
field?: number & ClickHouseDefault<"18"> or WithDefault<number, "18">
When you need to stream data into your ClickHouse tables, you can set the Stream.destination as a reference to the OlapTable you want to insert into. This will automatically provision a synchronization process that batches and inserts data into the table.
import { Stream } from "@514labs/moose-lib"; interface Event { id: Key<string>; userId: string; timestamp: Date; eventType: string;} const eventsTable = new OlapTable<Event>("Events"); const stream = new Stream<Event>("Events", { destination: eventsTable // automatically syncs the stream to the table in ClickHouse-optimized batches});ClickHouse inserts need to be batched for optimal performance. Moose automatically batches your data into ClickHouse-optimized batches of up to 100,000 records, with automatic flushing every second. It also handles at-least-once delivery and retries on connection errors to ensure your data is never lost.
If you have data source better suited for batch patterns, use a workflow and the direct insert() method to land data into your tables:
import { OlapTable, Key } from "@514labs/moose-lib"; interface Event { id: Key<string>; userId: string; timestamp: Date; eventType: string;} const eventsTable = new OlapTable<Event>("user_events"); const etlTask = new Task<null, void>({ name: "ETL", run: async () => { const result = await eventsTable.insert([ { id: "evt_1", userId: "user_123", timestamp: new Date(), eventType: "click" }, { id: "evt_2", userId: "user_456", timestamp: new Date(), eventType: "view" } // ... more records of type Event ]); }}) export const etlWorkflow = new Workflow({ name: "ETL", startingTask: [etlTask]})In your Moose code, you can leverage the built in MooseAPI module to place a POST REST API endpoint in front of your streams and tables to allow you to insert data from external applications.
import { IngestApi } from "@514labs/moose-lib"; const ingestApi = new IngestApi("user_events", { destination: events_stream});We're working on a new client library that you can use to interact with your Moose pipelines from external applications.
The OlapTable provides an insert() method that allows you to directly insert data into ClickHouse tables with validation and error handling.
import { OlapTable, Key } from "@514labs/moose-lib"; interface UserEvent { id: Key<string>; userId: string; timestamp: Date; eventType: string;} const eventsTable = new OlapTable<UserEvent>("user_events"); // Insert single record or array of recordsconst result = await eventsTable.insert([ { id: "evt_1", userId: "user_123", timestamp: new Date(), eventType: "click" }, { id: "evt_2", userId: "user_456", timestamp: new Date(), eventType: "view" }]); console.log(`Successfully inserted: ${result.successful} records`);console.log(`Failed: ${result.failed} records`);ClickHouse strongly recommends batching inserts. You should avoid inserting single records in to tables, and consider using Moose Streams and Ingest Pipelines if your data source sends events as individual records.
For large datasets, use Node.js streams for memory-efficient processing:
import { Readable } from 'node:stream'; const dataStream = new Readable({ objectMode: true, read() { // Stream implementation }}); const result = await eventsTable.insert(dataStream, { strategy: 'fail-fast' // Note: 'isolate' not supported with streams});Before inserting data, you can validate it using the following methods:
// Type guard with compile-time type narrowingif (eventsTable.isValidRecord(unknownData)) { // TypeScript now knows unknownData is UserEvent console.log(unknownData.userId); // Type-safe access} // Detailed validation with error reportingconst validationResult = eventsTable.validateRecord(unknownData);if (validationResult.success) { console.log("Valid data:", validationResult.data);} else { console.log("Validation errors:", validationResult.errors);} // Assert validation (throws on failure)try { const validData = eventsTable.assertValidRecord(unknownData); // Use validData with full type safety} catch (error) { console.log("Validation failed:", error.message);}Choose from three error handling strategies based on your reliability requirements:
// Stops immediately on any errorconst result = await eventsTable.insert(data, { strategy: 'fail-fast'});// Discards invalid records, continues with valid onesconst result = await eventsTable.insert(data, { strategy: 'discard', allowErrors: 10, // Allow up to 10 failed records allowErrorsRatio: 0.05 // Allow up to 5% failure rate});// Retries individual records to isolate failuresconst result = await eventsTable.insert(data, { strategy: 'isolate', allowErrorsRatio: 0.1}); // Access detailed failure informationif (result.failedRecords) { result.failedRecords.forEach(failed => { console.log(`Record ${failed.index} failed: ${failed.error}`); });}The insert API includes several performance optimizations:
close_client() when completely done// For high-throughput scenariosconst result = await eventsTable.insert(largeDataset, { validate: false, // Skip validation for performance strategy: 'discard'}); // Clean up when completely done (optional)await eventsTable.closeClient();Use IngestPipeline with streams for continuous data ingestion from APIs and external sources
Use OlapTable.insert() for ETL workflows and bulk data imports
Use validation methods to catch data quality issues early
Use fail-fast for critical data, discard for high-volume scenarios, and isolate for debugging