let observer continue exploring when task failed/terminated (#1520)

This commit is contained in:
Shuchang Zheng
2025-01-08 20:06:14 -08:00
committed by GitHub
parent d3b159da49
commit 52c188b5de
3 changed files with 18 additions and 26 deletions

View File

@@ -457,6 +457,9 @@ async def run_observer_cruise_helper(
# generate the extraction task
block_result = await block.execute_safe(workflow_run_id=workflow_run_id, organization_id=organization_id)
task_history_record["status"] = str(block_result.status)
if block_result.failure_reason:
task_history_record["reason"] = block_result.failure_reason
extracted_data = _get_extracted_data_from_block_result(
block_result,
@@ -499,8 +502,8 @@ async def run_observer_cruise_helper(
status=workflow_run.status,
)
break
if block_result.success is True:
# validate completion
if block_result.success is True and i == max_iterations - 1:
# validate completion only happens at the last iteration
observer_completion_prompt = prompt_engine.load_prompt(
"observer_check_completion",
user_goal=user_prompt,
@@ -610,20 +613,10 @@ async def handle_block_result(
block_type_var=block.block_type,
block_label=block.label,
)
else:
failure_reason = f"Block with type {block.block_type} failed. failure reason: {block_result.failure_reason}"
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
)
# TODO: add api_key
await app.WORKFLOW_SERVICE.clean_up_workflow(
workflow=workflow,
workflow_run=workflow_run,
)
# observer will continue running the workflow
elif block_result.status == BlockStatus.terminated:
LOG.info(
f"Block with type {block.block_type} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated",
f"Block with type {block.block_type} was terminated for workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run.workflow_run_id,
block_result=block_result,
@@ -640,15 +633,6 @@ async def handle_block_result(
block_type_var=block.block_type,
block_label=block.label,
)
else:
failure_reason = f"Block with type {block.block_type} terminated. Reason: {block_result.failure_reason}"
await app.WORKFLOW_SERVICE.mark_workflow_run_as_terminated(
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
)
await app.WORKFLOW_SERVICE.clean_up_workflow(
workflow=workflow,
workflow_run=workflow_run,
)
# refresh workflow run model
return await app.WORKFLOW_SERVICE.get_workflow_run(
workflow_run_id=workflow_run_id,