Define Workflows
Viewing:
Overview
Workflows automate task sequences with built-in reliability and monitoring. Tasks execute in order, passing data between steps.
Powered by Temporal
Built on Temporal for reliability, retries, and monitoring via GUI dashboard.
Writing Workflow Tasks
Tasks are objects with a run
function. Return values automatically pass to the next task.
import { Task, Workflow } from "@514labs/moose-lib";
export interface Foo {
name: string;
}
export const task1 = new Task<Foo, void>("task1", {
run: async (input: Foo) => {
const name = input.name ?? "world";
const greeting = `hello, ${name}!`;
console.log(greeting);
},
});
export const myworkflow = new Workflow("myworkflow", {
startingTask: task1,
});
Export Task
and Workflow
objects. Specify startingTask
in the Workflow
config.
from moose_lib import Task, TaskConfig, Workflow, WorkflowConfig
from pydantic import BaseModel
class Foo(BaseModel):
name: str;
def run_task1(input: Foo) -> None:
name = input.name or "world"
greeting = f"hello, {name}!"
task1 = Task[Foo, None](
name="task1",
config=TaskConfig(run=run_task1)
)
myworkflow = Workflow(
name="myworkflow",
config=WorkflowConfig(starting_task=task1)
)
Export Task
and Workflow
objects. Specify starting_task
in the WorkflowConfig
.
Data Flow Between Tasks
Tasks communicate through their return values. Each task can return an object that is automatically passed as input to the next task in the workflow.
- Only values inside the object are passed to the next task.
- The object must be JSON-serializable.
import { Task, Workflow } from "@514labs/moose-lib";
export interface Foo {
name: string;
}
export interface Bar {
name: string;
greeting: string;
counter: number;
}
export const task2 = new Task<Bar, void>("task2", {
run: async (input: Bar) => {
console.log(`task2 input: ${JSON.stringify(input)}`);
}
});
export const task1 = new Task<Foo, Bar>("task1", {
run: async (input: Foo) => {
const name = input.name ?? "world";
const greeting = `hello, ${name}!`;
return {
name: name,
greeting: greeting,
counter: 1
};
},
onComplete: [task2],
});
export const myworkflow = new Workflow("myworkflow", {
startingTask: task1,
});
from moose_lib import Task, TaskConfig, Workflow, WorkflowConfig, Logger
from pydantic import BaseModel
class Foo(BaseModel):
name: str
class Bar(BaseModel):
name: str
greeting: str
counter: int
def run_task2(input: Bar) -> None:
logger = Logger(action="run_task2")
logger.info(f"task2 input: {input.model_dump_json()}")
task2 = Task[Bar, None](
name="task2",
config=TaskConfig(run=run_task2)
)
def run_task1(input: Foo) -> Bar:
name = input.name or "world"
greeting = f"hello, {name}!"
return Bar(
name=name,
greeting=greeting,
counter=1
)
task1 = Task[Foo, Bar](
name="task1",
config=TaskConfig(
run=run_task1,
on_complete=[task2]
)
)
myworkflow = Workflow(
name="myworkflow",
config=WorkflowConfig(starting_task=task1)
)
Debugging Workflows
While the Temporal dashboard is a helpful tool for debugging, you can also leverage the Moose CLI to monitor and debug workflows. This is useful if you want to monitor a workflow without having to leave your terminal.
Use the moose workflow status
command to monitor a workflow:
moose workflow status example
This will print high level information about the workflow run:
Workflow Workflow Status: example
Run ID: 446eab6e-663d-4913-93fe-f79d6109391f
Status: WORKFLOW_EXECUTION_STATUS_COMPLETED ✅
Execution Time: 66s
If you want more detailed information about the workflow’s status, including task level logs and inputs/outputs, you can use the --verbose
flag:
moose workflow status example --verbose
Workflow Workflow Status: example
Run ID: 446eab6e-663d-4913-93fe-f79d6109391f
Status: WORKFLOW_EXECUTION_STATUS_COMPLETED ✅
Execution Time: 66s
Request: GetWorkflowExecutionHistoryRequest { namespace: "default", execution: Some(WorkflowExecution { workflow_id: "example", run_id: "446eab6e-663d-4913-93fe-f79d6109391f" }), maximum_page_size: 0, next_page_token: [], wait_new_event: false, history_event_filter_type: Unspecified, skip_archival: false }
Found 17 events
Event History:
• [2025-02-21T14:16:56.234808764+00:00] EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
• [2025-02-21T14:16:56.235132389+00:00] EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
• [2025-02-21T14:16:56.259341847+00:00] EVENT_TYPE_WORKFLOW_TASK_STARTED
• [2025-02-21T14:16:56.329856180+00:00] EVENT_TYPE_WORKFLOW_TASK_COMPLETED
• [2025-02-21T14:16:56.329951889+00:00] EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
Activity: example/task1
• [2025-02-21T14:16:56.333761680+00:00] EVENT_TYPE_ACTIVITY_TASK_STARTED
• [2025-02-21T14:16:56.497156055+00:00] EVENT_TYPE_ACTIVITY_TASK_COMPLETED
Result:
{
"counter": 1,
"greeting": "hello, no name!",
"name": "no name",
}
With this more detailed output, you can see the exact sequence of events and the inputs and outputs of each task. This is useful for debugging and understanding the workflow’s behavior. The result of each task is included in the output, allowing you to inspect the data that was passed between task for debugging purposes.
If your workflow fails due to some runtime error, you can use the event history timeline to identify the task that failed.