diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index 24da926d..c072c897 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -409,6 +409,11 @@ class TaskAlreadyCanceled(SkyvernHTTPException): ) +class TaskAlreadyTimeout(SkyvernException): + def __init__(self, task_id: str): + super().__init__(f"Task {task_id} is timed out") + + class InvalidTaskStatusTransition(SkyvernHTTPException): def __init__(self, old_status: str, new_status: str, task_id: str): super().__init__(f"Invalid task status transition from {old_status} to {new_status} for {task_id}") diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 421ad466..e339d07f 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -37,6 +37,7 @@ from skyvern.exceptions import ( StepTerminationError, StepUnableToExecuteError, TaskAlreadyCanceled, + TaskAlreadyTimeout, TaskNotFound, UnsupportedActionType, UnsupportedTaskType, @@ -266,6 +267,23 @@ class ForgeAgent: ) return step, None, None + if workflow_run and workflow_run.status == WorkflowRunStatus.timed_out: + LOG.info( + "Workflow run is timed out, stopping execution inside task", + workflow_run_id=workflow_run.workflow_run_id, + step_id=step.step_id, + ) + step = await self.update_step( + step, + status=StepStatus.canceled, + is_last=True, + ) + task = await self.update_task( + task, + status=TaskStatus.timed_out, + ) + return step, None, None + refreshed_task = await app.DATABASE.get_task(task_id=task.task_id, organization_id=organization.organization_id) if refreshed_task: task = refreshed_task @@ -473,6 +491,20 @@ class ForgeAgent: step_id=step.step_id, ) raise + except TaskAlreadyTimeout: + LOG.warning( + "Task is timed out, stopping execution", + task_id=task.task_id, + step=step.step_id, + ) + await self.clean_up_task( + task=task, + last_step=step, + api_key=api_key, + close_browser_on_completion=close_browser_on_completion, + browser_session_id=browser_session_id, + ) + return step, detailed_output, None except StepTerminationError as e: LOG.warning( "Step cannot be executed, marking task as failed", diff --git a/skyvern/forge/agent_functions.py b/skyvern/forge/agent_functions.py index e02cfec7..020b4bd8 100644 --- a/skyvern/forge/agent_functions.py +++ b/skyvern/forge/agent_functions.py @@ -9,7 +9,7 @@ from playwright.async_api import Frame, Page from skyvern.config import settings from skyvern.constants import SKYVERN_ID_ATTR -from skyvern.exceptions import StepUnableToExecuteError +from skyvern.exceptions import StepUnableToExecuteError, TaskAlreadyTimeout from skyvern.forge import app from skyvern.forge.async_operations import AsyncOperation from skyvern.forge.prompts import prompt_engine @@ -362,6 +362,9 @@ class AgentFunction: :return: A tuple of whether the step can be executed and a list of reasons why it can't be executed. """ reasons = [] + if task.status == TaskStatus.timed_out: + raise TaskAlreadyTimeout(task_id=task.task_id) + # can't execute if task status is not running has_valid_task_status = task.status == TaskStatus.running if not has_valid_task_status: diff --git a/skyvern/forge/sdk/schemas/tasks.py b/skyvern/forge/sdk/schemas/tasks.py index b5f6d3e8..47e0da2f 100644 --- a/skyvern/forge/sdk/schemas/tasks.py +++ b/skyvern/forge/sdk/schemas/tasks.py @@ -7,7 +7,7 @@ from zoneinfo import ZoneInfo from pydantic import BaseModel, Field, field_validator -from skyvern.exceptions import InvalidTaskStatusTransition, TaskAlreadyCanceled +from skyvern.exceptions import InvalidTaskStatusTransition, TaskAlreadyCanceled, TaskAlreadyTimeout from skyvern.forge.sdk.core.validators import validate_url from skyvern.forge.sdk.db.enums import TaskType @@ -277,6 +277,8 @@ class Task(TaskBase): if not old_status.can_update_to(status): if old_status == TaskStatus.canceled: raise TaskAlreadyCanceled(new_status=status, task_id=self.task_id) + if old_status == TaskStatus.timed_out: + raise TaskAlreadyTimeout(task_id=self.task_id) raise InvalidTaskStatusTransition(old_status=old_status, new_status=status, task_id=self.task_id) if status.requires_failure_reason() and failure_reason is None: diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 2b0f6354..ca160f40 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -95,6 +95,7 @@ class BlockStatus(StrEnum): failed = "failed" terminated = "terminated" canceled = "canceled" + timed_out = "timed_out" @dataclass(frozen=True) @@ -614,6 +615,7 @@ class BaseTaskBlock(Block): TaskStatus.terminated: BlockStatus.terminated, TaskStatus.failed: BlockStatus.failed, TaskStatus.canceled: BlockStatus.canceled, + TaskStatus.timed_out: BlockStatus.timed_out, } if updated_task.status == TaskStatus.completed or updated_task.status == TaskStatus.terminated: LOG.info( @@ -653,6 +655,23 @@ class BaseTaskBlock(Block): workflow_run_block_id=workflow_run_block_id, organization_id=organization_id, ) + elif updated_task.status == TaskStatus.timed_out: + LOG.info( + "Task timed out, making the block time out", + task_id=updated_task.task_id, + task_status=updated_task.status, + workflow_run_id=workflow_run_id, + workflow_id=workflow.workflow_id, + organization_id=workflow.organization_id, + ) + return await self.build_block_result( + success=False, + failure_reason=updated_task.failure_reason, + output_parameter_value=None, + status=block_status_mapping[updated_task.status], + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) else: current_retry += 1 will_retry = current_retry <= self.max_retries diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 1f713223..29e60bfb 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -247,6 +247,25 @@ class WorkflowService: browser_session_id=browser_session_id, ) return workflow_run + + if refreshed_workflow_run and refreshed_workflow_run.status == WorkflowRunStatus.timed_out: + LOG.info( + "Workflow run is timed out, stopping execution inside workflow execution loop", + workflow_run_id=workflow_run.workflow_run_id, + block_idx=block_idx, + block_type=block.block_type, + block_label=block.label, + ) + await self.clean_up_workflow( + workflow=workflow, + workflow_run=workflow_run, + api_key=api_key, + need_call_webhook=True, + close_browser_on_completion=browser_session_id is None, + browser_session_id=browser_session_id, + ) + return workflow_run + parameters = block.get_all_parameters(workflow_run_id) await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run( workflow_run_id, parameters, organization @@ -356,6 +375,42 @@ class WorkflowService: block_label=block.label, ) + elif block_result.status == BlockStatus.timed_out: + LOG.info( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, marking workflow run as failed", + block_type=block.block_type, + workflow_run_id=workflow_run.workflow_run_id, + block_idx=block_idx, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + + if not block.continue_on_failure: + failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out. Reason: {block_result.failure_reason}" + await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + ) + await self.clean_up_workflow( + workflow=workflow, + workflow_run=workflow_run, + api_key=api_key, + close_browser_on_completion=browser_session_id is None, + browser_session_id=browser_session_id, + ) + return workflow_run + + LOG.warning( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, but will continue executing the workflow run", + block_type=block.block_type, + workflow_run_id=workflow_run.workflow_run_id, + block_idx=block_idx, + block_result=block_result, + continue_on_failure=block.continue_on_failure, + block_type_var=block.block_type, + block_label=block.label, + ) + except Exception as e: LOG.exception( f"Error while executing workflow run {workflow_run.workflow_run_id}", @@ -390,11 +445,12 @@ class WorkflowService: WorkflowRunStatus.canceled, WorkflowRunStatus.failed, WorkflowRunStatus.terminated, + WorkflowRunStatus.timed_out, ): await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id) else: LOG.info( - "Workflow run is already canceled, failed, or terminated, not marking as completed", + "Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed", workflow_run_id=workflow_run.workflow_run_id, workflow_run_status=refreshed_workflow_run.status if refreshed_workflow_run else None, )