From 54f27a860106a1bb8c75741640bb8457883ae0e3 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Mon, 18 Aug 2025 13:25:54 -0700 Subject: [PATCH] run block using run_workflow interface (#3224) --- skyvern/forge/sdk/executor/async_executor.py | 18 ++++++++++------ skyvern/forge/sdk/routes/agent_protocol.py | 19 ++++++++++------- skyvern/forge/sdk/workflow/service.py | 1 + skyvern/services/block_service.py | 22 +++++++++++++------- skyvern/services/workflow_service.py | 2 ++ 5 files changed, 40 insertions(+), 22 deletions(-) diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index 4eb27555..86a3a52f 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -38,13 +38,14 @@ class AsyncExecutor(abc.ABC): async def execute_workflow( self, request: Request | None, - background_tasks: BackgroundTasks, + background_tasks: BackgroundTasks | None, organization: Organization, workflow_id: str, workflow_run_id: str, max_steps_override: int | None, api_key: str | None, browser_session_id: str | None, + block_labels: list[str] | None, **kwargs: dict, ) -> None: pass @@ -147,24 +148,29 @@ class BackgroundTaskExecutor(AsyncExecutor): max_steps_override: int | None, api_key: str | None, browser_session_id: str | None, + block_labels: list[str] | None, **kwargs: dict, ) -> None: - LOG.info( - "Executing workflow using background task executor", - workflow_run_id=workflow_run_id, - ) - if background_tasks: + LOG.info( + "Executing workflow using background task executor", + workflow_run_id=workflow_run_id, + ) + await initialize_skyvern_state_file( workflow_run_id=workflow_run_id, organization_id=organization.organization_id ) + background_tasks.add_task( app.WORKFLOW_SERVICE.execute_workflow, workflow_run_id=workflow_run_id, api_key=api_key, organization=organization, browser_session_id=browser_session_id, + block_labels=block_labels, ) + else: + LOG.warning("Background tasks not enabled, skipping workflow execution") async def execute_task_v2( self, diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index bd04a886..061b3d82 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -850,6 +850,8 @@ async def retry_run_webhook( response_model=BlockRunResponse, ) async def run_block( + request: Request, + background_tasks: BackgroundTasks, block_run_request: BlockRunRequest, organization: Organization = Depends(org_auth_service.get_current_org), template: bool = Query(False), @@ -869,14 +871,15 @@ async def run_block( browser_session_id = block_run_request.browser_session_id - asyncio.create_task( - block_service.execute_blocks( - api_key=x_api_key or "", - block_labels=block_run_request.block_labels, - workflow_run_id=workflow_run.workflow_run_id, - organization=organization, - browser_session_id=browser_session_id, - ) + await block_service.execute_blocks( + request=request, + background_tasks=background_tasks, + api_key=x_api_key or "", + block_labels=block_run_request.block_labels, + workflow_id=block_run_request.workflow_id, + workflow_run_id=workflow_run.workflow_run_id, + organization=organization, + browser_session_id=browser_session_id, ) return BlockRunResponse( diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 4bf76aaa..594fe2c4 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -272,6 +272,7 @@ class WorkflowService: workflow_run_id=workflow_run_id, organization_id=organization_id, browser_session_id=browser_session_id, + block_labels=block_labels, ) workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id, organization_id=organization_id) workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id=workflow_run.workflow_permanent_id) diff --git a/skyvern/services/block_service.py b/skyvern/services/block_service.py index 5f24361a..dcf05c1d 100644 --- a/skyvern/services/block_service.py +++ b/skyvern/services/block_service.py @@ -1,7 +1,8 @@ import structlog +from fastapi import BackgroundTasks, Request -from skyvern.forge import app from skyvern.forge.sdk.core import skyvern_context +from skyvern.forge.sdk.executor.factory import AsyncExecutorFactory from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.workflow.models.workflow import WorkflowRequestBody, WorkflowRun from skyvern.schemas.runs import WorkflowRunRequest @@ -44,12 +45,15 @@ async def ensure_workflow_run( async def execute_blocks( + request: Request, + background_tasks: BackgroundTasks, api_key: str, block_labels: list[str], + workflow_id: str, workflow_run_id: str, organization: Organization, browser_session_id: str | None = None, -) -> WorkflowRun: +) -> None: """ Runs one or more blocks of a workflow. """ @@ -61,12 +65,14 @@ async def execute_blocks( block_labels=block_labels, ) - workflow_run = await app.WORKFLOW_SERVICE.execute_workflow( - workflow_run_id=workflow_run_id, - api_key=api_key, + await AsyncExecutorFactory.get_executor().execute_workflow( + request=request, + background_tasks=background_tasks, organization=organization, - block_labels=block_labels, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + max_steps_override=None, browser_session_id=browser_session_id, + api_key=api_key, + block_labels=block_labels, ) - - return workflow_run diff --git a/skyvern/services/workflow_service.py b/skyvern/services/workflow_service.py index e403672d..c79d0d31 100644 --- a/skyvern/services/workflow_service.py +++ b/skyvern/services/workflow_service.py @@ -68,6 +68,7 @@ async def run_workflow( request_id: str | None = None, request: Request | None = None, background_tasks: BackgroundTasks | None = None, + block_labels: list[str] | None = None, ) -> WorkflowRun: workflow_run = await prepare_workflow( workflow_id=workflow_id, @@ -88,6 +89,7 @@ async def run_workflow( max_steps_override=max_steps, browser_session_id=workflow_request.browser_session_id, api_key=api_key, + block_labels=block_labels, ) return workflow_run