Project Structure
Viewing typescript
switch to python
Overview
A Moose project has two main levels of organization:
Project Root
Contains configuration and generated artifacts
File/Directory | Description |
---|---|
app/ | Where all your application code lives |
.moose/ | Generated artifacts and infrastructure mappings |
moose.config.toml | Project configuration |
package.json or setup.py | Package management files |
Application Code
All your code lives in the app
directory, where Moose provides a flexible yet opinionated structure. The key principle is that only objects exported from your root index.ts or main.py file are mapped to infrastructure, giving you freedom in how you organize your implementation details.
How to Organize Your App Code
Required: Export Infrastructure Objects from Root
All infrastructure components (data models, pipelines, views, APIs) MUST be exported from index.ts/main.py to be deployed. Internal implementation details can live anywhere.
Recommended: Use Standard Directory Structure
Organize your code into models/, ingest/, views/, and apis/ directories. Each component type has its dedicated location for better maintainability.
Optional: Group Related Components
Within each directory, you can group related components into subdirectories or single files based on your domain (e.g., analytics/, monitoring/).
Best Practice: Clear Component Dependencies
Keep clear import paths between components: models → ingest → views → apis. This makes data flow and dependencies easy to understand.
Core Concept: Infrastructure Mapping
The most important file in your project is the root index.ts
. This file acts as the single source of truth for what gets deployed to your infrastructure:
export { RawRecords, ProcessedRecords } from './ingest/records';
export { RecordMetricsView } from './views/aggregations';
export { MetricsAPI } from './apis/metrics-api';
The most important file in your project is the root main.py
. This file acts as the single source of truth for what gets deployed to your infrastructure:
# app/main.py
import app.ingest
import app.views
import app.apis
Recommended Project Structure
While you have complete flexibility in organizing your implementation, here’s a simplified structure that clearly separates ingestion from transformations:
- index.ts
- records.ts
- aggregations.ts
- metrics-api.ts
- helpers.ts
- package.json
- moose.config.toml
- main.py
- records.py
- aggregations.py
- metrics_api.py
- helpers.py
- moose.config.toml
Directory Purposes
app/index.ts | The critical file that exports all resources to be mapped to infrastructure |
ingest | Create IngestPipeline data models and objects that combine ingest APIs, streams, and tables |
views | Define in-database transformations using Materialized Views |
apis | Build Consumption APIs to expose your data to clients |
utils | Helper functions and utilities |
app/main.py | The critical file that exports all resources to be mapped to infrastructure |
ingest | Create IngestPipeline data models and objects that combine ingest APIs, streams, and tables |
views | Define in-database transformations using Materialized Views |
apis | Build Consumption APIs to expose your data to clients |
scripts | Create Workflows to run ad-hoc tasks |
utils | Helper functions and utilities |
Example
Here’s how you might organize a typical data flow:
Create Ingestion Pipelines in app/ingest
import { IngestPipeline, Key } from "@514labs/moose-lib";
export interface RawRecord {
id: Key<string>;
sourceId: string;
timestamp: Date;
status: string;
}
export interface ProcessedRecord extends RawRecord {
processedAt: Date;
metadata: {
version: string;
processingTime: number;
};
}
export const RawRecords = new IngestPipeline<RawRecord>("raw_records", {
ingest: true, // Creates a REST API endpoint
stream: true, // Creates Kafka/Redpanda topic
table: true // Creates a table to store records
});
export const ProcessedRecords = new IngestPipeline<ProcessedRecord>("processed_records", {
ingest: false,
stream: true,
table: true
});
RawRecords.stream!.addTransform(ProcessedRecords.stream!, (record) => {
return {
...record,
processedAt: new Date(),
metadata: {
version: "1.0",
processingTime: Date.now() - record.timestamp.getTime()
}
};
});
from pydantic import BaseModel
from moose_lib import Key, IngestPipeline, IngestPipelineConfig
from datetime import datetime
from typing import Dict, Any
class RawRecord(BaseModel):
id: Key[str]
source_id: str
timestamp: datetime
status: str
class Metadata(BaseModel):
version: str
processing_time: float
class ProcessedRecord(RawRecord):
processed_at: datetime
metadata: Metadata
RawRecords = IngestPipeline[RawRecord]("raw_records", IngestPipelineConfig(
ingest=True, // Creates a REST API endpoint
stream=True, // Creates Kafka/Redpanda topic
table=True // Creates a table to store records
))
ProcessedRecords = IngestPipeline[ProcessedRecord]("processed_records", IngestPipelineConfig(
ingest=False,
stream=True,
table=True
))
def transform_record(record: RawRecord) -> ProcessedRecord:
now = datetime.now()
return ProcessedRecord(
**record.dict(),
processed_at=now,
metadata={
"version": "1.0",
"processing_time": (now - record.timestamp).total_seconds()
}
)
RawRecords.get_stream().add_transform(destination=ProcessedRecords.get_stream(), transformation=transform_record)
Define Materialized Views in app/views
import { ProcessedRecords } from '../ingestion/records';
import { sql, MaterializedView } from "@514labs/moose-lib";
interface RecordMetricsSchema {
sourceId: string;
recordCount: number;
avgProcessingTime: number;
lastProcessed: Date;
}
export const RecordMetricsView = new MaterializedView<RecordMetricsSchema>({
selectStatement: sql`
SELECT
sourceId,
COUNT(*) as recordCount,
AVG(metadata.processingTime) as avgProcessingTime,
MAX(processedAt) as lastProcessed
FROM ${ProcessedRecords.table}
GROUP BY sourceId
`,
tableName: "record_metrics",
materializedViewName: "record_metrics_mv"
});
from app.ingest.records import ProcessedRecords
from moose_lib import MaterializedView, sql
from pydantic import BaseModel
from datetime import datetime
class RecordMetricsSchema(BaseModel):
source_id: str
record_count: int
avg_processing_time: float
last_processed: datetime
RecordMetricsView = MaterializedView[RecordMetricsSchema](
select_statement=sql(f"""
SELECT
source_id,
COUNT(*) as record_count,
AVG(metadata->>'processing_time') as avg_processing_time,
MAX(processed_at) as last_processed
FROM {ProcessedRecords.table}
GROUP BY source_id
"""),
table_name="record_metrics",
materialized_view_name="record_metrics_mv"
)
Define Consumption APIs in app/apis
import { RecordMetricsView } from '../views/aggregations';
import { ConsumptionApi } from "@514labs/moose-lib";
interface QueryParams {
sourceId: string;
}
interface ResponseBody {
sourceId: string;
recordCount: number;
avgProcessingTime: number;
lastProcessed: Date;
}
export const MetricsAPI = new ConsumptionApi<QueryParams, ResponseBody>(
"metrics",
async ({ sourceId }, { client, sql }) => {
const query = sql`
SELECT * FROM ${RecordMetricsView}
WHERE sourceId = ${sourceId}
`;
return client.query.execute(query);
}
);
from app.views.aggregations import RecordMetricsView
from moose_lib import ConsumptionApi
from pydantic import BaseModel
from datetime import datetime
class QueryParams(BaseModel):
source_id: str
class ResponseBody(BaseModel):
source_id: str
record_count: int
avg_processing_time: float
last_processed: datetime
def handler(params: QueryParams, client):
query = f"""
SELECT * FROM {RecordMetricsView}
WHERE source_id = {source_id}
"""
return client.query.execute(query, {"source_id": params.source_id})
MetricsAPI = ConsumptionApi[QueryParams, ResponseBody](
"metrics",
handler
)
Export everything from the root file
export { RawRecords, ProcessedRecords } from './ingest/records';
export { RecordMetricsView } from './views/aggregations';
export { MetricsAPI } from './apis/metrics-api';
from app.ingest.records import RawRecords, ProcessedRecords
from app.views.aggregations import RecordMetricsView
from app.apis.metrics_api import MetricsAPI
Alternative: Simpler Structure for Small Projects
For smaller projects, you might prefer an even simpler structure:
app/
index.ts
ingestion.ts # All ingestion pipelines
views.ts # All materialized views
apis.ts # All consumption APIs
app/
main.py
ingestion.py # All ingestion pipelines
views.py # All materialized views
apis.py # All consumption APIs