From 56b4d828c15c57e8a995be0a45b925067e411893 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sat, 27 Sep 2025 11:18:17 -0700 Subject: [PATCH] fix workflow_run.run_with override (#3543) --- .../core/script_generations/skyvern_page.py | 2 +- skyvern/forge/sdk/db/client.py | 2 + skyvern/forge/sdk/workflow/models/workflow.py | 1 + skyvern/forge/sdk/workflow/service.py | 49 ++++++++++++------- 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/skyvern/core/script_generations/skyvern_page.py b/skyvern/core/script_generations/skyvern_page.py index eb2cc7c6..7c2f2525 100644 --- a/skyvern/core/script_generations/skyvern_page.py +++ b/skyvern/core/script_generations/skyvern_page.py @@ -367,7 +367,7 @@ class SkyvernPage: """ new_xpath = xpath - if intention and data: + if intention: try: # Build the element tree of the current page for the prompt context = skyvern_context.ensure_context() diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index e2fa1e72..e08aaab5 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1697,6 +1697,7 @@ class AgentDB: extra_http_headers: dict[str, str] | None = None, browser_address: str | None = None, sequential_key: str | None = None, + run_with: str | None = None, ) -> WorkflowRun: try: async with self.Session() as session: @@ -1715,6 +1716,7 @@ class AgentDB: extra_http_headers=extra_http_headers, browser_address=browser_address, sequential_key=sequential_key, + run_with=run_with, ) session.add(workflow_run) await session.commit() diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index edb5608f..762fa074 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -26,6 +26,7 @@ class WorkflowRequestBody(BaseModel): max_screenshot_scrolls: int | None = None extra_http_headers: dict[str, str] | None = None browser_address: str | None = None + run_with: str | None = None @field_validator("webhook_callback_url", "totp_verification_url") @classmethod diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index c2a78e50..fb185fde 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -388,7 +388,7 @@ class WorkflowService: WorkflowRunStatus.timed_out, ): workflow_run = await self.mark_workflow_run_as_completed( - workflow_run_id=workflow_run_id, is_script_run=is_script_run + workflow_run_id=workflow_run_id, ) await self.generate_script_if_needed( workflow=workflow, @@ -424,6 +424,7 @@ class WorkflowService: workflow_run_id = workflow_run.workflow_run_id top_level_blocks = workflow.workflow_definition.blocks all_blocks = get_all_blocks(top_level_blocks) + await self.mark_workflow_run_as_running(workflow_run_id=workflow_run_id, run_with="agent") if block_labels and len(block_labels): blocks: list[BlockTypeVar] = [] @@ -970,6 +971,7 @@ class WorkflowService: extra_http_headers=workflow_request.extra_http_headers, browser_address=workflow_request.browser_address, sequential_key=sequential_key, + run_with=workflow_request.run_with, ) async def _update_workflow_run_status( @@ -977,9 +979,8 @@ class WorkflowService: workflow_run_id: str, status: WorkflowRunStatus, failure_reason: str | None = None, - is_script_run: bool = False, + run_with: str | None = None, ) -> WorkflowRun: - run_with = "code" if is_script_run else "agent" workflow_run = await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=status, @@ -1006,7 +1007,7 @@ class WorkflowService: ) return workflow_run - async def mark_workflow_run_as_completed(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun: + async def mark_workflow_run_as_completed(self, workflow_run_id: str, run_with: str | None = None) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as completed", workflow_run_id=workflow_run_id, @@ -1015,11 +1016,14 @@ class WorkflowService: return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.completed, - is_script_run=is_script_run, + run_with=run_with, ) async def mark_workflow_run_as_failed( - self, workflow_run_id: str, failure_reason: str | None, is_script_run: bool = False + self, + workflow_run_id: str, + failure_reason: str | None, + run_with: str | None = None, ) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as failed", @@ -1031,23 +1035,27 @@ class WorkflowService: workflow_run_id=workflow_run_id, status=WorkflowRunStatus.failed, failure_reason=failure_reason, - is_script_run=is_script_run, + run_with=run_with, ) - async def mark_workflow_run_as_running(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun: + async def mark_workflow_run_as_running(self, workflow_run_id: str, run_with: str | None = None) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as running", workflow_run_id=workflow_run_id, workflow_status="running", + run_with=run_with, ) return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.running, - is_script_run=is_script_run, + run_with=run_with, ) async def mark_workflow_run_as_terminated( - self, workflow_run_id: str, failure_reason: str | None, is_script_run: bool = False + self, + workflow_run_id: str, + failure_reason: str | None, + run_with: str | None = None, ) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as terminated", @@ -1059,10 +1067,10 @@ class WorkflowService: workflow_run_id=workflow_run_id, status=WorkflowRunStatus.terminated, failure_reason=failure_reason, - is_script_run=is_script_run, + run_with=run_with, ) - async def mark_workflow_run_as_canceled(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun: + async def mark_workflow_run_as_canceled(self, workflow_run_id: str, run_with: str | None = None) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as canceled", workflow_run_id=workflow_run_id, @@ -1071,11 +1079,14 @@ class WorkflowService: return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.canceled, - is_script_run=is_script_run, + run_with=run_with, ) async def mark_workflow_run_as_timed_out( - self, workflow_run_id: str, failure_reason: str | None = None, is_script_run: bool = False + self, + workflow_run_id: str, + failure_reason: str | None = None, + run_with: str | None = None, ) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as timed out", @@ -1086,7 +1097,7 @@ class WorkflowService: workflow_run_id=workflow_run_id, status=WorkflowRunStatus.timed_out, failure_reason=failure_reason, - is_script_run=is_script_run, + run_with=run_with, ) async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun: @@ -2494,6 +2505,7 @@ class WorkflowService: Execute the related workflow script instead of running the workflow blocks. """ LOG.info("Start to execute workflow script", workflow_run_id=workflow_run.workflow_run_id) + await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id, run_with="code") try: # Render the cache_key_value to find the right script @@ -2514,7 +2526,7 @@ class WorkflowService: # Mark workflow run as completed workflow_run = await self.mark_workflow_run_as_completed( - workflow_run_id=workflow_run.workflow_run_id, is_script_run=True + workflow_run_id=workflow_run.workflow_run_id, ) LOG.info( @@ -2537,7 +2549,8 @@ class WorkflowService: # Mark workflow run as failed failure_reason = f"Failed to execute workflow script: {str(e)}" workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason, is_script_run=True + workflow_run_id=workflow_run.workflow_run_id, + failure_reason=failure_reason, ) return workflow_run @@ -2587,6 +2600,8 @@ class WorkflowService: ) -> bool: if workflow_run.run_with == "code": return True + if workflow_run.run_with == "agent": + return False if workflow.generate_script: return True return False