wait files fully downloaded before complete task (#1707)

This commit is contained in:
Shuchang Zheng
2025-02-03 23:49:46 +08:00
committed by GitHub
parent 1ba225002b
commit b43f1bfec2
5 changed files with 81 additions and 24 deletions

View File

@@ -18,6 +18,7 @@ GET_DOWNLOADED_FILES_TIMEOUT = 30
NAVIGATION_MAX_RETRY_TIME = 5 NAVIGATION_MAX_RETRY_TIME = 5
AUTO_COMPLETION_POTENTIAL_VALUES_COUNT = 5 AUTO_COMPLETION_POTENTIAL_VALUES_COUNT = 5
DROPDOWN_MENU_MAX_DISTANCE = 100 DROPDOWN_MENU_MAX_DISTANCE = 100
BROWSER_DOWNLOADING_SUFFIX = ".crdownload"
# reserved fields for navigation payload # reserved fields for navigation payload
SPECIAL_FIELD_VERIFICATION_CODE = "verification_code" SPECIAL_FIELD_VERIFICATION_CODE = "verification_code"

View File

@@ -1,3 +1,5 @@
from pathlib import Path
from fastapi import status from fastapi import status
@@ -252,6 +254,12 @@ class DownloadFileMaxSizeExceeded(SkyvernException):
super().__init__(f"Download file size exceeded the maximum allowed size of {max_size} MB.") super().__init__(f"Download file size exceeded the maximum allowed size of {max_size} MB.")
class DownloadFileMaxWaitingTime(SkyvernException):
def __init__(self, downloading_files: list[Path]) -> None:
self.downloading_files = downloading_files
super().__init__(f"Long-time downloading files [{downloading_files}].")
class NoFileDownloadTriggered(SkyvernException): class NoFileDownloadTriggered(SkyvernException):
def __init__(self, element_id: str) -> None: def __init__(self, element_id: str) -> None:
super().__init__(f"Clicking on element doesn't trigger the file download. element_id={element_id}") super().__init__(f"Clicking on element doesn't trigger the file download. element_id={element_id}")

View File

@@ -16,6 +16,7 @@ from playwright.async_api import Page
from skyvern import analytics from skyvern import analytics
from skyvern.config import settings from skyvern.config import settings
from skyvern.constants import ( from skyvern.constants import (
BROWSER_DOWNLOADING_SUFFIX,
GET_DOWNLOADED_FILES_TIMEOUT, GET_DOWNLOADED_FILES_TIMEOUT,
SAVE_DOWNLOADED_FILES_TIMEOUT, SAVE_DOWNLOADED_FILES_TIMEOUT,
SCRAPE_TYPE_ORDER, SCRAPE_TYPE_ORDER,
@@ -24,6 +25,7 @@ from skyvern.constants import (
) )
from skyvern.exceptions import ( from skyvern.exceptions import (
BrowserStateMissingPage, BrowserStateMissingPage,
DownloadFileMaxWaitingTime,
EmptyScrapePage, EmptyScrapePage,
FailedToNavigateToUrl, FailedToNavigateToUrl,
FailedToParseActionInstruction, FailedToParseActionInstruction,
@@ -45,7 +47,13 @@ from skyvern.exceptions import (
from skyvern.forge import app from skyvern.forge import app
from skyvern.forge.async_operations import AgentPhase, AsyncOperationPool from skyvern.forge.async_operations import AgentPhase, AsyncOperationPool
from skyvern.forge.prompts import prompt_engine from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.files import get_path_for_workflow_download_directory, list_files_in_directory, rename_file from skyvern.forge.sdk.api.files import (
get_path_for_workflow_download_directory,
list_downloading_files_in_directory,
list_files_in_directory,
rename_file,
wait_for_download_finished,
)
from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.artifact.models import ArtifactType
from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers
@@ -375,12 +383,31 @@ class ForgeAgent:
if task_block and task_block.complete_on_download and task.workflow_run_id: if task_block and task_block.complete_on_download and task.workflow_run_id:
workflow_download_directory = get_path_for_workflow_download_directory(task.workflow_run_id) workflow_download_directory = get_path_for_workflow_download_directory(task.workflow_run_id)
downloading_files: list[Path] = list_downloading_files_in_directory(workflow_download_directory)
if len(downloading_files) > 0:
LOG.info(
"Detecting files are still downloading, waiting for files to be completely downloaded.",
downloading_files=downloading_files,
step_id=step.step_id,
)
try:
await wait_for_download_finished(downloading_files=downloading_files)
except DownloadFileMaxWaitingTime as e:
LOG.warning(
"There're several long-time downloading files, these files might be broken",
downloading_files=e.downloading_files,
task_id=task.task_id,
step_id=step.step_id,
workflow_run_id=task.workflow_run_id,
)
list_files_after = list_files_in_directory(workflow_download_directory) list_files_after = list_files_in_directory(workflow_download_directory)
if len(list_files_after) > len(list_files_before): if len(list_files_after) > len(list_files_before):
files_to_rename = list(set(list_files_after) - set(list_files_before)) files_to_rename = list(set(list_files_after) - set(list_files_before))
for file in files_to_rename: for file in files_to_rename:
file_extension = Path(file).suffix file_extension = Path(file).suffix
if file_extension == ".crdownload": if file_extension == BROWSER_DOWNLOADING_SUFFIX:
LOG.warning( LOG.warning(
"Detecting incompleted download file, skip the rename", "Detecting incompleted download file, skip the rename",
file=file, file=file,

View File

@@ -1,3 +1,4 @@
import asyncio
import hashlib import hashlib
import mimetypes import mimetypes
import os import os
@@ -13,8 +14,8 @@ import structlog
from multidict import CIMultiDictProxy from multidict import CIMultiDictProxy
from skyvern.config import settings from skyvern.config import settings
from skyvern.constants import REPO_ROOT_DIR from skyvern.constants import BROWSER_DOWNLOAD_TIMEOUT, BROWSER_DOWNLOADING_SUFFIX, REPO_ROOT_DIR
from skyvern.exceptions import DownloadFileMaxSizeExceeded from skyvern.exceptions import DownloadFileMaxSizeExceeded, DownloadFileMaxWaitingTime
from skyvern.forge.sdk.api.aws import AsyncAWSClient from skyvern.forge.sdk.api.aws import AsyncAWSClient
LOG = structlog.get_logger() LOG = structlog.get_logger()
@@ -158,6 +159,34 @@ def list_files_in_directory(directory: Path, recursive: bool = False) -> list[st
return listed_files return listed_files
def list_downloading_files_in_directory(
directory: Path, downloading_suffix: str = BROWSER_DOWNLOADING_SUFFIX
) -> list[Path]:
# check if there's any file is still downloading
downloading_files: list[Path] = []
for file in list_files_in_directory(directory):
path = Path(file)
if path.suffix == downloading_suffix:
downloading_files.append(path)
return downloading_files
async def wait_for_download_finished(downloading_files: list[Path], timeout: float = BROWSER_DOWNLOAD_TIMEOUT) -> None:
cur_downloading_files = downloading_files
try:
async with asyncio.timeout(timeout):
while len(cur_downloading_files) > 0:
new_downloading_files: list[Path] = []
for path in cur_downloading_files:
if not path.exists():
continue
new_downloading_files.append(path)
cur_downloading_files = new_downloading_files
await asyncio.sleep(1)
except asyncio.TimeoutError:
raise DownloadFileMaxWaitingTime(downloading_files=cur_downloading_files)
def get_number_of_files_in_directory(directory: Path, recursive: bool = False) -> int: def get_number_of_files_in_directory(directory: Path, recursive: bool = False) -> int:
return len(list_files_in_directory(directory, recursive)) return len(list_files_in_directory(directory, recursive))

View File

@@ -17,12 +17,12 @@ from skyvern.config import settings
from skyvern.constants import ( from skyvern.constants import (
AUTO_COMPLETION_POTENTIAL_VALUES_COUNT, AUTO_COMPLETION_POTENTIAL_VALUES_COUNT,
BROWSER_DOWNLOAD_MAX_WAIT_TIME, BROWSER_DOWNLOAD_MAX_WAIT_TIME,
BROWSER_DOWNLOAD_TIMEOUT,
DROPDOWN_MENU_MAX_DISTANCE, DROPDOWN_MENU_MAX_DISTANCE,
REPO_ROOT_DIR, REPO_ROOT_DIR,
SKYVERN_ID_ATTR, SKYVERN_ID_ATTR,
) )
from skyvern.exceptions import ( from skyvern.exceptions import (
DownloadFileMaxWaitingTime,
EmptySelect, EmptySelect,
ErrEmptyTweakValue, ErrEmptyTweakValue,
ErrFoundSelectableElement, ErrFoundSelectableElement,
@@ -52,7 +52,13 @@ from skyvern.exceptions import (
) )
from skyvern.forge import app from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.files import download_file, get_download_dir, list_files_in_directory from skyvern.forge.sdk.api.files import (
download_file,
get_download_dir,
list_downloading_files_in_directory,
list_files_in_directory,
wait_for_download_finished,
)
from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError
from skyvern.forge.sdk.core.aiohttp_helper import aiohttp_post from skyvern.forge.sdk.core.aiohttp_helper import aiohttp_post
from skyvern.forge.sdk.core.security import generate_skyvern_signature from skyvern.forge.sdk.core.security import generate_skyvern_signature
@@ -505,12 +511,7 @@ async def handle_click_to_download_file_action(
return [ActionFailure(exception=NoFileDownloadTriggered(action.element_id))] return [ActionFailure(exception=NoFileDownloadTriggered(action.element_id))]
# check if there's any file is still downloading # check if there's any file is still downloading
downloading_files: list[Path] = [] downloading_files = list_downloading_files_in_directory(download_dir)
for file in list_files_after:
path = Path(file)
if path.suffix == ".crdownload":
downloading_files.append(path)
if len(downloading_files) == 0: if len(downloading_files) == 0:
return [ActionSuccess(download_triggered=True)] return [ActionSuccess(download_triggered=True)]
@@ -522,20 +523,11 @@ async def handle_click_to_download_file_action(
workflow_run_id=task.workflow_run_id, workflow_run_id=task.workflow_run_id,
) )
try: try:
async with asyncio.timeout(BROWSER_DOWNLOAD_TIMEOUT): await wait_for_download_finished(downloading_files=downloading_files)
while len(downloading_files) > 0: except DownloadFileMaxWaitingTime as e:
new_downloading_files: list[Path] = []
for path in downloading_files:
if not path.exists():
continue
new_downloading_files.append(path)
downloading_files = new_downloading_files
await asyncio.sleep(1)
except asyncio.TimeoutError:
LOG.warning( LOG.warning(
"There're several long-time downloading files, these files might be broken", "There're several long-time downloading files, these files might be broken",
downloading_files=downloading_files, downloading_files=e.downloading_files,
task_id=task.task_id, task_id=task.task_id,
step_id=step.step_id, step_id=step.step_id,
workflow_run_id=task.workflow_run_id, workflow_run_id=task.workflow_run_id,