Move save legacy file logic to S3Storage + add storage class support + add logging (#2624)
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import BinaryIO
|
||||
|
||||
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType
|
||||
from skyvern.forge.sdk.models import Step
|
||||
@@ -116,3 +117,9 @@ class BaseStorage(ABC):
|
||||
self, organization_id: str, task_id: str | None, workflow_run_id: str | None
|
||||
) -> list[FileInfo]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def save_legacy_file(
|
||||
self, *, organization_id: str, filename: str, fileObj: BinaryIO
|
||||
) -> tuple[str, str] | None:
|
||||
pass
|
||||
|
||||
@@ -2,6 +2,7 @@ import os
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import BinaryIO
|
||||
|
||||
import structlog
|
||||
|
||||
@@ -178,3 +179,10 @@ class LocalStorage(BaseStorage):
|
||||
def _create_directories_if_not_exists(path_including_file_name: Path) -> None:
|
||||
path = path_including_file_name.parent
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
async def save_legacy_file(
|
||||
self, *, organization_id: str, filename: str, fileObj: BinaryIO
|
||||
) -> tuple[str, str] | None:
|
||||
raise NotImplementedError(
|
||||
"Legacy file storage is not implemented for LocalStorage. Please use a different storage backend."
|
||||
)
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from typing import BinaryIO
|
||||
|
||||
import structlog
|
||||
|
||||
@@ -210,3 +212,55 @@ class S3Storage(BaseStorage):
|
||||
file_infos.append(file_info)
|
||||
|
||||
return file_infos
|
||||
|
||||
async def save_legacy_file(
|
||||
self, *, organization_id: str, filename: str, fileObj: BinaryIO
|
||||
) -> tuple[str, str] | None:
|
||||
todays_date = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
|
||||
bucket = settings.AWS_S3_BUCKET_UPLOADS
|
||||
sc = await self._get_storage_class_for_org(organization_id)
|
||||
# First try uploading with original filename
|
||||
try:
|
||||
sanitized_filename = os.path.basename(filename) # Remove any path components
|
||||
s3_uri = f"s3://{bucket}/{settings.ENV}/{organization_id}/{todays_date}/{sanitized_filename}"
|
||||
uploaded_s3_uri = await self.async_client.upload_file_stream(s3_uri, fileObj, storage_class=sc)
|
||||
except Exception:
|
||||
LOG.error("Failed to upload file to S3", exc_info=True)
|
||||
uploaded_s3_uri = None
|
||||
|
||||
# If upload fails, try again with UUID prefix
|
||||
if not uploaded_s3_uri:
|
||||
uuid_prefixed_filename = f"{str(uuid.uuid4())}_{filename}"
|
||||
s3_uri = f"s3://{bucket}/{settings.ENV}/{organization_id}/{todays_date}/{uuid_prefixed_filename}"
|
||||
fileObj.seek(0) # Reset file pointer
|
||||
uploaded_s3_uri = await self.async_client.upload_file_stream(s3_uri, fileObj, storage_class=sc)
|
||||
|
||||
if not uploaded_s3_uri:
|
||||
LOG.error(
|
||||
"Failed to upload file to S3 after retrying with UUID prefix",
|
||||
organization_id=organization_id,
|
||||
storage_class=sc,
|
||||
filename=filename,
|
||||
exc_info=True,
|
||||
)
|
||||
return None
|
||||
LOG.debug(
|
||||
"Legacy file upload",
|
||||
organization_id=organization_id,
|
||||
storage_class=sc,
|
||||
filename=filename,
|
||||
uploaded_s3_uri=uploaded_s3_uri,
|
||||
)
|
||||
# Generate a presigned URL for the uploaded file
|
||||
presigned_urls = await self.async_client.create_presigned_urls([uploaded_s3_uri])
|
||||
if not presigned_urls:
|
||||
LOG.error(
|
||||
"Failed to create presigned URL for uploaded file",
|
||||
organization_id=organization_id,
|
||||
storage_class=sc,
|
||||
uploaded_s3_uri=uploaded_s3_uri,
|
||||
filename=filename,
|
||||
exc_info=True,
|
||||
)
|
||||
return None
|
||||
return presigned_urls[0], uploaded_s3_uri
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
import datetime
|
||||
import os
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from typing import Annotated, Any
|
||||
|
||||
@@ -14,7 +11,6 @@ from skyvern._version import __version__
|
||||
from skyvern.config import settings
|
||||
from skyvern.forge import app
|
||||
from skyvern.forge.prompts import prompt_engine
|
||||
from skyvern.forge.sdk.api.aws import aws_client
|
||||
from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError
|
||||
from skyvern.forge.sdk.artifact.models import Artifact
|
||||
from skyvern.forge.sdk.core import skyvern_context
|
||||
@@ -1696,36 +1692,12 @@ async def upload_file(
|
||||
file: UploadFile = Depends(_validate_file_size),
|
||||
current_org: Organization = Depends(org_auth_service.get_current_org),
|
||||
) -> Response:
|
||||
bucket = app.SETTINGS_MANAGER.AWS_S3_BUCKET_UPLOADS
|
||||
todays_date = datetime.datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
# First try uploading with original filename
|
||||
try:
|
||||
sanitized_filename = os.path.basename(file.filename) # Remove any path components
|
||||
s3_uri = (
|
||||
f"s3://{bucket}/{app.SETTINGS_MANAGER.ENV}/{current_org.organization_id}/{todays_date}/{sanitized_filename}"
|
||||
)
|
||||
uploaded_s3_uri = await aws_client.upload_file_stream(s3_uri, file.file)
|
||||
except Exception:
|
||||
LOG.error("Failed to upload file to S3", exc_info=True)
|
||||
uploaded_s3_uri = None
|
||||
|
||||
# If upload fails, try again with UUID prefix
|
||||
if not uploaded_s3_uri:
|
||||
uuid_prefixed_filename = f"{str(uuid.uuid4())}_{file.filename}"
|
||||
s3_uri = f"s3://{bucket}/{app.SETTINGS_MANAGER.ENV}/{current_org.organization_id}/{todays_date}/{uuid_prefixed_filename}"
|
||||
file.file.seek(0) # Reset file pointer
|
||||
uploaded_s3_uri = await aws_client.upload_file_stream(s3_uri, file.file)
|
||||
|
||||
if not uploaded_s3_uri:
|
||||
uris = await app.STORAGE.save_legacy_file(
|
||||
organization_id=current_org.organization_id, filename=file.filename, fileObj=file.file
|
||||
)
|
||||
if not uris:
|
||||
raise HTTPException(status_code=500, detail="Failed to upload file to S3.")
|
||||
|
||||
# Generate a presigned URL for the uploaded file
|
||||
presigned_urls = await aws_client.create_presigned_urls([uploaded_s3_uri])
|
||||
if not presigned_urls:
|
||||
raise HTTPException(status_code=500, detail="Failed to generate presigned URL.")
|
||||
|
||||
presigned_url = presigned_urls[0]
|
||||
presigned_url, uploaded_s3_uri = uris
|
||||
return ORJSONResponse(
|
||||
content={"s3_uri": uploaded_s3_uri, "presigned_url": presigned_url},
|
||||
status_code=200,
|
||||
|
||||
Reference in New Issue
Block a user