# Moose / Streaming Documentation – TypeScript ## Included Files 1. moose/streaming/connect-cdc.mdx 2. moose/streaming/consumer-functions.mdx 3. moose/streaming/create-stream.mdx 4. moose/streaming/dead-letter-queues.mdx 5. moose/streaming/from-your-code.mdx 6. moose/streaming/schema-registry.mdx 7. moose/streaming/sync-to-table.mdx 8. moose/streaming/transform-functions.mdx ## connect-cdc Source: moose/streaming/connect-cdc.mdx # Connect to CDC Services Coming Soon! --- ## Streaming Consumer Functions Source: moose/streaming/consumer-functions.mdx Read and process data from streams with consumers and processors # Streaming Consumer Functions ## Overview Consuming data from streams allows you to read and process data from Kafka/Redpanda topics. This is essential for building real-time applications, analytics, and event-driven architectures. ## Basic Usage Consumers are just functions that are called when new data is available in a stream. You add them to a stream like this: ```typescript filename="StreamConsumer.ts" interface UserEvent { id: string; userId: string; timestamp: Date; eventType: string; } const userEventsStream = new Stream("user-events"); // Add a consumer to process events userEventsStream.addConsumer((event: UserEvent) => { console.log(`Processing event: ${event.id}`); console.log(`User: ${event.userId}, Type: ${event.eventType}`); // Your processing logic here // e.g., update analytics, send notifications, etc. }); // Add multiple consumers for different purposes userEventsStream.addConsumer((event: UserEvent) => { // Analytics processing if (event.eventType === 'purchase') { updatePurchaseAnalytics(event); } }); userEventsStream.addConsumer((event: UserEvent) => { // Notification processing if (event.eventType === 'signup') { sendWelcomeEmail(event.userId); } }); ``` ## Processing Patterns ### Stateful Processing with MooseCache Maintain state across event processing using MooseCache for distributed state management: ```typescript filename="StatefulProcessing.ts" // State container for accumulating data interface AccumulatorState { id: string; counter: number; sum: number; lastModified: Date; attributes: Record; } // Input message structure interface InputMessage { id: string; groupId: string; numericValue: number; messageType: string; timestamp: Date; payload: Record; } const messageStream = new Stream("input-stream"); messageStream.addConsumer(async (message: InputMessage) => { // Get distributed cache instance const cache = await MooseCache.get(); const cacheKey = `state:${message.groupId}`; // Load existing state or create new one let state: AccumulatorState | null = await cache.get(cacheKey); if (!state) { // Initialize new state state = { id: message.groupId, counter: 0, sum: 0, lastModified: new Date(), attributes: {} }; } // Apply message to state state.counter += 1; state.sum += message.numericValue; state.lastModified = message.timestamp; state.attributes = { ...state.attributes, ...message.payload }; // Determine cache lifetime based on message type const ttlSeconds = message.messageType === 'complete' ? 60 : 3600; if (message.messageType === 'complete' || shouldFinalize(state)) { // Finalize and remove state await finalizeState(state); await cache.delete(cacheKey); } else { // Persist updated state await cache.set(cacheKey, state, ttlSeconds); } }); // Condition for automatic state finalization function shouldFinalize(state: AccumulatorState): boolean { const threshold = 100; const timeLimit = 30 * 60 * 1000; // 30 minutes const elapsed = new Date().getTime() - state.lastModified.getTime(); return state.counter >= threshold || elapsed > timeLimit; } async function finalizeState(state: AccumulatorState): Promise { console.log(`Finalizing state ${state.id}: counter=${state.counter}, sum=${state.sum}`); } ``` ## Propagating Events to External Systems You can use consumer functions to trigger actions across external systems - send notifications, sync databases, update caches, or integrate with any other service when events occur: ### HTTP API Calls Send processed data to external APIs: ```typescript filename="HttpIntegration.ts" interface WebhookPayload { id: string; data: Record; timestamp: Date; } const webhookStream = new Stream("webhook-events"); webhookStream.addConsumer(async (payload: WebhookPayload) => { try { // Send to external webhook const response = await fetch('https://external-api.com/webhook', { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + process.env.API_TOKEN }, body: JSON.stringify({ eventId: payload.id, eventData: payload.data, processedAt: new Date().toISOString() }) }); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } console.log(`Successfully sent event ${payload.id} to external API`); } catch (error) { console.error(`Failed to send event ${payload.id}:`, error); // Could implement retry logic or dead letter queue here } }); ``` #### Database Operations Write processed data to external databases: ```typescript filename="DatabaseIntegration.ts" interface DatabaseRecord { id: string; category: string; value: number; metadata: Record; timestamp: Date; } const dbStream = new Stream("database-events"); // Initialize database connection const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME }; dbStream.addConsumer(async (record: DatabaseRecord) => { const connection = await createConnection(dbConfig); try { // Insert record into external database await connection.execute( 'INSERT INTO processed_events (id, category, value, metadata, created_at) VALUES (?, ?, ?, ?, ?)', [ record.id, record.category, record.value, JSON.stringify(record.metadata), record.timestamp ] ); console.log(`Inserted record ${record.id} into database`); } catch (error) { console.error(`Database insert failed for record ${record.id}:`, error); } finally { await connection.end(); } }); ``` #### File System Operations Write processed data to files or cloud storage: ```typescript filename="FileSystemIntegration.ts" interface FileOutput { id: string; filename: string; content: string; directory: string; format: 'json' | 'csv' | 'txt'; } const fileStream = new Stream("file-events"); fileStream.addConsumer(async (output: FileOutput) => { try { // Ensure directory exists await mkdir(output.directory, { recursive: true }); // Format content based on type let formattedContent: string; switch (output.format) { case 'json': formattedContent = JSON.stringify(JSON.parse(output.content), null, 2); break; case 'csv': formattedContent = output.content; // Assume already CSV formatted break; default: formattedContent = output.content; } // Write file with timestamp const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); const filename = `${output.filename}_${timestamp}.${output.format}`; const filepath = join(output.directory, filename); await writeFile(filepath, formattedContent, 'utf8'); console.log(`Written file: ${filepath}`); } catch (error) { console.error(`Failed to write file for output ${output.id}:`, error); } }); ``` #### Email and Notifications Send alerts and notifications based on processed events: ```typescript filename="NotificationIntegration.ts" interface NotificationEvent { id: string; type: 'email' | 'slack' | 'webhook'; recipient: string; subject: string; message: string; priority: 'low' | 'medium' | 'high'; metadata: Record; } const notificationStream = new Stream("notifications"); // Configure email transporter const emailTransporter = nodemailer.createTransporter({ host: process.env.SMTP_HOST, port: parseInt(process.env.SMTP_PORT || '587'), secure: false, auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS } }); notificationStream.addConsumer(async (notification: NotificationEvent) => { try { switch (notification.type) { case 'email': await emailTransporter.sendMail({ from: process.env.SMTP_FROM, to: notification.recipient, subject: notification.subject, text: notification.message, html: `

