diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 3cf43505..e0c87110 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -491,7 +491,6 @@ async def get_workflow_run( return await app.WORKFLOW_SERVICE.build_workflow_run_status_response( workflow_id=workflow_id, workflow_run_id=workflow_run_id, - last_block_result=None, organization_id=current_org.organization_id, ) diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index 3aa9dd77..25c0b8ec 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -6,7 +6,7 @@ from pydantic import BaseModel from skyvern.forge.sdk.schemas.tasks import ProxyLocation from skyvern.forge.sdk.workflow.exceptions import WorkflowDefinitionHasDuplicateBlockLabels -from skyvern.forge.sdk.workflow.models.block import BlockResult, BlockTypeVar +from skyvern.forge.sdk.workflow.models.block import BlockTypeVar from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE @@ -98,5 +98,4 @@ class WorkflowRunStatusResponse(BaseModel): parameters: dict[str, Any] screenshot_urls: list[str] | None = None recording_url: str | None = None - payload: dict[str, Any] | None = None - output: BlockResult | None = None + outputs: dict[str, Any] | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 3c078b4d..c7b96889 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -27,7 +27,6 @@ from skyvern.forge.sdk.workflow.exceptions import ( WorkflowDefinitionHasReservedParameterKeys, ) from skyvern.forge.sdk.workflow.models.block import ( - BlockResult, BlockType, BlockTypeVar, CodeBlock, @@ -204,27 +203,20 @@ class WorkflowService: ) else: await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) - break + await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key) + return workflow_run - except Exception as e: + except Exception: LOG.exception( f"Error while executing workflow run {workflow_run.workflow_run_id}", workflow_run_id=workflow_run.workflow_run_id, ) await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) - raise e + await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key) + return workflow_run - tasks = await self.get_tasks_by_workflow_run_id(workflow_run.workflow_run_id) await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id) - - await self.send_workflow_response( - workflow=workflow, - workflow_run=workflow_run, - tasks=tasks, - # TODO: We don't persist the block result for now, but we should in the case the users want to get it later - last_block_result=block_result, - api_key=api_key, - ) + await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key) return workflow_run async def handle_workflow_status(self, workflow_run: WorkflowRun, tasks: list[Task]) -> WorkflowRun: @@ -363,7 +355,9 @@ class WorkflowService: async def mark_workflow_run_as_completed(self, workflow_run_id: str) -> None: LOG.info( - f"Marking workflow run {workflow_run_id} as completed", workflow_run_id=workflow_run_id, status="completed" + f"Marking workflow run {workflow_run_id} as completed", + workflow_run_id=workflow_run_id, + workflow_status="completed", ) await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, @@ -371,7 +365,11 @@ class WorkflowService: ) async def mark_workflow_run_as_failed(self, workflow_run_id: str) -> None: - LOG.info(f"Marking workflow run {workflow_run_id} as failed", workflow_run_id=workflow_run_id, status="failed") + LOG.info( + f"Marking workflow run {workflow_run_id} as failed", + workflow_run_id=workflow_run_id, + workflow_status="failed", + ) await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.failed, @@ -379,7 +377,9 @@ class WorkflowService: async def mark_workflow_run_as_running(self, workflow_run_id: str) -> None: LOG.info( - f"Marking workflow run {workflow_run_id} as running", workflow_run_id=workflow_run_id, status="running" + f"Marking workflow run {workflow_run_id} as running", + workflow_run_id=workflow_run_id, + workflow_status="running", ) await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, @@ -390,7 +390,7 @@ class WorkflowService: LOG.info( f"Marking workflow run {workflow_run_id} as terminated", workflow_run_id=workflow_run_id, - status="terminated", + workflow_status="terminated", ) await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, @@ -511,7 +511,6 @@ class WorkflowService: self, workflow_id: str, workflow_run_id: str, - last_block_result: BlockResult | None, organization_id: str, ) -> WorkflowRunStatusResponse: workflow = await self.get_workflow(workflow_id=workflow_id, organization_id=organization_id) @@ -555,21 +554,14 @@ class WorkflowService: workflow_id=workflow_id, workflow_run_id=workflow_run_id ) if output_parameter_tuples: - payload = { - output_parameter.key: wfrp.value - for output_parameter, wfrp in output_parameter_tuples - if wfrp.value is not None - } + outputs = {output_parameter.key: output.value for output_parameter, output in output_parameter_tuples} else: - payload = { - task.task_id: { - "title": task.title, - "extracted_information": task.extracted_information, - "navigation_payload": task.navigation_payload, - "errors": await app.agent.get_task_errors(task=task), - } - for task in workflow_run_tasks - } + LOG.error( + "No output parameters found for workflow run", + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + ) + outputs = None return WorkflowRunStatusResponse( workflow_id=workflow_id, workflow_run_id=workflow_run_id, @@ -581,20 +573,18 @@ class WorkflowService: parameters=parameters_with_value, screenshot_urls=screenshot_urls, recording_url=recording_url, - payload=payload, - output=last_block_result, + outputs=outputs, ) async def send_workflow_response( self, workflow: Workflow, workflow_run: WorkflowRun, - tasks: list[Task], - last_block_result: BlockResult | None, api_key: str | None = None, close_browser_on_completion: bool = True, ) -> None: analytics.capture("skyvern-oss-agent-workflow-status", {"status": workflow_run.status}) + tasks = await self.get_tasks_by_workflow_run_id(workflow_run.workflow_run_id) all_workflow_task_ids = [task.task_id for task in tasks] browser_state = await app.BROWSER_MANAGER.cleanup_for_workflow_run( workflow_run.workflow_run_id, all_workflow_task_ids, close_browser_on_completion @@ -608,7 +598,6 @@ class WorkflowService: workflow_run_status_response = await self.build_workflow_run_status_response( workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, - last_block_result=last_block_result, organization_id=workflow.organization_id, ) LOG.info("Built workflow run status response", workflow_run_status_response=workflow_run_status_response) @@ -629,7 +618,7 @@ class WorkflowService: ) return - # send task_response to the webhook callback url + # send webhook to the webhook callback url # TODO: use async requests (httpx) timestamp = str(int(datetime.utcnow().timestamp())) payload = workflow_run_status_response.model_dump_json()