From 53d8c69e08816eec11f6786eb10260737c819491 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sun, 24 Aug 2025 13:45:00 -0700 Subject: [PATCH] script skyvern fallback (#3285) --- skyvern/forge/agent.py | 5 +- skyvern/forge/sdk/db/client.py | 5 +- skyvern/forge/sdk/models.py | 2 +- skyvern/schemas/steps.py | 24 +++ skyvern/services/script_service.py | 273 ++++++++++++++++++++++------- skyvern/webeye/actions/models.py | 19 +- 6 files changed, 246 insertions(+), 82 deletions(-) create mode 100644 skyvern/schemas/steps.py diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 0cc044c7..d65e5114 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -76,6 +76,7 @@ from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus from skyvern.schemas.runs import CUA_ENGINES, RunEngine +from skyvern.schemas.steps import AgentStepOutput from skyvern.services import run_service from skyvern.services.task_v1_service import is_cua_task from skyvern.utils.image_resizer import Resolution @@ -95,7 +96,7 @@ from skyvern.webeye.actions.actions import ( ) from skyvern.webeye.actions.caching import retrieve_action_plan from skyvern.webeye.actions.handler import ActionHandler, poll_verification_code -from skyvern.webeye.actions.models import AgentStepOutput, DetailedAgentStepOutput +from skyvern.webeye.actions.models import DetailedAgentStepOutput from skyvern.webeye.actions.parse_actions import ( parse_actions, parse_anthropic_actions, @@ -461,7 +462,7 @@ class ForgeAgent: llm_caller=llm_caller, ) await app.AGENT_FUNCTION.post_step_execution(task, step) - task = await self.update_task_errors_from_detailed_output(task, detailed_output) + task = await self.update_task_errors_from_detailed_output(task, detailed_output) # type: ignore retry = False if task_block and task_block.complete_on_download and task.workflow_run_id: diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 7cb1a6f4..c3c97094 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -105,9 +105,9 @@ from skyvern.forge.sdk.workflow.models.workflow import ( ) from skyvern.schemas.runs import ProxyLocation, RunEngine, RunType from skyvern.schemas.scripts import Script, ScriptBlock, ScriptFile +from skyvern.schemas.steps import AgentStepOutput from skyvern.schemas.workflows import BlockStatus, BlockType, WorkflowStatus from skyvern.webeye.actions.actions import Action -from skyvern.webeye.actions.models import AgentStepOutput LOG = structlog.get_logger() @@ -213,6 +213,7 @@ class AgentDB: order: int, retry_index: int, organization_id: str | None = None, + status: StepStatus = StepStatus.created, ) -> Step: try: async with self.Session() as session: @@ -220,7 +221,7 @@ class AgentDB: task_id=task_id, order=order, retry_index=retry_index, - status="created", + status=status, organization_id=organization_id, ) session.add(new_step) diff --git a/skyvern/forge/sdk/models.py b/skyvern/forge/sdk/models.py index 066c8e7c..e792c779 100644 --- a/skyvern/forge/sdk/models.py +++ b/skyvern/forge/sdk/models.py @@ -5,8 +5,8 @@ from enum import StrEnum from pydantic import BaseModel +from skyvern.schemas.steps import AgentStepOutput from skyvern.webeye.actions.action_types import ActionType -from skyvern.webeye.actions.models import AgentStepOutput class StepStatus(StrEnum): diff --git a/skyvern/schemas/steps.py b/skyvern/schemas/steps.py new file mode 100644 index 00000000..3cf49850 --- /dev/null +++ b/skyvern/schemas/steps.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from pydantic import BaseModel + +from skyvern.webeye.actions.actions import Action, UserDefinedError +from skyvern.webeye.actions.responses import ActionResult + + +class AgentStepOutput(BaseModel): + """ + Output of the agent step, this is recorded in the database. + """ + + # Will be deprecated once we move to the new format below + action_results: list[ActionResult] | None = None + # Nullable for backwards compatibility, once backfill is done, this won't be nullable anymore + actions_and_results: list[tuple[Action, list[ActionResult]]] | None = None + errors: list[UserDefinedError] = [] + + def __repr__(self) -> str: + return f"AgentStepOutput({self.model_dump()})" + + def __str__(self) -> str: + return self.__repr__() diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index da206eaf..6297f739 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -18,8 +18,11 @@ from skyvern.exceptions import ScriptNotFound, WorkflowRunNotFound from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.core import skyvern_context +from skyvern.forge.sdk.models import StepStatus from skyvern.forge.sdk.schemas.files import FileInfo from skyvern.forge.sdk.schemas.tasks import TaskOutput, TaskStatus +from skyvern.forge.sdk.workflow.models.block import TaskBlock +from skyvern.schemas.runs import RunEngine from skyvern.schemas.scripts import CreateScriptResponse, FileNode, ScriptFileCreate from skyvern.schemas.workflows import BlockStatus, BlockType @@ -245,14 +248,14 @@ async def _create_workflow_block_run_and_task( block_type: BlockType, prompt: str | None = None, url: str | None = None, -) -> tuple[str | None, str | None]: +) -> tuple[str | None, str | None, str | None]: """ Create a workflow block run and optionally a task if workflow_run_id is available in context. Returns (workflow_run_block_id, task_id) tuple. """ context = skyvern_context.current() if not context or not context.workflow_run_id or not context.organization_id: - return None, None + return None, None, None workflow_run_id = context.workflow_run_id organization_id = context.organization_id @@ -294,6 +297,7 @@ async def _create_workflow_block_run_and_task( order=0, retry_index=0, organization_id=organization_id, + status=StepStatus.running, ) step_id = step.step_id @@ -307,7 +311,7 @@ async def _create_workflow_block_run_and_task( context.step_id = step_id context.task_id = task_id - return workflow_run_block_id, task_id + return workflow_run_block_id, task_id, step_id except Exception as e: LOG.warning( @@ -317,7 +321,7 @@ async def _create_workflow_block_run_and_task( workflow_run_id=context.workflow_run_id, exc_info=True, ) - return None, None + return None, None, None async def _record_output_parameter_value( @@ -357,6 +361,9 @@ async def _update_workflow_block( status: BlockStatus, task_id: str | None = None, task_status: TaskStatus = TaskStatus.completed, + step_id: str | None = None, + step_status: StepStatus = StepStatus.completed, + is_last: bool | None = True, label: str | None = None, failure_reason: str | None = None, output: dict[str, Any] | list | str | None = None, @@ -368,6 +375,14 @@ async def _update_workflow_block( return final_output = output if task_id: + if step_id: + await app.DATABASE.update_step( + step_id=step_id, + task_id=task_id, + organization_id=context.organization_id, + status=step_status, + is_last=is_last, + ) updated_task = await app.DATABASE.update_task( task_id=task_id, organization_id=context.organization_id, @@ -430,6 +445,129 @@ async def _run_cached_function(cache_key: str) -> Any: raise Exception(f"Cache key {cache_key} not found") +async def _fallback_to_ai_run( + cache_key: str, + prompt: str | None = None, + url: str | None = None, + engine: RunEngine = RunEngine.skyvern_v1, + complete_criterion: str | None = None, + terminate_criterion: str | None = None, + data_extraction_goal: str | None = None, + schema: dict[str, Any] | list | str | None = None, + error_code_mapping: dict[str, str] | None = None, + max_steps: int | None = None, + complete_on_download: bool = False, + download_suffix: str | None = None, + totp_verification_url: str | None = None, + totp_identifier: str | None = None, + complete_verification: bool = True, + include_action_history_in_verification: bool = False, + error: Exception | None = None, + workflow_run_block_id: str | None = None, +) -> None: + context = skyvern_context.current() + if not ( + context + and context.organization_id + and context.workflow_run_id + and context.workflow_id + and context.task_id + and context.step_id + ): + return + try: + organization_id = context.organization_id + LOG.info( + "Script fallback to AI run", + cache_key=cache_key, + organization_id=organization_id, + workflow_id=context.workflow_id, + workflow_run_id=context.workflow_run_id, + task_id=context.task_id, + step_id=context.step_id, + ) + # 1. fail the previous step + previous_step = await app.DATABASE.update_step( + step_id=context.step_id, + task_id=context.task_id, + organization_id=organization_id, + status=StepStatus.failed, + ) + # 2. create a new step for ai run + ai_step = await app.DATABASE.create_step( + task_id=context.task_id, + organization_id=organization_id, + order=previous_step.order + 1, + retry_index=0, + ) + context.step_id = ai_step.step_id + # 3. build the task block + # 4. run execute_step + organization = await app.DATABASE.get_organization(organization_id=organization_id) + if not organization: + raise Exception(f"Organization is missing organization_id={organization_id}") + task = await app.DATABASE.get_task(task_id=context.task_id, organization_id=organization_id) + if not task: + raise Exception(f"Task is missing task_id={context.task_id}") + workflow = await app.DATABASE.get_workflow(workflow_id=context.workflow_id, organization_id=organization_id) + if not workflow: + return + + # get the output_paramter + output_parameter = workflow.get_output_parameter(cache_key) + if not output_parameter: + return + + task_block = TaskBlock( + label=cache_key, + url=task.url, + navigation_goal=prompt, + output_parameter=output_parameter, + title=cache_key, + engine=engine, + complete_criterion=complete_criterion, + terminate_criterion=terminate_criterion, + data_extraction_goal=data_extraction_goal, + data_schema=schema, + error_code_mapping=error_code_mapping, + max_steps_per_run=max_steps, + complete_on_download=complete_on_download, + download_suffix=download_suffix, + totp_verification_url=totp_verification_url, + totp_identifier=totp_identifier, + complete_verification=complete_verification, + include_action_history_in_verification=include_action_history_in_verification, + ) + await app.agent.execute_step( + organization=organization, + task=task, + step=ai_step, + task_block=task_block, + ) + # Update block status to completed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block( + workflow_run_block_id, + BlockStatus.completed, + task_id=context.task_id, + step_id=context.step_id, + label=cache_key, + ) + except Exception as e: + LOG.warning("Failed to fallback to AI run", cache_key=cache_key, exc_info=True) + # Update block status to failed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block( + workflow_run_block_id, + BlockStatus.failed, + task_id=context.task_id, + task_status=TaskStatus.failed, + label=cache_key, + failure_reason=str(e), + ) + raise e + + async def run_task( prompt: str, url: str | None = None, @@ -437,7 +575,7 @@ async def run_task( cache_key: str | None = None, ) -> None: # Auto-create workflow block run and task if workflow_run_id is available - workflow_run_block_id, task_id = await _create_workflow_block_run_and_task( + workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( block_type=BlockType.TASK, prompt=prompt, url=url, @@ -453,22 +591,22 @@ async def run_task( # Update block status to completed if workflow block was created if workflow_run_block_id: await _update_workflow_block( - workflow_run_block_id, BlockStatus.completed, task_id=task_id, label=cache_key + workflow_run_block_id, + BlockStatus.completed, + task_id=task_id, + step_id=step_id, + label=cache_key, ) except Exception as e: - # TODO: fallback to AI run in case of error - # Update block status to failed if workflow block was created - if workflow_run_block_id: - await _update_workflow_block( - workflow_run_block_id, - BlockStatus.failed, - task_id=task_id, - task_status=TaskStatus.failed, - label=cache_key, - failure_reason=str(e), - ) - raise + await _fallback_to_ai_run( + cache_key=cache_key, + prompt=prompt, + url=url, + max_steps=max_steps, + error=e, + workflow_run_block_id=workflow_run_block_id, + ) finally: # clear the prompt in the RunContext run_context.prompt = None @@ -479,6 +617,8 @@ async def run_task( BlockStatus.failed, task_id=task_id, task_status=TaskStatus.failed, + step_id=step_id, + step_status=StepStatus.failed, failure_reason="Cache key is required", ) run_context.prompt = None @@ -492,7 +632,7 @@ async def download( cache_key: str | None = None, ) -> None: # Auto-create workflow block run and task if workflow_run_id is available - workflow_run_block_id, task_id = await _create_workflow_block_run_and_task( + workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( block_type=BlockType.FILE_DOWNLOAD, prompt=prompt, url=url, @@ -508,21 +648,23 @@ async def download( # Update block status to completed if workflow block was created if workflow_run_block_id: await _update_workflow_block( - workflow_run_block_id, BlockStatus.completed, task_id=task_id, label=cache_key + workflow_run_block_id, + BlockStatus.completed, + task_id=task_id, + step_id=step_id, + label=cache_key, ) except Exception as e: - # Update block status to failed if workflow block was created - if workflow_run_block_id: - await _update_workflow_block( - workflow_run_block_id, - BlockStatus.failed, - task_id=task_id, - task_status=TaskStatus.failed, - label=cache_key, - failure_reason=str(e), - ) - raise + await _fallback_to_ai_run( + cache_key=cache_key, + prompt=prompt, + url=url, + max_steps=max_steps, + complete_on_download=True, + error=e, + workflow_run_block_id=workflow_run_block_id, + ) finally: run_context.prompt = None else: @@ -532,6 +674,8 @@ async def download( BlockStatus.failed, task_id=task_id, task_status=TaskStatus.failed, + step_id=step_id, + step_status=StepStatus.failed, failure_reason="Cache key is required", ) run_context.prompt = None @@ -545,7 +689,7 @@ async def action( cache_key: str | None = None, ) -> None: # Auto-create workflow block run and task if workflow_run_id is available - workflow_run_block_id, task_id = await _create_workflow_block_run_and_task( + workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( block_type=BlockType.ACTION, prompt=prompt, url=url, @@ -561,21 +705,22 @@ async def action( # Update block status to completed if workflow block was created if workflow_run_block_id: await _update_workflow_block( - workflow_run_block_id, BlockStatus.completed, task_id=task_id, label=cache_key + workflow_run_block_id, + BlockStatus.completed, + task_id=task_id, + step_id=step_id, + label=cache_key, ) except Exception as e: - # Update block status to failed if workflow block was created - if workflow_run_block_id: - await _update_workflow_block( - workflow_run_block_id, - BlockStatus.failed, - task_id=task_id, - task_status=TaskStatus.failed, - label=cache_key, - failure_reason=str(e), - ) - raise + await _fallback_to_ai_run( + cache_key=cache_key, + prompt=prompt, + url=url, + max_steps=max_steps, + error=e, + workflow_run_block_id=workflow_run_block_id, + ) finally: run_context.prompt = None else: @@ -585,6 +730,8 @@ async def action( BlockStatus.failed, task_id=task_id, task_status=TaskStatus.failed, + step_id=step_id, + step_status=StepStatus.failed, failure_reason="Cache key is required", ) run_context.prompt = None @@ -598,7 +745,7 @@ async def login( cache_key: str | None = None, ) -> None: # Auto-create workflow block run and task if workflow_run_id is available - workflow_run_block_id, task_id = await _create_workflow_block_run_and_task( + workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( block_type=BlockType.LOGIN, prompt=prompt, url=url, @@ -614,21 +761,22 @@ async def login( # Update block status to completed if workflow block was created if workflow_run_block_id: await _update_workflow_block( - workflow_run_block_id, BlockStatus.completed, task_id=task_id, label=cache_key + workflow_run_block_id, + BlockStatus.completed, + task_id=task_id, + step_id=step_id, + label=cache_key, ) except Exception as e: - # Update block status to failed if workflow block was created - if workflow_run_block_id: - await _update_workflow_block( - workflow_run_block_id, - BlockStatus.failed, - task_id=task_id, - task_status=TaskStatus.failed, - label=cache_key, - failure_reason=str(e), - ) - raise + await _fallback_to_ai_run( + cache_key=cache_key, + prompt=prompt, + url=url, + max_steps=max_steps, + error=e, + workflow_run_block_id=workflow_run_block_id, + ) finally: run_context.prompt = None else: @@ -638,6 +786,8 @@ async def login( BlockStatus.failed, task_id=task_id, task_status=TaskStatus.failed, + step_id=step_id, + step_status=StepStatus.failed, failure_reason="Cache key is required", ) run_context.prompt = None @@ -651,7 +801,7 @@ async def extract( cache_key: str | None = None, ) -> dict[str, Any] | list | str | None: # Auto-create workflow block run and task if workflow_run_id is available - workflow_run_block_id, task_id = await _create_workflow_block_run_and_task( + workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( block_type=BlockType.EXTRACTION, prompt=prompt, url=url, @@ -671,6 +821,7 @@ async def extract( workflow_run_block_id, BlockStatus.completed, task_id=task_id, + step_id=step_id, output=output, label=cache_key, ) @@ -683,6 +834,8 @@ async def extract( BlockStatus.failed, task_id=task_id, task_status=TaskStatus.failed, + step_id=step_id, + step_status=StepStatus.failed, failure_reason=str(e), output=output, label=cache_key, @@ -697,6 +850,8 @@ async def extract( BlockStatus.failed, task_id=task_id, task_status=TaskStatus.failed, + step_id=step_id, + step_status=StepStatus.failed, failure_reason="Cache key is required", ) run_context.prompt = None @@ -705,7 +860,7 @@ async def extract( async def wait(seconds: int) -> None: # Auto-create workflow block run if workflow_run_id is available (wait block doesn't create tasks) - workflow_run_block_id, _ = await _create_workflow_block_run_and_task(block_type=BlockType.WAIT) + workflow_run_block_id, _, _ = await _create_workflow_block_run_and_task(block_type=BlockType.WAIT) try: await asyncio.sleep(seconds) diff --git a/skyvern/webeye/actions/models.py b/skyvern/webeye/actions/models.py index b9c8eba6..65f810b2 100644 --- a/skyvern/webeye/actions/models.py +++ b/skyvern/webeye/actions/models.py @@ -6,29 +6,12 @@ from openai.types.responses.response import Response as OpenAIResponse from pydantic import BaseModel, ConfigDict from skyvern.config import settings +from skyvern.schemas.steps import AgentStepOutput from skyvern.webeye.actions.actions import Action, DecisiveAction, UserDefinedError from skyvern.webeye.actions.responses import ActionResult from skyvern.webeye.scraper.scraper import ScrapedPage -class AgentStepOutput(BaseModel): - """ - Output of the agent step, this is recorded in the database. - """ - - # Will be deprecated once we move to the new format below - action_results: list[ActionResult] | None = None - # Nullable for backwards compatibility, once backfill is done, this won't be nullable anymore - actions_and_results: list[tuple[Action, list[ActionResult]]] | None = None - errors: list[UserDefinedError] = [] - - def __repr__(self) -> str: - return f"AgentStepOutput({self.model_dump()})" - - def __str__(self) -> str: - return self.__repr__() - - class DetailedAgentStepOutput(BaseModel): """ Output of the agent step, this is not recorded in the database, only used for debugging in the Jupyter notebook.