diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 0d5084e1..85b619b4 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -4957,6 +4957,59 @@ def _is_pure_jinja_expression(expression: str) -> bool: return True +def _resolve_nested_path(value: Any, path: str) -> Any: + """ + Resolve a dotted/bracket access path on a nested value. + + Examples: + _resolve_nested_path({"a": {"b": 1}}, ".a.b") -> 1 + _resolve_nested_path([{"x": 2}], "[0].x") -> 2 + + Args: + value: The root value to traverse + path: The access path (e.g., ".field1.field2[0].field3") + + Returns: + The resolved leaf value + + Raises: + LookupError: If the path cannot be resolved + """ + segments = re.findall(r"\.([a-zA-Z_]\w*)|\[(\d+)\]", path) + current = value + for dot_key, bracket_idx in segments: + if dot_key: + if isinstance(current, dict): + if dot_key not in current: + raise LookupError(f"Key {dot_key!r} not found") + current = current[dot_key] + else: + raise LookupError(f"Cannot access .{dot_key} on {type(current).__name__}") + elif bracket_idx: + idx = int(bracket_idx) + if isinstance(current, (list, tuple)): + if idx >= len(current): + raise LookupError(f"Index [{idx}] out of range") + current = current[idx] + else: + raise LookupError(f"Cannot index [{idx}] on {type(current).__name__}") + return current + + +_JINJA_DISPLAY_FILTERS: dict[str, Callable[[Any], Any]] = { + "lower": lambda v: str(v).lower(), + "upper": lambda v: str(v).upper(), + "trim": lambda v: str(v).strip(), + "title": lambda v: str(v).title(), + "capitalize": lambda v: str(v).capitalize(), + "int": lambda v: int(v), + "float": lambda v: float(v), + "string": lambda v: str(v), + "length": lambda v: len(v), + "abs": lambda v: abs(v), +} + + def _render_jinja_expression_for_display( expression: str, context_values: dict[str, Any], @@ -4969,6 +5022,13 @@ def _render_jinja_expression_for_display( without actually evaluating the expression. For example: - Input: "{{ base_date == date_1 }}" with context {"base_date": "01-25-2026", "date_1": "01-25-2026"} - Output: '"01-25-2026" == "01-25-2026"' + - Input: "{{ output.extracted_information.field != None }}" with nested dict context + - Output: '"some_value" != None' + - Input: "{{ output.status|lower == 'active' }}" with context {"output": {"status": "Active"}} + - Output: '"active" == \'active\'' + + Known Jinja filters (lower, upper, trim, etc.) are applied to the resolved value. + Unknown filters are left as-is in the output. Returns the original expression if it's not a pure Jinja expression or if rendering fails. """ @@ -4980,15 +5040,48 @@ def _render_jinja_expression_for_display( inner_expr = expression.strip()[2:-2].strip() display_expr = inner_expr - # Substitute variable names with their values using word boundary regex - # This ensures we only match whole variable names, not substrings - # e.g., "date" won't match inside "validate_date" or "date_1" + # Substitute variable references (including dotted/bracket access paths and filters) + # with their values. + # Match var_name optionally followed by .field or [index] segments, + # then optionally followed by a |filter_name. + # Sort by key length (longest first) to avoid partial matches. for var_name in sorted(context_values.keys(), key=len, reverse=True): - pattern = r"\b" + re.escape(var_name) + r"\b" - var_value = context_values[var_name] - # Quote string values for clarity - replacement = f'"{var_value}"' if isinstance(var_value, str) else str(var_value) - display_expr = re.sub(pattern, replacement, display_expr) + pattern = r"\b" + re.escape(var_name) + r"((?:\.[a-zA-Z_]\w*|\[\d+\])*)(\|[a-zA-Z_]\w*)?" + + def _replacer(match: re.Match, _var_name: str = var_name) -> str: + access_path = match.group(1) # the dotted/bracket part after var_name + filter_expr = match.group(2) # e.g., "|lower" or None + var_value = context_values[_var_name] + + if access_path: + try: + var_value = _resolve_nested_path(var_value, access_path) + except LookupError: + # Path couldn't be resolved — return original text unchanged + return match.group(0) + + if filter_expr: + filter_name = filter_expr[1:] # strip the leading | + filter_fn = _JINJA_DISPLAY_FILTERS.get(filter_name) + if filter_fn is not None: + try: + var_value = filter_fn(var_value) + except Exception: + # Filter application failed — show value with filter text + if isinstance(var_value, str): + return f'"{var_value}"{filter_expr}' + return f"{var_value}{filter_expr}" + else: + # Unknown filter — show value with filter text preserved + if isinstance(var_value, str): + return f'"{var_value}"{filter_expr}' + return f"{var_value}{filter_expr}" + + if isinstance(var_value, str): + return f'"{var_value}"' + return str(var_value) + + display_expr = re.sub(pattern, _replacer, display_expr) return display_expr except Exception as exc: