Transformation Functions
Viewing:
Overview
Transformations allow you to process and reshape data as it flows between streams. You can filter, enrich, reshape, and combine data in-flight before it reaches its destination.
Implementing Transformations
Reshape and Enrich Data
Transform data shape or enrich records:
import { Stream, Key } from "@514labs/moose-lib";
interface RawEvent {
id: Key<string>;
timestamp: string;
data: {
user_id: string;
platform: string;
app_version: string;
ip_address: string;
}
}
interface EnrichedEvent {
eventId: Key<string>;
timestamp: Date;
userId: Key<string>;
properties: {
platform: string;
version: string;
country: string;
};
metadata: {
originalTimestamp: string;
processedAt: Date;
}
}
const rawStream = new Stream<RawEvent>("raw_events");
const enrichedStream = new Stream<EnrichedEvent>("enriched_events");
// Reshape and enrich data
rawStream.addTransform(enrichedStream, async (record: RawEvent) => ({
eventId: record.id,
timestamp: new Date(record.timestamp),
userId: record.data.user_id,
properties: {
platform: record.data.platform || 'unknown',
version: record.data.app_version,
country: await lookupCountry(record.data.ip_address)
},
metadata: {
originalTimestamp: record.timestamp,
processedAt: new Date()
}
}));
from moose_lib import Stream, Key
from pydantic import BaseModel
from datetime import datetime
class EventProperties(BaseModel):
user_id: str
platform: str
app_version: str
ip_address: str
class RawEvent(BaseModel):
id: Key[str]
timestamp: str
data: EventProperties
class EnrichedEventProperties(BaseModel):
platform: str
version: str
country: str
class EnrichedEventMetadata(BaseModel):
originalTimestamp: str
processedAt: datetime
class EnrichedEvent(BaseModel):
eventId: Key[str]
timestamp: datetime
userId: Key[str]
properties: EnrichedEventProperties
metadata: EnrichedEventMetadata
raw_stream = Stream[RawEvent]("raw_events")
enriched_stream = Stream[EnrichedEvent]("enriched_events")
raw_stream.add_transform(destination=enriched_stream, transformation=lambda record: EnrichedEvent(
eventId=record.id,
timestamp=datetime.fromisoformat(record.timestamp),
userId=record.data.user_id,
properties=EnrichedEventProperties(
platform=record.data.platform,
version=record.data.app_version,
country=lookupCountry(record.data.ip_address)
),
metadata=EnrichedEventMetadata(
originalTimestamp=record.timestamp,
processedAt=datetime.now()
)
))
Filtering
Remove or filter records based on conditions:
interface MetricRecord {
id: string;
name: string;
value: number;
timestamp: Date;
}
const inputStream = new Stream<MetricRecord>("input_metrics");
const validMetrics = new Stream<MetricRecord>("valid_metrics");
// Multiple filtering conditions
inputStream.addTransform(validMetrics, (record) => {
// Filter out records with invalid values
if (isNaN(record.value) || record.value < 0) {
return undefined;
}
// Filter out old records
if (record.timestamp < getStartOfDay()) {
return undefined;
}
// Filter out specific metrics
if (record.name.startsWith('debug_')) {
return undefined;
}
return record;
});
from moose_lib import Stream, Key
from pydantic import BaseModel
class MetricRecord(BaseModel):
id: Key[str]
name: str
value: float
timestamp: Date
class ValidMetrics(BaseModel):
id: Key[str]
name: str
value: float
timestamp: Date
input_stream = Stream[MetricRecord]("input_metrics")
valid_metrics = Stream[ValidMetrics]("valid_metrics")
def filter_function(record: MetricRecord) -> ValidMetrics | None:
if record.value > 0 and record.timestamp > getStartOfDay() and not record.name.startswith('debug_'):
return ValidMetrics(
id=record.id,
name=record.name,
value=record.value,
timestamp=record.timestamp
)
return None
input_stream.add_transform(destination=valid_metrics, transformation=filter_function)
Fan Out (1:N)
Send data to multiple downstream processors:
interface Order {
orderId: string;
userId: string;
amount: number;
items: string[];
}
interface HighPriorityOrder extends Order {
priority: 'high';
}
interface ArchivedOrder extends Order {
archivedAt: Date;
}
// Define destination streams
const analyticsStream = new Stream<Order>("order_analytics");
const notificationStream = new Stream<HighPriorityOrder>("order_notifications");
const archiveStream = new Stream<ArchivedOrder>("order_archive");
// Source stream
const orderStream = new Stream<Order>("orders");
// Send all orders to analytics
orderStream.addTransform(analyticsStream, (order) => order);
// Send large orders to notifications
orderStream.addTransform(notificationStream, (order) => {
if (order.amount > 1000) {
return {
...order,
priority: 'high'
};
}
return undefined; // Skip small orders
});
// Archive all orders
orderStream.addTransform(archiveStream, (order) => ({
...order,
archivedAt: new Date()
}));
from moose_lib import Stream, Key
from pydantic import BaseModel
# Define data models
class Order(BaseModel):
orderId: Key[str]
userId: Key[str]
amount: float
items: List[str]
class HighPriorityOrder(Order):
priority: str = 'high'
class ArchivedOrder(Order):
archivedAt: Date
# Create source and destination streams
order_stream = Stream[Order]("orders")
analytics_stream = Stream[Order]("order_analytics")
notification_stream = Stream[HighPriorityOrder]("order_notifications")
archive_stream = Stream[ArchivedOrder]("order_archive")
# Send all orders to analytics
def analytics_transform(order: Order) -> Order:
return order
order_stream.add_transform(destination=analytics_stream, transformation=analytics_transform)
# Send large orders to notifications
def high_priority_transform(order: Order) -> HighPriorityOrder | None:
if order.amount > 1000:
return HighPriorityOrder(
orderId=order.orderId,
userId=order.userId,
amount=order.amount,
items=order.items,
priority='high'
)
return None # Skip small orders
order_stream.add_transform(destination=notification_stream, transformation=high_priority_transform)
# Archive all orders with timestamp
def archive_transform(order: Order) -> ArchivedOrder | None:
return ArchivedOrder(
orderId=order.orderId,
userId=order.userId,
amount=order.amount,
items=order.items,
archivedAt=datetime.now()
)
order_stream.add_transform(destination=archive_stream, transformation=archive_transform)
Fan In (N:1)
Combine data from multiple sources:
import { Stream, OlapTable, Key } from "@514labs/moose-lib";
interface UserEvent {
userId: Key<string>;
eventType: string;
timestamp: Date;
source: string;
}
// Source streams
const webEvents = new Stream<UserEvent>("web_events");
const mobileEvents = new Stream<UserEvent>("mobile_events");
const apiEvents = new Stream<UserEvent>("api_events");
// Create a stream and table for the combined events
const eventsTable = new OlapTable<UserEvent>("all_events");
const allEvents = new Stream<UserEvent>("all_events", {
destination: eventsTable
});
// Fan in from web
webEvents.addTransform(allEvents, (event) => ({
...event,
source: 'web',
timestamp: new Date()
}));
// Fan in from mobile
mobileEvents.addTransform(allEvents, (event) => ({
...event,
source: 'mobile',
timestamp: new Date()
}));
// Fan in from API
apiEvents.addTransform(allEvents, (event) => ({
...event,
source: 'api',
timestamp: new Date()
}));
from moose_lib import Stream, OlapTable, Key, StreamConfig
class UserEvent(BaseModel):
userId: Key[str]
eventType: str
timestamp: Date
source: str
# Create source and destination streams
web_events = Stream[UserEvent]("web_events")
mobile_events = Stream[UserEvent]("mobile_events")
api_events = Stream[UserEvent]("api_events")
# Create a stream and table for the combined events
events_table = OlapTable[UserEvent]("all_events")
all_events = Stream[UserEvent]("all_events", StreamConfig(
destination=events_table
))
# Fan in from web
def web_transform(event: UserEvent) -> UserEvent:
return UserEvent(
userId=event.userId,
eventType=event.eventType,
timestamp=event.timestamp,
source='web'
)
web_events.add_transform(destination=all_events, transformation=web_transform)
# Fan in from mobile
def mobile_transform(event: UserEvent) -> UserEvent:
return UserEvent(
userId=event.userId,
eventType=event.eventType,
timestamp=event.timestamp,
source='mobile'
)
mobile_events.add_transform(destination=all_events, transformation=mobile_transform)
# Fan in from API
def api_transform(event: UserEvent) -> UserEvent:
return UserEvent(
userId=event.userId,
eventType=event.eventType,
timestamp=event.timestamp,
source='api'
)
api_events.add_transform(destination=all_events, transformation=api_transform)
Unnesting
Flatten nested records:
import { Stream, Key } from "@514labs/moose-lib";
interface NestedRecord {
id: Key<string>;
nested: {
value: number;
}[];
}
interface FlattenedRecord {
id: Key<string>;
value: number;
}
const nestedStream = new Stream<NestedRecord>("nested_records");
const flattenedStream = new Stream<FlattenedRecord>("flattened_records");
nestedStream.addTransform(flattenedStream, (record) => record.nested.map((n) => ({
id: record.id,
value: n.value
})));
from moose_lib import Stream, Key
class NestedRecord(BaseModel):
id: Key[str]
nested: List[NestedValue]
class FlattenedRecord(BaseModel):
id: Key[str]
value: int
nested_stream = Stream[NestedRecord]("nested_records")
flattened_stream = Stream[FlattenedRecord]("flattened_records")
def unnest_transform(record: NestedRecord) -> List[FlattenedRecord]:
result = []
for nested in record.nested:
result.append(FlattenedRecord(
id=record.id,
value=nested.value
))
return result
nested_stream.add_transform(flattened_stream, unnest_transform)
Multiple transformation routes limitation
You cannot have multiple transforms between the same source and destination stream. If you need multiple transformation routes, you must either:
- Use conditional logic inside a single streaming function to handle different cases, or
- Implement a fan-out/fan-in pattern, where you route records to different intermediate streams and then merge them back into the destination stream.
Error Handling with Dead Letter Queues
When stream processing fails, you can configure dead letter queues to capture failed messages for later analysis and recovery. This prevents single message failures from stopping your entire pipeline.
import { DeadLetterQueue, IngestPipeline } from "@514labs/moose-lib";
interface UserEvent {
userId: string;
action: string;
timestamp: number;
}
interface ProcessedEvent {
userId: string;
action: string;
processedAt: Date;
isValid: boolean;
}
// Create pipelines
const rawEvents = new IngestPipeline<UserEvent>("raw_events", {
ingest: true,
stream: true,
table: false
});
const processedEvents = new IngestPipeline<ProcessedEvent>("processed_events", {
ingest: false,
stream: true,
table: true
});
// Create dead letter queue for failed transformations
const eventDLQ = new DeadLetterQueue<UserEvent>("EventDLQ");
// Add transform with error handling
rawEvents.stream!.addTransform(
processedEvents.stream!,
(event: UserEvent): ProcessedEvent => {
// This might fail for invalid data
if (!event.userId || event.userId.length === 0) {
throw new Error("Invalid userId: cannot be empty");
}
return {
userId: event.userId,
action: event.action,
processedAt: new Date(),
isValid: true
};
},
{
deadLetterQueue: eventDLQ // Failed messages go here
}
);
// Monitor dead letter messages
eventDLQ.addConsumer((deadLetter) => {
console.log(`Error: ${deadLetter.errorMessage}`);
console.log(`Failed at: ${deadLetter.failedAt}`);
// Access original typed data
const originalEvent: UserEvent = deadLetter.asTyped();
console.log(`Original User ID: ${originalEvent.userId}`);
});
from moose_lib import DeadLetterQueue, IngestPipeline, TransformConfig, DeadLetterModel
from pydantic import BaseModel
from datetime import datetime
class UserEvent(BaseModel):
user_id: str
action: str
timestamp: float
class ProcessedEvent(BaseModel):
user_id: str
action: str
processed_at: datetime
is_valid: bool
# Create pipelines
raw_events = IngestPipeline[UserEvent]("raw_events", {
"ingest": True,
"stream": True,
"table": False
})
processed_events = IngestPipeline[ProcessedEvent]("processed_events", {
"ingest": False,
"stream": True,
"table": True
})
# Create dead letter queue for failed transformations
event_dlq = DeadLetterQueue[UserEvent](name="EventDLQ")
def process_event(event: UserEvent) -> ProcessedEvent:
# This might fail for invalid data
if not event.user_id or len(event.user_id) == 0:
raise ValueError("Invalid user_id: cannot be empty")
return ProcessedEvent(
user_id=event.user_id,
action=event.action,
processed_at=datetime.now(),
is_valid=True
)
# Add transform with error handling
raw_events.get_stream().add_transform(
destination=processed_events.get_stream(),
transformation=process_event,
config=TransformConfig(
dead_letter_queue=event_dlq # Failed messages go here
)
)
def monitor_dead_letters(dead_letter: DeadLetterModel[UserEvent]) -> None:
print(f"Error: {dead_letter.error_message}")
print(f"Failed at: {dead_letter.failed_at}")
# Access original typed data
original_event: UserEvent = dead_letter.as_typed()
print(f"Original User ID: {original_event.user_id}")
# Monitor dead letter messages
event_dlq.add_consumer(monitor_dead_letters)
Learn More
For comprehensive dead letter queue patterns, recovery strategies, and best practices, see the Dead Letter Queues guide.