From c9c66398c4f32d936b0d634d69cccff056f28ca7 Mon Sep 17 00:00:00 2001 From: LawyZheng Date: Wed, 17 Dec 2025 14:54:13 +0800 Subject: [PATCH] Revert "File Download renaming reliability - customer bug fix" (#4311) --- skyvern/forge/agent.py | 31 +++++-------------- skyvern/forge/sdk/api/files.py | 27 ----------------- skyvern/forge/sdk/workflow/models/block.py | 35 ++-------------------- skyvern/forge/sdk/workflow/service.py | 13 ++------ 4 files changed, 13 insertions(+), 93 deletions(-) diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 6a1cbba3..7aef2181 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -68,7 +68,6 @@ from skyvern.forge.async_operations import AgentPhase, AsyncOperationPool from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.aws import aws_client from skyvern.forge.sdk.api.files import ( - get_effective_download_run_id, get_path_for_workflow_download_directory, list_downloading_files_in_directory, list_files_in_directory, @@ -417,12 +416,11 @@ class ForgeAgent: list_files_before: list[str] = [] try: if task.workflow_run_id: - download_run_id = get_effective_download_run_id( - context_run_id=context.run_id if context else None, - workflow_run_id=task.workflow_run_id, - task_id=task.task_id, + list_files_before = list_files_in_directory( + get_path_for_workflow_download_directory( + context.run_id if context and context.run_id else task.workflow_run_id + ) ) - list_files_before = list_files_in_directory(get_path_for_workflow_download_directory(download_run_id)) if task.browser_session_id: browser_session_downloaded_files = await app.STORAGE.list_downloaded_files_in_browser_session( organization_id=organization.organization_id, @@ -508,12 +506,9 @@ class ForgeAgent: retry = False if task_block and task_block.complete_on_download and task.workflow_run_id: - download_run_id = get_effective_download_run_id( - context_run_id=context.run_id if context else None, - workflow_run_id=task.workflow_run_id, - task_id=task.task_id, + workflow_download_directory = get_path_for_workflow_download_directory( + context.run_id if context and context.run_id else task.workflow_run_id ) - workflow_download_directory = get_path_for_workflow_download_directory(download_run_id) downloading_files = list_downloading_files_in_directory(workflow_download_directory) if task.browser_session_id: @@ -3234,14 +3229,9 @@ class ForgeAgent: try: async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT): context = skyvern_context.current() - download_run_id = get_effective_download_run_id( - context_run_id=context.run_id if context else None, - workflow_run_id=task.workflow_run_id, - task_id=task.task_id, - ) await app.STORAGE.save_downloaded_files( organization_id=task.organization_id, - run_id=download_run_id, + run_id=context.run_id if context and context.run_id else task.workflow_run_id or task.task_id, ) except asyncio.TimeoutError: LOG.warning( @@ -3432,14 +3422,9 @@ class ForgeAgent: try: async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): context = skyvern_context.current() - download_run_id = get_effective_download_run_id( - context_run_id=context.run_id if context else None, - workflow_run_id=task.workflow_run_id, - task_id=task.task_id, - ) downloaded_files = await app.STORAGE.get_downloaded_files( organization_id=task.organization_id, - run_id=download_run_id, + run_id=context.run_id if context and context.run_id else task.workflow_run_id or task.task_id, ) except asyncio.TimeoutError: LOG.warning( diff --git a/skyvern/forge/sdk/api/files.py b/skyvern/forge/sdk/api/files.py index 1324a958..75866c51 100644 --- a/skyvern/forge/sdk/api/files.py +++ b/skyvern/forge/sdk/api/files.py @@ -233,33 +233,6 @@ def get_download_dir(run_id: str | None) -> str: return download_dir -def get_effective_download_run_id( - context_run_id: str | None = None, - workflow_run_id: str | None = None, - task_id: str | None = None, -) -> str: - """ - Get the effective run_id for download operations. - - This ensures consistent run_id logic across all download-related operations - (save_downloaded_files, get_downloaded_files, get_download_dir). - - Priority: - 1. context_run_id (if set explicitly) - 2. workflow_run_id (for workflow tasks) - 3. task_id (for standalone tasks) - - Raises ValueError if no valid run_id can be determined. - """ - if context_run_id: - return context_run_id - if workflow_run_id: - return workflow_run_id - if task_id: - return task_id - raise ValueError("Cannot determine run_id: no context_run_id, workflow_run_id, or task_id provided") - - def list_files_in_directory(directory: Path, recursive: bool = False) -> list[str]: listed_files: list[str] = [] for root, dirs, files in os.walk(directory): diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 7bdd19f7..75cda342 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -33,7 +33,6 @@ from skyvern.constants import ( AZURE_BLOB_STORAGE_MAX_UPLOAD_FILE_COUNT, GET_DOWNLOADED_FILES_TIMEOUT, MAX_UPLOAD_FILE_COUNT, - SAVE_DOWNLOADED_FILES_TIMEOUT, ) from skyvern.exceptions import ( AzureConfigurationError, @@ -54,7 +53,6 @@ from skyvern.forge.sdk.api.files import ( create_named_temporary_file, download_file, download_from_s3, - get_effective_download_run_id, get_path_for_workflow_download_directory, parse_uri_to_path, ) @@ -848,41 +846,14 @@ class BaseTaskBlock(Block): ) success = updated_task.status == TaskStatus.completed - # Determine the run_id for download operations (consistent across save and get) - download_run_id = get_effective_download_run_id( - context_run_id=current_context.run_id if current_context else None, - workflow_run_id=workflow_run_id, - task_id=updated_task.task_id, - ) - - # Ensure downloaded files are saved to storage before querying - # This fixes timing issue where files are in local dir but not yet in S3 - try: - async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT): - await app.STORAGE.save_downloaded_files( - organization_id=workflow_run.organization_id, - run_id=download_run_id, - ) - except asyncio.TimeoutError: - LOG.warning( - "Timeout saving downloaded files before building block output", - task_id=updated_task.task_id, - workflow_run_id=workflow_run_id, - ) - except Exception: - LOG.warning( - "Failed to save downloaded files before building block output", - exc_info=True, - task_id=updated_task.task_id, - workflow_run_id=workflow_run_id, - ) - downloaded_files: list[FileInfo] = [] try: async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): downloaded_files = await app.STORAGE.get_downloaded_files( organization_id=workflow_run.organization_id, - run_id=download_run_id, + run_id=current_context.run_id + if current_context and current_context.run_id + else workflow_run_id or updated_task.task_id, ) except asyncio.TimeoutError: LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 8f61ba27..bb9930eb 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -36,7 +36,6 @@ from skyvern.exceptions import ( ) from skyvern.forge import app from skyvern.forge.prompts import prompt_engine -from skyvern.forge.sdk.api.files import get_effective_download_run_id from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature @@ -2609,13 +2608,9 @@ class WorkflowService: try: async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): context = skyvern_context.current() - download_run_id = get_effective_download_run_id( - context_run_id=context.run_id if context else None, - workflow_run_id=workflow_run.workflow_run_id, - ) downloaded_files = await app.STORAGE.get_downloaded_files( organization_id=workflow_run.organization_id, - run_id=download_run_id, + run_id=context.run_id if context and context.run_id else workflow_run.workflow_run_id, ) if task_v2: task_v2_downloaded_files = await app.STORAGE.get_downloaded_files( @@ -2748,13 +2743,9 @@ class WorkflowService: try: async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT): context = skyvern_context.current() - download_run_id = get_effective_download_run_id( - context_run_id=context.run_id if context else None, - workflow_run_id=workflow_run.workflow_run_id, - ) await app.STORAGE.save_downloaded_files( organization_id=workflow_run.organization_id, - run_id=download_run_id, + run_id=context.run_id if context and context.run_id else workflow_run.workflow_run_id, ) except asyncio.TimeoutError: LOG.warning(