From bd041ed52f112c07e7b28c882988d45c4d712e2c Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sun, 18 Jan 2026 11:17:02 -0800 Subject: [PATCH] add action result, including downloaded files to step.output for both agent and caching runs (#4477) --- .../script_generations/script_skyvern_page.py | 198 ++++++++++++++++-- .../core/script_generations/skyvern_page.py | 8 +- skyvern/forge/sdk/db/agent_db.py | 2 +- skyvern/services/action_service.py | 9 +- skyvern/services/script_service.py | 23 +- skyvern/webeye/actions/actions.py | 2 + skyvern/webeye/actions/handler.py | 14 ++ skyvern/webeye/actions/responses.py | 5 + 8 files changed, 245 insertions(+), 16 deletions(-) diff --git a/skyvern/core/script_generations/script_skyvern_page.py b/skyvern/core/script_generations/script_skyvern_page.py index d545cdc8..9e4d6778 100644 --- a/skyvern/core/script_generations/script_skyvern_page.py +++ b/skyvern/core/script_generations/script_skyvern_page.py @@ -1,20 +1,30 @@ from __future__ import annotations import asyncio +import os +from pathlib import Path from typing import Any, Callable import structlog from playwright.async_api import Page from skyvern.config import settings +from skyvern.constants import BROWSER_DOWNLOAD_TIMEOUT from skyvern.core.script_generations.real_skyvern_page_ai import RealSkyvernPageAi, render_template from skyvern.core.script_generations.skyvern_page import ActionCall, ActionMetadata, RunContext, SkyvernPage from skyvern.core.script_generations.skyvern_page_ai import SkyvernPageAi +from skyvern.errors.errors import UserDefinedError from skyvern.exceptions import ScriptTerminationException, WorkflowRunNotFound from skyvern.forge import app from skyvern.forge.prompts import prompt_engine +from skyvern.forge.sdk.api.files import ( + check_downloading_files_and_wait_for_download_to_complete, + get_path_for_workflow_download_directory, + list_files_in_directory, +) from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context +from skyvern.schemas.steps import AgentStepOutput from skyvern.services.otp_service import poll_otp_value from skyvern.utils.url_validators import prepend_scheme_and_validate_url from skyvern.webeye.actions.action_types import ActionType @@ -22,6 +32,7 @@ from skyvern.webeye.actions.actions import ( Action, ActionStatus, CompleteAction, + DecisiveAction, ExtractAction, SelectOption, SolveCaptchaAction, @@ -32,6 +43,7 @@ from skyvern.webeye.actions.handler import ( get_actual_value_of_parameter_if_secret, handle_complete_action, ) +from skyvern.webeye.actions.responses import ActionFailure, ActionResult, ActionSuccess from skyvern.webeye.browser_state import BrowserState from skyvern.webeye.scraper.scraped_page import ScrapedPage @@ -122,6 +134,29 @@ class ScriptSkyvernPage(SkyvernPage): support_empty_page=True, ) + async def _ensure_download_to_complete( + self, + download_dir: Path, + browser_session_id: str | None = None, + ) -> None: + context = skyvern_context.current() + if not context or not context.organization_id: + return + if not download_dir.exists(): + return + organization_id = context.organization_id + download_timeout = BROWSER_DOWNLOAD_TIMEOUT + if context.task_id: + task = await app.DATABASE.get_task(context.task_id, organization_id=organization_id) + if task and task.download_timeout: + download_timeout = task.download_timeout + await check_downloading_files_and_wait_for_download_to_complete( + download_dir=download_dir, + organization_id=organization_id, + browser_session_id=browser_session_id, + timeout=download_timeout, + ) + async def _decorate_call( self, fn: Callable, @@ -176,6 +211,28 @@ class ScriptSkyvernPage(SkyvernPage): else: print() + # Download detection for click actions + download_triggered: bool | None = None + downloaded_files: list[str] | None = None + files_before: list[str] = [] + download_dir: Path | None = None + + # Capture files before click action for download detection + if action == ActionType.CLICK and context and context.workflow_run_id: + try: + download_dir = get_path_for_workflow_download_directory(context.workflow_run_id) + if download_dir.exists(): + files_before = list_files_in_directory(download_dir) + if context.browser_session_id and context.organization_id: + browser_session_downloaded_files = await app.STORAGE.list_downloaded_files_in_browser_session( + organization_id=context.organization_id, + browser_session_id=context.browser_session_id, + ) + files_before = files_before + browser_session_downloaded_files + + except Exception: + pass # Don't block action execution if file listing fails + try: call.result = await fn(self, *args, **kwargs) @@ -198,23 +255,58 @@ class ScriptSkyvernPage(SkyvernPage): # LLM fallback hook could go here ... raise finally: + # Add a small buffer between cached actions to give slow pages time to settle + if settings.CACHED_ACTION_DELAY_SECONDS > 0: + await asyncio.sleep(settings.CACHED_ACTION_DELAY_SECONDS) + + # Check for downloaded files after click action + if action == ActionType.CLICK and context and context.workflow_run_id and download_dir: + try: + if download_dir.exists(): + await self._ensure_download_to_complete( + download_dir=download_dir, + browser_session_id=context.browser_session_id, + ) + files_after = list_files_in_directory(download_dir) + if context.browser_session_id and context.organization_id: + browser_session_downloaded_files = ( + await app.STORAGE.list_downloaded_files_in_browser_session( + organization_id=context.organization_id, + browser_session_id=context.browser_session_id, + ) + ) + files_after = files_after + browser_session_downloaded_files + + new_file_paths = set(files_after) - set(files_before) + if new_file_paths: + download_triggered = True + downloaded_files = [os.path.basename(fp) for fp in new_file_paths] + LOG.info( + "Script click action detected download", + downloaded_files=downloaded_files, + workflow_run_id=context.workflow_run_id, + ) + else: + download_triggered = False + except Exception: + pass # Don't block if download detection fails + self._record(call) - # Auto-create action after execution - await self._create_action_after_execution( + # Auto-create action after execution and store result + await self._create_action_and_result_after_execution( action_type=action, intention=prompt, status=action_status, kwargs=kwargs, call_result=call.result, + call_error=call.error, + download_triggered=download_triggered, + downloaded_files=downloaded_files, ) # Auto-create screenshot artifact after execution await self._create_screenshot_after_execution() - # Add a small buffer between cached actions to give slow pages time to settle - if settings.CACHED_ACTION_DELAY_SECONDS > 0: - await asyncio.sleep(settings.CACHED_ACTION_DELAY_SECONDS) - async def _update_action_reasoning( self, action_id: str, @@ -265,20 +357,27 @@ class ScriptSkyvernPage(SkyvernPage): ) return reasoning - async def _create_action_after_execution( + async def _create_action_and_result_after_execution( self, action_type: ActionType, intention: str = "", status: ActionStatus = ActionStatus.pending, kwargs: dict[str, Any] | None = None, call_result: Any | None = None, - ) -> Action | None: - """Create an action record in the database before execution if task_id and step_id are available.""" + call_error: Exception | None = None, + download_triggered: bool | None = None, + downloaded_files: list[str] | None = None, + ) -> tuple[Action | None, list[ActionResult]]: + """Create an action record and result in the database after execution if task_id and step_id are available. + + Returns a tuple of (Action, list[ActionResult]) similar to how the agent stores actions and results. + """ + results: list[ActionResult] = [] try: context = skyvern_context.current() if not context or not context.task_id or not context.step_id: - return None + return None, results # Create action record. TODO: store more action fields kwargs = kwargs or {} @@ -320,6 +419,9 @@ class ScriptSkyvernPage(SkyvernPage): file_url=file_url, response=response, xpath=xpath, + download=download_triggered, + download_triggered=download_triggered, + downloaded_files=downloaded_files, created_by="script", ) data_extraction_goal = None @@ -363,11 +465,32 @@ class ScriptSkyvernPage(SkyvernPage): context.action_order += 1 - return created_action + # Create ActionResult based on success/failure + if call_error: + result = ActionFailure(exception=call_error, download_triggered=download_triggered) + else: + # For extract actions, include the extracted data in the result + result_data = None + if action_type == ActionType.EXTRACT and call_result: + result_data = call_result + result = ActionSuccess( + data=result_data, + download_triggered=download_triggered, + downloaded_files=downloaded_files, + ) + + results = [result] + + # Store action and results in RunContext for step output + run_context = script_run_context_manager.get_run_context() + if run_context: + run_context.actions_and_results.append((created_action, results)) + + return created_action, results except Exception: # If action creation fails, don't block the actual action execution - return None + return None, results @classmethod async def _create_screenshot_after_execution(cls) -> None: @@ -492,6 +615,15 @@ class ScriptSkyvernPage(SkyvernPage): task = await app.DATABASE.get_task(context.task_id, context.organization_id) step = await app.DATABASE.get_step(context.step_id, context.organization_id) if task and step: + # CRITICAL: Update step.output with actions_and_results BEFORE validation + # This ensures complete_verify() can access action history (including download info) + # when checking if the goal was achieved + await self._update_step_output_before_complete(context) + # Refresh step to get updated output for validation + step = await app.DATABASE.get_step(context.step_id, context.organization_id) + if not step: + return + action = CompleteAction( organization_id=context.organization_id, task_id=context.task_id, @@ -504,6 +636,48 @@ class ScriptSkyvernPage(SkyvernPage): if result and result[-1].success is False: raise ScriptTerminationException(result[-1].exception_message) + async def _update_step_output_before_complete(self, context: skyvern_context.SkyvernContext) -> None: + """Update step.output with actions_and_results before complete validation. + + This is critical for cached runs because complete_verify() reads action history + from step.output.actions_and_results to check if goals were achieved (e.g., file downloads). + Without this, the validation has no visibility into what actions were performed. + """ + + # Validate required context fields + if not context.step_id or not context.task_id or not context.organization_id: + return + + run_context = script_run_context_manager.get_run_context() + if not run_context or not run_context.actions_and_results: + return + + # Extract errors from DecisiveActions (similar to agent flow) + errors: list[UserDefinedError] = [] + for action, _ in run_context.actions_and_results: + if isinstance(action, DecisiveAction): + errors.extend(action.errors) + + # Create AgentStepOutput similar to how agent does it + step_output = AgentStepOutput( + actions_and_results=run_context.actions_and_results, + action_results=[result for _, results in run_context.actions_and_results for result in results], + errors=errors, + ) + + await app.DATABASE.update_step( + step_id=context.step_id, + task_id=context.task_id, + organization_id=context.organization_id, + output=step_output, + ) + LOG.info( + "Updated step output with cached actions before complete validation", + step_id=context.step_id, + task_id=context.task_id, + num_actions=len(run_context.actions_and_results), + ) + class ScriptRunContextManager: """ diff --git a/skyvern/core/script_generations/skyvern_page.py b/skyvern/core/script_generations/skyvern_page.py index 5938647d..56def805 100644 --- a/skyvern/core/script_generations/skyvern_page.py +++ b/skyvern/core/script_generations/skyvern_page.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio import copy from dataclasses import dataclass -from typing import Any, Callable, Literal, overload +from typing import TYPE_CHECKING, Any, Callable, Literal, overload import structlog from playwright.async_api import Locator, Page @@ -16,6 +16,10 @@ from skyvern.library.ai_locator import AILocator from skyvern.webeye.actions import handler_utils from skyvern.webeye.actions.action_types import ActionType +if TYPE_CHECKING: + from skyvern.webeye.actions.actions import Action + from skyvern.webeye.actions.responses import ActionResult + LOG = structlog.get_logger() @@ -1007,3 +1011,5 @@ class RunContext: self.parameters[key] = value self.page = page self.trace: list[ActionCall] = [] + # Store actions and results for step output (similar to agent flow) + self.actions_and_results: list[tuple[Action, list[ActionResult]]] = [] diff --git a/skyvern/forge/sdk/db/agent_db.py b/skyvern/forge/sdk/db/agent_db.py index 369ebce5..4e4fd684 100644 --- a/skyvern/forge/sdk/db/agent_db.py +++ b/skyvern/forge/sdk/db/agent_db.py @@ -3859,7 +3859,7 @@ class AgentDB(BaseAlchemyDB): session.add(new_action) await session.commit() await session.refresh(new_action) - return Action.model_validate(new_action) + return hydrate_action(new_action) async def update_action_screenshot_artifact_id( self, *, organization_id: str, action_id: str, screenshot_artifact_id: str diff --git a/skyvern/services/action_service.py b/skyvern/services/action_service.py index d529f15c..d4b2fcd9 100644 --- a/skyvern/services/action_service.py +++ b/skyvern/services/action_service.py @@ -34,7 +34,14 @@ async def get_action_history( { "action": action.model_dump( exclude_none=True, - include={"action_type", "element_id", "status", "reasoning", "option", "download"}, + include={ + "action_type", + "element_id", + "status", + "reasoning", + "option", + "download", + }, ), # use the last result of the action, because some actions(like chain_click) # might have multiple results. Only the last one can represent the real result, diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 5d0d3a9b..0ce4cd4d 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -60,9 +60,10 @@ from skyvern.schemas.scripts import ( ScriptFileCreate, ScriptStatus, ) +from skyvern.schemas.steps import AgentStepOutput from skyvern.schemas.workflows import BlockResult, BlockStatus, BlockType, FileStorageType, FileType from skyvern.webeye.actions.action_types import ActionType -from skyvern.webeye.actions.actions import Action +from skyvern.webeye.actions.actions import Action, DecisiveAction from skyvern.webeye.scraper.scraped_page import ElementTreeFormat LOG = structlog.get_logger() @@ -455,6 +456,8 @@ async def _create_workflow_block_run_and_task( organization_id=organization_id, workflow_run_id=workflow_run_id, model=model, + # always use the action history for validation in caching/script run + include_action_history_in_verification=True, ) task_id = task.task_id @@ -593,12 +596,30 @@ async def _update_workflow_block( final_output = output if task_id: if step_id: + # Build step output from script actions similar to agent flow + step_output = None + run_context = script_run_context_manager.get_run_context() + if run_context and run_context.actions_and_results: + # Extract errors from DecisiveActions (similar to agent flow) + errors: list[UserDefinedError] = [] + for action, _ in run_context.actions_and_results: + if isinstance(action, DecisiveAction): + errors.extend(action.errors) + + # Create AgentStepOutput similar to how agent does it + step_output = AgentStepOutput( + actions_and_results=run_context.actions_and_results, + action_results=[result for _, results in run_context.actions_and_results for result in results], + errors=errors, + ) + await app.DATABASE.update_step( step_id=step_id, task_id=task_id, organization_id=context.organization_id, status=step_status, is_last=is_last, + output=step_output, ) updated_task = await app.DATABASE.update_task( task_id=task_id, diff --git a/skyvern/webeye/actions/actions.py b/skyvern/webeye/actions/actions.py index c3e110f8..477c4d78 100644 --- a/skyvern/webeye/actions/actions.py +++ b/skyvern/webeye/actions/actions.py @@ -125,6 +125,8 @@ class Action(BaseModel): file_name: str | None = None file_url: str | None = None download: bool | None = None + download_triggered: bool | None = None # Whether a download was triggered by this action + downloaded_files: list[str] | None = None # List of file names downloaded by this action is_upload_file_tag: bool | None = None text: str | None = None input_or_select_context: InputOrSelectContext | None = None diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index 2fd7891e..3ab5a96b 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -481,8 +481,22 @@ class ActionHandler: if not download_triggered: results[-1].download_triggered = False + action.download_triggered = False return results results[-1].download_triggered = True + action.download_triggered = True + + # Calculate newly downloaded file names + new_file_paths = set(list_files_after) - set(list_files_before) + downloaded_file_names = [os.path.basename(fp) for fp in new_file_paths] + if downloaded_file_names: + results[-1].downloaded_files = downloaded_file_names + action.downloaded_files = downloaded_file_names + LOG.info( + "Downloaded files captured", + downloaded_files=downloaded_file_names, + workflow_run_id=task.workflow_run_id, + ) await check_downloading_files_and_wait_for_download_to_complete( download_dir=download_dir, diff --git a/skyvern/webeye/actions/responses.py b/skyvern/webeye/actions/responses.py index 1fc1c445..07f473eb 100644 --- a/skyvern/webeye/actions/responses.py +++ b/skyvern/webeye/actions/responses.py @@ -14,6 +14,7 @@ class ActionResult(BaseModel): step_retry_number: int | None = None step_order: int | None = None download_triggered: bool | None = None + downloaded_files: list[str] | None = None # Actual file names that were downloaded # None is used for old data so that we can differentiate between old and new data which only has boolean interacted_with_sibling: bool | None = None interacted_with_parent: bool | None = None @@ -33,6 +34,8 @@ class ActionResult(BaseModel): results.append(f"step_retry_number={self.step_retry_number}") if self.download_triggered is not None: results.append(f"download_triggered={self.download_triggered}") + if self.downloaded_files is not None: + results.append(f"downloaded_files={self.downloaded_files}") if self.interacted_with_sibling is not None: results.append(f"interacted_with_sibling={self.interacted_with_sibling}") if self.interacted_with_parent is not None: @@ -51,6 +54,7 @@ class ActionSuccess(ActionResult): self, data: dict[str, Any] | list | str | None = None, download_triggered: bool | None = None, + downloaded_files: list[str] | None = None, interacted_with_sibling: bool = False, interacted_with_parent: bool = False, ): @@ -58,6 +62,7 @@ class ActionSuccess(ActionResult): success=True, data=data, download_triggered=download_triggered, + downloaded_files=downloaded_files, interacted_with_sibling=interacted_with_sibling, interacted_with_parent=interacted_with_parent, )