# Moose / Olap / Insert Data Documentation – Python ## Included Files 1. moose/olap/insert-data/insert-data.mdx ## Inserting Data Source: moose/olap/insert-data/insert-data.mdx Insert data into OLAP tables using various methods # Inserting Data Inserting data into your database is a common task. MooseStack provides a few different ways to insert data into your database. If a table column is modeled as optional in your app type but has a ClickHouse default, Moose treats incoming records as optional at the API/stream boundary, but the ClickHouse table stores the column as required with a DEFAULT clause. If you omit the field in the payload, ClickHouse fills it with the default at insert time. `Annotated[int, clickhouse_default("18")]` ## From a Stream (Streaming Ingest) When you need to stream data into your ClickHouse tables, you can set the `Stream.destination` as a reference to the `OlapTable` you want to insert into. This will automatically provision a synchronization process that batches and inserts data into the table. ```py filename="StreamInsert.py" copy from moose_lib import Stream, StreamConfig, Key from pydantic import BaseModel from datetime import datetime class Event(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str events_table = OlapTable[Event]("user_events") events_pipeline = Stream[Event]("user_events", StreamConfig( destination=events_table # Automatically syncs the stream to the table in ClickHouse-optimized batches )) ``` [ClickHouse inserts need to be batched for optimal performance](https://clickhouse.com/blog/asynchronous-data-inserts-in-clickhouse#data-needs-to-be-batched-for-optimal-performance). Moose automatically batches your data into ClickHouse-optimized batches of up to 100,000 records, with automatic flushing every second. It also handles at-least-once delivery and retries on connection errors to ensure your data is never lost. ## From a Workflow (Batch Insert) If you have data source better suited for batch patterns, use a workflow and the direct `insert()` method to land data into your tables: ```py filename="WorkflowInsert.py" copy from moose_lib import OlapTable, Key, InsertOptions from pydantic import BaseModel from datetime import datetime class UserEvent(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str events_table = OlapTable[UserEvent]("user_events") # Direct insertion for ETL workflows result = events_table.insert([ {"id": "evt_1", "user_id": "user_123", "timestamp": datetime.now(), "event_type": "click"}, {"id": "evt_2", "user_id": "user_456", "timestamp": datetime.now(), "event_type": "view"} ]) print(f"Successfully inserted: {result.successful} records") print(f"Failed: {result.failed} records") ``` ## From a Client App ### Via REST API In your Moose code, you can leverage the built in [MooseAPI module](/moose/apis) to place a `POST` REST API endpoint in front of your streams and tables to allow you to insert data from external applications. ```py filename="IngestApi.py" copy from moose_lib import IngestApi, IngestConfig ingest_api = IngestApi[Event]("user_events", IngestConfig( destination=events_stream )) ``` Alternatively, use `IngestPipeline` instead of standalone `IngestApi`, `Stream` `OlapTable` components: ```py filename="IngestPipeline.py" copy from moose_lib import IngestPipeline, IngestPipelineConfig ingest_pipeline = IngestPipeline[Event]("user_events", IngestPipelineConfig( ingest_api=True, stream=True, table=True, )) ``` With these APIs you can leverage the built-in OpenAPI client integration to generate API clients in your own language to connect to your pipelines from external applications. ### Coming Soon: MooseClient We're working on a new client library that you can use to interact with your Moose pipelines from external applications. Join the community slack to stay updated and let us know if you're interested in helping us build it. ## Direct Data Insertion The `OlapTable` provides an `insert()` method that allows you to directly insert data into ClickHouse tables with validation and error handling. ### Inserting Arrays of Records ```py filename="DirectInsert.py" copy from moose_lib import OlapTable, Key, InsertOptions from pydantic import BaseModel from datetime import datetime class UserEvent(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str events_table = OlapTable[UserEvent]("user_events") # Insert single record or array of records result = events_table.insert([ {"id": "evt_1", "user_id": "user_123", "timestamp": datetime.now(), "event_type": "click"}, {"id": "evt_2", "user_id": "user_456", "timestamp": datetime.now(), "event_type": "view"} ]) print(f"Successfully inserted: {result.successful} records") print(f"Failed: {result.failed} records") ``` ClickHouse strongly recommends batching inserts. You should avoid inserting single records in to tables, and consider using Moose Streams and Ingest Pipelines if your data source sends events as individual records. ### Handling Large Batch Inserts For large datasets, use Python generators for memory-efficient processing: ```py filename="StreamInsert.py" copy def user_event_generator(): """Generate user events for memory-efficient processing.""" for i in range(10000): yield { "id": f"evt_{i}", "user_id": f"user_{i % 100}", "timestamp": datetime.now(), "event_type": "click" if i % 2 == 0 else "view" } # Insert from generator (validation not available for streams) result = events_table.insert(user_event_generator(), InsertOptions(strategy="fail-fast")) ``` ### Validation Methods Before inserting data, you can validate it using the following methods: ```py filename="ValidationMethods.py" copy from moose_lib import OlapTable, Key from pydantic import BaseModel class UserEvent(BaseModel): id: Key[str] user_id: str event_type: str events_table = OlapTable[UserEvent]("user_events") # Validate a single record validated_data, error = events_table.validate_record(unknown_data) if validated_data is not None: print("Valid data:", validated_data) else: print("Validation error:", error) # Validate multiple records with detailed error reporting validation_result = events_table.validate_records(data_array) print(f"Valid records: {len(validation_result.valid)}") print(f"Invalid records: {len(validation_result.invalid)}") for error in validation_result.invalid: print(f"Record {error.index} failed: {error.error}") ``` ### Error Handling Strategies Choose from three error handling strategies based on your reliability requirements: #### Fail-Fast Strategy (Default) ```py filename="FailFast.py" copy from moose_lib import InsertOptions # Stops immediately on any error result = events_table.insert(data, InsertOptions(strategy="fail-fast")) ``` #### Discard Strategy ```py filename="Discard.py" copy from moose_lib import InsertOptions # Discards invalid records, continues with valid ones result = events_table.insert(data, InsertOptions( strategy="discard", allow_errors=10, # Allow up to 10 failed records allow_errors_ratio=0.05 # Allow up to 5% failure rate )) ``` #### Isolate Strategy ```py filename="Isolate.py" copy from moose_lib import InsertOptions # Retries individual records to isolate failures result = events_table.insert(data, InsertOptions( strategy="isolate", allow_errors_ratio=0.1 )) # Access detailed failure information if result.failed_records: for failed in result.failed_records: print(f"Record {failed.index} failed: {failed.error}") ``` ### Performance Optimization The insert API includes several performance optimizations: - **Memoized connections**: ClickHouse clients are reused across insert calls - **Batch processing**: Optimized batch sizes for large datasets - **Async inserts**: Automatic async insert mode for datasets > 1000 records - **Connection management**: Use `close_client()` when completely done ```py filename="Performance.py" copy from moose_lib import InsertOptions # For high-throughput scenarios result = events_table.insert(large_dataset, InsertOptions( validate=False, # Skip validation for performance strategy="discard" )) # Clean up when completely done (optional) events_table.close_client() ``` ## Best Practices