diff --git a/skyvern/forge/sdk/workflow/context_manager.py b/skyvern/forge/sdk/workflow/context_manager.py index 56c8e97f..cb914e21 100644 --- a/skyvern/forge/sdk/workflow/context_manager.py +++ b/skyvern/forge/sdk/workflow/context_manager.py @@ -284,6 +284,62 @@ class WorkflowRunContext: return [self.mask_secrets_in_data(item, mask) for item in data] return data + def build_workflow_run_summary(self) -> dict[str, Any]: + """ + Build a workflow-level summary from per-block outputs. + + Aggregates data across all blocks into a single workflow-level structure + suitable for webhook payloads. + + Returns a dict with: + - workflow_run_id: The workflow run ID + - status: Status from the last block + - output.extracted_information: Merged extracted_information from all blocks + - downloaded_files: Aggregated list from all blocks + - errors: Aggregated list from all blocks + - failure_reason: First non-null failure reason found + """ + last_status: str | None = None + merged_extracted_information: dict[str, Any] = {} + aggregated_downloaded_files: list[Any] = [] + aggregated_errors: list[Any] = [] + aggregated_failure_reason: str | None = None + + for _, block_output in self.workflow_run_outputs.items(): + if not isinstance(block_output, dict): + continue + + block_status = block_output.get("status") + if block_status: + last_status = str(block_status) + + extracted_info = block_output.get("extracted_information") + if extracted_info is not None and isinstance(extracted_info, dict): + # Merge extracted_information from all blocks together + merged_extracted_information.update(extracted_info) + + downloaded_files = block_output.get("downloaded_files") + if downloaded_files: + aggregated_downloaded_files.extend(downloaded_files) + + errors = block_output.get("errors") + if errors: + aggregated_errors.extend(errors) + + if aggregated_failure_reason is None: + failure_reason = block_output.get("failure_reason") + if failure_reason: + aggregated_failure_reason = failure_reason + + return { + "workflow_run_id": self.workflow_run_id, + "status": last_status, + "output": {"extracted_information": merged_extracted_information}, + "downloaded_files": aggregated_downloaded_files, + "errors": aggregated_errors, + "failure_reason": aggregated_failure_reason, + } + async def get_secrets_from_password_manager(self) -> dict[str, Any]: """ Get the secrets from the password manager. The secrets dict will contain the actual secret values. diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 2dddbcbe..7a54bd51 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -125,10 +125,9 @@ def _json_type_filter(value: Any) -> str: When _render_templates_in_json() detects these markers, it unwraps and parses the JSON to get the native typed value (bool, int, list, etc.). - Args: - value: Any JSON-serializable value (bool, int, float, str, list, dict, None). + Uses default=str to handle non-JSON-serializable types (datetime, Enum, etc.) """ - return f"{_JSON_TYPE_MARKER}{json.dumps(value)}{_JSON_TYPE_MARKER}" + return f"{_JSON_TYPE_MARKER}{json.dumps(value, default=str)}{_JSON_TYPE_MARKER}" jinja_sandbox_env.filters["json"] = _json_type_filter @@ -397,6 +396,7 @@ class Block(BaseModel, abc.ABC): template_data["workflow_run_id"] = workflow_run_context.workflow_run_id template_data["workflow_run_outputs"] = workflow_run_context.workflow_run_outputs + template_data["workflow_run_summary"] = workflow_run_context.build_workflow_run_summary() if settings.WORKFLOW_TEMPLATING_STRICTNESS == "strict": if missing_variables := get_missing_variables(potential_template, template_data):