API Reference
Viewing:
This is a comprehensive reference for @514labs/moose-lib
moose_lib
, detailing all exported components, types, and utilities.
Core Types
Key<T extends string | number | Date>
A type for marking fields as primary keys in data models.
// Example
interface MyModel {
id: Key<string>; // Marks 'id' as a primary key of type string
}
JWT<T extends object>
A type for working with JSON Web Tokens.
// Example
type UserJWT = JWT<{ userId: string, role: string }>;
ApiUtil
Interface providing utilities for analytics APIs.
interface ApiUtil {
client: MooseClient; // Client for interacting with the database
sql: typeof sql; // SQL template tag function
jwt: JWTPayload | undefined; // Current JWT if available
}
Infrastructure Components
OlapTable<T>
Creates a ClickHouse table with the schema of type T.
// Basic usage with MergeTree (default)
export const myTable = new OlapTable<UserProfile>("user_profiles");
// With sorting configuration (fields)
export const myConfiguredTable = new OlapTable<UserProfile>("user_profiles", {
orderByFields: ["id", "timestamp"]
});
// With sorting configuration (expression)
export const myConfiguredTableExpr = new OlapTable<UserProfile>("user_profiles_expr", {
// Equivalent to orderByFields: ["id", "timestamp"]
orderByExpression: "(id, timestamp)"
});
// Disable sorting entirely
export const myUnsortedTable = new OlapTable<UserProfile>("user_profiles_unsorted", {
orderByExpression: "tuple()"
});
// For deduplication, explicitly set the ReplacingMergeTree engine
export const dedupTable = new OlapTable<UserProfile>("user_profiles", {
engine: ClickHouseEngines.ReplacingMergeTree,
orderByFields: ["id", "timestamp"],
ver: "updated_at", // Optional: version column for keeping latest
isDeleted: "deleted" // Optional: soft delete marker (requires ver)
});
Stream<T>
Creates a Redpanda topic with the schema of type T.
// Basic usage
export const myStream = new Stream<UserEvent>("user_events");
// With configuration
export const myConfiguredStream = new Stream<UserEvent>("user_events", {
parallelism: 3,
retentionPeriod: 86400 // 1 day in seconds
});
// Adding transformations
myConfiguredStream.addTransform(
destinationStream,
(record) => transformFunction(record)
);
IngestApi<T>
Creates an HTTP endpoint for ingesting data of type T.
// Basic usage with destination stream
export const myIngestApi = new IngestApi<UserEvent>("user_events", {
destination: myUserEventStream
});
Api<T, R>
Creates an HTTP endpoint for querying data with request type T and response type R.
// Basic usage
export const myApi = new Api<UserQuery, UserProfile[]>(
"getUserProfiles",
async (params, { client, sql }) => {
const result = await client.query.execute(
sql`SELECT * FROM user_profiles WHERE age > ${params.minAge} LIMIT 10`
);
return result;
}
);
IngestPipeline<T>
Combines ingest API, stream, and table creation in a single component.
// Basic usage
export const pipeline = new IngestPipeline<UserEvent>("user_pipeline", {
ingestApi: true,
stream: true,
table: true
});
// With advanced configuration
export const advancedPipeline = new IngestPipeline<UserEvent>("advanced_pipeline", {
ingestApi: true,
stream: { parallelism: 3 },
table: {
orderByFields: ["id", "timestamp"]
}
});
MaterializedView<T>
Creates a materialized view in ClickHouse.
// Basic usage
export const view = new MaterializedView<UserStatistics>({
selectStatement: "SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id",
tableName: "user_events",
materializedViewName: "user_statistics",
orderByFields: ["user_id"]
});
SQL Utilities
sql
Template Tag
Template tag for creating type-safe SQL queries with parameters.
// Basic usage
const query = sql`SELECT * FROM users WHERE id = ${userId}`;
// With multiple parameters
const query = sql`
SELECT * FROM users
WHERE age > ${minAge}
AND country = ${country}
LIMIT ${limit}
`;
MooseClient
Client for interacting with ClickHouse and Temporal.
class MooseClient {
query: QueryClient; // For database queries
workflow: WorkflowClient; // For workflow operations
}
ClickHouse Utilities
Table Engine Configurations
ClickHouseEngines
Enum
Available table engines:
enum ClickHouseEngines {
MergeTree = "MergeTree",
ReplacingMergeTree = "ReplacingMergeTree",
AggregatingMergeTree = "AggregatingMergeTree",
SummingMergeTree = "SummingMergeTree",
ReplicatedMergeTree = "ReplicatedMergeTree",
ReplicatedReplacingMergeTree = "ReplicatedReplacingMergeTree",
ReplicatedAggregatingMergeTree = "ReplicatedAggregatingMergeTree",
ReplicatedSummingMergeTree = "ReplicatedSummingMergeTree",
S3Queue = "S3Queue"
}
ReplacingMergeTreeConfig<T>
Configuration for ReplacingMergeTree tables:
type ReplacingMergeTreeConfig<T> = {
engine: ClickHouseEngines.ReplacingMergeTree;
orderByFields?: (keyof T & string)[];
ver?: keyof T & string; // Optional: version column for keeping latest
isDeleted?: keyof T & string; // Optional: soft delete marker (requires ver)
settings?: { [key: string]: string };
}
Replicated Engine Configurations
Configuration for replicated table engines:
// ReplicatedMergeTree
type ReplicatedMergeTreeConfig<T> = {
engine: ClickHouseEngines.ReplicatedMergeTree;
keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud)
replicaName?: string; // Optional: replica name (omit for Cloud)
orderByFields?: (keyof T & string)[];
settings?: { [key: string]: string };
}
// ReplicatedReplacingMergeTree
type ReplicatedReplacingMergeTreeConfig<T> = {
engine: ClickHouseEngines.ReplicatedReplacingMergeTree;
keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud)
replicaName?: string; // Optional: replica name (omit for Cloud)
ver?: keyof T & string; // Optional: version column
isDeleted?: keyof T & string; // Optional: soft delete marker
orderByFields?: (keyof T & string)[];
settings?: { [key: string]: string };
}
// ReplicatedAggregatingMergeTree
type ReplicatedAggregatingMergeTreeConfig<T> = {
engine: ClickHouseEngines.ReplicatedAggregatingMergeTree;
keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud)
replicaName?: string; // Optional: replica name (omit for Cloud)
orderByFields?: (keyof T & string)[];
settings?: { [key: string]: string };
}
// ReplicatedSummingMergeTree
type ReplicatedSummingMergeTreeConfig<T> = {
engine: ClickHouseEngines.ReplicatedSummingMergeTree;
keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud)
replicaName?: string; // Optional: replica name (omit for Cloud)
columns?: string[]; // Optional: columns to sum
orderByFields?: (keyof T & string)[];
settings?: { [key: string]: string };
}
Note: The keeperPath
and replicaName
parameters are optional. When omitted, Moose uses smart defaults that work in both ClickHouse Cloud and self-managed environments (default path: /clickhouse/tables/{uuid}/{shard}
with replica {replica}
). You can still provide both parameters explicitly if you need custom replication paths.
S3QueueTableSettings
Type-safe interface for S3Queue-specific table settings (ClickHouse 24.7+).
interface S3QueueTableSettings {
mode?: "ordered" | "unordered"; // Processing mode
after_processing?: "keep" | "delete"; // File handling after processing
keeper_path?: string; // ZooKeeper path for coordination
loading_retries?: string; // Number of retry attempts
processing_threads_num?: string; // Parallel processing threads
// ... and many more settings
}
S3Queue Configuration
Configure S3Queue tables for streaming data from S3 buckets (ORDER BY is not supported):
export const s3Events = new OlapTable<S3Event>("s3_events", {
engine: ClickHouseEngines.S3Queue,
s3Path: "s3://bucket/data/*.json",
format: "JSONEachRow",
settings: {
mode: "unordered",
keeper_path: "/clickhouse/s3queue/events"
}
});
Task Management
Task<T, R>
A class that represents a single task within a workflow system.
// No input, no output
export const task1 = new Task<null, void>("task1", {
run: async () => {
console.log("No input/output");
},
retries: 3,
timeout: "30s"
});
// With input and output
export const task2 = new Task<InputType, OutputType>("task2", {
run: async (ctx) => {
return process(ctx.input);
},
onComplete: [nextTask]
});
TaskContext<T>
A context object that includes input & state passed between the task’s run/cancel functions.
export type TaskContext<T> = T extends null ? { state: any; input?: null } : { state: any; input: T };
TaskConfig<T, R>
Configuration options for tasks.
interface TaskConfig<T, R> {
// The main function that executes the task logic
run: (context: TaskContext<T>) => Promise<R>;
// Optional array of tasks to execute after this task completes
onComplete?: (Task<R extends void ? null : R, any> | Task<R extends void ? null : R, void>)[];
// Optional function that is called when the task is cancelled.
onCancel?: (context: TaskContext<T>) => Promise<void>;
// Optional timeout duration (e.g., "30s", "5m", "never")
timeout?: string;
// Optional number of retry attempts
retries?: number;
}
Workflow
A class that represents a complete workflow composed of interconnected tasks.
const myWorkflow = new Workflow("getData", {
startingTask: callAPI,
schedule: "@every 5s", // Run every 5 seconds
timeout: "1h",
retries: 3
});
WorkflowConfig
Configuration options for defining a workflow.
interface WorkflowConfig {
// The initial task that begins the workflow execution
startingTask: Task<null, any> | Task<null, void> | Task<any, any> | Task<any, void>;
// Optional number of retry attempts
retries?: number;
// Optional timeout duration (e.g., "10m", "1h", "never")
timeout?: string;
// Optional cron-style schedule string
schedule?: string;
}
Ensure your Infrastructure Components is correctly exported from your app/index.ts
file.
Ensure your Infrastructure Components is correctly imported into your main.py
file.
Learn more about export pattern: local development / hosted.
Important: The following components must be exported from your app/index.ts
file for Moose to detect them:
OlapTable
instancesStream
instancesIngestApi
instancesApi
instancesIngestPipeline
instancesMaterializedView
instancesTask
instancesWorkflow
instances
Configuration objects and utilities (like DeadLetterQueue
, Key
, sql
) do not need to be exported as they are used as dependencies of the main components.