diff --git a/skyvern/forge/sdk/api/aws.py b/skyvern/forge/sdk/api/aws.py index 6c8130e1..c0977933 100644 --- a/skyvern/forge/sdk/api/aws.py +++ b/skyvern/forge/sdk/api/aws.py @@ -10,6 +10,18 @@ from skyvern.config import settings LOG = structlog.get_logger() +# We only include the storage classes that we want to use in our application. +class S3StorageClass(StrEnum): + STANDARD = "STANDARD" + # REDUCED_REDUNDANCY = "REDUCED_REDUNDANCY" + # INTELLIGENT_TIERING = "INTELLIGENT_TIERING" + ONEZONE_IA = "ONEZONE_IA" + GLACIER = "GLACIER" + # DEEP_ARCHIVE = "DEEP_ARCHIVE" + # OUTPOSTS = "OUTPOSTS" + # STANDARD_IA = "STANDARD_IA" + + class AWSClientType(StrEnum): S3 = "s3" SECRETS_MANAGER = "secretsmanager" @@ -68,21 +80,33 @@ class AsyncAWSClient: LOG.exception("Failed to delete secret.", secret_name=secret_name) raise e - async def upload_file(self, uri: str, data: bytes) -> str | None: + async def upload_file( + self, uri: str, data: bytes, storage_class: S3StorageClass = S3StorageClass.STANDARD + ) -> str | None: + if storage_class not in S3StorageClass: + raise ValueError(f"Invalid storage class: {storage_class}. Must be one of {list(S3StorageClass)}") try: async with self.session.client(AWSClientType.S3, region_name=self.region_name) as client: parsed_uri = S3Uri(uri) - await client.put_object(Body=data, Bucket=parsed_uri.bucket, Key=parsed_uri.key) + await client.put_object( + Body=data, Bucket=parsed_uri.bucket, Key=parsed_uri.key, StorageClass=str(storage_class) + ) return uri except Exception: LOG.exception("S3 upload failed.", uri=uri) return None - async def upload_file_stream(self, uri: str, file_obj: IO[bytes]) -> str | None: + async def upload_file_stream( + self, uri: str, file_obj: IO[bytes], storage_class: S3StorageClass = S3StorageClass.STANDARD + ) -> str | None: + if storage_class not in S3StorageClass: + raise ValueError(f"Invalid storage class: {storage_class}. Must be one of {list(S3StorageClass)}") try: async with self.session.client(AWSClientType.S3, region_name=self.region_name) as client: parsed_uri = S3Uri(uri) - await client.upload_fileobj(file_obj, parsed_uri.bucket, parsed_uri.key) + await client.upload_fileobj( + file_obj, parsed_uri.bucket, parsed_uri.key, StorageClass=str(storage_class) + ) LOG.debug("Upload file stream success", uri=uri) return uri except Exception: @@ -93,22 +117,21 @@ class AsyncAWSClient: self, uri: str, file_path: str, + storage_class: S3StorageClass = S3StorageClass.STANDARD, metadata: dict | None = None, raise_exception: bool = False, ) -> None: try: async with self.session.client(AWSClientType.S3, region_name=self.region_name) as client: parsed_uri = S3Uri(uri) - params: dict[str, Any] = { - "Filename": file_path, - "Bucket": parsed_uri.bucket, - "Key": parsed_uri.key, - } - - if metadata: - params["ExtraArgs"] = {"Metadata": metadata} - - await client.upload_file(**params) + extra_args: dict[str, Any] = {"ExtraArgs": {"Metadata": metadata}} if metadata else {} + await client.upload_file( + Filename=file_path, + Bucket=parsed_uri.bucket, + Key=parsed_uri.key, + StorageClass=str(storage_class), + **extra_args, + ) except Exception as e: LOG.exception("S3 upload failed.", uri=uri) if raise_exception: diff --git a/skyvern/forge/sdk/artifact/storage/s3.py b/skyvern/forge/sdk/artifact/storage/s3.py index a3d0b434..a0f89c4a 100644 --- a/skyvern/forge/sdk/artifact/storage/s3.py +++ b/skyvern/forge/sdk/artifact/storage/s3.py @@ -6,7 +6,7 @@ import structlog from skyvern.config import settings from skyvern.constants import DOWNLOAD_FILE_PREFIX -from skyvern.forge.sdk.api.aws import AsyncAWSClient +from skyvern.forge.sdk.api.aws import AsyncAWSClient, S3StorageClass from skyvern.forge.sdk.api.files import ( calculate_sha256_for_file, create_named_temporary_file, @@ -67,7 +67,18 @@ class S3Storage(BaseStorage): return f"s3://{self.bucket}/{settings.ENV}/ai_suggestions/{ai_suggestion.ai_suggestion_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" async def store_artifact(self, artifact: Artifact, data: bytes) -> None: - await self.async_client.upload_file(artifact.uri, data) + sc = await self._get_storage_class_for_org(artifact.organization_id) + LOG.info( + "Storing artifact", + artifact_id=artifact.id, + organization_id=artifact.organization_id, + uri=artifact.uri, + storage_class=sc, + ) + await self.async_client.upload_file(artifact.uri, data, storage_class=sc) + + async def _get_storage_class_for_org(self, organization_id: str | None) -> S3StorageClass: + return S3StorageClass.STANDARD async def retrieve_artifact(self, artifact: Artifact) -> bytes | None: return await self.async_client.download_file(artifact.uri) @@ -80,12 +91,30 @@ class S3Storage(BaseStorage): return await self.async_client.create_presigned_urls([artifact.uri for artifact in artifacts]) async def store_artifact_from_path(self, artifact: Artifact, path: str) -> None: - await self.async_client.upload_file_from_path(artifact.uri, path) + sc = await self._get_storage_class_for_org(artifact.organization_id) + LOG.info( + "Storing artifact from path", + artifact_id=artifact.id, + organization_id=artifact.organization_id, + uri=artifact.uri, + storage_class=sc, + path=path, + ) + await self.async_client.upload_file_from_path(artifact.uri, path, storage_class=sc) async def save_streaming_file(self, organization_id: str, file_name: str) -> None: from_path = f"{get_skyvern_temp_dir()}/{organization_id}/{file_name}" to_path = f"s3://{settings.AWS_S3_BUCKET_SCREENSHOTS}/{settings.ENV}/{organization_id}/{file_name}" - await self.async_client.upload_file_from_path(to_path, from_path) + sc = await self._get_storage_class_for_org(organization_id) + LOG.info( + "Saving streaming file", + organization_id=organization_id, + file_name=file_name, + from_path=from_path, + to_path=to_path, + storage_class=sc, + ) + await self.async_client.upload_file_from_path(to_path, from_path, storage_class=sc) async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None: path = f"s3://{settings.AWS_S3_BUCKET_SCREENSHOTS}/{settings.ENV}/{organization_id}/{file_name}" @@ -96,7 +125,16 @@ class S3Storage(BaseStorage): temp_zip_file = create_named_temporary_file() zip_file_path = shutil.make_archive(temp_zip_file.name, "zip", directory) browser_session_uri = f"s3://{settings.AWS_S3_BUCKET_BROWSER_SESSIONS}/{settings.ENV}/{organization_id}/{workflow_permanent_id}.zip" - await self.async_client.upload_file_from_path(browser_session_uri, zip_file_path) + sc = await self._get_storage_class_for_org(organization_id) + LOG.info( + "Storing browser session", + organization_id=organization_id, + workflow_permanent_id=workflow_permanent_id, + zip_file_path=zip_file_path, + browser_session_uri=browser_session_uri, + storage_class=sc, + ) + await self.async_client.upload_file_from_path(browser_session_uri, zip_file_path, storage_class=sc) async def retrieve_browser_session(self, organization_id: str, workflow_permanent_id: str) -> str | None: browser_session_uri = f"s3://{settings.AWS_S3_BUCKET_BROWSER_SESSIONS}/{settings.ENV}/{organization_id}/{workflow_permanent_id}.zip" @@ -117,6 +155,7 @@ class S3Storage(BaseStorage): ) -> None: download_dir = get_download_dir(workflow_run_id=workflow_run_id, task_id=task_id) files = os.listdir(download_dir) + sc = await self._get_storage_class_for_org(organization_id) for file in files: fpath = os.path.join(download_dir, file) if os.path.isfile(fpath): @@ -124,11 +163,19 @@ class S3Storage(BaseStorage): # Calculate SHA-256 checksum checksum = calculate_sha256_for_file(fpath) - LOG.info("Calculated checksum for file", file=file, checksum=checksum) - + LOG.info( + "Calculated checksum for file", + file=file, + checksum=checksum, + organization_id=organization_id, + storage_class=sc, + ) # Upload file with checksum metadata await self.async_client.upload_file_from_path( - uri=uri, file_path=fpath, metadata={"sha256_checksum": checksum, "original_filename": file} + uri=uri, + file_path=fpath, + metadata={"sha256_checksum": checksum, "original_filename": file}, + storage_class=sc, ) async def get_downloaded_files(