From face695b5fcdba462246aeb45e9791c515739236 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Tue, 31 Dec 2024 15:03:36 -0800 Subject: [PATCH] fail observer cruise and mark workflow_run failed with failure reason when db error happens (#1459) --- .../forge/sdk/services/observer_service.py | 93 ++++++++++++++++--- 1 file changed, 79 insertions(+), 14 deletions(-) diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index d74e630c..b506e8eb 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -6,6 +6,7 @@ from typing import Any import structlog from pydantic import BaseModel +from sqlalchemy.exc import OperationalError from skyvern.exceptions import UrlGenerationFailure from skyvern.forge import app @@ -170,10 +171,65 @@ async def run_observer_cruise( max_iterations_override: str | int | None = None, ) -> None: organization_id = organization.organization_id - observer_cruise = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) + try: + observer_cruise = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) + except Exception: + LOG.error( + "Failed to get observer cruise", + observer_cruise_id=observer_cruise_id, + organization_id=organization_id, + exc_info=True, + ) + await mark_observer_cruise_as_failed(observer_cruise_id, organization_id=organization_id) + return None if not observer_cruise: LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id) return None + + try: + workflow, workflow_run = await run_observer_cruise_helper( + organization=organization, + observer_cruise=observer_cruise, + request_id=request_id, + max_iterations_override=max_iterations_override, + ) + except OperationalError: + LOG.error("Database error when running observer cruise", exc_info=True) + await mark_observer_cruise_as_failed( + observer_cruise_id, + workflow_run_id=observer_cruise.workflow_run_id, + failure_reason="Database error when running cruise", + organization_id=organization_id, + ) + return + except Exception: + LOG.error("Failed to run observer cruise", exc_info=True) + await mark_observer_cruise_as_failed( + observer_cruise_id, + workflow_run_id=observer_cruise.workflow_run_id, + # TODO: add better failure reason + failure_reason="Failed to run observer cruise", + organization_id=organization_id, + ) + return + + await app.DATABASE.update_observer_cruise( + observer_cruise_id=observer_cruise_id, + organization_id=organization_id, + status=ObserverCruiseStatus.completed, + ) + if workflow and workflow_run: + await app.WORKFLOW_SERVICE.clean_up_workflow(workflow=workflow, workflow_run=workflow_run) + + +async def run_observer_cruise_helper( + organization: Organization, + observer_cruise: ObserverCruise, + request_id: str | None = None, + max_iterations_override: str | int | None = None, +) -> tuple[Workflow, WorkflowRun] | tuple[None, None]: + organization_id = organization.organization_id + observer_cruise_id = observer_cruise.observer_cruise_id if observer_cruise.status != ObserverCruiseStatus.queued: LOG.error( "Observer cruise is not queued. Duplicate observer cruise", @@ -181,21 +237,21 @@ async def run_observer_cruise( status=observer_cruise.status, organization_id=organization_id, ) - return None + return None, None if not observer_cruise.url or not observer_cruise.prompt: LOG.error( "Observer cruise url or prompt not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id, ) - return None + return None, None if not observer_cruise.workflow_run_id: LOG.error( "Workflow run id not found in observer cruise", observer_cruise_id=observer_cruise_id, organization_id=organization_id, ) - return None + return None, None int_max_iterations_override = None if max_iterations_override: @@ -213,19 +269,19 @@ async def run_observer_cruise( workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id, organization_id=organization_id) if not workflow_run: LOG.error("Workflow run not found", workflow_run_id=workflow_run_id) - return None + return None, None else: LOG.info("Workflow run found", workflow_run_id=workflow_run_id) if workflow_run.status != WorkflowRunStatus.queued: LOG.warning("Duplicate workflow run execution", workflow_run_id=workflow_run_id, status=workflow_run.status) - return None + return None, None workflow_id = workflow_run.workflow_id workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id, organization_id=organization_id) if not workflow: LOG.error("Workflow not found", workflow_id=workflow_id) - return None + return None, None else: LOG.info("Workflow found", workflow_id=workflow_id) @@ -472,13 +528,7 @@ async def run_observer_cruise( ) await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id) break - - await app.DATABASE.update_observer_cruise( - observer_cruise_id=observer_cruise_id, - organization_id=organization_id, - status=ObserverCruiseStatus.completed, - ) - await app.WORKFLOW_SERVICE.clean_up_workflow(workflow=workflow, workflow_run=workflow_run) + return workflow, workflow_run async def handle_block_result( @@ -946,3 +996,18 @@ async def _record_thought_screenshot(observer_thought: ObserverThought, workflow async def get_observer_cruise(observer_cruise_id: str, organization_id: str | None = None) -> ObserverCruise | None: return await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) + + +async def mark_observer_cruise_as_failed( + observer_cruise_id: str, + workflow_run_id: str | None = None, + failure_reason: str | None = None, + organization_id: str | None = None, +) -> None: + await app.DATABASE.update_observer_cruise( + observer_cruise_id, organization_id=organization_id, status=ObserverCruiseStatus.failed + ) + if workflow_run_id: + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id, failure_reason=failure_reason or "Observer cruise failed" + )