From 2b4829f87a414c8e63f2a6192aa0468ad57ed789 Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Sat, 25 May 2024 18:24:35 -0700 Subject: [PATCH] Introduce ActionFunction to make it easy to patch and do extra validations before step starts (#365) Co-authored-by: Shuchang Zheng --- skyvern/exceptions.py | 5 +++ skyvern/forge/agent.py | 57 ++++++++++++++------------------ skyvern/forge/agent_functions.py | 44 ++++++++++++++++++++++++ skyvern/forge/app.py | 6 ++-- 4 files changed, 76 insertions(+), 36 deletions(-) create mode 100644 skyvern/forge/agent_functions.py diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index 1199503d..7477c262 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -262,3 +262,8 @@ class BitwardenLogoutError(BitwardenBaseError): class UnknownElementTreeFormat(SkyvernException): def __init__(self, fmt: str) -> None: super().__init__(f"Unknown element tree format {fmt}") + + +class StepTerminationError(SkyvernException): + def __init__(self, step_id: str, reason: str) -> None: + super().__init__(f"Step {step_id} cannot be executed and task is terminated. Reason: {reason}") diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 1a94fb93..d2f0dd1e 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -17,6 +17,7 @@ from skyvern.exceptions import ( FailedToSendWebhook, InvalidWorkflowTaskURLState, MissingBrowserStatePage, + StepTerminationError, TaskNotFound, ) from skyvern.forge import app @@ -79,34 +80,6 @@ class ForgeAgent: ) self.async_operation_pool = AsyncOperationPool() - async def validate_step_execution( - self, - task: Task, - step: Step, - ) -> None: - """ - Checks if the step can be executed. - :return: A tuple of whether the step can be executed and a list of reasons why it can't be executed. - """ - reasons = [] - # can't execute if task status is not running - has_valid_task_status = task.status == TaskStatus.running - if not has_valid_task_status: - reasons.append(f"invalid_task_status:{task.status}") - # can't execute if the step is already running or completed - has_valid_step_status = step.status in [StepStatus.created, StepStatus.failed] - if not has_valid_step_status: - reasons.append(f"invalid_step_status:{step.status}") - # can't execute if the task has another step that is running - steps = await app.DATABASE.get_task_steps(task_id=task.task_id, organization_id=task.organization_id) - has_no_running_steps = not any(step.status == StepStatus.running for step in steps) - if not has_no_running_steps: - reasons.append(f"another_step_is_running_for_task:{task.task_id}") - - can_execute = has_valid_task_status and has_valid_step_status and has_no_running_steps - if not can_execute: - raise Exception(f"Cannot execute step. Reasons: {reasons}, Step: {step}") - async def create_task_and_step_from_block( self, task_block: TaskBlock, @@ -211,9 +184,7 @@ class ForgeAgent: return task def register_async_operations(self, organization: Organization, task: Task, page: Page) -> None: - if not app.generate_async_operations: - return - operations = app.generate_async_operations(organization, task, page) + operations = app.AGENT_FUNCTION.generate_async_operations(organization, task, page) self.async_operation_pool.add_operations(task.task_id, operations) async def execute_step( @@ -229,7 +200,7 @@ class ForgeAgent: detailed_output: DetailedAgentStepOutput | None = None try: # Check some conditions before executing the step, throw an exception if the step can't be executed - await self.validate_step_execution(task, step) + await app.AGENT_FUNCTION.validate_step_execution(task, step) ( step, browser_state, @@ -323,6 +294,28 @@ class ForgeAgent: return step, detailed_output, next_step # TODO (kerem): Let's add other exceptions that we know about here as custom exceptions as well + except StepTerminationError as e: + LOG.error( + "Step cannot be executed. Task terminated", + task_id=task.task_id, + step_id=step.step_id, + ) + await self.update_step( + step=step, + status=StepStatus.failed, + ) + task = await self.update_task( + task, + status=TaskStatus.failed, + failure_reason=e.message, + ) + await self.send_task_response( + task=task, + last_step=step, + api_key=api_key, + close_browser_on_completion=close_browser_on_completion, + ) + return step, detailed_output, next_step except FailedToSendWebhook: LOG.exception( "Failed to send webhook", diff --git a/skyvern/forge/agent_functions.py b/skyvern/forge/agent_functions.py new file mode 100644 index 00000000..bbdafe22 --- /dev/null +++ b/skyvern/forge/agent_functions.py @@ -0,0 +1,44 @@ +from playwright.async_api import Page + +from skyvern.forge import app +from skyvern.forge.async_operations import AsyncOperation +from skyvern.forge.sdk.models import Organization, Step, StepStatus +from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus + + +class AgentFunction: + async def validate_step_execution( + self, + task: Task, + step: Step, + ) -> None: + """ + Checks if the step can be executed. + :return: A tuple of whether the step can be executed and a list of reasons why it can't be executed. + """ + reasons = [] + # can't execute if task status is not running + has_valid_task_status = task.status == TaskStatus.running + if not has_valid_task_status: + reasons.append(f"invalid_task_status:{task.status}") + # can't execute if the step is already running or completed + has_valid_step_status = step.status in [StepStatus.created, StepStatus.failed] + if not has_valid_step_status: + reasons.append(f"invalid_step_status:{step.status}") + # can't execute if the task has another step that is running + steps = await app.DATABASE.get_task_steps(task_id=task.task_id, organization_id=task.organization_id) + has_no_running_steps = not any(step.status == StepStatus.running for step in steps) + if not has_no_running_steps: + reasons.append(f"another_step_is_running_for_task:{task.task_id}") + + can_execute = has_valid_task_status and has_valid_step_status and has_no_running_steps + if not can_execute: + raise Exception(f"Cannot execute step. Reasons: {reasons}, Step: {step}") + + def generate_async_operations( + self, + organization: Organization, + task: Task, + page: Page, + ) -> list[AsyncOperation]: + return [] diff --git a/skyvern/forge/app.py b/skyvern/forge/app.py index 76d07bf6..e9f54616 100644 --- a/skyvern/forge/app.py +++ b/skyvern/forge/app.py @@ -3,10 +3,9 @@ from typing import Awaitable, Callable from ddtrace import tracer from ddtrace.filters import FilterRequestsOnUrl from fastapi import FastAPI -from playwright.async_api import Page from skyvern.forge.agent import ForgeAgent -from skyvern.forge.async_operations import AsyncOperation +from skyvern.forge.agent_functions import AgentFunction from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory from skyvern.forge.sdk.artifact.manager import ArtifactManager from skyvern.forge.sdk.artifact.storage.factory import StorageFactory @@ -14,7 +13,6 @@ from skyvern.forge.sdk.db.client import AgentDB from skyvern.forge.sdk.experimentation.providers import BaseExperimentationProvider, NoOpExperimentationProvider from skyvern.forge.sdk.forge_log import setup_logger from skyvern.forge.sdk.models import Organization -from skyvern.forge.sdk.schemas.tasks import Task from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.workflow.context_manager import WorkflowContextManager from skyvern.forge.sdk.workflow.service import WorkflowService @@ -41,7 +39,7 @@ EXPERIMENTATION_PROVIDER: BaseExperimentationProvider = NoOpExperimentationProvi LLM_API_HANDLER = LLMAPIHandlerFactory.get_llm_api_handler(SettingsManager.get_settings().LLM_KEY) WORKFLOW_CONTEXT_MANAGER = WorkflowContextManager() WORKFLOW_SERVICE = WorkflowService() -generate_async_operations: Callable[[Organization, Task, Page], list[AsyncOperation]] | None = None +AGENT_FUNCTION = AgentFunction() authentication_function: Callable[[str], Awaitable[Organization]] | None = None setup_api_app: Callable[[FastAPI], None] | None = None