From b43f1bfec24940eb9ff6707136f7b337926683bc Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Mon, 3 Feb 2025 23:49:46 +0800 Subject: [PATCH] wait files fully downloaded before complete task (#1707) --- skyvern/constants.py | 1 + skyvern/exceptions.py | 8 ++++++++ skyvern/forge/agent.py | 31 +++++++++++++++++++++++++++-- skyvern/forge/sdk/api/files.py | 33 +++++++++++++++++++++++++++++-- skyvern/webeye/actions/handler.py | 32 +++++++++++------------------- 5 files changed, 81 insertions(+), 24 deletions(-) diff --git a/skyvern/constants.py b/skyvern/constants.py index 5e6455db..b786ac8d 100644 --- a/skyvern/constants.py +++ b/skyvern/constants.py @@ -18,6 +18,7 @@ GET_DOWNLOADED_FILES_TIMEOUT = 30 NAVIGATION_MAX_RETRY_TIME = 5 AUTO_COMPLETION_POTENTIAL_VALUES_COUNT = 5 DROPDOWN_MENU_MAX_DISTANCE = 100 +BROWSER_DOWNLOADING_SUFFIX = ".crdownload" # reserved fields for navigation payload SPECIAL_FIELD_VERIFICATION_CODE = "verification_code" diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index 2d10234c..2b0c8f93 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -1,3 +1,5 @@ +from pathlib import Path + from fastapi import status @@ -252,6 +254,12 @@ class DownloadFileMaxSizeExceeded(SkyvernException): super().__init__(f"Download file size exceeded the maximum allowed size of {max_size} MB.") +class DownloadFileMaxWaitingTime(SkyvernException): + def __init__(self, downloading_files: list[Path]) -> None: + self.downloading_files = downloading_files + super().__init__(f"Long-time downloading files [{downloading_files}].") + + class NoFileDownloadTriggered(SkyvernException): def __init__(self, element_id: str) -> None: super().__init__(f"Clicking on element doesn't trigger the file download. element_id={element_id}") diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 3d257f78..c9ef1ff4 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -16,6 +16,7 @@ from playwright.async_api import Page from skyvern import analytics from skyvern.config import settings from skyvern.constants import ( + BROWSER_DOWNLOADING_SUFFIX, GET_DOWNLOADED_FILES_TIMEOUT, SAVE_DOWNLOADED_FILES_TIMEOUT, SCRAPE_TYPE_ORDER, @@ -24,6 +25,7 @@ from skyvern.constants import ( ) from skyvern.exceptions import ( BrowserStateMissingPage, + DownloadFileMaxWaitingTime, EmptyScrapePage, FailedToNavigateToUrl, FailedToParseActionInstruction, @@ -45,7 +47,13 @@ from skyvern.exceptions import ( from skyvern.forge import app from skyvern.forge.async_operations import AgentPhase, AsyncOperationPool from skyvern.forge.prompts import prompt_engine -from skyvern.forge.sdk.api.files import get_path_for_workflow_download_directory, list_files_in_directory, rename_file +from skyvern.forge.sdk.api.files import ( + get_path_for_workflow_download_directory, + list_downloading_files_in_directory, + list_files_in_directory, + rename_file, + wait_for_download_finished, +) from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers @@ -375,12 +383,31 @@ class ForgeAgent: if task_block and task_block.complete_on_download and task.workflow_run_id: workflow_download_directory = get_path_for_workflow_download_directory(task.workflow_run_id) + + downloading_files: list[Path] = list_downloading_files_in_directory(workflow_download_directory) + if len(downloading_files) > 0: + LOG.info( + "Detecting files are still downloading, waiting for files to be completely downloaded.", + downloading_files=downloading_files, + step_id=step.step_id, + ) + try: + await wait_for_download_finished(downloading_files=downloading_files) + except DownloadFileMaxWaitingTime as e: + LOG.warning( + "There're several long-time downloading files, these files might be broken", + downloading_files=e.downloading_files, + task_id=task.task_id, + step_id=step.step_id, + workflow_run_id=task.workflow_run_id, + ) + list_files_after = list_files_in_directory(workflow_download_directory) if len(list_files_after) > len(list_files_before): files_to_rename = list(set(list_files_after) - set(list_files_before)) for file in files_to_rename: file_extension = Path(file).suffix - if file_extension == ".crdownload": + if file_extension == BROWSER_DOWNLOADING_SUFFIX: LOG.warning( "Detecting incompleted download file, skip the rename", file=file, diff --git a/skyvern/forge/sdk/api/files.py b/skyvern/forge/sdk/api/files.py index d88aa25e..116417f2 100644 --- a/skyvern/forge/sdk/api/files.py +++ b/skyvern/forge/sdk/api/files.py @@ -1,3 +1,4 @@ +import asyncio import hashlib import mimetypes import os @@ -13,8 +14,8 @@ import structlog from multidict import CIMultiDictProxy from skyvern.config import settings -from skyvern.constants import REPO_ROOT_DIR -from skyvern.exceptions import DownloadFileMaxSizeExceeded +from skyvern.constants import BROWSER_DOWNLOAD_TIMEOUT, BROWSER_DOWNLOADING_SUFFIX, REPO_ROOT_DIR +from skyvern.exceptions import DownloadFileMaxSizeExceeded, DownloadFileMaxWaitingTime from skyvern.forge.sdk.api.aws import AsyncAWSClient LOG = structlog.get_logger() @@ -158,6 +159,34 @@ def list_files_in_directory(directory: Path, recursive: bool = False) -> list[st return listed_files +def list_downloading_files_in_directory( + directory: Path, downloading_suffix: str = BROWSER_DOWNLOADING_SUFFIX +) -> list[Path]: + # check if there's any file is still downloading + downloading_files: list[Path] = [] + for file in list_files_in_directory(directory): + path = Path(file) + if path.suffix == downloading_suffix: + downloading_files.append(path) + return downloading_files + + +async def wait_for_download_finished(downloading_files: list[Path], timeout: float = BROWSER_DOWNLOAD_TIMEOUT) -> None: + cur_downloading_files = downloading_files + try: + async with asyncio.timeout(timeout): + while len(cur_downloading_files) > 0: + new_downloading_files: list[Path] = [] + for path in cur_downloading_files: + if not path.exists(): + continue + new_downloading_files.append(path) + cur_downloading_files = new_downloading_files + await asyncio.sleep(1) + except asyncio.TimeoutError: + raise DownloadFileMaxWaitingTime(downloading_files=cur_downloading_files) + + def get_number_of_files_in_directory(directory: Path, recursive: bool = False) -> int: return len(list_files_in_directory(directory, recursive)) diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index 0725a02c..21e09de0 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -17,12 +17,12 @@ from skyvern.config import settings from skyvern.constants import ( AUTO_COMPLETION_POTENTIAL_VALUES_COUNT, BROWSER_DOWNLOAD_MAX_WAIT_TIME, - BROWSER_DOWNLOAD_TIMEOUT, DROPDOWN_MENU_MAX_DISTANCE, REPO_ROOT_DIR, SKYVERN_ID_ATTR, ) from skyvern.exceptions import ( + DownloadFileMaxWaitingTime, EmptySelect, ErrEmptyTweakValue, ErrFoundSelectableElement, @@ -52,7 +52,13 @@ from skyvern.exceptions import ( ) from skyvern.forge import app from skyvern.forge.prompts import prompt_engine -from skyvern.forge.sdk.api.files import download_file, get_download_dir, list_files_in_directory +from skyvern.forge.sdk.api.files import ( + download_file, + get_download_dir, + list_downloading_files_in_directory, + list_files_in_directory, + wait_for_download_finished, +) from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError from skyvern.forge.sdk.core.aiohttp_helper import aiohttp_post from skyvern.forge.sdk.core.security import generate_skyvern_signature @@ -505,12 +511,7 @@ async def handle_click_to_download_file_action( return [ActionFailure(exception=NoFileDownloadTriggered(action.element_id))] # check if there's any file is still downloading - downloading_files: list[Path] = [] - for file in list_files_after: - path = Path(file) - if path.suffix == ".crdownload": - downloading_files.append(path) - + downloading_files = list_downloading_files_in_directory(download_dir) if len(downloading_files) == 0: return [ActionSuccess(download_triggered=True)] @@ -522,20 +523,11 @@ async def handle_click_to_download_file_action( workflow_run_id=task.workflow_run_id, ) try: - async with asyncio.timeout(BROWSER_DOWNLOAD_TIMEOUT): - while len(downloading_files) > 0: - new_downloading_files: list[Path] = [] - for path in downloading_files: - if not path.exists(): - continue - new_downloading_files.append(path) - downloading_files = new_downloading_files - await asyncio.sleep(1) - - except asyncio.TimeoutError: + await wait_for_download_finished(downloading_files=downloading_files) + except DownloadFileMaxWaitingTime as e: LOG.warning( "There're several long-time downloading files, these files might be broken", - downloading_files=downloading_files, + downloading_files=e.downloading_files, task_id=task.task_id, step_id=step.step_id, workflow_run_id=task.workflow_run_id,