Axis Unblock - slimmer workflow run overview payload (#4501)
This commit is contained in:
@@ -284,6 +284,62 @@ class WorkflowRunContext:
|
||||
return [self.mask_secrets_in_data(item, mask) for item in data]
|
||||
return data
|
||||
|
||||
def build_workflow_run_summary(self) -> dict[str, Any]:
|
||||
"""
|
||||
Build a workflow-level summary from per-block outputs.
|
||||
|
||||
Aggregates data across all blocks into a single workflow-level structure
|
||||
suitable for webhook payloads.
|
||||
|
||||
Returns a dict with:
|
||||
- workflow_run_id: The workflow run ID
|
||||
- status: Status from the last block
|
||||
- output.extracted_information: Merged extracted_information from all blocks
|
||||
- downloaded_files: Aggregated list from all blocks
|
||||
- errors: Aggregated list from all blocks
|
||||
- failure_reason: First non-null failure reason found
|
||||
"""
|
||||
last_status: str | None = None
|
||||
merged_extracted_information: dict[str, Any] = {}
|
||||
aggregated_downloaded_files: list[Any] = []
|
||||
aggregated_errors: list[Any] = []
|
||||
aggregated_failure_reason: str | None = None
|
||||
|
||||
for _, block_output in self.workflow_run_outputs.items():
|
||||
if not isinstance(block_output, dict):
|
||||
continue
|
||||
|
||||
block_status = block_output.get("status")
|
||||
if block_status:
|
||||
last_status = str(block_status)
|
||||
|
||||
extracted_info = block_output.get("extracted_information")
|
||||
if extracted_info is not None and isinstance(extracted_info, dict):
|
||||
# Merge extracted_information from all blocks together
|
||||
merged_extracted_information.update(extracted_info)
|
||||
|
||||
downloaded_files = block_output.get("downloaded_files")
|
||||
if downloaded_files:
|
||||
aggregated_downloaded_files.extend(downloaded_files)
|
||||
|
||||
errors = block_output.get("errors")
|
||||
if errors:
|
||||
aggregated_errors.extend(errors)
|
||||
|
||||
if aggregated_failure_reason is None:
|
||||
failure_reason = block_output.get("failure_reason")
|
||||
if failure_reason:
|
||||
aggregated_failure_reason = failure_reason
|
||||
|
||||
return {
|
||||
"workflow_run_id": self.workflow_run_id,
|
||||
"status": last_status,
|
||||
"output": {"extracted_information": merged_extracted_information},
|
||||
"downloaded_files": aggregated_downloaded_files,
|
||||
"errors": aggregated_errors,
|
||||
"failure_reason": aggregated_failure_reason,
|
||||
}
|
||||
|
||||
async def get_secrets_from_password_manager(self) -> dict[str, Any]:
|
||||
"""
|
||||
Get the secrets from the password manager. The secrets dict will contain the actual secret values.
|
||||
|
||||
@@ -125,10 +125,9 @@ def _json_type_filter(value: Any) -> str:
|
||||
When _render_templates_in_json() detects these markers, it unwraps and
|
||||
parses the JSON to get the native typed value (bool, int, list, etc.).
|
||||
|
||||
Args:
|
||||
value: Any JSON-serializable value (bool, int, float, str, list, dict, None).
|
||||
Uses default=str to handle non-JSON-serializable types (datetime, Enum, etc.)
|
||||
"""
|
||||
return f"{_JSON_TYPE_MARKER}{json.dumps(value)}{_JSON_TYPE_MARKER}"
|
||||
return f"{_JSON_TYPE_MARKER}{json.dumps(value, default=str)}{_JSON_TYPE_MARKER}"
|
||||
|
||||
|
||||
jinja_sandbox_env.filters["json"] = _json_type_filter
|
||||
@@ -397,6 +396,7 @@ class Block(BaseModel, abc.ABC):
|
||||
template_data["workflow_run_id"] = workflow_run_context.workflow_run_id
|
||||
|
||||
template_data["workflow_run_outputs"] = workflow_run_context.workflow_run_outputs
|
||||
template_data["workflow_run_summary"] = workflow_run_context.build_workflow_run_summary()
|
||||
|
||||
if settings.WORKFLOW_TEMPLATING_STRICTNESS == "strict":
|
||||
if missing_variables := get_missing_variables(potential_template, template_data):
|
||||
|
||||
Reference in New Issue
Block a user