add extracted info to observer task history (#1488)

This commit is contained in:
Shuchang Zheng
2025-01-05 09:06:20 -08:00
committed by GitHub
parent 240b75ca4c
commit 466f20918b

View File

@@ -394,6 +394,7 @@ async def run_observer_cruise_helper(
break
block: BlockTypeVar | None = None
task_history_record: dict[str, Any] = {}
if task_type == "extract":
block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task(
observer_cruise=observer_cruise,
@@ -405,7 +406,7 @@ async def run_observer_cruise_helper(
data_extraction_goal=plan,
task_history=task_history,
)
task_history.append({"type": task_type, "task": plan})
task_history_record = {"type": task_type, "task": plan}
elif task_type == "navigate":
original_url = url if i == 0 else None
block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task(
@@ -415,7 +416,7 @@ async def run_observer_cruise_helper(
original_url=original_url,
navigation_goal=plan,
)
task_history.append({"type": task_type, "task": plan})
task_history_record = {"type": task_type, "task": plan}
elif task_type == "loop":
try:
block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task(
@@ -428,14 +429,12 @@ async def run_observer_cruise_helper(
original_url=url,
scraped_page=scraped_page,
)
task_history.append(
{
"type": task_type,
"task": plan,
"loop_over_values": extraction_obj.loop_values,
"task_inside_the_loop": inner_task,
}
)
task_history_record = {
"type": task_type,
"task": plan,
"loop_over_values": extraction_obj.loop_values,
"task_inside_the_loop": inner_task,
}
except Exception:
LOG.exception("Failed to generate loop task")
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
@@ -453,6 +452,15 @@ async def run_observer_cruise_helper(
# generate the extraction task
block_result = await block.execute_safe(workflow_run_id=workflow_run_id, organization_id=organization_id)
extracted_data = _get_extracted_data_from_block_result(
block_result,
task_type,
observer_cruise_id=observer_cruise_id,
workflow_run_id=workflow_run_id,
)
if extracted_data is not None:
task_history_record["extracted_data"] = extracted_data
task_history.append(task_history_record)
# refresh workflow
yaml_blocks.extend(block_yaml_list)
yaml_parameters.extend(parameter_yaml_list)
@@ -1018,3 +1026,62 @@ async def mark_observer_cruise_as_failed(
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id, failure_reason=failure_reason or "Observer cruise failed"
)
def _get_extracted_data_from_block_result(
block_result: BlockResult,
task_type: str,
observer_cruise_id: str | None = None,
workflow_run_id: str | None = None,
) -> Any | None:
"""Extract data from block result based on task type.
Args:
block_result: The result from block execution
task_type: Type of task ("extract" or "loop")
observer_cruise_id: Optional ID for logging
workflow_run_id: Optional ID for logging
Returns:
Extracted data if available, None otherwise
"""
if task_type == "extract":
if (
isinstance(block_result.output_parameter_value, dict)
and "extracted_information" in block_result.output_parameter_value
and block_result.output_parameter_value["extracted_information"]
):
return block_result.output_parameter_value["extracted_information"]
elif task_type == "loop":
# if loop task has data extraction, add it to the task history
# WARNING: the assumption here is that the output_paremeter_value is a list of list of dicts
# output_parameter_value data structure is not consistent across all the blocks
if block_result.output_parameter_value and isinstance(block_result.output_parameter_value, list):
loop_output_overall = []
for inner_loop_output in block_result.output_parameter_value:
inner_loop_output_overall = []
if not isinstance(inner_loop_output, list):
LOG.warning(
"Inner loop output is not a list",
inner_loop_output=inner_loop_output,
observer_cruise_id=observer_cruise_id,
workflow_run_id=workflow_run_id,
workflow_run_block_id=block_result.workflow_run_block_id,
)
continue
for inner_output in inner_loop_output:
if not isinstance(inner_output, dict):
LOG.warning(
"inner output is not a dict",
inner_output=inner_output,
observer_cruise_id=observer_cruise_id,
workflow_run_id=workflow_run_id,
workflow_run_block_id=block_result.workflow_run_block_id,
)
continue
output_value = inner_output.get("output_value", {})
if "extracted_information" in output_value and output_value["extracted_information"]:
inner_loop_output_overall.append(output_value["extracted_information"])
loop_output_overall.append(inner_loop_output_overall)
return loop_output_overall if loop_output_overall else None
return None