fail observer and workerflow runs if observer max iterations reached (#1507)
This commit is contained in:
@@ -190,6 +190,7 @@ async def run_observer_cruise(
|
|||||||
LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id)
|
LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
workflow, workflow_run = None, None
|
||||||
try:
|
try:
|
||||||
workflow, workflow_run = await run_observer_cruise_helper(
|
workflow, workflow_run = await run_observer_cruise_helper(
|
||||||
organization=organization,
|
organization=organization,
|
||||||
@@ -216,14 +217,13 @@ async def run_observer_cruise(
|
|||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
finally:
|
||||||
|
if workflow and workflow_run:
|
||||||
|
await app.WORKFLOW_SERVICE.clean_up_workflow(workflow=workflow, workflow_run=workflow_run)
|
||||||
|
else:
|
||||||
|
LOG.warning("Workflow or workflow run not found")
|
||||||
|
|
||||||
await app.DATABASE.update_observer_cruise(
|
skyvern_context.reset()
|
||||||
observer_cruise_id=observer_cruise_id,
|
|
||||||
organization_id=organization_id,
|
|
||||||
status=ObserverCruiseStatus.completed,
|
|
||||||
)
|
|
||||||
if workflow and workflow_run:
|
|
||||||
await app.WORKFLOW_SERVICE.clean_up_workflow(workflow=workflow, workflow_run=workflow_run)
|
|
||||||
|
|
||||||
|
|
||||||
async def run_observer_cruise_helper(
|
async def run_observer_cruise_helper(
|
||||||
@@ -311,7 +311,8 @@ async def run_observer_cruise_helper(
|
|||||||
yaml_blocks: list[BLOCK_YAML_TYPES] = []
|
yaml_blocks: list[BLOCK_YAML_TYPES] = []
|
||||||
yaml_parameters: list[PARAMETER_YAML_TYPES] = []
|
yaml_parameters: list[PARAMETER_YAML_TYPES] = []
|
||||||
|
|
||||||
for i in range(int_max_iterations_override or DEFAULT_MAX_ITERATIONS):
|
max_iterations = int_max_iterations_override or DEFAULT_MAX_ITERATIONS
|
||||||
|
for i in range(max_iterations):
|
||||||
LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url)
|
LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url)
|
||||||
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
|
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
|
||||||
workflow_run=workflow_run,
|
workflow_run=workflow_run,
|
||||||
@@ -537,8 +538,26 @@ async def run_observer_cruise_helper(
|
|||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
completion_resp=completion_resp,
|
completion_resp=completion_resp,
|
||||||
)
|
)
|
||||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id)
|
await mark_observer_cruise_as_completed(
|
||||||
|
observer_cruise_id=observer_cruise_id,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
organization_id=organization_id,
|
||||||
|
)
|
||||||
break
|
break
|
||||||
|
else:
|
||||||
|
LOG.info(
|
||||||
|
"Observer cruise failed - run out of iterations",
|
||||||
|
max_iterations=max_iterations,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
)
|
||||||
|
await mark_observer_cruise_as_failed(
|
||||||
|
observer_cruise_id=observer_cruise_id,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
# TODO: add a better failure reason with LLM
|
||||||
|
failure_reason="Observer max iterations reached",
|
||||||
|
organization_id=organization_id,
|
||||||
|
)
|
||||||
|
|
||||||
return workflow, workflow_run
|
return workflow, workflow_run
|
||||||
|
|
||||||
|
|
||||||
@@ -1028,6 +1047,20 @@ async def mark_observer_cruise_as_failed(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def mark_observer_cruise_as_completed(
|
||||||
|
observer_cruise_id: str,
|
||||||
|
workflow_run_id: str | None = None,
|
||||||
|
organization_id: str | None = None,
|
||||||
|
) -> None:
|
||||||
|
await app.DATABASE.update_observer_cruise(
|
||||||
|
observer_cruise_id,
|
||||||
|
organization_id=organization_id,
|
||||||
|
status=ObserverCruiseStatus.completed,
|
||||||
|
)
|
||||||
|
if workflow_run_id:
|
||||||
|
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
|
||||||
|
|
||||||
|
|
||||||
def _get_extracted_data_from_block_result(
|
def _get_extracted_data_from_block_result(
|
||||||
block_result: BlockResult,
|
block_result: BlockResult,
|
||||||
task_type: str,
|
task_type: str,
|
||||||
|
|||||||
Reference in New Issue
Block a user