Streaming
Viewing:
Overview
The Streaming capability provides standalone real-time data processing with Kafka/Redpanda topics. You can use this capability independently to build event-driven architectures, data transformations, and real-time pipelines without requiring other MooseStack components.
Basic Usage
Stream.ts
import { Stream } from "@514labs/moose-lib";
interface ExampleEvent {
id: string;
userId: string;
timestamp: Date;
eventType: string;
}
// Create a standalone stream for events
const exampleStream = new Stream<ExampleEvent>("streaming-topic-name", {
// Optional: specify destination table
destination: new OlapTable<ExampleEvent>("table-name")
});
// Add consumers for real-time processing
exampleStream.addConsumer((event) => {
console.log("Processing event:", event);
// Custom processing logic here
});
Stream.py
from moose_lib import Stream
from pydantic import BaseModel
from datetime import datetime
class ExampleEvent(BaseModel):
id: str
user_id: str
timestamp: datetime
event_type: str
# Create a standalone stream for user events
example_stream = Stream[ExampleEvent]("streaming-topic-name")
# Add consumers for real-time processing
def process_event(event: ExampleEvent):
print(f"Processing event: {event}")
# Custom processing logic here
example_stream.add_consumer(process_event)
Enabling Streaming
To enable streaming, you need to ensure that the streaming_engine
feature flag is set to true
in your moose.config.toml
file:
[features]
streaming_engine = true
Core Capabilities
Integration with Other Capabilities
The Streaming capability can be used independently, or in conjunction with other MooseStack modules: