Trigger Workflows
Viewing:
Overview
Moose workflows can be triggered programmatically from various sources including APIs, events, external systems, or manual execution. This enables you to build reactive data processing pipelines and on-demand task execution.
Manual Workflow Execution
The simplest way to trigger a workflow is using the Moose CLI:
Terminal
# Run a workflow manually
moose workflow run example
# Run with input parameters
moose workflow run example --input '{"name": "John", "email": "john@example.com"}'
Passing Input to Workflows
When triggering workflows, you can pass input data that will be passed to the starting task:
Terminal
moose workflow run data-processing --input '{
"sourceUrl": "https://api.example.com/data",
"apiKey": "your-api-key",
"batchSize": 100
}'
The input is parsed as JSON and passed to the workflow’s starting task.
API-Triggered Workflows
Create APIs that trigger workflows programmatically using the MooseClient
:
app/apis/trigger_workflow.ts
import { ConsumptionAPI } from "@514labs/moose-lib";
interface WorkflowParams {
inputValue: string;
priority?: string;
}
interface WorkflowResponse {
workflowId: string;
status: string;
}
const triggerApi = new ConsumptionAPI<WorkflowParams, WorkflowResponse>(
"trigger-workflow",
async ({ inputValue, priority = "normal" }: WorkflowParams, { client }) => {
// Trigger the workflow with input parameters
const workflowExecution = await client.workflow.execute("data-processing", {
inputValue,
priority,
triggeredAt: new Date().toISOString()
});
return {
workflowId: workflowExecution.id,
status: "started"
};
}
);
export default triggerApi;
app/apis/trigger_workflow.py
from moose_lib import MooseClient, ConsumptionAPI
from pydantic import BaseModel, Field
from datetime import datetime
class WorkflowParams(BaseModel):
input_value: str
priority: str = Field(default="normal")
class WorkflowResponse(BaseModel):
workflow_id: str
status: str
def run(params: WorkflowParams, client: MooseClient) -> WorkflowResponse:
# Trigger the workflow with input parameters
workflow_execution = client.workflows.execute(
workflow="data-processing",
params={
"input_value": params.input_value,
"priority": params.priority,
"triggered_at": datetime.now().isoformat()
}
)
return WorkflowResponse(
workflow_id=workflow_execution.id,
status="started"
)
api = ConsumptionAPI[WorkflowParams, WorkflowResponse]("trigger-workflow", run)
Using the API
Once deployed, you can trigger workflows via HTTP requests:
Terminal
curl -X POST http://localhost:4000/consumption/trigger-workflow \
-H "Content-Type: application/json" \
-d '{
"inputValue": "process-user-data",
"priority": "high"
}'
Response:
{
"workflowId": "workflow-12345",
"status": "started"
}