call webhook when time out (#1612)
This commit is contained in:
@@ -409,6 +409,11 @@ class TaskAlreadyCanceled(SkyvernHTTPException):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TaskAlreadyTimeout(SkyvernException):
|
||||||
|
def __init__(self, task_id: str):
|
||||||
|
super().__init__(f"Task {task_id} is timed out")
|
||||||
|
|
||||||
|
|
||||||
class InvalidTaskStatusTransition(SkyvernHTTPException):
|
class InvalidTaskStatusTransition(SkyvernHTTPException):
|
||||||
def __init__(self, old_status: str, new_status: str, task_id: str):
|
def __init__(self, old_status: str, new_status: str, task_id: str):
|
||||||
super().__init__(f"Invalid task status transition from {old_status} to {new_status} for {task_id}")
|
super().__init__(f"Invalid task status transition from {old_status} to {new_status} for {task_id}")
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ from skyvern.exceptions import (
|
|||||||
StepTerminationError,
|
StepTerminationError,
|
||||||
StepUnableToExecuteError,
|
StepUnableToExecuteError,
|
||||||
TaskAlreadyCanceled,
|
TaskAlreadyCanceled,
|
||||||
|
TaskAlreadyTimeout,
|
||||||
TaskNotFound,
|
TaskNotFound,
|
||||||
UnsupportedActionType,
|
UnsupportedActionType,
|
||||||
UnsupportedTaskType,
|
UnsupportedTaskType,
|
||||||
@@ -266,6 +267,23 @@ class ForgeAgent:
|
|||||||
)
|
)
|
||||||
return step, None, None
|
return step, None, None
|
||||||
|
|
||||||
|
if workflow_run and workflow_run.status == WorkflowRunStatus.timed_out:
|
||||||
|
LOG.info(
|
||||||
|
"Workflow run is timed out, stopping execution inside task",
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
step_id=step.step_id,
|
||||||
|
)
|
||||||
|
step = await self.update_step(
|
||||||
|
step,
|
||||||
|
status=StepStatus.canceled,
|
||||||
|
is_last=True,
|
||||||
|
)
|
||||||
|
task = await self.update_task(
|
||||||
|
task,
|
||||||
|
status=TaskStatus.timed_out,
|
||||||
|
)
|
||||||
|
return step, None, None
|
||||||
|
|
||||||
refreshed_task = await app.DATABASE.get_task(task_id=task.task_id, organization_id=organization.organization_id)
|
refreshed_task = await app.DATABASE.get_task(task_id=task.task_id, organization_id=organization.organization_id)
|
||||||
if refreshed_task:
|
if refreshed_task:
|
||||||
task = refreshed_task
|
task = refreshed_task
|
||||||
@@ -473,6 +491,20 @@ class ForgeAgent:
|
|||||||
step_id=step.step_id,
|
step_id=step.step_id,
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
except TaskAlreadyTimeout:
|
||||||
|
LOG.warning(
|
||||||
|
"Task is timed out, stopping execution",
|
||||||
|
task_id=task.task_id,
|
||||||
|
step=step.step_id,
|
||||||
|
)
|
||||||
|
await self.clean_up_task(
|
||||||
|
task=task,
|
||||||
|
last_step=step,
|
||||||
|
api_key=api_key,
|
||||||
|
close_browser_on_completion=close_browser_on_completion,
|
||||||
|
browser_session_id=browser_session_id,
|
||||||
|
)
|
||||||
|
return step, detailed_output, None
|
||||||
except StepTerminationError as e:
|
except StepTerminationError as e:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
"Step cannot be executed, marking task as failed",
|
"Step cannot be executed, marking task as failed",
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ from playwright.async_api import Frame, Page
|
|||||||
|
|
||||||
from skyvern.config import settings
|
from skyvern.config import settings
|
||||||
from skyvern.constants import SKYVERN_ID_ATTR
|
from skyvern.constants import SKYVERN_ID_ATTR
|
||||||
from skyvern.exceptions import StepUnableToExecuteError
|
from skyvern.exceptions import StepUnableToExecuteError, TaskAlreadyTimeout
|
||||||
from skyvern.forge import app
|
from skyvern.forge import app
|
||||||
from skyvern.forge.async_operations import AsyncOperation
|
from skyvern.forge.async_operations import AsyncOperation
|
||||||
from skyvern.forge.prompts import prompt_engine
|
from skyvern.forge.prompts import prompt_engine
|
||||||
@@ -362,6 +362,9 @@ class AgentFunction:
|
|||||||
:return: A tuple of whether the step can be executed and a list of reasons why it can't be executed.
|
:return: A tuple of whether the step can be executed and a list of reasons why it can't be executed.
|
||||||
"""
|
"""
|
||||||
reasons = []
|
reasons = []
|
||||||
|
if task.status == TaskStatus.timed_out:
|
||||||
|
raise TaskAlreadyTimeout(task_id=task.task_id)
|
||||||
|
|
||||||
# can't execute if task status is not running
|
# can't execute if task status is not running
|
||||||
has_valid_task_status = task.status == TaskStatus.running
|
has_valid_task_status = task.status == TaskStatus.running
|
||||||
if not has_valid_task_status:
|
if not has_valid_task_status:
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from zoneinfo import ZoneInfo
|
|||||||
|
|
||||||
from pydantic import BaseModel, Field, field_validator
|
from pydantic import BaseModel, Field, field_validator
|
||||||
|
|
||||||
from skyvern.exceptions import InvalidTaskStatusTransition, TaskAlreadyCanceled
|
from skyvern.exceptions import InvalidTaskStatusTransition, TaskAlreadyCanceled, TaskAlreadyTimeout
|
||||||
from skyvern.forge.sdk.core.validators import validate_url
|
from skyvern.forge.sdk.core.validators import validate_url
|
||||||
from skyvern.forge.sdk.db.enums import TaskType
|
from skyvern.forge.sdk.db.enums import TaskType
|
||||||
|
|
||||||
@@ -277,6 +277,8 @@ class Task(TaskBase):
|
|||||||
if not old_status.can_update_to(status):
|
if not old_status.can_update_to(status):
|
||||||
if old_status == TaskStatus.canceled:
|
if old_status == TaskStatus.canceled:
|
||||||
raise TaskAlreadyCanceled(new_status=status, task_id=self.task_id)
|
raise TaskAlreadyCanceled(new_status=status, task_id=self.task_id)
|
||||||
|
if old_status == TaskStatus.timed_out:
|
||||||
|
raise TaskAlreadyTimeout(task_id=self.task_id)
|
||||||
raise InvalidTaskStatusTransition(old_status=old_status, new_status=status, task_id=self.task_id)
|
raise InvalidTaskStatusTransition(old_status=old_status, new_status=status, task_id=self.task_id)
|
||||||
|
|
||||||
if status.requires_failure_reason() and failure_reason is None:
|
if status.requires_failure_reason() and failure_reason is None:
|
||||||
|
|||||||
@@ -95,6 +95,7 @@ class BlockStatus(StrEnum):
|
|||||||
failed = "failed"
|
failed = "failed"
|
||||||
terminated = "terminated"
|
terminated = "terminated"
|
||||||
canceled = "canceled"
|
canceled = "canceled"
|
||||||
|
timed_out = "timed_out"
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
@@ -614,6 +615,7 @@ class BaseTaskBlock(Block):
|
|||||||
TaskStatus.terminated: BlockStatus.terminated,
|
TaskStatus.terminated: BlockStatus.terminated,
|
||||||
TaskStatus.failed: BlockStatus.failed,
|
TaskStatus.failed: BlockStatus.failed,
|
||||||
TaskStatus.canceled: BlockStatus.canceled,
|
TaskStatus.canceled: BlockStatus.canceled,
|
||||||
|
TaskStatus.timed_out: BlockStatus.timed_out,
|
||||||
}
|
}
|
||||||
if updated_task.status == TaskStatus.completed or updated_task.status == TaskStatus.terminated:
|
if updated_task.status == TaskStatus.completed or updated_task.status == TaskStatus.terminated:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
@@ -653,6 +655,23 @@ class BaseTaskBlock(Block):
|
|||||||
workflow_run_block_id=workflow_run_block_id,
|
workflow_run_block_id=workflow_run_block_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
)
|
)
|
||||||
|
elif updated_task.status == TaskStatus.timed_out:
|
||||||
|
LOG.info(
|
||||||
|
"Task timed out, making the block time out",
|
||||||
|
task_id=updated_task.task_id,
|
||||||
|
task_status=updated_task.status,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
workflow_id=workflow.workflow_id,
|
||||||
|
organization_id=workflow.organization_id,
|
||||||
|
)
|
||||||
|
return await self.build_block_result(
|
||||||
|
success=False,
|
||||||
|
failure_reason=updated_task.failure_reason,
|
||||||
|
output_parameter_value=None,
|
||||||
|
status=block_status_mapping[updated_task.status],
|
||||||
|
workflow_run_block_id=workflow_run_block_id,
|
||||||
|
organization_id=organization_id,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
current_retry += 1
|
current_retry += 1
|
||||||
will_retry = current_retry <= self.max_retries
|
will_retry = current_retry <= self.max_retries
|
||||||
|
|||||||
@@ -247,6 +247,25 @@ class WorkflowService:
|
|||||||
browser_session_id=browser_session_id,
|
browser_session_id=browser_session_id,
|
||||||
)
|
)
|
||||||
return workflow_run
|
return workflow_run
|
||||||
|
|
||||||
|
if refreshed_workflow_run and refreshed_workflow_run.status == WorkflowRunStatus.timed_out:
|
||||||
|
LOG.info(
|
||||||
|
"Workflow run is timed out, stopping execution inside workflow execution loop",
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_type=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
await self.clean_up_workflow(
|
||||||
|
workflow=workflow,
|
||||||
|
workflow_run=workflow_run,
|
||||||
|
api_key=api_key,
|
||||||
|
need_call_webhook=True,
|
||||||
|
close_browser_on_completion=browser_session_id is None,
|
||||||
|
browser_session_id=browser_session_id,
|
||||||
|
)
|
||||||
|
return workflow_run
|
||||||
|
|
||||||
parameters = block.get_all_parameters(workflow_run_id)
|
parameters = block.get_all_parameters(workflow_run_id)
|
||||||
await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run(
|
await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run(
|
||||||
workflow_run_id, parameters, organization
|
workflow_run_id, parameters, organization
|
||||||
@@ -356,6 +375,42 @@ class WorkflowService:
|
|||||||
block_label=block.label,
|
block_label=block.label,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
elif block_result.status == BlockStatus.timed_out:
|
||||||
|
LOG.info(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, marking workflow run as failed",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not block.continue_on_failure:
|
||||||
|
failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out. Reason: {block_result.failure_reason}"
|
||||||
|
await self.mark_workflow_run_as_failed(
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
||||||
|
)
|
||||||
|
await self.clean_up_workflow(
|
||||||
|
workflow=workflow,
|
||||||
|
workflow_run=workflow_run,
|
||||||
|
api_key=api_key,
|
||||||
|
close_browser_on_completion=browser_session_id is None,
|
||||||
|
browser_session_id=browser_session_id,
|
||||||
|
)
|
||||||
|
return workflow_run
|
||||||
|
|
||||||
|
LOG.warning(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, but will continue executing the workflow run",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
continue_on_failure=block.continue_on_failure,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception(
|
LOG.exception(
|
||||||
f"Error while executing workflow run {workflow_run.workflow_run_id}",
|
f"Error while executing workflow run {workflow_run.workflow_run_id}",
|
||||||
@@ -390,11 +445,12 @@ class WorkflowService:
|
|||||||
WorkflowRunStatus.canceled,
|
WorkflowRunStatus.canceled,
|
||||||
WorkflowRunStatus.failed,
|
WorkflowRunStatus.failed,
|
||||||
WorkflowRunStatus.terminated,
|
WorkflowRunStatus.terminated,
|
||||||
|
WorkflowRunStatus.timed_out,
|
||||||
):
|
):
|
||||||
await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
|
await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
|
||||||
else:
|
else:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Workflow run is already canceled, failed, or terminated, not marking as completed",
|
"Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed",
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
workflow_run_status=refreshed_workflow_run.status if refreshed_workflow_run else None,
|
workflow_run_status=refreshed_workflow_run.status if refreshed_workflow_run else None,
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user