downloaded file debugger accessible (#3338)
This commit is contained in:
@@ -53,6 +53,7 @@ from skyvern.exceptions import (
|
||||
from skyvern.forge import app
|
||||
from skyvern.forge.async_operations import AgentPhase, AsyncOperationPool
|
||||
from skyvern.forge.prompts import prompt_engine
|
||||
from skyvern.forge.sdk.api.aws import aws_client
|
||||
from skyvern.forge.sdk.api.files import (
|
||||
get_path_for_workflow_download_directory,
|
||||
list_downloading_files_in_directory,
|
||||
@@ -189,6 +190,7 @@ class ForgeAgent:
|
||||
max_screenshot_scrolling_times=workflow_run.max_screenshot_scrolls,
|
||||
extra_http_headers=workflow_run.extra_http_headers,
|
||||
browser_address=workflow_run.browser_address,
|
||||
browser_session_id=workflow_run.browser_session_id,
|
||||
)
|
||||
LOG.info(
|
||||
"Created a new task for workflow run",
|
||||
@@ -387,6 +389,12 @@ class ForgeAgent:
|
||||
context.run_id if context and context.run_id else task.workflow_run_id
|
||||
)
|
||||
)
|
||||
if task.browser_session_id:
|
||||
browser_session_downloaded_files = await app.STORAGE.list_downloaded_files_in_browser_session(
|
||||
organization_id=organization.organization_id,
|
||||
browser_session_id=task.browser_session_id,
|
||||
)
|
||||
list_files_before = list_files_before + browser_session_downloaded_files
|
||||
# Check some conditions before executing the step, throw an exception if the step can't be executed
|
||||
await app.AGENT_FUNCTION.validate_step_execution(task, step)
|
||||
|
||||
@@ -470,7 +478,13 @@ class ForgeAgent:
|
||||
context.run_id if context and context.run_id else task.workflow_run_id
|
||||
)
|
||||
|
||||
downloading_files: list[Path] = list_downloading_files_in_directory(workflow_download_directory)
|
||||
downloading_files = list_downloading_files_in_directory(workflow_download_directory)
|
||||
if task.browser_session_id:
|
||||
browser_session_downloading_files = await app.STORAGE.list_downloading_files_in_browser_session(
|
||||
organization_id=organization.organization_id,
|
||||
browser_session_id=task.browser_session_id,
|
||||
)
|
||||
downloading_files = downloading_files + browser_session_downloading_files
|
||||
if len(downloading_files) > 0:
|
||||
LOG.info(
|
||||
"Detecting files are still downloading, waiting for files to be completely downloaded.",
|
||||
@@ -489,9 +503,23 @@ class ForgeAgent:
|
||||
)
|
||||
|
||||
list_files_after = list_files_in_directory(workflow_download_directory)
|
||||
if task.browser_session_id:
|
||||
browser_session_downloaded_files_after = await app.STORAGE.list_downloaded_files_in_browser_session(
|
||||
organization_id=organization.organization_id,
|
||||
browser_session_id=task.browser_session_id,
|
||||
)
|
||||
list_files_after = list_files_after + browser_session_downloaded_files_after
|
||||
if len(list_files_after) > len(list_files_before):
|
||||
files_to_rename = list(set(list_files_after) - set(list_files_before))
|
||||
for file in files_to_rename:
|
||||
if file.startswith("s3://"):
|
||||
file_data = await aws_client.download_file(file, log_exception=False)
|
||||
if not file_data:
|
||||
continue
|
||||
file = file.split("/")[-1] # Extract filename from the end of S3 URI
|
||||
with open(os.path.join(workflow_download_directory, file), "wb") as f:
|
||||
f.write(file_data)
|
||||
|
||||
file_extension = Path(file).suffix
|
||||
if file_extension == BROWSER_DOWNLOADING_SUFFIX:
|
||||
LOG.warning(
|
||||
|
||||
@@ -203,6 +203,16 @@ class AsyncAWSClient:
|
||||
LOG.exception("S3 download failed", uri=uri)
|
||||
return None
|
||||
|
||||
async def delete_file(self, uri: str, log_exception: bool = True) -> None:
|
||||
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/delete_object.html
|
||||
try:
|
||||
async with self._s3_client() as client:
|
||||
parsed_uri = S3Uri(uri)
|
||||
await client.delete_object(Bucket=parsed_uri.bucket, Key=parsed_uri.key)
|
||||
except Exception:
|
||||
if log_exception:
|
||||
LOG.exception("S3 delete failed", uri=uri)
|
||||
|
||||
async def get_object_info(self, uri: str) -> dict:
|
||||
async with self._s3_client() as client:
|
||||
parsed_uri = S3Uri(uri)
|
||||
|
||||
@@ -17,7 +17,7 @@ from yarl import URL
|
||||
from skyvern.config import settings
|
||||
from skyvern.constants import BROWSER_DOWNLOAD_TIMEOUT, BROWSER_DOWNLOADING_SUFFIX, REPO_ROOT_DIR
|
||||
from skyvern.exceptions import DownloadFileMaxSizeExceeded, DownloadFileMaxWaitingTime
|
||||
from skyvern.forge.sdk.api.aws import AsyncAWSClient
|
||||
from skyvern.forge.sdk.api.aws import AsyncAWSClient, aws_client
|
||||
from skyvern.utils.url_validators import encode_url
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
@@ -195,24 +195,28 @@ def list_files_in_directory(directory: Path, recursive: bool = False) -> list[st
|
||||
|
||||
def list_downloading_files_in_directory(
|
||||
directory: Path, downloading_suffix: str = BROWSER_DOWNLOADING_SUFFIX
|
||||
) -> list[Path]:
|
||||
) -> list[str]:
|
||||
# check if there's any file is still downloading
|
||||
downloading_files: list[Path] = []
|
||||
downloading_files: list[str] = []
|
||||
for file in list_files_in_directory(directory):
|
||||
path = Path(file)
|
||||
if path.suffix == downloading_suffix:
|
||||
downloading_files.append(path)
|
||||
downloading_files.append(file)
|
||||
return downloading_files
|
||||
|
||||
|
||||
async def wait_for_download_finished(downloading_files: list[Path], timeout: float = BROWSER_DOWNLOAD_TIMEOUT) -> None:
|
||||
async def wait_for_download_finished(downloading_files: list[str], timeout: float = BROWSER_DOWNLOAD_TIMEOUT) -> None:
|
||||
cur_downloading_files = downloading_files
|
||||
try:
|
||||
async with asyncio.timeout(timeout):
|
||||
while len(cur_downloading_files) > 0:
|
||||
new_downloading_files: list[Path] = []
|
||||
new_downloading_files: list[str] = []
|
||||
for path in cur_downloading_files:
|
||||
if not path.exists():
|
||||
if path.startswith("s3://"):
|
||||
metadata = await aws_client.get_file_metadata(path, log_exception=False)
|
||||
if not metadata:
|
||||
continue
|
||||
if not Path(path).exists():
|
||||
continue
|
||||
new_downloading_files.append(path)
|
||||
cur_downloading_files = new_downloading_files
|
||||
|
||||
@@ -123,6 +123,18 @@ class BaseStorage(ABC):
|
||||
async def retrieve_browser_session(self, organization_id: str, workflow_permanent_id: str) -> str | None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def list_downloaded_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[str]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def list_downloading_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[str]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def save_downloaded_files(self, organization_id: str, run_id: str | None) -> None:
|
||||
pass
|
||||
|
||||
@@ -210,6 +210,16 @@ class LocalStorage(BaseStorage):
|
||||
async def save_downloaded_files(self, organization_id: str, run_id: str | None) -> None:
|
||||
pass
|
||||
|
||||
async def list_downloaded_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[str]:
|
||||
return []
|
||||
|
||||
async def list_downloading_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[str]:
|
||||
return []
|
||||
|
||||
async def get_downloaded_files(self, organization_id: str, run_id: str | None) -> list[FileInfo]:
|
||||
download_dir = get_download_dir(run_id=run_id)
|
||||
file_infos: list[FileInfo] = []
|
||||
|
||||
@@ -7,7 +7,7 @@ from typing import BinaryIO
|
||||
import structlog
|
||||
|
||||
from skyvern.config import settings
|
||||
from skyvern.constants import DOWNLOAD_FILE_PREFIX
|
||||
from skyvern.constants import BROWSER_DOWNLOADING_SUFFIX, DOWNLOAD_FILE_PREFIX
|
||||
from skyvern.forge.sdk.api.aws import AsyncAWSClient, S3StorageClass
|
||||
from skyvern.forge.sdk.api.files import (
|
||||
calculate_sha256_for_file,
|
||||
@@ -195,6 +195,23 @@ class S3Storage(BaseStorage):
|
||||
temp_zip_file.close()
|
||||
return temp_dir
|
||||
|
||||
async def list_downloaded_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[str]:
|
||||
uri = f"s3://{settings.AWS_S3_BUCKET_ARTIFACTS}/v1/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/downloads"
|
||||
return [
|
||||
f"s3://{settings.AWS_S3_BUCKET_ARTIFACTS}/{file}" for file in await self.async_client.list_files(uri=uri)
|
||||
]
|
||||
|
||||
async def list_downloading_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[str]:
|
||||
uri = f"s3://{settings.AWS_S3_BUCKET_ARTIFACTS}/v1/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/downloads"
|
||||
files = [
|
||||
f"s3://{settings.AWS_S3_BUCKET_ARTIFACTS}/{file}" for file in await self.async_client.list_files(uri=uri)
|
||||
]
|
||||
return [file for file in files if file.endswith(BROWSER_DOWNLOADING_SUFFIX)]
|
||||
|
||||
async def save_downloaded_files(self, organization_id: str, run_id: str | None) -> None:
|
||||
download_dir = get_download_dir(run_id=run_id)
|
||||
files = os.listdir(download_dir)
|
||||
|
||||
Reference in New Issue
Block a user