From 897579c057edf5e212219259b29697e974bb5d8d Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Mon, 6 Jan 2025 16:42:42 -0800 Subject: [PATCH] fail observer and workerflow runs if observer max iterations reached (#1507) --- .../forge/sdk/services/observer_service.py | 51 +++++++++++++++---- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index 3718a629..ef506c66 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -190,6 +190,7 @@ async def run_observer_cruise( LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id) return None + workflow, workflow_run = None, None try: workflow, workflow_run = await run_observer_cruise_helper( organization=organization, @@ -216,14 +217,13 @@ async def run_observer_cruise( organization_id=organization_id, ) return + finally: + if workflow and workflow_run: + await app.WORKFLOW_SERVICE.clean_up_workflow(workflow=workflow, workflow_run=workflow_run) + else: + LOG.warning("Workflow or workflow run not found") - await app.DATABASE.update_observer_cruise( - observer_cruise_id=observer_cruise_id, - organization_id=organization_id, - status=ObserverCruiseStatus.completed, - ) - if workflow and workflow_run: - await app.WORKFLOW_SERVICE.clean_up_workflow(workflow=workflow, workflow_run=workflow_run) + skyvern_context.reset() async def run_observer_cruise_helper( @@ -311,7 +311,8 @@ async def run_observer_cruise_helper( yaml_blocks: list[BLOCK_YAML_TYPES] = [] yaml_parameters: list[PARAMETER_YAML_TYPES] = [] - for i in range(int_max_iterations_override or DEFAULT_MAX_ITERATIONS): + max_iterations = int_max_iterations_override or DEFAULT_MAX_ITERATIONS + for i in range(max_iterations): LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url) browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( workflow_run=workflow_run, @@ -537,8 +538,26 @@ async def run_observer_cruise_helper( workflow_run_id=workflow_run_id, completion_resp=completion_resp, ) - await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id) + await mark_observer_cruise_as_completed( + observer_cruise_id=observer_cruise_id, + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) break + else: + LOG.info( + "Observer cruise failed - run out of iterations", + max_iterations=max_iterations, + workflow_run_id=workflow_run_id, + ) + await mark_observer_cruise_as_failed( + observer_cruise_id=observer_cruise_id, + workflow_run_id=workflow_run_id, + # TODO: add a better failure reason with LLM + failure_reason="Observer max iterations reached", + organization_id=organization_id, + ) + return workflow, workflow_run @@ -1028,6 +1047,20 @@ async def mark_observer_cruise_as_failed( ) +async def mark_observer_cruise_as_completed( + observer_cruise_id: str, + workflow_run_id: str | None = None, + organization_id: str | None = None, +) -> None: + await app.DATABASE.update_observer_cruise( + observer_cruise_id, + organization_id=organization_id, + status=ObserverCruiseStatus.completed, + ) + if workflow_run_id: + await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) + + def _get_extracted_data_from_block_result( block_result: BlockResult, task_type: str,