# Moose / Streaming / Sync To Table Documentation – Python ## Included Files 1. moose/streaming/sync-to-table/sync-to-table.mdx ## Sync to Table Source: moose/streaming/sync-to-table/sync-to-table.mdx Automatically sync stream data to OLAP tables with intelligent batching # Sync to Table ## Overview Moose automatically handles batch writes between streams and OLAP tables through a **destination configuration**. When you specify a `destination` OLAP table for a stream, Moose provisions a background synchronization process that batches and writes data from the stream to the table. ### Basic Usage ```py filename="SyncToTable.py" copy {12} from moose_lib import Stream, OlapTable, Key class Event(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str events_table = OlapTable[Event]("events") events_stream = Stream[Event]("events", StreamConfig( destination=events_table # This configures automatic batching )) ``` ## Setting Up Automatic Sync ### Using IngestPipeline (Easiest) The simplest way to set up automatic syncing is with an `IngestPipeline`, which creates all components and wires them together: ```py filename="AutoSync.py" copy {14-15} from moose_lib import IngestPipeline, IngestPipelineConfig, Key from pydantic import BaseModel from datetime import datetime class Event(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str # Creates stream, table, API, and automatic sync events_pipeline = IngestPipeline[Event]("events", IngestPipelineConfig( ingest_api=True, stream=True, # Creates stream table=True # Creates destination table + auto-sync process )) ``` ### Standalone Components For more granular control, you can configure components individually: ```py filename="ManualSync.py" copy from moose_lib import Stream, OlapTable, IngestApi, StreamConfig, Key from pydantic import BaseModel from datetime import datetime class Event(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str # Create table first events_table = OlapTable[Event]("events") # Create stream with destination table (enables auto-sync) events_stream = Stream[Event]("events", StreamConfig( destination=events_table # This configures automatic batching )) # Create API that writes to the stream events_api = IngestApi[Event]("events", { "destination": events_stream }) ``` ## How Automatic Syncing Works When you configure a stream with a `destination` table, Moose automatically handles the synchronization by managing a Rust process process in the background. Moose creates a **Rust background process** that: 1. **Consumes** messages from the stream (Kafka/Redpanda topic) 2. **Batches** records up to 100,000 or flushes every second (whichever comes first) 3. **Executes** optimized ClickHouse `INSERT` statements 4. **Commits** stream offsets after successful writes 5. **Retries** failed batches with exponential backoff Default batching parameters: | Parameter | Value | Description | |-----------|-------|-------------| | `MAX_BATCH_SIZE` | 100,000 records | Maximum records per batch insert | | `FLUSH_INTERVAL` | 1 second | Automatic flush regardless of batch size | Currently, you cannot configure the batching parameters, but we're interested in adding this feature. If you need this capability, let us know on slack! [ClickHouse inserts need to be batched for optimal performance](https://clickhouse.com/blog/asynchronous-data-inserts-in-clickhouse#data-needs-to-be-batched-for-optimal-performance). Moose automatically handles this optimization internally, ensuring your data is efficiently written to ClickHouse without any configuration required. ## Data Flow Example Here's how data flows through the automatic sync process: ```py filename="DataFlow.py" copy # 1. Data sent to ingestion API requests.post('http://localhost:4000/ingest/events', json={ "id": "evt_123", "user_id": "user_456", "timestamp": "2024-01-15T10:30:00Z", "event_type": "click" }) # 2. API validates and writes to stream # 3. Background sync process batches stream data # 4. Batch automatically written to ClickHouse table when: # - Batch reaches 100,000 records, OR # - 1 second has elapsed since last flush # 5. Data available for queries in events table # SELECT * FROM events WHERE user_id = 'user_456'; ``` ## Monitoring and Observability The sync process provides built-in observability within the Moose runtime: - **Batch Insert Logs**: Records successful batch insertions with sizes and offsets - **Error Handling**: Logs transient failures with retry information - **Metrics**: Tracks throughput, batch sizes, and error rates - **Offset Tracking**: Maintains Kafka consumer group offsets for reliability