From 7557a130a3a51efc6808051c06cbd4917884eb2e Mon Sep 17 00:00:00 2001 From: Marc Kelechava Date: Tue, 16 Dec 2025 15:20:31 -0800 Subject: [PATCH] File Download renaming reliability - customer bug fix (#4308) --- skyvern/forge/agent.py | 31 ++++++++++++++----- skyvern/forge/sdk/api/files.py | 27 +++++++++++++++++ skyvern/forge/sdk/workflow/models/block.py | 35 ++++++++++++++++++++-- skyvern/forge/sdk/workflow/service.py | 13 ++++++-- 4 files changed, 93 insertions(+), 13 deletions(-) diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 7aef2181..6a1cbba3 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -68,6 +68,7 @@ from skyvern.forge.async_operations import AgentPhase, AsyncOperationPool from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.aws import aws_client from skyvern.forge.sdk.api.files import ( + get_effective_download_run_id, get_path_for_workflow_download_directory, list_downloading_files_in_directory, list_files_in_directory, @@ -416,11 +417,12 @@ class ForgeAgent: list_files_before: list[str] = [] try: if task.workflow_run_id: - list_files_before = list_files_in_directory( - get_path_for_workflow_download_directory( - context.run_id if context and context.run_id else task.workflow_run_id - ) + download_run_id = get_effective_download_run_id( + context_run_id=context.run_id if context else None, + workflow_run_id=task.workflow_run_id, + task_id=task.task_id, ) + list_files_before = list_files_in_directory(get_path_for_workflow_download_directory(download_run_id)) if task.browser_session_id: browser_session_downloaded_files = await app.STORAGE.list_downloaded_files_in_browser_session( organization_id=organization.organization_id, @@ -506,9 +508,12 @@ class ForgeAgent: retry = False if task_block and task_block.complete_on_download and task.workflow_run_id: - workflow_download_directory = get_path_for_workflow_download_directory( - context.run_id if context and context.run_id else task.workflow_run_id + download_run_id = get_effective_download_run_id( + context_run_id=context.run_id if context else None, + workflow_run_id=task.workflow_run_id, + task_id=task.task_id, ) + workflow_download_directory = get_path_for_workflow_download_directory(download_run_id) downloading_files = list_downloading_files_in_directory(workflow_download_directory) if task.browser_session_id: @@ -3229,9 +3234,14 @@ class ForgeAgent: try: async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT): context = skyvern_context.current() + download_run_id = get_effective_download_run_id( + context_run_id=context.run_id if context else None, + workflow_run_id=task.workflow_run_id, + task_id=task.task_id, + ) await app.STORAGE.save_downloaded_files( organization_id=task.organization_id, - run_id=context.run_id if context and context.run_id else task.workflow_run_id or task.task_id, + run_id=download_run_id, ) except asyncio.TimeoutError: LOG.warning( @@ -3422,9 +3432,14 @@ class ForgeAgent: try: async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): context = skyvern_context.current() + download_run_id = get_effective_download_run_id( + context_run_id=context.run_id if context else None, + workflow_run_id=task.workflow_run_id, + task_id=task.task_id, + ) downloaded_files = await app.STORAGE.get_downloaded_files( organization_id=task.organization_id, - run_id=context.run_id if context and context.run_id else task.workflow_run_id or task.task_id, + run_id=download_run_id, ) except asyncio.TimeoutError: LOG.warning( diff --git a/skyvern/forge/sdk/api/files.py b/skyvern/forge/sdk/api/files.py index 75866c51..1324a958 100644 --- a/skyvern/forge/sdk/api/files.py +++ b/skyvern/forge/sdk/api/files.py @@ -233,6 +233,33 @@ def get_download_dir(run_id: str | None) -> str: return download_dir +def get_effective_download_run_id( + context_run_id: str | None = None, + workflow_run_id: str | None = None, + task_id: str | None = None, +) -> str: + """ + Get the effective run_id for download operations. + + This ensures consistent run_id logic across all download-related operations + (save_downloaded_files, get_downloaded_files, get_download_dir). + + Priority: + 1. context_run_id (if set explicitly) + 2. workflow_run_id (for workflow tasks) + 3. task_id (for standalone tasks) + + Raises ValueError if no valid run_id can be determined. + """ + if context_run_id: + return context_run_id + if workflow_run_id: + return workflow_run_id + if task_id: + return task_id + raise ValueError("Cannot determine run_id: no context_run_id, workflow_run_id, or task_id provided") + + def list_files_in_directory(directory: Path, recursive: bool = False) -> list[str]: listed_files: list[str] = [] for root, dirs, files in os.walk(directory): diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 75cda342..7bdd19f7 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -33,6 +33,7 @@ from skyvern.constants import ( AZURE_BLOB_STORAGE_MAX_UPLOAD_FILE_COUNT, GET_DOWNLOADED_FILES_TIMEOUT, MAX_UPLOAD_FILE_COUNT, + SAVE_DOWNLOADED_FILES_TIMEOUT, ) from skyvern.exceptions import ( AzureConfigurationError, @@ -53,6 +54,7 @@ from skyvern.forge.sdk.api.files import ( create_named_temporary_file, download_file, download_from_s3, + get_effective_download_run_id, get_path_for_workflow_download_directory, parse_uri_to_path, ) @@ -846,14 +848,41 @@ class BaseTaskBlock(Block): ) success = updated_task.status == TaskStatus.completed + # Determine the run_id for download operations (consistent across save and get) + download_run_id = get_effective_download_run_id( + context_run_id=current_context.run_id if current_context else None, + workflow_run_id=workflow_run_id, + task_id=updated_task.task_id, + ) + + # Ensure downloaded files are saved to storage before querying + # This fixes timing issue where files are in local dir but not yet in S3 + try: + async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT): + await app.STORAGE.save_downloaded_files( + organization_id=workflow_run.organization_id, + run_id=download_run_id, + ) + except asyncio.TimeoutError: + LOG.warning( + "Timeout saving downloaded files before building block output", + task_id=updated_task.task_id, + workflow_run_id=workflow_run_id, + ) + except Exception: + LOG.warning( + "Failed to save downloaded files before building block output", + exc_info=True, + task_id=updated_task.task_id, + workflow_run_id=workflow_run_id, + ) + downloaded_files: list[FileInfo] = [] try: async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): downloaded_files = await app.STORAGE.get_downloaded_files( organization_id=workflow_run.organization_id, - run_id=current_context.run_id - if current_context and current_context.run_id - else workflow_run_id or updated_task.task_id, + run_id=download_run_id, ) except asyncio.TimeoutError: LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index bb9930eb..8f61ba27 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -36,6 +36,7 @@ from skyvern.exceptions import ( ) from skyvern.forge import app from skyvern.forge.prompts import prompt_engine +from skyvern.forge.sdk.api.files import get_effective_download_run_id from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature @@ -2608,9 +2609,13 @@ class WorkflowService: try: async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): context = skyvern_context.current() + download_run_id = get_effective_download_run_id( + context_run_id=context.run_id if context else None, + workflow_run_id=workflow_run.workflow_run_id, + ) downloaded_files = await app.STORAGE.get_downloaded_files( organization_id=workflow_run.organization_id, - run_id=context.run_id if context and context.run_id else workflow_run.workflow_run_id, + run_id=download_run_id, ) if task_v2: task_v2_downloaded_files = await app.STORAGE.get_downloaded_files( @@ -2743,9 +2748,13 @@ class WorkflowService: try: async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT): context = skyvern_context.current() + download_run_id = get_effective_download_run_id( + context_run_id=context.run_id if context else None, + workflow_run_id=workflow_run.workflow_run_id, + ) await app.STORAGE.save_downloaded_files( organization_id=workflow_run.organization_id, - run_id=context.run_id if context and context.run_id else workflow_run.workflow_run_id, + run_id=download_run_id, ) except asyncio.TimeoutError: LOG.warning(