${notification.subject}

${notification.message}

Priority: ${notification.priority}

` }); break; case 'slack': await fetch(`https://hooks.slack.com/services/${process.env.SLACK_WEBHOOK}`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ text: notification.message, channel: notification.recipient, username: 'Moose Alert', icon_emoji: notification.priority === 'high' ? ':warning:' : ':information_source:' }) }); break; case 'webhook': await fetch(notification.recipient, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ id: notification.id, subject: notification.subject, message: notification.message, priority: notification.priority, metadata: notification.metadata }) }); break; } console.log(`Sent ${notification.type} notification ${notification.id}`); } catch (error) { console.error(`Failed to send notification ${notification.id}:`, error); } }); ``` --- ## Create Streams Source: moose/streaming/create-stream.mdx Define and create Kafka/Redpanda topics with type-safe schemas # Creating Streams ## Overview Streams serve as the transport layer between your data sources and database tables. Built on Kafka/Redpanda topics, they provide a way to implement real-time pipelines for ingesting and processing incoming data. ## Creating Streams You can create streams in two ways: - High-level: Using the `IngestPipeline` class (recommended) - Low-level: Manually configuring the `Stream` component ### Streams for Ingestion The `IngestPipeline` class provides a convenient way to set up streams with ingestion APIs and tables. This is the recommended way to create streams for ingestion: ```ts filename="IngestionStream.ts" copy {10} interface RawData { id: Key; value: number; } ); ``` While the `IngestPipeline` provides a convenient way to set up streams with ingestion APIs and tables, you can also configure these components individually for more granular control: ```ts filename="StreamObject.ts" copy {8-12} interface RawData { id: string; value: number; } // Create a table for the raw data ); // Create an ingestion API for the raw data ); ``` ### Streams for Transformations If the raw data needs to be transformed before landing in the database, you can define a transform destination stream and a transform function to process the data: #### Single Stream Transformation ```ts filename="TransformDestinationStream.ts" copy interface RawData { id: Key; value: number; } interface TransformedData { id: Key; transformedValue: number; transformedAt: Date; } // Configure components for raw data ingestion & buffering const rawData = new IngestPipeline("raw_data", { ingestApi: true, stream: true, // Buffers data between the ingestion API and the database table table: false // Don't create a table for the raw data }); // Create a table for the transformed data const transformedData = new IngestPipeline("transformed_data", { ingestApi: false, // Don't create an ingestion API for the transformed data stream: true, // Create destination stream for the transformed data table: true // Create a table for the transformed data }); rawData.stream.addTransform(transformedData.stream, (record) => ({ id: record.id, transformedValue: record.value * 2, transformedAt: new Date() })); ``` ```ts filename="TransformDestinationStream.ts" copy interface RawData { id: Key; value: number; } interface TransformedData { id: Key; transformedValue: number; transformedAt: Date; } // Configure components for raw data ingestion & buffering ); // Configure components for transformed data stream & storage ); // Add a transform to the raw data stream to transform the data rawDataStream.addTransform(transformedStream, (record) => ({ id: record.id, transformedValue: record.value * 2, transformedAt: new Date() })); ``` #### Chaining Transformations For more complex transformations, you can chain multiple transformations together. This is a use case where using a standalone Stream for intermediate stages of your pipeline may be useful: ```ts filename="ChainedTransformations.ts" copy // Define the schema for raw input data interface RawData { id: Key; value: number; } // Define the schema for intermediate transformed data interface IntermediateData { id: Key; transformedValue: number; transformedAt: Date; } // Define the schema for final transformed data interface FinalData { id: Key; transformedValue: number; anotherTransformedValue: number; transformedAt: Date; } // Create the first pipeline for raw data ingestion // Only create an API and a stream (no table) since we're ingesting the raw data const rawData = new IngestPipeline("raw_data", { ingestApi: true, // Enable HTTP ingestion endpoint stream: true, // Create a stream to buffer data table: false // Don't store raw data in a table }); // Create an intermediate stream to hold data between transformations (no api or table needed) )); // Create the final pipeline that will store the fully transformed data const finalData = new IngestPipeline("final_stream", { ingestApi: false, // No direct ingestion to this pipeline stream: true, // Create a stream for processing table: true // Store final results in a table }); // Second transformation: further transform the intermediate data intermediateStream.addTransform(finalData.stream, (record) => ({ id: record.id, transformedValue: record.transformedValue * 2, // Double the intermediate value anotherTransformedValue: record.transformedValue * 3, // Triple the intermediate value transformedAt: new Date() // Update timestamp })); ``` ## Stream Configurations ### Parallelism and Retention ```typescript filename="StreamConfig.ts" ); ``` ### LifeCycle Management Control how Moose manages your stream resources when your code changes. See the [LifeCycle Management guide](./lifecycle) for detailed information. ```typescript filename="LifeCycleStreamConfig.ts" // Production stream with external management ); // Development stream with full management ); ``` See the [API Reference](/moose/reference/ts-moose-lib#stream) for complete configuration options. --- ## Dead Letter Queues Source: moose/streaming/dead-letter-queues.mdx Handle failed stream processing with dead letter queues # Dead Letter Queues ## Overview Dead Letter Queues (DLQs) provide a robust error handling mechanism for stream processing in Moose. When streaming functions fail during transformation or consumption, failed messages are automatically routed to a configured dead letter queue for later analysis and recovery. ## Dead Letter Record Structure When a message fails processing, Moose creates a dead letter record with the following structure: ```ts interface DeadLetterModel { originalRecord: Record; // The original message that failed errorMessage: string; // Error description errorType: string; // Error class/type name failedAt: Date; // Timestamp when failure occurred source: "api" | "transform" | "table"; // Where the failure happened } interface DeadLetter extends DeadLetterModel { asTyped: () => T; // Type-safe access to original record } ``` ## Creating Dead Letter Queues ### Basic Setup ```ts filename="dead-letter-setup.ts" copy // Define your data model interface UserEvent { userId: string; action: string; timestamp: number; } // Create a dead letter queue for UserEvent failures const userEventDLQ = new DeadLetterQueue("UserEventDLQ"); ``` ### Configuring Transformations with Dead Letter Queues Add a dead letter queue to your Transformation Function configuration, and any errors thrown in the transformation will trigger the event to be routed to the dead letter queue. ```ts filename="transform-with-dlq.ts" copy // Create dead letter queue const eventDLQ = new DeadLetterQueue("EventDLQ"); // Add transform with errors to trigger DLQ, and DLQ configuration rawEvents.stream!.addTransform( processedEvents.stream!, (event: RawEvent): ProcessedEvent => { // This transform might fail for invalid data if (!event.userId || event.userId.length === 0) { throw new Error("Invalid userId: cannot be empty"); } if (event.timestamp < 0) { throw new Error("Invalid timestamp: cannot be negative"); } return { userId: event.userId, action: event.action, processedAt: new Date(), isValid: true }; }, { deadLetterQueue: eventDLQ // Configure DLQ for this transform } ); ``` ### Configuring Consumers with Dead Letter Queues Add a dead letter queue to your Consumer Function configuration, and any errors thrown in the function will trigger the event to be routed to the dead letter queue. ```ts filename="consumer-with-dlq.ts" copy // Add consumer with errors to trigger DLQ, and DLQ configuration rawEvents.stream!.addConsumer( (event: RawEvent): void => { // This consumer might fail for certain events if (event.action === "forbidden_action") { throw new Error("Forbidden action detected"); } // Process the event (e.g., send to external API) console.log(`Processing event for user ${event.userId}`); }, { deadLetterQueue: eventDLQ // Configure DLQ for this consumer } ); ``` ### Configuring Ingest APIs with Dead Letter Queues Add a dead letter queue to your Ingest API configuration, and any runtime data validation failures at the API will trigger the event to be routed to the dead letter queue. ```typescript filename="ValidationExample.ts" copy interface ExampleModel { id: string; userId: string; timestamp: Date; properties?: { device?: string; version?: number; } } ); ``` ## Processing Dead Letter Messages ### Monitoring Dead Letter Queues ```ts filename="dlq-monitoring.ts" copy // Add a consumer to monitor dead letter messages eventDLQ.addConsumer((deadLetter) => { console.log("Dead letter received:"); console.log(`Error: ${deadLetter.errorMessage}`); console.log(`Error Type: ${deadLetter.errorType}`); console.log(`Failed At: ${deadLetter.failedAt}`); console.log(`Source: ${deadLetter.source}`); // Access the original typed data const originalEvent: RawEvent = deadLetter.asTyped(); console.log(`Original User ID: ${originalEvent.userId}`); }); ``` ### Recovery and Retry Logic ```ts filename="dlq-recovery.ts" copy // Create a recovery stream for fixed messages const recoveredEvents = new Stream("recovered_events", { destination: processedEvents.table // Send recovered data to main table }); // Add recovery logic to the DLQ eventDLQ.addTransform( recoveredEvents, (deadLetter): ProcessedEvent | null => { try { const originalEvent = deadLetter.asTyped(); // Apply fixes based on error type if (deadLetter.errorMessage.includes("Invalid userId")) { // Skip events with invalid user IDs return null; } if (deadLetter.errorMessage.includes("Invalid timestamp")) { // Fix negative timestamps const fixedEvent = { ...originalEvent, timestamp: Math.abs(originalEvent.timestamp) }; return { userId: fixedEvent.userId, action: fixedEvent.action, processedAt: new Date(), isValid: true }; } return null; // Skip other errors } catch (error) { console.error("Recovery failed:", error); return null; } } ); ``` ## Best Practices ## Common Patterns ### Circuit Breaker Pattern ```ts filename="circuit-breaker.ts" copy let failureCount = 0; const maxFailures = 5; const resetTimeout = 60000; // 1 minute rawEvents.stream!.addTransform( processedEvents.stream!, (event: RawEvent): ProcessedEvent => { if (failureCount >= maxFailures) { throw new Error("Circuit breaker open - too many failures"); } try { // Your processing logic here const result = processEvent(event); failureCount = 0; // Reset on success return result; } catch (error) { failureCount++; if (failureCount >= maxFailures) { setTimeout(() => { failureCount = 0; }, resetTimeout); } throw error; } }, { deadLetterQueue: eventDLQ } ); ``` ### Retry with Exponential Backoff ```ts filename="retry-backoff.ts" copy // Create a retry DLQ with delay processing const retryDLQ = new DeadLetterQueue("RetryDLQ"); retryDLQ.addTransform( processedEvents.stream!, (deadLetter): ProcessedEvent | null => { const retryCount = deadLetter.originalRecord.retryCount || 0; const maxRetries = 3; if (retryCount >= maxRetries) { console.log("Max retries exceeded, giving up"); return null; } // Calculate delay (exponential backoff) const delay = Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s setTimeout(() => { try { const originalEvent = deadLetter.asTyped(); // Add retry count to track attempts const eventWithRetry = { ...originalEvent, retryCount: retryCount + 1 }; // Retry the original processing logic processEvent(eventWithRetry); } catch (error) { // Will go back to DLQ with incremented retry count throw error; } }, delay); return null; // Don't emit immediately, wait for retry } ); ``` Dead letter queues add overhead to stream processing. Use them judiciously and monitor their impact on throughput. Consider implementing sampling for high-volume streams where occasional message loss is acceptable. Dead letter queue events can be integrated with monitoring systems like Prometheus, DataDog, or CloudWatch for alerting and dashboards. Consider tracking metrics like DLQ message rate, error types, and recovery success rates. ## Using Dead Letter Queues in Ingestion Pipelines Dead Letter Queues (DLQs) can be directly integrated with your ingestion pipelines to capture records that fail validation or processing at the API entry point. This ensures that no data is lost, even if it cannot be immediately processed. ```typescript filename="IngestPipelineWithDLQ.ts" copy interface ExampleSchema { id: string; name: string; value: number; timestamp: Date; } const pipeline = new IngestPipeline("example", { ingestApi: true, stream: true, table: true, deadLetterQueue: true, // Route failed ingestions to DLQ }); ``` See the [Ingestion API documentation](/moose/apis/ingest-api#validation) for more details and best practices on configuring DLQs for ingestion. --- ## Publish Data Source: moose/streaming/from-your-code.mdx Write data to streams from applications, APIs, or external sources # Publishing Data to Streams ## Overview Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines. ## Publishing Methods ### Using REST APIs The most common way to publish data is through Moose's built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint: ```typescript filename="PublishViaAPI.ts" // When you create an IngestPipeline with ingestApi: true, Moose automatically creates an API endpoint const rawData = new IngestPipeline("raw_data", { ingestApi: true, // Creates POST /ingest/raw_data endpoint stream: true, table: true }); // You can then publish data via HTTP POST requests const response = await fetch('/ingest/raw_data', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ id: '123', value: 42 }) }); ``` See the [OpenAPI documentation](/stack/open-api) to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs. ### Direct Stream Publishing You can publish directly to a stream from your Moose code using the stream's `send` method. This is useful when emitting events from workflows or other backend logic. `send` accepts a single record or an array of records. If your `Stream` is configured with `schemaConfig.kind = "JSON"`, Moose produces using the Confluent envelope automatically (0x00 + schema id + JSON). No code changes are needed beyond setting `schemaConfig`. See the [Schema Registry guide](/moose/streaming/schema-registry). ```ts filename="DirectPublish.ts" copy interface UserEvent { id: Key; userId: string; timestamp: Date; eventType: string; } // Create a stream (optionally configure destination to sync to a table) const events = new Stream("user-events"); // Publish a single record await events.send({ id: "evt_1", userId: "user_123", timestamp: new Date(), eventType: "click", }); // Publish multiple records await events.send([ { id: "evt_2", userId: "user_456", timestamp: new Date(), eventType: "view" }, { id: "evt_3", userId: "user_789", timestamp: new Date(), eventType: "signup" }, ]); ``` Moose builds the Kafka topic name from your stream name, optional namespace, and optional version (dots become underscores). For example, a stream named `events` with version `1.2.0` becomes `events_1_2_0` (or `my_ns.events_1_2_0` when the namespace is `"my_ns"`). ### Using the Kafka/Redpanda Client from External Applications You can also publish to streams from external applications using Kafka/Redpanda clients: See the [Kafka.js documentation](https://kafka.js.org/docs/getting-started) for more information on how to use the Kafka.js client to publish to streams. ```typescript filename="ExternalPublish.ts" const { Kafka } = KafkaJS; const kafka = new Kafka({ kafkaJS: { clientId: 'my-app', brokers: ['localhost:19092'] } }); const producer = kafka.producer(); await producer.connect(); // Publish to the stream topic await producer.send({ topic: 'user-events', // Stream name becomes the topic name messages: [ { key: 'event-123', value: JSON.stringify({ id: 'event-123', userId: 'user-456', timestamp: new Date().toISOString(), eventType: 'page_view' }) } ] }); ``` #### Locating Redpanda Connection Details When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the `moose.config.toml` file in the root of your project: ```toml filename="moose.config.toml" copy [redpanda_config] broker = "localhost:19092" message_timeout_ms = 1000 retention_ms = 30000 replication_factor = 1 ``` --- ## Schema Registry Source: moose/streaming/schema-registry.mdx Use Confluent Schema Registry with Moose streams (JSON Schema first) # Schema Registry Integration The first supported encoding is JSON Schema. Avro and Protobuf are planned. ## Overview Moose can publish and consume Kafka/Redpanda messages using Confluent Schema Registry. The first supported encoding is JSON Schema; Avro and Protobuf are planned. ## Configure Schema Registry URL Set the Schema Registry URL in `moose.config.toml` under `redpanda_config` (aliased as `kafka_config`). You can also override with environment variables. ```toml filename="moose.config.toml" copy [redpanda_config] broker = "localhost:19092" schema_registry_url = "http://localhost:8081" ``` Environment overrides (either key works): ```bash filename="Terminal" copy export MOOSE_REDPANDA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081 # or export MOOSE_KAFKA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081 ``` ## Referencing Schemas You can attach a Schema Registry reference to any `Stream` via `schemaConfig`. Use one of: - Subject latest: `{ subjectLatest: string }` - Subject and version: `{ subject: string, version: number }` - Schema id: `{ id: number }` ```ts filename="sr-stream.ts" copy {9,16-23} interface Event { id: string; value: number; } const schemaConfig: KafkaSchemaConfig = { kind: "JSON", reference: { subjectLatest: "event-value" }, }; ); // Producing uses Schema Registry envelope automatically await events.send({ id: "e1", value: 42 }); ``` ## Consuming SR JSON in Runners Moose streaming runners automatically detect the Confluent JSON envelope when consuming and strip the header before parsing the JSON. Your transformation code continues to work unchanged. ## Ingestion APIs and SR When an Ingest API routes to a topic that has a `schemaConfig` of kind JSON, Moose resolves the schema id and publishes requests using the Schema Registry envelope. You can also set the reference to a fixed `id` to skip lookups. ## Discover existing topics and schemas Use the CLI to pull external topics and optionally fetch JSON Schemas from Schema Registry to emit typed models. ```bash filename="Terminal" copy moose kafka pull \ --schema-registry http://localhost:8081 \ --path app/external-topics \ --include "*" \ --exclude "{__consumer_offsets,_schemas}" ``` This writes external topic declarations under the provided path based on language (default path is inferred). ## Current limitations - JSON Schema only (Avro/Protobuf planned) - Ingest API schema declared in code may not match the actual schema in registry. --- ## Sync to Table Source: moose/streaming/sync-to-table.mdx Automatically sync stream data to OLAP tables with intelligent batching # Sync to Table ## Overview Moose automatically handles batch writes between streams and OLAP tables through a **destination configuration**. When you specify a `destination` OLAP table for a stream, Moose provisions a background synchronization process that batches and writes data from the stream to the table. ### Basic Usage ```ts filename="SyncToTable.ts" copy {13} interface Event { id: Key; userId: string; timestamp: Date; eventType: string; } const eventsTable = new OlapTable("events"); const eventsStream = new Stream("events", { destination: eventsTable // This configures automatic batching }); ``` ## Setting Up Automatic Sync ### Using IngestPipeline (Easiest) The simplest way to set up automatic syncing is with an `IngestPipeline`, which creates all components and wires them together: ```ts filename="AutoSync.ts" copy interface Event { id: Key; userId: string; timestamp: Date; eventType: string; } // Creates stream, table, API, and automatic sync const eventsPipeline = new IngestPipeline("events", { ingestApi: true, // Creates HTTP endpoint at POST /ingest/events stream: true, // Creates buffering stream table: true // Creates destination table + auto-sync process }); ``` ### Standalone Components For more granular control, you can configure components individually: ```ts filename="ManualSync.ts" copy interface Event { id: Key; userId: string; timestamp: Date; eventType: string; } // Create table first const eventsTable = new OlapTable("events"); // Create stream with destination table (enables auto-sync) const eventsStream = new Stream("events", { destination: eventsTable // This configures automatic batching }); // Create API that writes to the stream const eventsApi = new IngestApi("events", { destination: eventsStream }); ``` ## How Automatic Syncing Works When you configure a stream with a `destination` table, Moose automatically handles the synchronization by managing a Rust process process in the background. Moose creates a **Rust background process** that: 1. **Consumes** messages from the stream (Kafka/Redpanda topic) 2. **Batches** records up to 100,000 or flushes every second (whichever comes first) 3. **Executes** optimized ClickHouse `INSERT` statements 4. **Commits** stream offsets after successful writes 5. **Retries** failed batches with exponential backoff Default batching parameters: | Parameter | Value | Description | |-----------|-------|-------------| | `MAX_BATCH_SIZE` | 100,000 records | Maximum records per batch insert | | `FLUSH_INTERVAL` | 1 second | Automatic flush regardless of batch size | Currently, you cannot configure the batching parameters, but we're interested in adding this feature. If you need this capability, let us know on slack! [ClickHouse inserts need to be batched for optimal performance](https://clickhouse.com/blog/asynchronous-data-inserts-in-clickhouse#data-needs-to-be-batched-for-optimal-performance). Moose automatically handles this optimization internally, ensuring your data is efficiently written to ClickHouse without any configuration required. ## Data Flow Example Here's how data flows through the automatic sync process: ```ts filename="DataFlow.ts" copy // 1. Data sent to ingestion API fetch('http://localhost:4000/ingest/events', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ id: 'evt_123', userId: 'user_456', timestamp: '2024-01-15T10:30:00Z', eventType: 'click' }) }) // 2. API validates and writes to stream // 3. Background sync process batches stream data // 4. Batch automatically written to ClickHouse table when: // - Batch reaches 100,000 records, OR // - 1 second has elapsed since last flush // 5. Data available for queries in events table sql`SELECT * FROM events WHERE userId = 'user_456';` ``` ## Monitoring and Observability The sync process provides built-in observability within the Moose runtime: - **Batch Insert Logs**: Records successful batch insertions with sizes and offsets - **Error Handling**: Logs transient failures with retry information - **Metrics**: Tracks throughput, batch sizes, and error rates - **Offset Tracking**: Maintains Kafka consumer group offsets for reliability --- ## Transformation Functions Source: moose/streaming/transform-functions.mdx Process and transform data in-flight between streams # Transformation Functions ## Overview Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination. ## Implementing Transformations ### Reshape and Enrich Data Transform data shape or enrich records: ```typescript filename="DataTransform.ts" interface RawEvent { id: Key; timestamp: string; data: { user_id: string; platform: string; app_version: string; ip_address: string; } } interface EnrichedEvent { eventId: Key; timestamp: Date; userId: Key; properties: { platform: string; version: string; country: string; }; metadata: { originalTimestamp: string; processedAt: Date; } } const rawStream = new Stream("raw_events"); const enrichedStream = new Stream("enriched_events"); // Reshape and enrich data rawStream.addTransform(enrichedStream, async (record: RawEvent) => ({ eventId: record.id, timestamp: new Date(record.timestamp), userId: record.data.user_id, properties: { platform: record.data.platform || 'unknown', version: record.data.app_version, country: await lookupCountry(record.data.ip_address) }, metadata: { originalTimestamp: record.timestamp, processedAt: new Date() } })); ``` ### Filtering Remove or filter records based on conditions: ```typescript filename="FilterStream.ts" interface MetricRecord { id: string; name: string; value: number; timestamp: Date; } const inputStream = new Stream("input_metrics"); const validMetrics = new Stream("valid_metrics"); // Multiple filtering conditions inputStream.addTransform(validMetrics, (record) => { // Filter out records with invalid values if (isNaN(record.value) || record.value < 0) { return undefined; } // Filter out old records if (record.timestamp < getStartOfDay()) { return undefined; } // Filter out specific metrics if (record.name.startsWith('debug_')) { return undefined; } return record; }); ``` ### Fan Out (1:N) Send data to multiple downstream processors: ```ts filename="FanOut.ts" copy interface Order { orderId: string; userId: string; amount: number; items: string[]; } interface HighPriorityOrder extends Order { priority: 'high'; } interface ArchivedOrder extends Order { archivedAt: Date; } // Define destination streams const analyticsStream = new Stream("order_analytics"); const notificationStream = new Stream("order_notifications"); const archiveStream = new Stream("order_archive"); // Source stream const orderStream = new Stream("orders"); // Send all orders to analytics orderStream.addTransform(analyticsStream, (order) => order); // Send large orders to notifications orderStream.addTransform(notificationStream, (order) => { if (order.amount > 1000) { return { ...order, priority: 'high' }; } return undefined; // Skip small orders }); // Archive all orders orderStream.addTransform(archiveStream, (order) => ({ ...order, archivedAt: new Date() })); ``` ### Fan In (N:1) Combine data from multiple sources: ```typescript filename="FanIn.ts" interface UserEvent { userId: Key; eventType: string; timestamp: Date; source: string; } // Source streams const webEvents = new Stream("web_events"); const mobileEvents = new Stream("mobile_events"); const apiEvents = new Stream("api_events"); // Create a stream and table for the combined events const eventsTable = new OlapTable("all_events"); const allEvents = new Stream("all_events", { destination: eventsTable }); // Fan in from web webEvents.addTransform(allEvents, (event) => ({ ...event, source: 'web', timestamp: new Date() })); // Fan in from mobile mobileEvents.addTransform(allEvents, (event) => ({ ...event, source: 'mobile', timestamp: new Date() })); // Fan in from API apiEvents.addTransform(allEvents, (event) => ({ ...event, source: 'api', timestamp: new Date() })); ``` ### Unnesting Flatten nested records: ```typescript filename="Unnest.ts" interface NestedRecord { id: Key; nested: { value: number; }[]; } interface FlattenedRecord { id: Key; value: number; } const nestedStream = new Stream("nested_records"); const flattenedStream = new Stream("flattened_records"); nestedStream.addTransform(flattenedStream, (record) => record.nested.map((n) => ({ id: record.id, value: n.value }))); ``` You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either: - Use conditional logic inside a single streaming function to handle different cases, or - Implement a fan-out/fan-in pattern, where you route records to different intermediate streams and then merge them back into the destination stream. ## Error Handling with Dead Letter Queues When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline. ```typescript filename="DeadLetterQueue.ts" copy interface UserEvent { userId: string; action: string; timestamp: number; } interface ProcessedEvent { userId: string; action: string; processedAt: Date; isValid: boolean; } // Create pipelines const rawEvents = new IngestPipeline("raw_events", { ingestApi: true, stream: true, table: false }); const processedEvents = new IngestPipeline("processed_events", { ingestApi: false, stream: true, table: true }); // Create dead letter queue for failed transformations const eventDLQ = new DeadLetterQueue("EventDLQ"); // Add transform with error handling rawEvents.stream!.addTransform( processedEvents.stream!, (event: UserEvent): ProcessedEvent => { // This might fail for invalid data if (!event.userId || event.userId.length === 0) { throw new Error("Invalid userId: cannot be empty"); } return { userId: event.userId, action: event.action, processedAt: new Date(), isValid: true }; }, { deadLetterQueue: eventDLQ // Failed messages go here } ); // Monitor dead letter messages eventDLQ.addConsumer((deadLetter) => { console.log(`Error: ${deadLetter.errorMessage}`); console.log(`Failed at: ${deadLetter.failedAt}`); // Access original typed data const originalEvent: UserEvent = deadLetter.asTyped(); console.log(`Original User ID: ${originalEvent.userId}`); }); ``` For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the [Dead Letter Queues guide](./dead-letter-queues).