Moose automatically handles batch writes between streams and OLAP tables through a destination configuration. When you specify a destination OLAP table for a stream, Moose provisions a background synchronization process that batches and writes data from the stream to the table.
from moose_lib import Stream, OlapTable, Key class Event(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str events_table = OlapTable[Event]("events") events_stream = Stream[Event]("events", StreamConfig( destination=events_table # This configures automatic batching))Automatically batches inserts according to ClickHouse-recommended best practices
Guarantees that data is delivered at least once, even in the face of transient errors
Works automatically once destination is set to a valid OLAP Table reference
The simplest way to set up automatic syncing is with an IngestPipeline, which creates all components and wires them together:
from moose_lib import IngestPipeline, IngestPipelineConfig, Keyfrom pydantic import BaseModelfrom datetime import datetime class Event(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str # Creates stream, table, API, and automatic syncevents_pipeline = IngestPipeline[Event]("events", IngestPipelineConfig( ingest_api=True, stream=True, # Creates stream table=True # Creates destination table + auto-sync process))For more granular control, you can configure components individually:
from moose_lib import Stream, OlapTable, IngestApi, StreamConfig, Keyfrom pydantic import BaseModelfrom datetime import datetime class Event(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str # Create table firstevents_table = OlapTable[Event]("events") # Create stream with destination table (enables auto-sync)events_stream = Stream[Event]("events", StreamConfig( destination=events_table # This configures automatic batching)) # Create API that writes to the streamevents_api = IngestApi[Event]("events", { "destination": events_stream})When you configure a stream with a destination table, Moose automatically handles the synchronization by managing a Rust process process in the background.
ClickHouse inserts need to be batched for optimal performance. Moose automatically handles this optimization internally, ensuring your data is efficiently written to ClickHouse without any configuration required.
Here's how data flows through the automatic sync process:
# 1. Data sent to ingestion API requests.post('http://localhost:4000/ingest/events', json={ "id": "evt_123", "user_id": "user_456", "timestamp": "2024-01-15T10:30:00Z", "event_type": "click"}) # 2. API validates and writes to stream# 3. Background sync process batches stream data# 4. Batch automatically written to ClickHouse table when:# - Batch reaches 100,000 records, OR# - 1 second has elapsed since last flush # 5. Data available for queries in events table# SELECT * FROM events WHERE user_id = 'user_456';The sync process provides built-in observability within the Moose runtime: