Sync to Table
Viewing:
Overview
Moose automatically handles batch writes between streams and OLAP tables through a destination configuration. When you specify a destination
OLAP table for a stream, Moose provisions a background synchronization process that batches and writes data from the stream to the table.
Basic Usage
import { Stream, OlapTable, Key } from "@514labs/moose-lib";
interface Event {
id: Key<string>;
userId: string;
timestamp: Date;
eventType: string;
}
const eventsTable = new OlapTable<Event>("events");
const eventsStream = new Stream<Event>("events", {
destination: eventsTable // This configures automatic batching
});
from moose_lib import Stream, OlapTable, Key
class Event(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
events_table = OlapTable[Event]("events")
events_stream = Stream[Event]("events", StreamConfig(
destination=events_table # This configures automatic batching
))
What is Automatic Sync?
ClickHouse Optimized Batching
Automatically batches inserts according to ClickHouse-recommended best practices
At-least-once Delivery
Guarantees that data is delivered at least once, even in the face of transient errors
1-line Setup
Works automatically once destination is set to a valid OLAP Table reference
Setting Up Automatic Sync
Using IngestPipeline (Easiest)
The simplest way to set up automatic syncing is with an IngestPipeline
, which creates all components and wires them together:
import { IngestPipeline, Key } from "@514labs/moose-lib";
interface Event {
id: Key<string>;
userId: string;
timestamp: Date;
eventType: string;
}
// Creates stream, table, API, and automatic sync
const eventsPipeline = new IngestPipeline<Event>("events", {
ingest: true, // Creates HTTP endpoint at POST /ingest/events
stream: true, // Creates buffering stream
table: true // Creates destination table + auto-sync process
});
from moose_lib import IngestPipeline, IngestPipelineConfig, Key
from pydantic import BaseModel
from datetime import datetime
class Event(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
# Creates stream, table, API, and automatic sync
events_pipeline = IngestPipeline[Event]("events", IngestPipelineConfig(
ingest=True,
stream=True, # Creates stream
table=True # Creates destination table + auto-sync process
))
Standalone Components
For more granular control, you can configure components individually:
import { Stream, OlapTable, IngestApi, Key } from "@514labs/moose-lib";
interface Event {
id: Key<string>;
userId: string;
timestamp: Date;
eventType: string;
}
// Create table first
const eventsTable = new OlapTable<Event>("events");
// Create stream with destination table (enables auto-sync)
const eventsStream = new Stream<Event>("events", {
destination: eventsTable // This configures automatic batching
});
// Create API that writes to the stream
const eventsApi = new IngestApi<Event>("events", {
destination: eventsStream
});
from moose_lib import Stream, OlapTable, IngestApi, StreamConfig, Key
from pydantic import BaseModel
from datetime import datetime
class Event(BaseModel):
id: Key[str]
user_id: str
timestamp: datetime
event_type: str
# Create table first
events_table = OlapTable[Event]("events")
# Create stream with destination table (enables auto-sync)
events_stream = Stream[Event]("events", StreamConfig(
destination=events_table # This configures automatic batching
))
# Create API that writes to the stream
events_api = IngestApi[Event]("events", {
"destination": events_stream
})
How Automatic Syncing Works
When you configure a stream with a destination
table, Moose automatically handles the synchronization by managing a Rust process process in the background.
ClickHouse Requires Batched Inserts
ClickHouse inserts need to be batched for optimal performance. Moose automatically handles this optimization internally, ensuring your data is efficiently written to ClickHouse without any configuration required.
Data Flow Example
Here’s how data flows through the automatic sync process:
// 1. Data sent to ingestion API
fetch('http://localhost:4000/ingest/events', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
id: 'evt_123',
userId: 'user_456',
timestamp: '2024-01-15T10:30:00Z',
eventType: 'click'
})
})
// 2. API validates and writes to stream
// 3. Background sync process batches stream data
// 4. Batch automatically written to ClickHouse table when:
// - Batch reaches 100,000 records, OR
// - 1 second has elapsed since last flush
// 5. Data available for queries in events table
sql`SELECT * FROM events WHERE userId = 'user_456';`
# 1. Data sent to ingestion API
requests.post('http://localhost:4000/ingest/events', json={
"id": "evt_123",
"user_id": "user_456",
"timestamp": "2024-01-15T10:30:00Z",
"event_type": "click"
})
# 2. API validates and writes to stream
# 3. Background sync process batches stream data
# 4. Batch automatically written to ClickHouse table when:
# - Batch reaches 100,000 records, OR
# - 1 second has elapsed since last flush
# 5. Data available for queries in events table
# SELECT * FROM events WHERE user_id = 'user_456';
Monitoring and Observability
The sync process provides built-in observability within the Moose runtime:
- Batch Insert Logs: Records successful batch insertions with sizes and offsets
- Error Handling: Logs transient failures with retry information
- Metrics: Tracks throughput, batch sizes, and error rates
- Offset Tracking: Maintains Kafka consumer group offsets for reliability