From d3c1e744c98ee017e950c859436df7645dcfad90 Mon Sep 17 00:00:00 2001 From: LawyZheng Date: Thu, 4 Dec 2025 00:25:35 +0800 Subject: [PATCH] browser session sequential workflow run (#4181) --- skyvern/forge/sdk/db/client.py | 32 +++++++++++++++ skyvern/forge/sdk/db/utils.py | 1 + skyvern/forge/sdk/workflow/service.py | 57 +++++++++++++-------------- 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 05e54c49..0578c86a 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -2518,6 +2518,7 @@ class AgentDB: try: async with self.Session() as session: query = select(WorkflowRunModel).filter_by(workflow_permanent_id=workflow_permanent_id) + query = query.filter(WorkflowRunModel.browser_session_id.is_(None)) if organization_id: query = query.filter_by(organization_id=organization_id) query = query.filter_by(status=WorkflowRunStatus.queued) @@ -2558,6 +2559,7 @@ class AgentDB: try: async with self.Session() as session: query = select(WorkflowRunModel).filter_by(workflow_permanent_id=workflow_permanent_id) + query = query.filter(WorkflowRunModel.browser_session_id.is_(None)) if organization_id: query = query.filter_by(organization_id=organization_id) query = query.filter_by(status=WorkflowRunStatus.running) @@ -2573,6 +2575,36 @@ class AgentDB: LOG.error("SQLAlchemyError", exc_info=True) raise + async def get_last_workflow_run_for_browser_session( + self, + browser_session_id: str, + organization_id: str | None = None, + ) -> WorkflowRun | None: + try: + async with self.Session() as session: + # check if there's a queued run + query = select(WorkflowRunModel).filter_by(browser_session_id=browser_session_id) + if organization_id: + query = query.filter_by(organization_id=organization_id) + + queue_query = query.filter_by(status=WorkflowRunStatus.queued) + queue_query = queue_query.order_by(WorkflowRunModel.modified_at.desc()) + workflow_run = (await session.scalars(queue_query)).first() + if workflow_run: + return convert_to_workflow_run(workflow_run) + + # check if there's a running run + running_query = query.filter_by(status=WorkflowRunStatus.running) + running_query = running_query.filter(WorkflowRunModel.started_at.isnot(None)) + running_query = running_query.order_by(WorkflowRunModel.started_at.desc()) + workflow_run = (await session.scalars(running_query)).first() + if workflow_run: + return convert_to_workflow_run(workflow_run) + return None + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + async def get_workflows_depending_on( self, workflow_run_id: str, diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index 24260dd3..baf9de2b 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -359,6 +359,7 @@ def convert_to_workflow_run( workflow_id=workflow_run_model.workflow_id, organization_id=workflow_run_model.organization_id, browser_session_id=workflow_run_model.browser_session_id, + debug_session_id=workflow_run_model.debug_session_id, browser_profile_id=workflow_run_model.browser_profile_id, status=WorkflowRunStatus[workflow_run_model.status], failure_reason=workflow_run_model.failure_reason, diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 27bd2ea8..d297f316 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -23,7 +23,6 @@ from skyvern.constants import GET_DOWNLOADED_FILES_TIMEOUT, SAVE_DOWNLOADED_FILE from skyvern.exceptions import ( BlockNotFound, BrowserProfileNotFound, - BrowserSessionAlreadyOccupiedError, BrowserSessionNotFound, CannotUpdateWorkflowDueToCodeCache, FailedToSendWebhook, @@ -415,26 +414,6 @@ class WorkflowService: if workflow_request.webhook_callback_url is None and workflow.webhook_callback_url is not None: workflow_request.webhook_callback_url = workflow.webhook_callback_url - if workflow_request.browser_session_id: - persistent_browser_session = await app.DATABASE.get_persistent_browser_session( - workflow_request.browser_session_id, organization.organization_id - ) - if persistent_browser_session is None: - LOG.warning( - "Failed to create workflow run, browser sesssion not found", - browser_session_id=workflow_request.browser_session_id, - ) - raise BrowserSessionNotFound(workflow_request.browser_session_id) - - if persistent_browser_session.runnable_id: - LOG.warning( - "Failed to create workflow run, browser session is already occupied", - browser_session_id=workflow_request.browser_session_id, - ) - raise BrowserSessionAlreadyOccupiedError( - workflow_request.browser_session_id, persistent_browser_session.runnable_id - ) - # Create the workflow run and set skyvern context workflow_run = await self.create_workflow_run( workflow_request=workflow_request, @@ -537,14 +516,6 @@ class WorkflowService: ) raise e - if workflow_request.browser_session_id: - await app.PERSISTENT_SESSIONS_MANAGER.begin_session( - browser_session_id=workflow_request.browser_session_id, - runnable_type="workflow_run", - runnable_id=workflow_run.workflow_run_id, - organization_id=organization.organization_id, - ) - return workflow_run @staticmethod @@ -693,6 +664,34 @@ class WorkflowService: browser_session_id=browser_session_id, ) + if browser_session_id: + try: + await app.PERSISTENT_SESSIONS_MANAGER.begin_session( + browser_session_id=browser_session_id, + runnable_type="workflow_run", + runnable_id=workflow_run_id, + organization_id=organization.organization_id, + ) + except Exception as e: + LOG.exception( + "Failed to begin browser session for workflow run", + browser_session_id=browser_session_id, + workflow_run_id=workflow_run_id, + ) + failure_reason = f"Failed to begin browser session for workflow run: {str(e)}" + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason=failure_reason, + ) + await self.clean_up_workflow( + workflow=workflow, + workflow_run=workflow_run, + api_key=api_key, + browser_session_id=browser_session_id, + close_browser_on_completion=close_browser_on_completion, + ) + return workflow_run + # Check if there's a related workflow script that should be used instead workflow_script, _ = await workflow_script_service.get_workflow_script(workflow, workflow_run, block_labels) current_context = skyvern_context.current()