From 4a599bbf5547b8f6199f6d35bd213233ffbfee1d Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Wed, 11 Sep 2024 21:56:38 -0700 Subject: [PATCH] Ykeremy/downloaded filename prefix (#814) --- skyvern/forge/agent.py | 38 +++++++++++++--------- skyvern/forge/sdk/api/files.py | 29 ++++++++++++++--- skyvern/forge/sdk/workflow/models/block.py | 16 ++++++++- skyvern/forge/sdk/workflow/models/yaml.py | 1 + skyvern/forge/sdk/workflow/service.py | 1 + 5 files changed, 64 insertions(+), 21 deletions(-) diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 61252e22..b41f7279 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -1,6 +1,8 @@ import asyncio import json +import os import random +import string from asyncio.exceptions import CancelledError from datetime import datetime from typing import Any, Tuple @@ -30,7 +32,7 @@ from skyvern.exceptions import ( from skyvern.forge import app from skyvern.forge.async_operations import AgentPhase, AsyncOperationPool from skyvern.forge.prompts import prompt_engine -from skyvern.forge.sdk.api.files import get_number_of_files_in_directory, get_path_for_workflow_download_directory +from skyvern.forge.sdk.api.files import get_path_for_workflow_download_directory, list_files_in_directory, rename_file from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.security import generate_skyvern_signature @@ -209,9 +211,7 @@ class ForgeAgent: api_key: str | None = None, workflow_run: WorkflowRun | None = None, close_browser_on_completion: bool = True, - # If complete_on_download is True and there is a workflow run, the task will be marked as completed - # if a download happens during the step execution. - complete_on_download: bool = False, + task_block: TaskBlock | None = None, ) -> Tuple[Step, DetailedAgentStepOutput | None, Step | None]: refreshed_task = await app.DATABASE.get_task(task_id=task.task_id, organization_id=organization.organization_id) if refreshed_task: @@ -247,10 +247,10 @@ class ForgeAgent: ) next_step: Step | None = None detailed_output: DetailedAgentStepOutput | None = None - num_files_before = 0 + list_files_before: list[str] = [] try: if task.workflow_run_id: - num_files_before = get_number_of_files_in_directory( + list_files_before = list_files_in_directory( get_path_for_workflow_download_directory(task.workflow_run_id) ) # Check some conditions before executing the step, throw an exception if the step can't be executed @@ -269,16 +269,24 @@ class ForgeAgent: task = await self.update_task_errors_from_detailed_output(task, detailed_output) retry = False - if complete_on_download and task.workflow_run_id: - num_files_after = get_number_of_files_in_directory( - get_path_for_workflow_download_directory(task.workflow_run_id) - ) - if num_files_after > num_files_before: + if task_block and task_block.complete_on_download and task.workflow_run_id: + workflow_download_directory = get_path_for_workflow_download_directory(task.workflow_run_id) + list_files_after = list_files_in_directory(workflow_download_directory) + if len(list_files_after) > len(list_files_before): + files_to_rename = list(set(list_files_after) - set(list_files_before)) + for file in files_to_rename: + random_file_id = "".join(random.choices(string.ascii_uppercase + string.digits, k=4)) + random_file_name = f"download-{datetime.now().strftime('%Y%m%d%H%M%S%f')}-{random_file_id}" + if task_block.download_suffix: + random_file_name = f"{random_file_name}-{task_block.download_suffix}" + rename_file(os.path.join(workflow_download_directory, file), random_file_name) + LOG.info( "Task marked as completed due to download", task_id=task.task_id, - num_files_before=num_files_before, - num_files_after=num_files_after, + num_files_before=len(list_files_before), + num_files_after=len(list_files_after), + new_files=files_to_rename, ) last_step = await self.update_step(step, is_last=True) completed_task = await self.update_task( @@ -352,7 +360,7 @@ class ForgeAgent: next_step, api_key=api_key, close_browser_on_completion=close_browser_on_completion, - complete_on_download=complete_on_download, + task_block=task_block, ) elif SettingsManager.get_settings().execute_all_steps() and next_step: return await self.execute_step( @@ -361,7 +369,7 @@ class ForgeAgent: next_step, api_key=api_key, close_browser_on_completion=close_browser_on_completion, - complete_on_download=complete_on_download, + task_block=task_block, ) else: LOG.info( diff --git a/skyvern/forge/sdk/api/files.py b/skyvern/forge/sdk/api/files.py index 406a97eb..88905886 100644 --- a/skyvern/forge/sdk/api/files.py +++ b/skyvern/forge/sdk/api/files.py @@ -84,14 +84,33 @@ def get_path_for_workflow_download_directory(workflow_run_id: str) -> Path: return Path(f"{REPO_ROOT_DIR}/downloads/{workflow_run_id}/") -def get_number_of_files_in_directory(directory: Path, recursive: bool = False) -> int: - count = 0 +def list_files_in_directory(directory: Path, recursive: bool = False) -> list[str]: + listed_files: list[str] = [] for root, dirs, files in os.walk(directory): + listed_files.extend([os.path.join(root, file) for file in files]) if not recursive: - count += len(files) break - count += len(files) - return count + + return listed_files + + +def get_number_of_files_in_directory(directory: Path, recursive: bool = False) -> int: + return len(list_files_in_directory(directory, recursive)) + + +def sanitize_filename(filename: str) -> str: + return "".join(c for c in filename if c.isalnum() or c in ["-", "_", "."]) + + +def rename_file(file_path: str, new_file_name: str) -> str: + try: + new_file_name = sanitize_filename(new_file_name) + new_file_path = os.path.join(os.path.dirname(file_path), new_file_name) + os.rename(file_path, new_file_path) + return new_file_path + except Exception: + LOG.exception(f"Failed to rename file {file_path} to {new_file_name}") + return file_path def calculate_sha256(file_path: str) -> str: diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 1b3cdf82..d39e0e56 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -176,6 +176,7 @@ class TaskBlock(Block): max_steps_per_run: int | None = None parameters: list[PARAMETER_TYPE] = [] complete_on_download: bool = False + download_suffix: str | None = None totp_verification_url: str | None = None totp_identifier: str | None = None @@ -249,6 +250,16 @@ class TaskBlock(Block): ) self.url = task_url_parameter_value + if self.download_suffix and workflow_run_context.has_parameter(self.download_suffix): + download_suffix_parameter_value = workflow_run_context.get_value(self.download_suffix) + if download_suffix_parameter_value: + LOG.info( + "Download prefix is parameterized, using parameter value", + download_suffix_parameter_value=download_suffix_parameter_value, + download_suffix_parameter_key=self.download_suffix, + ) + self.download_suffix = download_suffix_parameter_value + # TODO (kerem) we should always retry on terminated. We should make a distinction between retriable and # non-retryable terminations while will_retry: @@ -306,7 +317,7 @@ class TaskBlock(Block): task=task, step=step, workflow_run=workflow_run, - complete_on_download=self.complete_on_download, + task_block=self, ) except Exception as e: # Make sure the task is marked as failed in the database before raising the exception @@ -472,7 +483,10 @@ class ForLoopBlock(Block): workflow_run_context.set_value(context_parameter.key, context_parameter.value) block_outputs = [] for block_idx, loop_block in enumerate(self.loop_blocks): + original_loop_block = loop_block + loop_block = loop_block.copy() block_output = await loop_block.execute_safe(workflow_run_id=workflow_run_id) + loop_block = original_loop_block block_outputs.append(block_output) if not block_output.success and not loop_block.continue_on_failure: LOG.info( diff --git a/skyvern/forge/sdk/workflow/models/yaml.py b/skyvern/forge/sdk/workflow/models/yaml.py index b98f198a..5457b688 100644 --- a/skyvern/forge/sdk/workflow/models/yaml.py +++ b/skyvern/forge/sdk/workflow/models/yaml.py @@ -111,6 +111,7 @@ class TaskBlockYAML(BlockYAML): max_steps_per_run: int | None = None parameter_keys: list[str] | None = None complete_on_download: bool = False + download_suffix: str | None = None totp_verification_url: str | None = None totp_identifier: str | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index c1b90621..3e95f6fd 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -1048,6 +1048,7 @@ class WorkflowService: max_steps_per_run=block_yaml.max_steps_per_run, max_retries=block_yaml.max_retries, complete_on_download=block_yaml.complete_on_download, + download_suffix=block_yaml.download_suffix, continue_on_failure=block_yaml.continue_on_failure, totp_verification_url=block_yaml.totp_verification_url, totp_identifier=block_yaml.totp_identifier,