diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index d94e25e9..3718a629 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -394,6 +394,7 @@ async def run_observer_cruise_helper( break block: BlockTypeVar | None = None + task_history_record: dict[str, Any] = {} if task_type == "extract": block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( observer_cruise=observer_cruise, @@ -405,7 +406,7 @@ async def run_observer_cruise_helper( data_extraction_goal=plan, task_history=task_history, ) - task_history.append({"type": task_type, "task": plan}) + task_history_record = {"type": task_type, "task": plan} elif task_type == "navigate": original_url = url if i == 0 else None block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task( @@ -415,7 +416,7 @@ async def run_observer_cruise_helper( original_url=original_url, navigation_goal=plan, ) - task_history.append({"type": task_type, "task": plan}) + task_history_record = {"type": task_type, "task": plan} elif task_type == "loop": try: block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( @@ -428,14 +429,12 @@ async def run_observer_cruise_helper( original_url=url, scraped_page=scraped_page, ) - task_history.append( - { - "type": task_type, - "task": plan, - "loop_over_values": extraction_obj.loop_values, - "task_inside_the_loop": inner_task, - } - ) + task_history_record = { + "type": task_type, + "task": plan, + "loop_over_values": extraction_obj.loop_values, + "task_inside_the_loop": inner_task, + } except Exception: LOG.exception("Failed to generate loop task") await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( @@ -453,6 +452,15 @@ async def run_observer_cruise_helper( # generate the extraction task block_result = await block.execute_safe(workflow_run_id=workflow_run_id, organization_id=organization_id) + extracted_data = _get_extracted_data_from_block_result( + block_result, + task_type, + observer_cruise_id=observer_cruise_id, + workflow_run_id=workflow_run_id, + ) + if extracted_data is not None: + task_history_record["extracted_data"] = extracted_data + task_history.append(task_history_record) # refresh workflow yaml_blocks.extend(block_yaml_list) yaml_parameters.extend(parameter_yaml_list) @@ -1018,3 +1026,62 @@ async def mark_observer_cruise_as_failed( await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( workflow_run_id, failure_reason=failure_reason or "Observer cruise failed" ) + + +def _get_extracted_data_from_block_result( + block_result: BlockResult, + task_type: str, + observer_cruise_id: str | None = None, + workflow_run_id: str | None = None, +) -> Any | None: + """Extract data from block result based on task type. + + Args: + block_result: The result from block execution + task_type: Type of task ("extract" or "loop") + observer_cruise_id: Optional ID for logging + workflow_run_id: Optional ID for logging + + Returns: + Extracted data if available, None otherwise + """ + if task_type == "extract": + if ( + isinstance(block_result.output_parameter_value, dict) + and "extracted_information" in block_result.output_parameter_value + and block_result.output_parameter_value["extracted_information"] + ): + return block_result.output_parameter_value["extracted_information"] + elif task_type == "loop": + # if loop task has data extraction, add it to the task history + # WARNING: the assumption here is that the output_paremeter_value is a list of list of dicts + # output_parameter_value data structure is not consistent across all the blocks + if block_result.output_parameter_value and isinstance(block_result.output_parameter_value, list): + loop_output_overall = [] + for inner_loop_output in block_result.output_parameter_value: + inner_loop_output_overall = [] + if not isinstance(inner_loop_output, list): + LOG.warning( + "Inner loop output is not a list", + inner_loop_output=inner_loop_output, + observer_cruise_id=observer_cruise_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=block_result.workflow_run_block_id, + ) + continue + for inner_output in inner_loop_output: + if not isinstance(inner_output, dict): + LOG.warning( + "inner output is not a dict", + inner_output=inner_output, + observer_cruise_id=observer_cruise_id, + workflow_run_id=workflow_run_id, + workflow_run_block_id=block_result.workflow_run_block_id, + ) + continue + output_value = inner_output.get("output_value", {}) + if "extracted_information" in output_value and output_value["extracted_information"]: + inner_loop_output_overall.append(output_value["extracted_information"]) + loop_output_overall.append(inner_loop_output_overall) + return loop_output_overall if loop_output_overall else None + return None