Revert "File Download renaming reliability - customer bug fix" (#4311)
This commit is contained in:
@@ -68,7 +68,6 @@ from skyvern.forge.async_operations import AgentPhase, AsyncOperationPool
|
|||||||
from skyvern.forge.prompts import prompt_engine
|
from skyvern.forge.prompts import prompt_engine
|
||||||
from skyvern.forge.sdk.api.aws import aws_client
|
from skyvern.forge.sdk.api.aws import aws_client
|
||||||
from skyvern.forge.sdk.api.files import (
|
from skyvern.forge.sdk.api.files import (
|
||||||
get_effective_download_run_id,
|
|
||||||
get_path_for_workflow_download_directory,
|
get_path_for_workflow_download_directory,
|
||||||
list_downloading_files_in_directory,
|
list_downloading_files_in_directory,
|
||||||
list_files_in_directory,
|
list_files_in_directory,
|
||||||
@@ -417,12 +416,11 @@ class ForgeAgent:
|
|||||||
list_files_before: list[str] = []
|
list_files_before: list[str] = []
|
||||||
try:
|
try:
|
||||||
if task.workflow_run_id:
|
if task.workflow_run_id:
|
||||||
download_run_id = get_effective_download_run_id(
|
list_files_before = list_files_in_directory(
|
||||||
context_run_id=context.run_id if context else None,
|
get_path_for_workflow_download_directory(
|
||||||
workflow_run_id=task.workflow_run_id,
|
context.run_id if context and context.run_id else task.workflow_run_id
|
||||||
task_id=task.task_id,
|
)
|
||||||
)
|
)
|
||||||
list_files_before = list_files_in_directory(get_path_for_workflow_download_directory(download_run_id))
|
|
||||||
if task.browser_session_id:
|
if task.browser_session_id:
|
||||||
browser_session_downloaded_files = await app.STORAGE.list_downloaded_files_in_browser_session(
|
browser_session_downloaded_files = await app.STORAGE.list_downloaded_files_in_browser_session(
|
||||||
organization_id=organization.organization_id,
|
organization_id=organization.organization_id,
|
||||||
@@ -508,12 +506,9 @@ class ForgeAgent:
|
|||||||
retry = False
|
retry = False
|
||||||
|
|
||||||
if task_block and task_block.complete_on_download and task.workflow_run_id:
|
if task_block and task_block.complete_on_download and task.workflow_run_id:
|
||||||
download_run_id = get_effective_download_run_id(
|
workflow_download_directory = get_path_for_workflow_download_directory(
|
||||||
context_run_id=context.run_id if context else None,
|
context.run_id if context and context.run_id else task.workflow_run_id
|
||||||
workflow_run_id=task.workflow_run_id,
|
|
||||||
task_id=task.task_id,
|
|
||||||
)
|
)
|
||||||
workflow_download_directory = get_path_for_workflow_download_directory(download_run_id)
|
|
||||||
|
|
||||||
downloading_files = list_downloading_files_in_directory(workflow_download_directory)
|
downloading_files = list_downloading_files_in_directory(workflow_download_directory)
|
||||||
if task.browser_session_id:
|
if task.browser_session_id:
|
||||||
@@ -3234,14 +3229,9 @@ class ForgeAgent:
|
|||||||
try:
|
try:
|
||||||
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
|
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
|
||||||
context = skyvern_context.current()
|
context = skyvern_context.current()
|
||||||
download_run_id = get_effective_download_run_id(
|
|
||||||
context_run_id=context.run_id if context else None,
|
|
||||||
workflow_run_id=task.workflow_run_id,
|
|
||||||
task_id=task.task_id,
|
|
||||||
)
|
|
||||||
await app.STORAGE.save_downloaded_files(
|
await app.STORAGE.save_downloaded_files(
|
||||||
organization_id=task.organization_id,
|
organization_id=task.organization_id,
|
||||||
run_id=download_run_id,
|
run_id=context.run_id if context and context.run_id else task.workflow_run_id or task.task_id,
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
@@ -3432,14 +3422,9 @@ class ForgeAgent:
|
|||||||
try:
|
try:
|
||||||
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
|
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
|
||||||
context = skyvern_context.current()
|
context = skyvern_context.current()
|
||||||
download_run_id = get_effective_download_run_id(
|
|
||||||
context_run_id=context.run_id if context else None,
|
|
||||||
workflow_run_id=task.workflow_run_id,
|
|
||||||
task_id=task.task_id,
|
|
||||||
)
|
|
||||||
downloaded_files = await app.STORAGE.get_downloaded_files(
|
downloaded_files = await app.STORAGE.get_downloaded_files(
|
||||||
organization_id=task.organization_id,
|
organization_id=task.organization_id,
|
||||||
run_id=download_run_id,
|
run_id=context.run_id if context and context.run_id else task.workflow_run_id or task.task_id,
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
|
|||||||
@@ -233,33 +233,6 @@ def get_download_dir(run_id: str | None) -> str:
|
|||||||
return download_dir
|
return download_dir
|
||||||
|
|
||||||
|
|
||||||
def get_effective_download_run_id(
|
|
||||||
context_run_id: str | None = None,
|
|
||||||
workflow_run_id: str | None = None,
|
|
||||||
task_id: str | None = None,
|
|
||||||
) -> str:
|
|
||||||
"""
|
|
||||||
Get the effective run_id for download operations.
|
|
||||||
|
|
||||||
This ensures consistent run_id logic across all download-related operations
|
|
||||||
(save_downloaded_files, get_downloaded_files, get_download_dir).
|
|
||||||
|
|
||||||
Priority:
|
|
||||||
1. context_run_id (if set explicitly)
|
|
||||||
2. workflow_run_id (for workflow tasks)
|
|
||||||
3. task_id (for standalone tasks)
|
|
||||||
|
|
||||||
Raises ValueError if no valid run_id can be determined.
|
|
||||||
"""
|
|
||||||
if context_run_id:
|
|
||||||
return context_run_id
|
|
||||||
if workflow_run_id:
|
|
||||||
return workflow_run_id
|
|
||||||
if task_id:
|
|
||||||
return task_id
|
|
||||||
raise ValueError("Cannot determine run_id: no context_run_id, workflow_run_id, or task_id provided")
|
|
||||||
|
|
||||||
|
|
||||||
def list_files_in_directory(directory: Path, recursive: bool = False) -> list[str]:
|
def list_files_in_directory(directory: Path, recursive: bool = False) -> list[str]:
|
||||||
listed_files: list[str] = []
|
listed_files: list[str] = []
|
||||||
for root, dirs, files in os.walk(directory):
|
for root, dirs, files in os.walk(directory):
|
||||||
|
|||||||
@@ -33,7 +33,6 @@ from skyvern.constants import (
|
|||||||
AZURE_BLOB_STORAGE_MAX_UPLOAD_FILE_COUNT,
|
AZURE_BLOB_STORAGE_MAX_UPLOAD_FILE_COUNT,
|
||||||
GET_DOWNLOADED_FILES_TIMEOUT,
|
GET_DOWNLOADED_FILES_TIMEOUT,
|
||||||
MAX_UPLOAD_FILE_COUNT,
|
MAX_UPLOAD_FILE_COUNT,
|
||||||
SAVE_DOWNLOADED_FILES_TIMEOUT,
|
|
||||||
)
|
)
|
||||||
from skyvern.exceptions import (
|
from skyvern.exceptions import (
|
||||||
AzureConfigurationError,
|
AzureConfigurationError,
|
||||||
@@ -54,7 +53,6 @@ from skyvern.forge.sdk.api.files import (
|
|||||||
create_named_temporary_file,
|
create_named_temporary_file,
|
||||||
download_file,
|
download_file,
|
||||||
download_from_s3,
|
download_from_s3,
|
||||||
get_effective_download_run_id,
|
|
||||||
get_path_for_workflow_download_directory,
|
get_path_for_workflow_download_directory,
|
||||||
parse_uri_to_path,
|
parse_uri_to_path,
|
||||||
)
|
)
|
||||||
@@ -848,41 +846,14 @@ class BaseTaskBlock(Block):
|
|||||||
)
|
)
|
||||||
success = updated_task.status == TaskStatus.completed
|
success = updated_task.status == TaskStatus.completed
|
||||||
|
|
||||||
# Determine the run_id for download operations (consistent across save and get)
|
|
||||||
download_run_id = get_effective_download_run_id(
|
|
||||||
context_run_id=current_context.run_id if current_context else None,
|
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
task_id=updated_task.task_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Ensure downloaded files are saved to storage before querying
|
|
||||||
# This fixes timing issue where files are in local dir but not yet in S3
|
|
||||||
try:
|
|
||||||
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
|
|
||||||
await app.STORAGE.save_downloaded_files(
|
|
||||||
organization_id=workflow_run.organization_id,
|
|
||||||
run_id=download_run_id,
|
|
||||||
)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
LOG.warning(
|
|
||||||
"Timeout saving downloaded files before building block output",
|
|
||||||
task_id=updated_task.task_id,
|
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
LOG.warning(
|
|
||||||
"Failed to save downloaded files before building block output",
|
|
||||||
exc_info=True,
|
|
||||||
task_id=updated_task.task_id,
|
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
downloaded_files: list[FileInfo] = []
|
downloaded_files: list[FileInfo] = []
|
||||||
try:
|
try:
|
||||||
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
|
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
|
||||||
downloaded_files = await app.STORAGE.get_downloaded_files(
|
downloaded_files = await app.STORAGE.get_downloaded_files(
|
||||||
organization_id=workflow_run.organization_id,
|
organization_id=workflow_run.organization_id,
|
||||||
run_id=download_run_id,
|
run_id=current_context.run_id
|
||||||
|
if current_context and current_context.run_id
|
||||||
|
else workflow_run_id or updated_task.task_id,
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
|
LOG.warning("Timeout getting downloaded files", task_id=updated_task.task_id)
|
||||||
|
|||||||
@@ -36,7 +36,6 @@ from skyvern.exceptions import (
|
|||||||
)
|
)
|
||||||
from skyvern.forge import app
|
from skyvern.forge import app
|
||||||
from skyvern.forge.prompts import prompt_engine
|
from skyvern.forge.prompts import prompt_engine
|
||||||
from skyvern.forge.sdk.api.files import get_effective_download_run_id
|
|
||||||
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
|
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
|
||||||
from skyvern.forge.sdk.core import skyvern_context
|
from skyvern.forge.sdk.core import skyvern_context
|
||||||
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature
|
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature
|
||||||
@@ -2609,13 +2608,9 @@ class WorkflowService:
|
|||||||
try:
|
try:
|
||||||
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
|
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
|
||||||
context = skyvern_context.current()
|
context = skyvern_context.current()
|
||||||
download_run_id = get_effective_download_run_id(
|
|
||||||
context_run_id=context.run_id if context else None,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
)
|
|
||||||
downloaded_files = await app.STORAGE.get_downloaded_files(
|
downloaded_files = await app.STORAGE.get_downloaded_files(
|
||||||
organization_id=workflow_run.organization_id,
|
organization_id=workflow_run.organization_id,
|
||||||
run_id=download_run_id,
|
run_id=context.run_id if context and context.run_id else workflow_run.workflow_run_id,
|
||||||
)
|
)
|
||||||
if task_v2:
|
if task_v2:
|
||||||
task_v2_downloaded_files = await app.STORAGE.get_downloaded_files(
|
task_v2_downloaded_files = await app.STORAGE.get_downloaded_files(
|
||||||
@@ -2748,13 +2743,9 @@ class WorkflowService:
|
|||||||
try:
|
try:
|
||||||
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
|
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
|
||||||
context = skyvern_context.current()
|
context = skyvern_context.current()
|
||||||
download_run_id = get_effective_download_run_id(
|
|
||||||
context_run_id=context.run_id if context else None,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
)
|
|
||||||
await app.STORAGE.save_downloaded_files(
|
await app.STORAGE.save_downloaded_files(
|
||||||
organization_id=workflow_run.organization_id,
|
organization_id=workflow_run.organization_id,
|
||||||
run_id=download_run_id,
|
run_id=context.run_id if context and context.run_id else workflow_run.workflow_run_id,
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
|
|||||||
Reference in New Issue
Block a user