fail observer cruise and mark workflow_run failed with failure reason when db error happens (#1459)

This commit is contained in:
Shuchang Zheng
2024-12-31 15:03:36 -08:00
committed by GitHub
parent 7b058c224f
commit face695b5f

View File

@@ -6,6 +6,7 @@ from typing import Any
import structlog
from pydantic import BaseModel
from sqlalchemy.exc import OperationalError
from skyvern.exceptions import UrlGenerationFailure
from skyvern.forge import app
@@ -170,10 +171,65 @@ async def run_observer_cruise(
max_iterations_override: str | int | None = None,
) -> None:
organization_id = organization.organization_id
observer_cruise = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id)
try:
observer_cruise = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id)
except Exception:
LOG.error(
"Failed to get observer cruise",
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
exc_info=True,
)
await mark_observer_cruise_as_failed(observer_cruise_id, organization_id=organization_id)
return None
if not observer_cruise:
LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id)
return None
try:
workflow, workflow_run = await run_observer_cruise_helper(
organization=organization,
observer_cruise=observer_cruise,
request_id=request_id,
max_iterations_override=max_iterations_override,
)
except OperationalError:
LOG.error("Database error when running observer cruise", exc_info=True)
await mark_observer_cruise_as_failed(
observer_cruise_id,
workflow_run_id=observer_cruise.workflow_run_id,
failure_reason="Database error when running cruise",
organization_id=organization_id,
)
return
except Exception:
LOG.error("Failed to run observer cruise", exc_info=True)
await mark_observer_cruise_as_failed(
observer_cruise_id,
workflow_run_id=observer_cruise.workflow_run_id,
# TODO: add better failure reason
failure_reason="Failed to run observer cruise",
organization_id=organization_id,
)
return
await app.DATABASE.update_observer_cruise(
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
status=ObserverCruiseStatus.completed,
)
if workflow and workflow_run:
await app.WORKFLOW_SERVICE.clean_up_workflow(workflow=workflow, workflow_run=workflow_run)
async def run_observer_cruise_helper(
organization: Organization,
observer_cruise: ObserverCruise,
request_id: str | None = None,
max_iterations_override: str | int | None = None,
) -> tuple[Workflow, WorkflowRun] | tuple[None, None]:
organization_id = organization.organization_id
observer_cruise_id = observer_cruise.observer_cruise_id
if observer_cruise.status != ObserverCruiseStatus.queued:
LOG.error(
"Observer cruise is not queued. Duplicate observer cruise",
@@ -181,21 +237,21 @@ async def run_observer_cruise(
status=observer_cruise.status,
organization_id=organization_id,
)
return None
return None, None
if not observer_cruise.url or not observer_cruise.prompt:
LOG.error(
"Observer cruise url or prompt not found",
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
)
return None
return None, None
if not observer_cruise.workflow_run_id:
LOG.error(
"Workflow run id not found in observer cruise",
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
)
return None
return None, None
int_max_iterations_override = None
if max_iterations_override:
@@ -213,19 +269,19 @@ async def run_observer_cruise(
workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id, organization_id=organization_id)
if not workflow_run:
LOG.error("Workflow run not found", workflow_run_id=workflow_run_id)
return None
return None, None
else:
LOG.info("Workflow run found", workflow_run_id=workflow_run_id)
if workflow_run.status != WorkflowRunStatus.queued:
LOG.warning("Duplicate workflow run execution", workflow_run_id=workflow_run_id, status=workflow_run.status)
return None
return None, None
workflow_id = workflow_run.workflow_id
workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id, organization_id=organization_id)
if not workflow:
LOG.error("Workflow not found", workflow_id=workflow_id)
return None
return None, None
else:
LOG.info("Workflow found", workflow_id=workflow_id)
@@ -472,13 +528,7 @@ async def run_observer_cruise(
)
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id)
break
await app.DATABASE.update_observer_cruise(
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
status=ObserverCruiseStatus.completed,
)
await app.WORKFLOW_SERVICE.clean_up_workflow(workflow=workflow, workflow_run=workflow_run)
return workflow, workflow_run
async def handle_block_result(
@@ -946,3 +996,18 @@ async def _record_thought_screenshot(observer_thought: ObserverThought, workflow
async def get_observer_cruise(observer_cruise_id: str, organization_id: str | None = None) -> ObserverCruise | None:
return await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id)
async def mark_observer_cruise_as_failed(
observer_cruise_id: str,
workflow_run_id: str | None = None,
failure_reason: str | None = None,
organization_id: str | None = None,
) -> None:
await app.DATABASE.update_observer_cruise(
observer_cruise_id, organization_id=organization_id, status=ObserverCruiseStatus.failed
)
if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id, failure_reason=failure_reason or "Observer cruise failed"
)