From f1d5a3a6876125d9a9b5f2c0cb320973e47dc92c Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Sun, 2 Jun 2024 23:24:30 -0700 Subject: [PATCH] complete_on_downloads for task block (#403) --- skyvern/forge/agent.py | 35 +++++++++++ skyvern/forge/sdk/api/files.py | 10 +++ skyvern/forge/sdk/workflow/context_manager.py | 2 +- skyvern/forge/sdk/workflow/models/block.py | 10 +++ skyvern/forge/sdk/workflow/models/yaml.py | 1 + skyvern/forge/sdk/workflow/service.py | 1 + skyvern/webeye/actions/handler.py | 62 ++++++++++++------- skyvern/webeye/actions/responses.py | 24 +++++-- skyvern/webeye/browser_factory.py | 3 +- 9 files changed, 118 insertions(+), 30 deletions(-) diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index ae976cab..2065c8e5 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -23,6 +23,7 @@ from skyvern.exceptions import ( from skyvern.forge import app from skyvern.forge.async_operations import AgentPhase, AsyncOperationPool from skyvern.forge.prompts import prompt_engine +from skyvern.forge.sdk.api.files import get_number_of_files_in_directory, get_path_for_workflow_download_directory from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.security import generate_skyvern_signature @@ -195,10 +196,18 @@ class ForgeAgent: api_key: str | None = None, workflow_run: WorkflowRun | None = None, close_browser_on_completion: bool = True, + # If complete_on_download is True and there is a workflow run, the task will be marked as completed + # if a download happens during the step execution. + complete_on_download: bool = False, ) -> Tuple[Step, DetailedAgentStepOutput | None, Step | None]: next_step: Step | None = None detailed_output: DetailedAgentStepOutput | None = None + num_files_before = 0 try: + if task.workflow_run_id: + num_files_before = get_number_of_files_in_directory( + get_path_for_workflow_download_directory(task.workflow_run_id) + ) # Check some conditions before executing the step, throw an exception if the step can't be executed await app.AGENT_FUNCTION.validate_step_execution(task, step) ( @@ -214,6 +223,30 @@ class ForgeAgent: task = await self.update_task_errors_from_detailed_output(task, detailed_output) retry = False + if complete_on_download and task.workflow_run_id: + num_files_after = get_number_of_files_in_directory( + get_path_for_workflow_download_directory(task.workflow_run_id) + ) + if num_files_after > num_files_before: + LOG.info( + "Task marked as completed due to download", + task_id=task.task_id, + num_files_before=num_files_before, + num_files_after=num_files_after, + ) + last_step = await self.update_step(step, is_last=True) + completed_task = await self.update_task( + task, + status=TaskStatus.completed, + ) + await self.send_task_response( + task=completed_task, + last_step=last_step, + api_key=api_key, + close_browser_on_completion=close_browser_on_completion, + ) + return last_step, detailed_output, None + # If the step failed, mark the step as failed and retry if step.status == StepStatus.failed: maybe_next_step = await self.handle_failed_step(organization, task, step) @@ -273,6 +306,7 @@ class ForgeAgent: next_step, api_key=api_key, close_browser_on_completion=close_browser_on_completion, + complete_on_download=complete_on_download, ) elif SettingsManager.get_settings().execute_all_steps() and next_step: return await self.execute_step( @@ -281,6 +315,7 @@ class ForgeAgent: next_step, api_key=api_key, close_browser_on_completion=close_browser_on_completion, + complete_on_download=complete_on_download, ) else: LOG.info( diff --git a/skyvern/forge/sdk/api/files.py b/skyvern/forge/sdk/api/files.py index ae5d994a..403ef49e 100644 --- a/skyvern/forge/sdk/api/files.py +++ b/skyvern/forge/sdk/api/files.py @@ -68,3 +68,13 @@ def zip_files(files_path: str, zip_file_path: str) -> str: def get_path_for_workflow_download_directory(workflow_run_id: str) -> Path: return Path(f"{REPO_ROOT_DIR}/downloads/{workflow_run_id}/") + + +def get_number_of_files_in_directory(directory: Path, recursive: bool = False) -> int: + count = 0 + for root, dirs, files in os.walk(directory): + if not recursive: + count += len(files) + break + count += len(files) + return count diff --git a/skyvern/forge/sdk/workflow/context_manager.py b/skyvern/forge/sdk/workflow/context_manager.py index 7599bbe8..1b37fdc0 100644 --- a/skyvern/forge/sdk/workflow/context_manager.py +++ b/skyvern/forge/sdk/workflow/context_manager.py @@ -217,7 +217,7 @@ class WorkflowRunContext: self, parameter: OutputParameter, value: dict[str, Any] | list | str | None ) -> None: if parameter.key in self.values: - LOG.error(f"Output parameter {parameter.output_parameter_id} already has a registered value") + LOG.warning(f"Output parameter {parameter.output_parameter_id} already has a registered value") return self.values[parameter.key] = value diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index e7484ed1..73bcece3 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -71,6 +71,14 @@ class Block(BaseModel, abc.ABC): workflow_run_id: str, value: dict[str, Any] | list | str | None = None, ) -> None: + if workflow_run_context.has_value(self.output_parameter.key): + LOG.warning( + "Output parameter value already recorded", + output_parameter_id=self.output_parameter.output_parameter_id, + workflow_run_id=workflow_run_id, + ) + return + await workflow_run_context.register_output_parameter_value_post_execution( parameter=self.output_parameter, value=value, @@ -150,6 +158,7 @@ class TaskBlock(Block): max_retries: int = 0 max_steps_per_run: int | None = None parameters: list[PARAMETER_TYPE] = [] + complete_on_download: bool = False def get_all_parameters( self, @@ -265,6 +274,7 @@ class TaskBlock(Block): task=task, step=step, workflow_run=workflow_run, + complete_on_download=self.complete_on_download, ) except Exception as e: # Make sure the task is marked as failed in the database before raising the exception diff --git a/skyvern/forge/sdk/workflow/models/yaml.py b/skyvern/forge/sdk/workflow/models/yaml.py index 81ba222b..97dca26e 100644 --- a/skyvern/forge/sdk/workflow/models/yaml.py +++ b/skyvern/forge/sdk/workflow/models/yaml.py @@ -87,6 +87,7 @@ class TaskBlockYAML(BlockYAML): max_retries: int = 0 max_steps_per_run: int | None = None parameter_keys: list[str] | None = None + complete_on_download: bool = False class ForLoopBlockYAML(BlockYAML): diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 7b94dfe5..9405bdcb 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -969,6 +969,7 @@ class WorkflowService: error_code_mapping=block_yaml.error_code_mapping, max_steps_per_run=block_yaml.max_steps_per_run, max_retries=block_yaml.max_retries, + complete_on_download=block_yaml.complete_on_download, ) elif block_yaml.block_type == BlockType.FOR_LOOP: loop_blocks = [ diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index 1c4fdf4b..af87f3b0 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -13,7 +13,11 @@ from skyvern.constants import REPO_ROOT_DIR from skyvern.exceptions import ImaginaryFileUrl, MissingElement, MissingFileUrl, MultipleElementsFound from skyvern.forge import app from skyvern.forge.prompts import prompt_engine -from skyvern.forge.sdk.api.files import download_file +from skyvern.forge.sdk.api.files import ( + download_file, + get_number_of_files_in_directory, + get_path_for_workflow_download_directory, +) from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.tasks import Task from skyvern.forge.sdk.services.bitwarden import BitwardenConstants @@ -162,17 +166,42 @@ async def handle_click_action( task: Task, step: Step, ) -> list[ActionResult]: + num_downloaded_files_before = 0 + download_dir = None + if task.workflow_run_id: + download_dir = get_path_for_workflow_download_directory(task.workflow_run_id) + num_downloaded_files_before = get_number_of_files_in_directory(download_dir) + LOG.info( + "Number of files in download directory before click", + num_downloaded_files_before=num_downloaded_files_before, + download_dir=download_dir, + ) xpath = await validate_actions_in_dom(action, page, scraped_page) await asyncio.sleep(0.3) if action.download: - return await handle_click_to_download_file_action(action, page, scraped_page) - return await chain_click( - task, - page, - action, - xpath, - timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS, - ) + results = await handle_click_to_download_file_action(action, page, scraped_page) + else: + results = await chain_click( + task, + page, + action, + xpath, + timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS, + ) + + if results and task.workflow_run_id and download_dir: + LOG.info("Sleeping for 5 seconds to let the download finish") + await asyncio.sleep(5) + num_downloaded_files_after = get_number_of_files_in_directory(download_dir) + LOG.info( + "Number of files in download directory after click", + num_downloaded_files_after=num_downloaded_files_after, + download_dir=download_dir, + ) + if num_downloaded_files_after > num_downloaded_files_before: + results[-1].download_triggered = True + + return results async def handle_click_to_download_file_action( @@ -677,16 +706,6 @@ async def chain_click( page.on("filechooser", fc_func) LOG.info("Registered file chooser listener", action=action, path=file) - # If a download is triggered due to the click, we need to let LLM know in action_results - download_triggered = False - - def download_func(download: Any) -> None: - nonlocal download_triggered - download_triggered = True - - page.on("download", download_func) - LOG.info("Registered download listener", action=action) - """ Clicks on an element identified by the xpath and its parent if failed. :param xpath: xpath of the element to click @@ -698,7 +717,6 @@ async def chain_click( return [ ActionSuccess( javascript_triggered=javascript_triggered, - download_triggered=download_triggered, ) ] except Exception as e: @@ -706,7 +724,6 @@ async def chain_click( ActionFailure( e, javascript_triggered=javascript_triggered, - download_triggered=download_triggered, ) ] if await is_input_element(page.locator(xpath)): @@ -716,7 +733,6 @@ async def chain_click( xpath=xpath, ) sibling_action_result = await click_sibling_of_input(page.locator(xpath), timeout=timeout) - sibling_action_result.download_triggered = download_triggered action_results.append(sibling_action_result) if type(sibling_action_result) == ActionSuccess: return action_results @@ -736,7 +752,6 @@ async def chain_click( ActionSuccess( javascript_triggered=javascript_triggered, interacted_with_parent=True, - download_triggered=download_triggered, ) ) except Exception as pe: @@ -765,7 +780,6 @@ async def chain_click( if file: await asyncio.sleep(10) page.remove_listener("filechooser", fc_func) - page.remove_listener("download", download_func) def get_anchor_to_click(scraped_page: ScrapedPage, element_id: int) -> str | None: diff --git a/skyvern/webeye/actions/responses.py b/skyvern/webeye/actions/responses.py index 60f5f684..eec15918 100644 --- a/skyvern/webeye/actions/responses.py +++ b/skyvern/webeye/actions/responses.py @@ -19,10 +19,26 @@ class ActionResult(BaseModel): interacted_with_parent: bool | None = None def __str__(self) -> str: - return ( - f"ActionResult(success={self.success}, exception_type={self.exception_type}, " - f"exception_message={self.exception_message}), data={self.data}" - ) + results = ["ActionResult(success={self.success}"] + if self.exception_type or self.exception_message: + results.append(f"exception_type={self.exception_type}") + results.append(f"exception_message={self.exception_message}") + if self.data: + results.append(f"data={self.data}") + if self.step_order: + results.append(f"step_order={self.step_order}") + if self.step_retry_number: + results.append(f"step_retry_number={self.step_retry_number}") + if self.javascript_triggered: + results.append(f"javascript_triggered={self.javascript_triggered}") + if self.download_triggered is not None: + results.append(f"download_triggered={self.download_triggered}") + if self.interacted_with_sibling is not None: + results.append(f"interacted_with_sibling={self.interacted_with_sibling}") + if self.interacted_with_parent is not None: + results.append(f"interacted_with_parent={self.interacted_with_parent}") + + return ", ".join(results) + ")" def __repr__(self) -> str: return self.__str__() diff --git a/skyvern/webeye/browser_factory.py b/skyvern/webeye/browser_factory.py index 2cc5b162..369046fa 100644 --- a/skyvern/webeye/browser_factory.py +++ b/skyvern/webeye/browser_factory.py @@ -183,7 +183,7 @@ class BrowserState: await self._close_all_other_pages() LOG.info("A new page is created") if url: - LOG.info(f"Navigating page to {url} and waiting for 3 seconds") + LOG.info(f"Navigating page to {url} and waiting for 5 seconds") try: start_time = time.time() await self.page.goto(url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS) @@ -193,6 +193,7 @@ class BrowserState: loading_time=end_time - start_time, url=url, ) + await asyncio.sleep(5) except Error as playright_error: LOG.exception(f"Error while navigating to url: {str(playright_error)}") raise FailedToNavigateToUrl(url=url, error_message=str(playright_error))