Add persist_browser_session flag to workflows (#777)
This commit is contained in:
@@ -75,6 +75,11 @@ def zip_files(files_path: str, zip_file_path: str) -> str:
|
||||
return zip_file_path
|
||||
|
||||
|
||||
def unzip_files(zip_file_path: str, output_dir: str) -> None:
|
||||
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
|
||||
zip_ref.extractall(output_dir)
|
||||
|
||||
|
||||
def get_path_for_workflow_download_directory(workflow_run_id: str) -> Path:
|
||||
return Path(f"{REPO_ROOT_DIR}/downloads/{workflow_run_id}/")
|
||||
|
||||
|
||||
@@ -59,3 +59,11 @@ class BaseStorage(ABC):
|
||||
@abstractmethod
|
||||
async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def store_browser_session(self, organization_id: str, workflow_permanent_id: str, directory: str) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def retrieve_browser_session(self, organization_id: str, workflow_permanent_id: str) -> str | None:
|
||||
pass
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from urllib.parse import unquote, urlparse
|
||||
@@ -85,6 +87,38 @@ class LocalStorage(BaseStorage):
|
||||
)
|
||||
return None
|
||||
|
||||
async def store_browser_session(self, organization_id: str, workflow_permanent_id: str, directory: str) -> None:
|
||||
stored_folder_path = (
|
||||
Path(SettingsManager.get_settings().BROWSER_SESSION_BASE_PATH) / organization_id / workflow_permanent_id
|
||||
)
|
||||
if directory == str(stored_folder_path):
|
||||
return
|
||||
self._create_directories_if_not_exists(stored_folder_path)
|
||||
LOG.info(
|
||||
"Storing browser session locally",
|
||||
organization_id=organization_id,
|
||||
workflow_permanent_id=workflow_permanent_id,
|
||||
directory=directory,
|
||||
browser_session_path=stored_folder_path,
|
||||
)
|
||||
|
||||
# Copy all files from the directory to the stored folder
|
||||
for root, _, files in os.walk(directory):
|
||||
for file in files:
|
||||
source_file_path = Path(root) / file
|
||||
relative_path = source_file_path.relative_to(directory)
|
||||
target_file_path = stored_folder_path / relative_path
|
||||
self._create_directories_if_not_exists(target_file_path)
|
||||
shutil.copy2(source_file_path, target_file_path)
|
||||
|
||||
async def retrieve_browser_session(self, organization_id: str, workflow_permanent_id: str) -> str | None:
|
||||
stored_folder_path = (
|
||||
Path(SettingsManager.get_settings().BROWSER_SESSION_BASE_PATH) / organization_id / workflow_permanent_id
|
||||
)
|
||||
if not stored_folder_path.exists():
|
||||
return None
|
||||
return str(stored_folder_path)
|
||||
|
||||
@staticmethod
|
||||
def _parse_uri_to_path(uri: str) -> str:
|
||||
parsed_uri = urlparse(uri)
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import shutil
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
|
||||
from skyvern.config import settings
|
||||
from skyvern.forge.sdk.api.aws import AsyncAWSClient
|
||||
from skyvern.forge.sdk.api.files import unzip_files
|
||||
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
|
||||
from skyvern.forge.sdk.artifact.storage.base import FILE_EXTENTSION_MAP, BaseStorage
|
||||
from skyvern.forge.sdk.models import Step
|
||||
@@ -40,3 +43,24 @@ class S3Storage(BaseStorage):
|
||||
async def get_streaming_file(self, organization_id: str, file_name: str, use_default: bool = True) -> bytes | None:
|
||||
path = f"s3://{settings.AWS_S3_BUCKET_SCREENSHOTS}/{settings.ENV}/{organization_id}/{file_name}"
|
||||
return await self.async_client.download_file(path, log_exception=False)
|
||||
|
||||
async def store_browser_session(self, organization_id: str, workflow_permanent_id: str, directory: str) -> None:
|
||||
# Zip the directory to a temp file
|
||||
temp_zip_file = tempfile.NamedTemporaryFile()
|
||||
zip_file_path = shutil.make_archive(temp_zip_file.name, "zip", directory)
|
||||
browser_session_uri = f"s3://{settings.AWS_S3_BUCKET_BROWSER_SESSIONS}/{settings.ENV}/{organization_id}/{workflow_permanent_id}.zip"
|
||||
await self.async_client.upload_file_from_path(browser_session_uri, zip_file_path)
|
||||
|
||||
async def retrieve_browser_session(self, organization_id: str, workflow_permanent_id: str) -> str | None:
|
||||
browser_session_uri = f"s3://{settings.AWS_S3_BUCKET_BROWSER_SESSIONS}/{settings.ENV}/{organization_id}/{workflow_permanent_id}.zip"
|
||||
downloaded_zip_bytes = await self.async_client.download_file(browser_session_uri, log_exception=True)
|
||||
if not downloaded_zip_bytes:
|
||||
return None
|
||||
temp_zip_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
temp_zip_file.write(downloaded_zip_bytes)
|
||||
temp_zip_file_path = temp_zip_file.name
|
||||
|
||||
temp_dir = tempfile.mkdtemp(prefix="skyvern_browser_session_")
|
||||
unzip_files(temp_zip_file_path, temp_dir)
|
||||
temp_zip_file.close()
|
||||
return temp_dir
|
||||
|
||||
@@ -819,6 +819,7 @@ class AgentDB:
|
||||
proxy_location: ProxyLocation | None = None,
|
||||
webhook_callback_url: str | None = None,
|
||||
totp_verification_url: str | None = None,
|
||||
persist_browser_session: bool = False,
|
||||
workflow_permanent_id: str | None = None,
|
||||
version: int | None = None,
|
||||
is_saved_task: bool = False,
|
||||
@@ -832,6 +833,7 @@ class AgentDB:
|
||||
proxy_location=proxy_location,
|
||||
webhook_callback_url=webhook_callback_url,
|
||||
totp_verification_url=totp_verification_url,
|
||||
persist_browser_session=persist_browser_session,
|
||||
is_saved_task=is_saved_task,
|
||||
)
|
||||
if workflow_permanent_id:
|
||||
|
||||
@@ -180,6 +180,7 @@ class WorkflowModel(Base):
|
||||
proxy_location = Column(Enum(ProxyLocation))
|
||||
webhook_callback_url = Column(String)
|
||||
totp_verification_url = Column(String)
|
||||
persist_browser_session = Column(Boolean, default=False, nullable=False)
|
||||
|
||||
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
|
||||
modified_at = Column(
|
||||
|
||||
@@ -162,6 +162,7 @@ def convert_to_workflow(workflow_model: WorkflowModel, debug_enabled: bool = Fal
|
||||
workflow_permanent_id=workflow_model.workflow_permanent_id,
|
||||
webhook_callback_url=workflow_model.webhook_callback_url,
|
||||
totp_verification_url=workflow_model.totp_verification_url,
|
||||
persist_browser_session=workflow_model.persist_browser_session,
|
||||
proxy_location=(ProxyLocation(workflow_model.proxy_location) if workflow_model.proxy_location else None),
|
||||
version=workflow_model.version,
|
||||
is_saved_task=workflow_model.is_saved_task,
|
||||
|
||||
@@ -51,6 +51,7 @@ class Workflow(BaseModel):
|
||||
proxy_location: ProxyLocation | None = None
|
||||
webhook_callback_url: str | None = None
|
||||
totp_verification_url: str | None = None
|
||||
persist_browser_session: bool = False
|
||||
|
||||
created_at: datetime
|
||||
modified_at: datetime
|
||||
|
||||
@@ -225,5 +225,6 @@ class WorkflowCreateYAMLRequest(BaseModel):
|
||||
proxy_location: ProxyLocation | None = None
|
||||
webhook_callback_url: str | None = None
|
||||
totp_verification_url: str | None = None
|
||||
persist_browser_session: bool = False
|
||||
workflow_definition: WorkflowDefinitionYAML
|
||||
is_saved_task: bool = False
|
||||
|
||||
@@ -286,6 +286,7 @@ class WorkflowService:
|
||||
proxy_location: ProxyLocation | None = None,
|
||||
webhook_callback_url: str | None = None,
|
||||
totp_verification_url: str | None = None,
|
||||
persist_browser_session: bool = False,
|
||||
workflow_permanent_id: str | None = None,
|
||||
version: int | None = None,
|
||||
is_saved_task: bool = False,
|
||||
@@ -298,6 +299,7 @@ class WorkflowService:
|
||||
proxy_location=proxy_location,
|
||||
webhook_callback_url=webhook_callback_url,
|
||||
totp_verification_url=totp_verification_url,
|
||||
persist_browser_session=persist_browser_session,
|
||||
workflow_permanent_id=workflow_permanent_id,
|
||||
version=version,
|
||||
is_saved_task=is_saved_task,
|
||||
@@ -657,6 +659,7 @@ class WorkflowService:
|
||||
tasks = await self.get_tasks_by_workflow_run_id(workflow_run.workflow_run_id)
|
||||
all_workflow_task_ids = [task.task_id for task in tasks]
|
||||
browser_state = await app.BROWSER_MANAGER.cleanup_for_workflow_run(
|
||||
workflow,
|
||||
workflow_run.workflow_run_id,
|
||||
all_workflow_task_ids,
|
||||
close_browser_on_completion,
|
||||
@@ -826,6 +829,7 @@ class WorkflowService:
|
||||
proxy_location=request.proxy_location,
|
||||
webhook_callback_url=request.webhook_callback_url,
|
||||
totp_verification_url=request.totp_verification_url,
|
||||
persist_browser_session=request.persist_browser_session,
|
||||
workflow_permanent_id=workflow_permanent_id,
|
||||
version=existing_version + 1,
|
||||
is_saved_task=request.is_saved_task,
|
||||
@@ -839,6 +843,7 @@ class WorkflowService:
|
||||
proxy_location=request.proxy_location,
|
||||
webhook_callback_url=request.webhook_callback_url,
|
||||
totp_verification_url=request.totp_verification_url,
|
||||
persist_browser_session=request.persist_browser_session,
|
||||
is_saved_task=request.is_saved_task,
|
||||
)
|
||||
# Create parameters from the request
|
||||
|
||||
Reference in New Issue
Block a user