Workflows
Viewing:
Overview
The Workflows capability provides standalone task orchestration and automation. You can use this capability independently to build ETL pipelines, run scheduled jobs, trigger background tasks, and manage long-running tasks without requiring other MooseStack components like databases or streams.
Basic Usage
DataFlow.ts
import { Task, Workflow } from "@514labs/moose-lib";
export interface Foo {
name: string;
}
export interface Bar {
name: string;
greeting: string;
counter: number;
}
export const task1 = new Task<Foo, Bar>("task1", {
run: async (input: Foo) => {
const greeting = `hello, ${input.name}!`;
return {
name: input.name,
greeting,
counter: 1
};
},
});
export const task2 = new Task<Bar, void>("task2", {
run: async (input: Bar) => {
console.log(`${input.greeting} (count: ${input.counter})`);
},
});
export const myworkflow = new Workflow("myworkflow", {
startingTask: task1
});
DataFlow.py
from moose_lib import Task, TaskConfig, Workflow, WorkflowConfig
from pydantic import BaseModel
class Foo(BaseModel):
name: str
class Bar(BaseModel):
name: str
greeting: str
counter: int
def run_task1(input: Foo) -> Bar:
greeting = f"hello, {input.name}!"
return Bar(
name=input.name,
greeting=greeting,
counter=1
)
def run_task2(input: Bar) -> None:
print(f"{input.greeting} (count: {input.counter})")
task1 = Task[Foo, Bar](
name="task1",
config=TaskConfig(run=run_task1, on_complete=[task2])
)
task2 = Task[Bar, None](
name="task2",
config=TaskConfig(run=run_task2)
)
myworkflow = Workflow(
name="myworkflow",
config=WorkflowConfig(starting_task=task1)
Enabling Workflows
To enable workflows, you need to add the workflows
feature to your moose.config.toml
file:
moose.config.toml
[features]
workflows = true
Core Capabilities
Integration with Other Capabilities
While the Workflows capability works independently, it is designed to be used in conjunction with other MooseStack capabilities: