API Reference
Viewing:
This is a comprehensive reference for @514labs/moose-lib moose_lib , detailing all exported components, types, and utilities.
Core Types
Key<T extends string | number | Date>
A type for marking fields as primary keys in data models.
// Example
interface MyModel {
id: Key<string>; // Marks 'id' as a primary key of type string
}JWT<T extends object>
A type for working with JSON Web Tokens.
// Example
type UserJWT = JWT<{ userId: string, role: string }>;ApiUtil
Interface providing utilities for analytics APIs.
interface ApiUtil {
client: MooseClient; // Client for interacting with the database
sql: typeof sql; // SQL template tag function
jwt: JWTPayload | undefined; // Current JWT if available
}Infrastructure Components
OlapTable<T>
Creates a ClickHouse table with the schema of type T.
// Basic usage with MergeTree (default)
export const myTable = new OlapTable<UserProfile>("user_profiles");
// With sorting configuration (fields)
export const myConfiguredTable = new OlapTable<UserProfile>("user_profiles", {
orderByFields: ["id", "timestamp"]
});
// With sorting configuration (expression)
export const myConfiguredTableExpr = new OlapTable<UserProfile>("user_profiles_expr", {
// Equivalent to orderByFields: ["id", "timestamp"]
orderByExpression: "(id, timestamp)"
});
// Disable sorting entirely
export const myUnsortedTable = new OlapTable<UserProfile>("user_profiles_unsorted", {
orderByExpression: "tuple()"
});
// For deduplication, explicitly set the ReplacingMergeTree engine
export const dedupTable = new OlapTable<UserProfile>("user_profiles", {
engine: ClickHouseEngines.ReplacingMergeTree,
orderByFields: ["id", "timestamp"],
ver: "updated_at", // Optional: version column for keeping latest
isDeleted: "deleted" // Optional: soft delete marker (requires ver)
});BaseOlapConfig<T>
Base configuration interface for OlapTable with common table configuration options.
interface BaseOlapConfig<T> {
// Optional database name (defaults to moose.config.toml clickhouse_config.db_name)
database?: string;
// Optional array of field names to order by
orderByFields?: (keyof T & string)[];
// Optional SQL expression for ORDER BY clause (alternative to orderByFields)
orderByExpression?: string;
// Optional table engine (defaults to MergeTree)
engine?: ClickHouseEngines;
// Optional settings for table configuration
settings?: { [key: string]: string };
// Optional lifecycle mode (defaults to MOOSE_MANAGED)
lifeCycle?: LifeCycle;
// Additional engine-specific fields (ver, isDeleted, keeperPath, etc.)
// depend on the engine type
}Example with database override:
// Table in custom database
export const analyticsTable = new OlapTable<UserEvent>("user_events", {
database: "analytics", // Override default database
orderByFields: ["id", "timestamp"]
});
// Default database (from moose.config.toml)
export const defaultTable = new OlapTable<UserEvent>("user_events", {
orderByFields: ["id", "timestamp"]
});Stream<T>
Creates a Redpanda topic with the schema of type T.
// Basic usage
export const myStream = new Stream<UserEvent>("user_events");
// With configuration
export const myConfiguredStream = new Stream<UserEvent>("user_events", {
parallelism: 3,
retentionPeriod: 86400 // 1 day in seconds
});
// Adding transformations
myConfiguredStream.addTransform(
destinationStream,
(record) => transformFunction(record)
);IngestApi<T>
Creates an HTTP endpoint for ingesting data of type T.
// Basic usage with destination stream
export const myIngestApi = new IngestApi<UserEvent>("user_events", {
destination: myUserEventStream
});Api<T, R>
Creates an HTTP endpoint for querying data with request type T and response type R.
// Basic usage
export const myApi = new Api<UserQuery, UserProfile[]>(
"getUserProfiles",
async (params, { client, sql }) => {
const result = await client.query.execute(
sql`SELECT * FROM user_profiles WHERE age > ${params.minAge} LIMIT 10`
);
return result;
}
);IngestPipeline<T>
Combines ingest API, stream, and table creation in a single component.
// Basic usage
export const pipeline = new IngestPipeline<UserEvent>("user_pipeline", {
ingestApi: true,
stream: true,
table: true
});
// With advanced configuration
export const advancedPipeline = new IngestPipeline<UserEvent>("advanced_pipeline", {
ingestApi: true,
stream: { parallelism: 3 },
table: {
orderByFields: ["id", "timestamp"]
}
});MaterializedView<T>
Creates a materialized view in ClickHouse.
// Basic usage
export const view = new MaterializedView<UserStatistics>({
selectStatement: "SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id",
tableName: "user_events",
materializedViewName: "user_statistics",
orderByFields: ["user_id"]
});SQL Utilities
sql Template Tag
Template tag for creating type-safe SQL queries with parameters.
// Basic usage
const query = sql`SELECT * FROM users WHERE id = ${userId}`;
// With multiple parameters
const query = sql`
SELECT * FROM users
WHERE age > ${minAge}
AND country = ${country}
LIMIT ${limit}
`;MooseClient
Client for interacting with ClickHouse and Temporal.
class MooseClient {
query: QueryClient; // For database queries
workflow: WorkflowClient; // For workflow operations
}ClickHouse Utilities
Table Engine Configurations
ClickHouseEngines Enum
Available table engines:
enum ClickHouseEngines {
MergeTree = "MergeTree",
ReplacingMergeTree = "ReplacingMergeTree",
AggregatingMergeTree = "AggregatingMergeTree",
SummingMergeTree = "SummingMergeTree",
ReplicatedMergeTree = "ReplicatedMergeTree",
ReplicatedReplacingMergeTree = "ReplicatedReplacingMergeTree",
ReplicatedAggregatingMergeTree = "ReplicatedAggregatingMergeTree",
ReplicatedSummingMergeTree = "ReplicatedSummingMergeTree",
S3Queue = "S3Queue"
}ReplacingMergeTreeConfig<T>
Configuration for ReplacingMergeTree tables:
type ReplacingMergeTreeConfig<T> = {
engine: ClickHouseEngines.ReplacingMergeTree;
orderByFields?: (keyof T & string)[];
ver?: keyof T & string; // Optional: version column for keeping latest
isDeleted?: keyof T & string; // Optional: soft delete marker (requires ver)
settings?: { [key: string]: string };
}Replicated Engine Configurations
Configuration for replicated table engines:
// ReplicatedMergeTree
type ReplicatedMergeTreeConfig<T> = {
engine: ClickHouseEngines.ReplicatedMergeTree;
keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud)
replicaName?: string; // Optional: replica name (omit for Cloud)
orderByFields?: (keyof T & string)[];
settings?: { [key: string]: string };
}
// ReplicatedReplacingMergeTree
type ReplicatedReplacingMergeTreeConfig<T> = {
engine: ClickHouseEngines.ReplicatedReplacingMergeTree;
keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud)
replicaName?: string; // Optional: replica name (omit for Cloud)
ver?: keyof T & string; // Optional: version column
isDeleted?: keyof T & string; // Optional: soft delete marker
orderByFields?: (keyof T & string)[];
settings?: { [key: string]: string };
}
// ReplicatedAggregatingMergeTree
type ReplicatedAggregatingMergeTreeConfig<T> = {
engine: ClickHouseEngines.ReplicatedAggregatingMergeTree;
keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud)
replicaName?: string; // Optional: replica name (omit for Cloud)
orderByFields?: (keyof T & string)[];
settings?: { [key: string]: string };
}
// ReplicatedSummingMergeTree
type ReplicatedSummingMergeTreeConfig<T> = {
engine: ClickHouseEngines.ReplicatedSummingMergeTree;
keeperPath?: string; // Optional: ZooKeeper/Keeper path (omit for Cloud)
replicaName?: string; // Optional: replica name (omit for Cloud)
columns?: string[]; // Optional: columns to sum
orderByFields?: (keyof T & string)[];
settings?: { [key: string]: string };
}Note: The keeperPath and replicaName parameters are optional. When omitted, Moose uses smart defaults that work in both ClickHouse Cloud and self-managed environments (default path: /clickhouse/tables/{uuid}/{shard} with replica {replica}). You can still provide both parameters explicitly if you need custom replication paths.
S3QueueTableSettings
Type-safe interface for S3Queue-specific table settings (ClickHouse 24.7+).
interface S3QueueTableSettings {
mode?: "ordered" | "unordered"; // Processing mode
after_processing?: "keep" | "delete"; // File handling after processing
keeper_path?: string; // ZooKeeper path for coordination
loading_retries?: string; // Number of retry attempts
processing_threads_num?: string; // Parallel processing threads
// ... and many more settings
}S3Queue Configuration
Configure S3Queue tables for streaming data from S3 buckets (ORDER BY is not supported):
export const s3Events = new OlapTable<S3Event>("s3_events", {
engine: ClickHouseEngines.S3Queue,
s3Path: "s3://bucket/data/*.json",
format: "JSONEachRow",
settings: {
mode: "unordered",
keeper_path: "/clickhouse/s3queue/events"
}
});S3 Configuration
Configure S3 tables for direct read/write access to S3 storage:
export const s3Data = new OlapTable<DataRecord>("s3_data", {
engine: ClickHouseEngines.S3,
path: "s3://bucket/data/file.json",
format: "JSONEachRow",
awsAccessKeyId: mooseRuntimeEnv.get("AWS_ACCESS_KEY_ID"),
awsSecretAccessKey: mooseRuntimeEnv.get("AWS_SECRET_ACCESS_KEY"),
compression: "gzip"
});
// Public bucket (no authentication) - omit credentials for NOSIGN
export const publicS3 = new OlapTable<DataRecord>("public_s3", {
engine: ClickHouseEngines.S3,
path: "s3://public-bucket/data/*.parquet",
format: "Parquet",
});Buffer Configuration
Configure Buffer tables for high-throughput buffered writes (ORDER BY is not supported):
// First create destination table
export const destination = new OlapTable<Record>("destination", {
engine: ClickHouseEngines.MergeTree,
orderByFields: ["id", "timestamp"]
});
// Then create buffer table
export const buffer = new OlapTable<Record>("buffer", {
engine: ClickHouseEngines.Buffer,
targetDatabase: "local",
targetTable: "destination",
numLayers: 16,
minTime: 10,
maxTime: 100,
minRows: 10000,
maxRows: 1000000,
minBytes: 10485760,
maxBytes: 104857600
});Distributed Configuration
Configure Distributed tables for cluster-wide distributed queries (ORDER BY is not supported):
export const distributed = new OlapTable<Record>("distributed", {
engine: ClickHouseEngines.Distributed,
cluster: "my_cluster",
targetDatabase: "default",
targetTable: "local_table",
shardingKey: "cityHash64(id)"
});Task Management
Task<T, R>
A class that represents a single task within a workflow system.
// No input, no output
export const task1 = new Task<null, void>("task1", {
run: async () => {
console.log("No input/output");
},
retries: 3,
timeout: "30s"
});
// With input and output
export const task2 = new Task<InputType, OutputType>("task2", {
run: async (ctx) => {
return process(ctx.input);
},
onComplete: [nextTask]
});TaskContext<T>
A context object that includes input & state passed between the task’s run/cancel functions.
export type TaskContext<T> = T extends null ? { state: any; input?: null } : { state: any; input: T };TaskConfig<T, R>
Configuration options for tasks.
interface TaskConfig<T, R> {
// The main function that executes the task logic
run: (context: TaskContext<T>) => Promise<R>;
// Optional array of tasks to execute after this task completes
onComplete?: (Task<R extends void ? null : R, any> | Task<R extends void ? null : R, void>)[];
// Optional function that is called when the task is cancelled.
onCancel?: (context: TaskContext<T>) => Promise<void>;
// Optional timeout duration (e.g., "30s", "5m", "never")
timeout?: string;
// Optional number of retry attempts
retries?: number;
}Workflow
A class that represents a complete workflow composed of interconnected tasks.
const myWorkflow = new Workflow("getData", {
startingTask: callAPI,
schedule: "@every 5s", // Run every 5 seconds
timeout: "1h",
retries: 3
});
WorkflowConfig
Configuration options for defining a workflow.
interface WorkflowConfig {
// The initial task that begins the workflow execution
startingTask: Task<null, any> | Task<null, void> | Task<any, any> | Task<any, void>;
// Optional number of retry attempts
retries?: number;
// Optional timeout duration (e.g., "10m", "1h", "never")
timeout?: string;
// Optional cron-style schedule string
schedule?: string;
}Ensure your Infrastructure Components is correctly exported from your app/index.ts file.
Ensure your Infrastructure Components is correctly imported into your main.py file.
Learn more about export pattern: local development / hosted.
Important: The following components must be exported from your app/index.ts file for Moose to detect them:
OlapTableinstancesStreaminstancesIngestApiinstancesApiinstancesIngestPipelineinstancesMaterializedViewinstancesTaskinstancesWorkflowinstances
Configuration objects and utilities (like DeadLetterQueue, Key, sql) do not need to be exported as they are used as dependencies of the main components.
Registry Functions
The registry functions allow you to programmatically access all registered Moose resources at runtime. This is useful for inspecting resources, building dynamic tools, meta-programming, and testing.
getTables()
Get all registered OLAP tables.
import { getTables } from "@514labs/moose-lib";
const tables = getTables(); // Returns Map<string, OlapTable<any>>
for (const [name, table] of tables) {
console.log(`Table: ${name}`);
}getTable(name)
Get a specific OLAP table by name.
import { getTable } from "@514labs/moose-lib";
const userTable = getTable("Users");
if (userTable) {
console.log(`Found table: ${userTable.name}`);
}getStreams()
Get all registered streams.
import { getStreams } from "@514labs/moose-lib";
const streams = getStreams(); // Returns Map<string, Stream<any>>getStream(name)
Get a specific stream by name.
import { getStream } from "@514labs/moose-lib";
const eventStream = getStream("UserEvents");getIngestApis()
Get all registered ingestion APIs.
import { getIngestApis } from "@514labs/moose-lib";
const ingestApis = getIngestApis(); // Returns Map<string, IngestApi<any>>getIngestApi(name)
Get a specific ingestion API by name.
import { getIngestApi } from "@514labs/moose-lib";
const userApi = getIngestApi("IngestUsers");getApis()
Get all registered consumption/egress APIs.
import { getApis } from "@514labs/moose-lib";
const apis = getApis(); // Returns Map<string, Api<any>>getApi(nameOrPath)
Get a registered API by name, version, or custom path.
Supports multiple lookup strategies:
- Direct lookup by full key (
nameorname:version) - Alias lookup by base name when only one versioned API exists
- Lookup by custom path (if configured)
import { getApi } from "@514labs/moose-lib";
// Direct lookup
const api = getApi("QueryUsers");
// Version lookup
const apiV1 = getApi("QueryUsers:1.0");
// Path lookup
const apiByPath = getApi("/custom/query");
// Automatic aliasing: if only one version exists, unversioned lookup works
const api = getApi("QueryUsers"); // Returns QueryUsers:1.0 if it's the only versiongetSqlResources()
Get all registered SQL resources (views, etc.).
import { getSqlResources } from "@514labs/moose-lib";
const resources = getSqlResources(); // Returns Map<string, SqlResource>getSqlResource(name)
Get a specific SQL resource by name.
import { getSqlResource } from "@514labs/moose-lib";
const view = getSqlResource("UserSummaryView");getWorkflows()
Get all registered workflows.
import { getWorkflows } from "@514labs/moose-lib";
const workflows = getWorkflows(); // Returns Map<string, Workflow>getWorkflow(name)
Get a specific workflow by name.
import { getWorkflow } from "@514labs/moose-lib";
const etlWorkflow = getWorkflow("DailyETL");getWebApps()
Get all registered web apps.
import { getWebApps } from "@514labs/moose-lib";
const webApps = getWebApps(); // Returns Map<string, WebApp>getWebApp(name)
Get a specific web app by name.
import { getWebApp } from "@514labs/moose-lib";
const adminApp = getWebApp("AdminPanel");
if (adminApp) {
console.log(`Mounted at: ${adminApp.config.mountPath}`);
}