# Moose / Workflows Documentation – Python ## Included Files 1. moose/workflows/cancel-workflow.mdx 2. moose/workflows/define-workflow.mdx 3. moose/workflows/retries-and-timeouts.mdx 4. moose/workflows/schedule-workflow.mdx 5. moose/workflows/trigger-workflow.mdx ## cancel-workflow Source: moose/workflows/cancel-workflow.mdx # Cancel a Running Workflow To stop a workflow before it has finished running, use the `workflow cancel` command. ```bash filename="Terminal" copy moose workflow cancel ``` ### Implementing Cancelation Callbacks For workflows that are running and have clean up operations to perform, you can implement a termination callback. This is especially useful for any long running tasks that have open connections or subscriptions to other services that need to be closed. You may also use the `state` within the run/cancel context to supplement your business logic. ```python filename="workflows/workflows.py" copy def run_task1(ctx: TaskContext[Foo]) -> None: connection.open() def on_cancel(ctx: TaskContext[Foo]) -> None: # Clean up any resources connection.close() task1 = Task[Foo, None]( name="task1", config=TaskConfig(run=run_task1, on_cancel=on_cancel) ) myworkflow = Workflow( name="myworkflow", config=WorkflowConfig(starting_task=task1, retries=3) ) ``` --- ## Define Workflows Source: moose/workflows/define-workflow.mdx Create workflow definitions with task sequences and data flow # Define Workflows ## Overview Workflows automate task sequences with built-in reliability and monitoring. Tasks execute in order, passing data between steps. Built on Temporal for reliability, retries, and monitoring via GUI dashboard. ## Writing Workflow Tasks ```python filename="app/main.py" copy from moose_lib import Task, TaskConfig, TaskContext, Workflow, WorkflowConfig from pydantic import BaseModel class Foo(BaseModel): name: str; def run_task1(ctx: TaskContext[Foo]) -> None: name = ctx.input.name or "world" greeting = f"hello, {name}!" task1 = Task[Foo, None]( name="task1", config=TaskConfig(run=run_task1) ) myworkflow = Workflow( name="myworkflow", config=WorkflowConfig(starting_task=task1) ) ``` Export `Task` and `Workflow` objects. Specify `starting_task` in the `WorkflowConfig`. ## Data Flow Between Tasks Tasks communicate through their return values. Each task can return an object that is automatically passed as input to the next task in the workflow. - Only values inside the object are passed to the next task. - The object must be JSON-serializable. ```python filename="app/main.py" copy from moose_lib import Task, TaskConfig, TaskContext, Workflow, WorkflowConfig, Logger from pydantic import BaseModel class Foo(BaseModel): name: str class Bar(BaseModel): name: str greeting: str counter: int def run_task2(ctx: TaskContext[Bar]) -> None: logger = Logger(action="run_task2") logger.info(f"task2 input: {ctx.input.model_dump_json()}") task2 = Task[Bar, None]( name="task2", config=TaskConfig(run=run_task2) ) def run_task1(ctx: TaskContext[Foo]) -> Bar: name = ctx.input.name or "world" greeting = f"hello, {name}!" return Bar( name=name, greeting=greeting, counter=1 ) task1 = Task[Foo, Bar]( name="task1", config=TaskConfig( run=run_task1, on_complete=[task2] ) ) myworkflow = Workflow( name="myworkflow", config=WorkflowConfig(starting_task=task1) ) ``` ## Debugging Workflows While the Temporal dashboard is a helpful tool for debugging, you can also leverage the Moose CLI to monitor and debug workflows. This is useful if you want to monitor a workflow without having to leave your terminal. Use the `moose workflow status` command to monitor a workflow: ```bash filename="Terminal" copy moose workflow status example ``` This will print high level information about the workflow run: ```txt filename="Terminal" Workflow Workflow Status: example Run ID: 446eab6e-663d-4913-93fe-f79d6109391f Status: WORKFLOW_EXECUTION_STATUS_COMPLETED ✅ Execution Time: 66s ``` If you want more detailed information about the workflow's status, including task level logs and inputs/outputs, you can use the `--verbose` flag: ```bash filename="Terminal" copy moose workflow status example --verbose ``` ```txt filename="Terminal" Workflow Workflow Status: example Run ID: 446eab6e-663d-4913-93fe-f79d6109391f Status: WORKFLOW_EXECUTION_STATUS_COMPLETED ✅ Execution Time: 66s Request: GetWorkflowExecutionHistoryRequest { namespace: "default", execution: Some(WorkflowExecution { workflow_id: "example", run_id: "446eab6e-663d-4913-93fe-f79d6109391f" }), maximum_page_size: 0, next_page_token: [], wait_new_event: false, history_event_filter_type: Unspecified, skip_archival: false } Found 17 events Event History: • [2025-02-21T14:16:56.234808764+00:00] EVENT_TYPE_WORKFLOW_EXECUTION_STARTED • [2025-02-21T14:16:56.235132389+00:00] EVENT_TYPE_WORKFLOW_TASK_SCHEDULED • [2025-02-21T14:16:56.259341847+00:00] EVENT_TYPE_WORKFLOW_TASK_STARTED • [2025-02-21T14:16:56.329856180+00:00] EVENT_TYPE_WORKFLOW_TASK_COMPLETED • [2025-02-21T14:16:56.329951889+00:00] EVENT_TYPE_ACTIVITY_TASK_SCHEDULED Activity: example/task1 • [2025-02-21T14:16:56.333761680+00:00] EVENT_TYPE_ACTIVITY_TASK_STARTED • [2025-02-21T14:16:56.497156055+00:00] EVENT_TYPE_ACTIVITY_TASK_COMPLETED Result: { "counter": 1, "greeting": "hello, no name!", "name": "no name", } ``` With this more detailed output, you can see the exact sequence of events and the inputs and outputs of each task. This is useful for debugging and understanding the workflow's behavior. The result of each task is included in the output, allowing you to inspect the data that was passed between task for debugging purposes. If your workflow fails due to some runtime error, you can use the event history timeline to identify the task that failed. --- ## retries-and-timeouts Source: moose/workflows/retries-and-timeouts.mdx # Error Detection and Handling Moose provides multiple layers of error protection, both at the workflow and task level: ### Workflow-Level Retries and Timeouts Moose automatically catches any runtime errors during workflow execution. Errors are logged for debugging, and the orchestrator will retry failed tasks according to the `retries` option. In your `Workflow`, you can configure the following options to control workflow behavior, including timeouts and retries: ```python filename="app/main.py" {5} copy from moose_lib import Task, TaskConfig, Workflow, WorkflowConfig myworkflow = Workflow( name="myworkflow", config=WorkflowConfig(starting_task=task1, retries=1, timeout="10m") ) ``` ### Task-Level Errors and Retries For more granular control over task-level errors and retries, you can configure your individual tasks to have their own retry behavior. For workflows & tasks that may not have a predefined timeout, you may set `never` as the timeout. ```python filename="app/main.py" {8} copy from moose_lib import Task, TaskConfig, TaskContext, Workflow, WorkflowConfig def run_task1(ctx: TaskContext[Foo]) -> None: pass task1 = Task[Foo, None]( name="task1", config=TaskConfig(run=run_task1, retries=1, timeout="5m") ) myworkflow = Workflow( name="myworkflow", config=WorkflowConfig(starting_task=task1, retries=2, timeout="10m") ) ``` ### Example: Workflow and Task Retry Interplay When configuring retries, it's important to understand how workflow-level and task-level retries interact. Consider the following scenario: ```python filename="app/main.py" {8,13} copy from moose_lib import Task, TaskConfig, TaskContext, Workflow, WorkflowConfig def run_task1(ctx: TaskContext[Foo]) -> None: pass task1 = Task[Foo, None]( name="task1", config=TaskConfig(run=run_task1, retries=2) ) myworkflow = Workflow( name="myworkflow", config=WorkflowConfig(starting_task=task1, retries=3) ) ``` If the execution of the workflow encounters an error, the retry sequence would proceed as follows: 1. **Workflow Attempt 1** - **Task Attempt 1**: Task fails - **Task Attempt 2**: Task fails - **Task Attempt 3**: Task fails - Workflow attempt fails after exhausting task retries 2. **Workflow Attempt 2** - **Task Attempt 1**: Task fails - **Task Attempt 2**: Task fails - **Task Attempt 3**: Task fails - Workflow attempt fails after exhausting task retries In this example, the workflow will make a total of 2 attempts, and each task within those attempts will retry up to 3 times before the workflow itself retries. --- ## Schedule Workflows Source: moose/workflows/schedule-workflow.mdx Set up recurring and scheduled workflow execution # Schedule Workflows ## Overview Moose workflows can be configured to run automatically on a schedule using cron expressions or interval-based scheduling. This enables you to automate recurring tasks, data processing jobs, and maintenance operations. ## Scheduling Workflows Workflows can be configured to run on a schedule using the `schedule` field in `Workflow`. This field is optional and blank by default. ### Cron Expressions ```python filename="app/scheduled_workflow.py" copy from moose_lib import Task, TaskConfig, Workflow, WorkflowConfig myworkflow = Workflow( name="myworkflow", config=WorkflowConfig(starting_task=task1, schedule="0 12 * * *") # Runs at 12:00 PM every day ) ``` #### Cron Expression Format ```text |------------------------------- Minute (0-59) | |------------------------- Hour (0-23) | | |------------------- Day of the month (1-31) | | | |------------- Month (1-12; or JAN to DEC) | | | | |------- Day of the week (0-6; or SUN to SAT; or 7 for Sunday) | | | | | | | | | | * * * * * ``` #### Common Cron Examples | Cron Expression | Description | |-----------------|-------------| | 0 12 * * * | Runs at 12:00 PM every day | | 0 0 * * 0 | Runs at 12:00 AM every Sunday | | 0 8 * * 1-5 | Runs at 8:00 AM on weekdays (Monday to Friday) | | * * * * * | Runs every minute | | 0 */6 * * * | Runs every 6 hours | | 0 9 1 * * | Runs at 9:00 AM on the first day of every month | | 0 0 1 1 * | Runs at midnight on January 1st every year | Use an online cron expression visualizer like [crontab.guru](https://crontab.guru/) to help you understand how the cron expression will schedule your workflow. ### Interval Schedules Interval schedules can be specified as a string `"@every "`. The interval follows standard duration format: ```python filename="app/interval_workflow.py" copy from moose_lib import Task, TaskConfig, Workflow, WorkflowConfig myworkflow = Workflow( name="myworkflow", config=WorkflowConfig(starting_task=task1, schedule="@every 1h") # Runs every hour ) ``` #### Interval Examples | Interval | Description | |----------|-------------| | `@every 30s` | Every 30 seconds | | `@every 5m` | Every 5 minutes | | `@every 1h` | Every hour | | `@every 12h` | Every 12 hours | | `@every 24h` | Every 24 hours | | `@every 7d` | Every 7 days | ## Practical Scheduling Examples ### Daily Data Processing ```python filename="app/daily_etl.py" copy from moose_lib import Workflow, WorkflowConfig daily_data_processing = Workflow( name="daily-data-processing", config=WorkflowConfig( starting_task=extract_data_task, schedule="0 2 * * *", # Run at 2 AM every day retries=2, timeout="2h" ) ) ``` ### Weekly Reports ```python filename="app/weekly_reports.py" copy weekly_reports = Workflow( name="weekly-reports", config=WorkflowConfig( starting_task=generate_report_task, schedule="0 9 * * 1", # Run at 9 AM every Monday retries=1, timeout="1h" ) ) ``` ### High-Frequency Monitoring ```python filename="app/monitoring.py" copy system_monitoring = Workflow( name="system-monitoring", config=WorkflowConfig( starting_task=check_system_health_task, schedule="@every 5m", # Check every 5 minutes retries=0, # Don't retry monitoring checks timeout="30s" ) ) ``` ## Monitoring Scheduled Workflows ### Development Environment If your dev server is running, you should see logs in the terminal when your scheduled workflow is executed: ```bash filename="Terminal" copy moose dev ``` ```txt filename="Terminal" [2024-01-15 12:00:00] Scheduled workflow 'daily-data-processing' started [2024-01-15 12:00:01] Task 'extract' completed successfully [2024-01-15 12:00:15] Task 'transform' completed successfully [2024-01-15 12:00:30] Task 'load' completed successfully [2024-01-15 12:00:30] Workflow 'daily-data-processing' completed successfully ``` ### Checking Workflow Status You can check the status of scheduled workflows using the CLI: ```bash filename="Terminal" copy # List all workflows defined in your project moose workflow list # Alternative command to list all workflows moose ls --type workflows # View workflow execution history moose workflow history # Check specific workflow status moose workflow status daily-data-processing # Get detailed execution history moose workflow status daily-data-processing --verbose ``` ### Temporal Dashboard Access the Temporal dashboard to view scheduled workflow executions: ```bash filename="Terminal" copy # Open Temporal dashboard (typically at http://localhost:8080) open http://localhost:8080 ``` The dashboard shows: - Scheduled workflow definitions - Execution history and timing - Success/failure rates - Retry attempts and errors ## Best Practices for Scheduled Workflows ### Timeout and Retry Configuration Configure appropriate timeouts and retries for scheduled workflows: ```python filename="app/robust_scheduled_workflow.py" copy def run_main_task() -> None: # Long-running task logic pass main_task = Task[None, None]( name="main", config=TaskConfig( run=run_main_task, retries=3, # Retry individual tasks timeout="1h" # Task-level timeout ) ) robust_scheduled_workflow = Workflow( name="robust-scheduled", config=WorkflowConfig( starting_task=main_task, schedule="0 3 * * *", # Run at 3 AM daily retries=2, # Retry failed workflows timeout="4h" # Allow sufficient time ) ) ``` ## Troubleshooting Scheduled Workflows ### Common Issues - **Timezone considerations**: Cron schedules use UTC by default - **Resource conflicts**: Ensure scheduled workflows don't compete for resources - **Long-running tasks**: Set appropriate timeouts for lengthy operations - **Error handling**: Implement proper error handling and logging --- ## Trigger Workflows Source: moose/workflows/trigger-workflow.mdx Start workflows from events, APIs, or external triggers # Trigger Workflows ## Overview Moose workflows can be triggered programmatically from various sources including APIs, events, external systems, or manual execution. This enables you to build reactive data processing pipelines and on-demand task execution. ## Manual Workflow Execution The simplest way to trigger a workflow is using the Moose CLI: ```bash filename="Terminal" copy # Run a workflow manually moose workflow run example # Run with input parameters moose workflow run example --input '{"name": "John", "email": "john@example.com"}' ``` ### Passing Input to Workflows When triggering workflows, you can pass input data that will be passed to the starting task: ```bash filename="Terminal" copy moose workflow run data-processing --input '{ "sourceUrl": "https://api.example.com/data", "apiKey": "your-api-key", "batchSize": 100 }' ``` The input is parsed as JSON and passed to the workflow's starting task. ## API-Triggered Workflows Trigger workflows directly via an HTTP POST endpoint exposed by the webserver. - Endpoint: `/workflows/{workflowName}/trigger` ### Request - Body: optional JSON payload passed to the workflow's starting task. Example: ```bash filename="Terminal" copy curl -X POST 'http://localhost:4000/workflows/data-processing/trigger' \ -H 'Content-Type: application/json' \ -d '{ "inputValue": "process-user-data", "priority": "high" }' ``` ### Authentication - Local development: no auth required. - Production: protect the endpoint using an API key. Follow these steps: 1. Generate a token and hashed key (see the Token Generation section in the API Auth docs): ```bash filename="Terminal" copy moose generate hash-token # Outputs: # - ENV API Key (hashed) → for environment/config # - Bearer Token (plain) → for Authorization header ``` 2. Configure the server with the hashed key: ```bash copy MOOSE_CONSUMPTION_API_KEY="" ``` 3. Call the endpoint using the plain Bearer token from step 1: ```bash filename="Terminal" copy curl -X POST 'https://your-host/workflows/data-processing/trigger' \ -H 'Authorization: Bearer ' \ -H 'Content-Type: application/json' \ -d '{"inputValue":"process-user-data"}' ``` For details, see the API Auth page under “Token Generation” and “API Endpoints”. ### Response ```json filename="Response" { "workflowId": "data-processing-", "runId": "", } ``` In local development, the response also includes a `dashboardUrl` to Temporal UI: ```json filename="Response (dev)" { "workflowId": "data-processing-", "runId": "", "dashboardUrl": "http://localhost:8080/namespaces//workflows/data-processing-//history" } ``` ## Terminate a Running Workflow After triggering a workflow, you can terminate it via an HTTP endpoint. - Endpoint: `POST /workflows/{workflowId}/terminate` ### Request - Local development (no auth): ```bash filename="Terminal" copy curl -X POST 'http://localhost:4000/workflows/data-processing-/terminate' ``` - Production (Bearer token required): ```bash filename="Terminal" copy curl -X POST 'https://your-host/workflows/data-processing-/terminate' \ -H 'Authorization: Bearer '' ```