File Download renaming reliability - customer bug fix (#4308)

This commit is contained in:
Marc Kelechava
2025-12-16 15:20:31 -08:00
committed by GitHub
parent 9aa490f475
commit 7557a130a3
4 changed files with 93 additions and 13 deletions

View File

@@ -68,6 +68,7 @@ from skyvern.forge.async_operations import AgentPhase, AsyncOperationPool
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.aws import aws_client
from skyvern.forge.sdk.api.files import (
get_effective_download_run_id,
get_path_for_workflow_download_directory,
list_downloading_files_in_directory,
list_files_in_directory,
@@ -416,11 +417,12 @@ class ForgeAgent:
list_files_before: list[str] = []
try:
if task.workflow_run_id:
list_files_before = list_files_in_directory(
get_path_for_workflow_download_directory(
context.run_id if context and context.run_id else task.workflow_run_id
)
download_run_id = get_effective_download_run_id(
context_run_id=context.run_id if context else None,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
)
list_files_before = list_files_in_directory(get_path_for_workflow_download_directory(download_run_id))
if task.browser_session_id:
browser_session_downloaded_files = await app.STORAGE.list_downloaded_files_in_browser_session(
organization_id=organization.organization_id,
@@ -506,9 +508,12 @@ class ForgeAgent:
retry = False
if task_block and task_block.complete_on_download and task.workflow_run_id:
workflow_download_directory = get_path_for_workflow_download_directory(
context.run_id if context and context.run_id else task.workflow_run_id
download_run_id = get_effective_download_run_id(
context_run_id=context.run_id if context else None,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
)
workflow_download_directory = get_path_for_workflow_download_directory(download_run_id)
downloading_files = list_downloading_files_in_directory(workflow_download_directory)
if task.browser_session_id:
@@ -3229,9 +3234,14 @@ class ForgeAgent:
try:
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
context = skyvern_context.current()
download_run_id = get_effective_download_run_id(
context_run_id=context.run_id if context else None,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
)
await app.STORAGE.save_downloaded_files(
organization_id=task.organization_id,
run_id=context.run_id if context and context.run_id else task.workflow_run_id or task.task_id,
run_id=download_run_id,
)
except asyncio.TimeoutError:
LOG.warning(
@@ -3422,9 +3432,14 @@ class ForgeAgent:
try:
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
context = skyvern_context.current()
download_run_id = get_effective_download_run_id(
context_run_id=context.run_id if context else None,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
)
downloaded_files = await app.STORAGE.get_downloaded_files(
organization_id=task.organization_id,
run_id=context.run_id if context and context.run_id else task.workflow_run_id or task.task_id,
run_id=download_run_id,
)
except asyncio.TimeoutError:
LOG.warning(

View File

@@ -233,6 +233,33 @@ def get_download_dir(run_id: str | None) -> str:
return download_dir
def get_effective_download_run_id(
context_run_id: str | None = None,
workflow_run_id: str | None = None,
task_id: str | None = None,
) -> str:
"""
Get the effective run_id for download operations.
This ensures consistent run_id logic across all download-related operations
(save_downloaded_files, get_downloaded_files, get_download_dir).
Priority:
1. context_run_id (if set explicitly)
2. workflow_run_id (for workflow tasks)
3. task_id (for standalone tasks)
Raises ValueError if no valid run_id can be determined.
"""
if context_run_id:
return context_run_id
if workflow_run_id:
return workflow_run_id
if task_id:
return task_id
raise ValueError("Cannot determine run_id: no context_run_id, workflow_run_id, or task_id provided")
def list_files_in_directory(directory: Path, recursive: bool = False) -> list[str]:
listed_files: list[str] = []
for root, dirs, files in os.walk(directory):

View File

@@ -33,6 +33,7 @@ from skyvern.constants import (
AZURE_BLOB_STORAGE_MAX_UPLOAD_FILE_COUNT,
GET_DOWNLOADED_FILES_TIMEOUT,
MAX_UPLOAD_FILE_COUNT,
SAVE_DOWNLOADED_FILES_TIMEOUT,
)
from skyvern.exceptions import (
AzureConfigurationError,
@@ -53,6 +54,7 @@ from skyvern.forge.sdk.api.files import (
create_named_temporary_file,
download_file,
download_from_s3,
get_effective_download_run_id,
get_path_for_workflow_download_directory,
parse_uri_to_path,
)
@@ -846,14 +848,41 @@ class BaseTaskBlock(Block):
)
success = updated_task.status == TaskStatus.completed
# Determine the run_id for download operations (consistent across save and get)
download_run_id = get_effective_download_run_id(
context_run_id=current_context.run_id if current_context else None,
workflow_run_id=workflow_run_id,
task_id=updated_task.task_id,
)
# Ensure downloaded files are saved to storage before querying
# This fixes timing issue where files are in local dir but not yet in S3
try:
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
await app.STORAGE.save_downloaded_files(
organization_id=workflow_run.organization_id,
run_id=download_run_id,
)
except asyncio.TimeoutError:
LOG.warning(
"Timeout saving downloaded files before building block output",
task_id=updated_task.task_id,
workflow_run_id=workflow_run_id,
)
except Exception:
LOG.warning(
"Failed to save downloaded files before building block output",
exc_info=True,
task_id=updated_task.task_id,
workflow_run_id=workflow_run_id,
)
downloaded_files: list[FileInfo] = []
try:
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
downloaded_files = await app.STORAGE.get_downloaded_files(
organization_id=workflow_run.organization_id,
run_id=current_context.run_id
if current_context and current_context.run_id
else workflow_run_id or updated_task.task_id,
run_id=download_run_id,
)
except asyncio.TimeoutError:
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)

View File

@@ -36,6 +36,7 @@ from skyvern.exceptions import (
)
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.files import get_effective_download_run_id
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature
@@ -2608,9 +2609,13 @@ class WorkflowService:
try:
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
context = skyvern_context.current()
download_run_id = get_effective_download_run_id(
context_run_id=context.run_id if context else None,
workflow_run_id=workflow_run.workflow_run_id,
)
downloaded_files = await app.STORAGE.get_downloaded_files(
organization_id=workflow_run.organization_id,
run_id=context.run_id if context and context.run_id else workflow_run.workflow_run_id,
run_id=download_run_id,
)
if task_v2:
task_v2_downloaded_files = await app.STORAGE.get_downloaded_files(
@@ -2743,9 +2748,13 @@ class WorkflowService:
try:
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
context = skyvern_context.current()
download_run_id = get_effective_download_run_id(
context_run_id=context.run_id if context else None,
workflow_run_id=workflow_run.workflow_run_id,
)
await app.STORAGE.save_downloaded_files(
organization_id=workflow_run.organization_id,
run_id=context.run_id if context and context.run_id else workflow_run.workflow_run_id,
run_id=download_run_id,
)
except asyncio.TimeoutError:
LOG.warning(