Fix Jinja2 template errors from invalid parameter/block names with special characters (SKY-7356) (#4644)

This commit is contained in:
Celal Zamanoglu
2026-02-06 00:58:36 +03:00
committed by GitHub
parent c35a744e27
commit 7bf1c721e4
8 changed files with 514 additions and 40 deletions

View File

@@ -104,6 +104,42 @@ function header(
return `${prefix} Context Parameter`; return `${prefix} Context Parameter`;
} }
/**
* Validates that a parameter key is a valid Python/Jinja2 identifier.
* Parameter keys are used in Jinja2 templates, so they must be valid identifiers.
* Returns an error message if invalid, or null if valid.
*/
function validateParameterKey(key: string): string | null {
if (!key) return null; // Empty key is handled separately
// Check for whitespace
if (/\s/.test(key)) {
return "Key cannot contain whitespace characters. Consider using underscores (_) instead.";
}
// Check if it's a valid Python identifier:
// - Must start with a letter (a-z, A-Z) or underscore (_)
// - Can only contain letters, digits (0-9), and underscores
const validIdentifierRegex = /^[a-zA-Z_][a-zA-Z0-9_]*$/;
if (!validIdentifierRegex.test(key)) {
if (/^[0-9]/.test(key)) {
return "Key cannot start with a digit. Parameter keys must start with a letter or underscore.";
}
if (key.includes("/")) {
return "Key cannot contain '/' characters. Use underscores instead (e.g., 'State_or_Province' instead of 'State/Province').";
}
if (key.includes("-")) {
return "Key cannot contain '-' characters. Use underscores instead (e.g., 'my_parameter' instead of 'my-parameter').";
}
if (key.includes(".")) {
return "Key cannot contain '.' characters. Use underscores instead.";
}
return "Key must be a valid identifier (only letters, digits, and underscores; cannot start with a digit).";
}
return null;
}
// Helper to detect initial credential data type from existing parameter // Helper to detect initial credential data type from existing parameter
function detectInitialCredentialDataType( function detectInitialCredentialDataType(
initialValues: ParametersState[number] | undefined, initialValues: ParametersState[number] | undefined,
@@ -149,7 +185,7 @@ function WorkflowParameterEditPanel({
const isCloud = useContext(CloudContext); const isCloud = useContext(CloudContext);
const isEditMode = !!initialValues; const isEditMode = !!initialValues;
const [key, setKey] = useState(initialValues?.key ?? ""); const [key, setKey] = useState(initialValues?.key ?? "");
const hasWhitespace = /\s/.test(key); const keyValidationError = validateParameterKey(key);
// Detect initial values for backward compatibility // Detect initial values for backward compatibility
const isBitwardenCredential = const isBitwardenCredential =
@@ -314,10 +350,8 @@ function WorkflowParameterEditPanel({
<div className="space-y-1"> <div className="space-y-1">
<Label className="text-xs text-slate-300">Key</Label> <Label className="text-xs text-slate-300">Key</Label>
<Input value={key} onChange={(e) => setKey(e.target.value)} /> <Input value={key} onChange={(e) => setKey(e.target.value)} />
{hasWhitespace && ( {keyValidationError && (
<p className="text-xs text-destructive"> <p className="text-xs text-destructive">{keyValidationError}</p>
Spaces are not allowed, consider using _
</p>
)} )}
</div> </div>
<div className="space-y-1"> <div className="space-y-1">
@@ -769,12 +803,11 @@ function WorkflowParameterEditPanel({
}); });
return; return;
} }
if (hasWhitespace) { if (keyValidationError) {
toast({ toast({
variant: "destructive", variant: "destructive",
title: "Failed to save parameter", title: "Failed to save parameter",
description: description: keyValidationError,
"Key cannot contain whitespace characters. Consider using underscores (_) instead.",
}); });
return; return;
} }

View File

@@ -7,6 +7,40 @@ import {
} from "../editor/workflowEditorUtils"; } from "../editor/workflowEditorUtils";
import { useState } from "react"; import { useState } from "react";
import { useWorkflowParametersStore } from "@/store/WorkflowParametersStore"; import { useWorkflowParametersStore } from "@/store/WorkflowParametersStore";
import { toast } from "@/components/ui/use-toast";
/**
* Sanitizes a block label to be a valid Python/Jinja2 identifier.
* Block labels are used to create output parameter keys (e.g., '{label}_output')
* which are then used as Jinja2 template variable names.
*/
function sanitizeBlockLabel(value: string): {
sanitized: string;
wasModified: boolean;
} {
const original = value;
// Replace any character that's not a letter, digit, or underscore with underscore
let sanitized = value.replace(/[^a-zA-Z0-9_]/g, "_");
// Collapse multiple consecutive underscores into one
sanitized = sanitized.replace(/_+/g, "_");
// Remove leading/trailing underscores for cleaner labels
sanitized = sanitized.replace(/^_+|_+$/g, "");
// If starts with a digit (after cleanup), prepend an underscore
if (/^[0-9]/.test(sanitized)) {
sanitized = "_" + sanitized;
}
// If everything was stripped, provide a default
if (!sanitized) {
sanitized = "block";
}
return { sanitized, wasModified: original !== sanitized };
}
type Props = { type Props = {
id: string; id: string;
@@ -24,13 +58,22 @@ function useNodeLabelChangeHandler({ id, initialValue }: Props) {
function handleLabelChange(value: string) { function handleLabelChange(value: string) {
const existingLabels = nodes const existingLabels = nodes
.filter(isWorkflowBlockNode) .filter((n) => isWorkflowBlockNode(n) && n.id !== id)
.map((n) => n.data.label); .map((n) => n.data.label);
const labelWithoutWhitespace = value.replace(/\s+/g, "_");
const newLabel = getUniqueLabelForExistingNode( // Sanitize the label to be a valid Python identifier
labelWithoutWhitespace, const { sanitized, wasModified } = sanitizeBlockLabel(value);
existingLabels,
); // Show a toast if characters were modified
if (wasModified) {
toast({
title: "Block label adjusted",
description:
"Block labels can only contain letters, numbers, and underscores. Invalid characters have been replaced.",
});
}
const newLabel = getUniqueLabelForExistingNode(sanitized, existingLabels);
setLabel(newLabel); setLabel(newLabel);
setNodes( setNodes(
getUpdatedNodesAfterLabelUpdateForParameterKeys( getUpdatedNodesAfterLabelUpdateForParameterKeys(

View File

@@ -26,6 +26,7 @@ from skyvern.core.script_generations.generate_workflow_parameters import (
) )
from skyvern.forge import app from skyvern.forge import app
from skyvern.schemas.workflows import FileStorageType from skyvern.schemas.workflows import FileStorageType
from skyvern.utils.strings import sanitize_identifier
from skyvern.webeye.actions.action_types import ActionType from skyvern.webeye.actions.action_types import ActionType
LOG = structlog.get_logger(__name__) LOG = structlog.get_logger(__name__)
@@ -126,36 +127,23 @@ def sanitize_variable_name(name: str) -> str:
Sanitize a string to be a valid Python variable name. Sanitize a string to be a valid Python variable name.
- Converts to snake_case - Converts to snake_case
- Removes invalid characters - Removes invalid characters (via shared sanitize_identifier)
- Ensures it doesn't start with a number - Ensures it doesn't start with a number
- Handles Python keywords by appending underscore - Handles Python keywords by appending underscore
- Removes empty spaces - Converts to lowercase
""" """
# Remove leading/trailing whitespace and replace internal spaces with underscores
name = name.strip().replace(" ", "_")
# Convert to snake_case: handle camelCase and PascalCase # Convert to snake_case: handle camelCase and PascalCase
name = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name) name = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", name)
# Remove any characters that aren't alphanumeric or underscore # Convert to lowercase before sanitizing
name = re.sub(r"[^a-zA-Z0-9_]", "_", name)
# Convert to lowercase
name = name.lower() name = name.lower()
# Remove consecutive underscores # Use shared sanitize_identifier for core cleanup (uses "_" prefix for digit-leading names)
name = re.sub(r"_+", "_", name) name = sanitize_identifier(name, default="param")
# Remove leading/trailing underscores # For script variable names, use "param_" prefix instead of bare "_" for digit-leading names
name = name.strip("_") if name.startswith("_") and len(name) > 1 and name[1].isdigit():
name = f"param{name}"
# Ensure it doesn't start with a number
if name and name[0].isdigit():
name = f"param_{name}"
# Handle empty string or invalid names
if not name or name == "_":
name = "param"
# Handle Python keywords # Handle Python keywords
if keyword.iskeyword(name): if keyword.iskeyword(name):

View File

@@ -118,7 +118,13 @@ from skyvern.schemas.runs import (
WorkflowRunResponse, WorkflowRunResponse,
) )
from skyvern.schemas.webhooks import RetryRunWebhookRequest from skyvern.schemas.webhooks import RetryRunWebhookRequest
from skyvern.schemas.workflows import BlockType, WorkflowCreateYAMLRequest, WorkflowRequest, WorkflowStatus from skyvern.schemas.workflows import (
BlockType,
WorkflowCreateYAMLRequest,
WorkflowRequest,
WorkflowStatus,
sanitize_workflow_yaml_with_references,
)
from skyvern.services import block_service, run_service, task_v1_service, task_v2_service, workflow_service from skyvern.services import block_service, run_service, task_v1_service, task_v2_service, workflow_service
from skyvern.services.pdf_import_service import pdf_import_service from skyvern.services.pdf_import_service import pdf_import_service
from skyvern.webeye.actions.actions import Action from skyvern.webeye.actions.actions import Action
@@ -536,6 +542,9 @@ async def create_workflow_legacy(
except yaml.YAMLError: except yaml.YAMLError:
raise HTTPException(status_code=422, detail="Invalid YAML") raise HTTPException(status_code=422, detail="Invalid YAML")
# Auto-sanitize block labels and update references for imports
workflow_yaml = sanitize_workflow_yaml_with_references(workflow_yaml)
try: try:
workflow_create_request = WorkflowCreateYAMLRequest.model_validate(workflow_yaml) workflow_create_request = WorkflowCreateYAMLRequest.model_validate(workflow_yaml)
# Override folder_id if provided as query parameter # Override folder_id if provided as query parameter
@@ -588,6 +597,8 @@ async def create_workflow(
try: try:
if data.yaml_definition: if data.yaml_definition:
workflow_json_from_yaml = yaml.safe_load(data.yaml_definition) workflow_json_from_yaml = yaml.safe_load(data.yaml_definition)
# Auto-sanitize block labels and update references for imports
workflow_json_from_yaml = sanitize_workflow_yaml_with_references(workflow_json_from_yaml)
workflow_definition = WorkflowCreateYAMLRequest.model_validate(workflow_json_from_yaml) workflow_definition = WorkflowCreateYAMLRequest.model_validate(workflow_json_from_yaml)
elif data.json_definition: elif data.json_definition:
workflow_definition = data.json_definition workflow_definition = data.json_definition

View File

@@ -166,7 +166,10 @@ class InvalidWaitBlockTime(SkyvernException):
class FailedToFormatJinjaStyleParameter(SkyvernException): class FailedToFormatJinjaStyleParameter(SkyvernException):
def __init__(self, template: str, msg: str) -> None: def __init__(self, template: str, msg: str) -> None:
super().__init__( super().__init__(
f"Failed to format Jinja style parameter {template}. Please make sure the variable reference is correct. reason: {msg}" f"Failed to format Jinja style parameter '{template}'. "
f"Reason: {msg}. "
"If your block labels or parameter keys contain characters like '/', '-', or '.', "
"please rename them to use only letters, numbers, and underscores (e.g., 'State_Province' instead of 'State/Province')."
) )
@@ -175,7 +178,9 @@ class MissingJinjaVariables(SkyvernException):
self.variables = variables self.variables = variables
super().__init__( super().__init__(
f"There are missing variables for '{template}'. Please make sure the variables are supplied. Missing variables: {variables}" f"Missing variables for '{template}'. Missing: {variables}. "
"If your block labels or parameter keys contain characters like '/', '-', or '.', "
"please rename them to use only letters, numbers, and underscores (e.g., 'State_Province' instead of 'State/Province')."
) )

View File

@@ -3,11 +3,325 @@ from dataclasses import dataclass
from enum import StrEnum from enum import StrEnum
from typing import Annotated, Any, Literal from typing import Annotated, Any, Literal
import structlog
from pydantic import BaseModel, Field, field_validator, model_validator from pydantic import BaseModel, Field, field_validator, model_validator
from skyvern.config import settings from skyvern.config import settings
from skyvern.forge.sdk.workflow.models.parameter import OutputParameter, ParameterType, WorkflowParameterType from skyvern.forge.sdk.workflow.models.parameter import OutputParameter, ParameterType, WorkflowParameterType
from skyvern.schemas.runs import GeoTarget, ProxyLocation, RunEngine from skyvern.schemas.runs import GeoTarget, ProxyLocation, RunEngine
from skyvern.utils.strings import sanitize_identifier
from skyvern.utils.templating import replace_jinja_reference
LOG = structlog.get_logger()
def sanitize_block_label(value: str) -> str:
"""Sanitizes a block label to be a valid Python/Jinja2 identifier.
Block labels are used to create output parameter keys (e.g., '{label}_output')
which are then used as Jinja2 template variable names.
Args:
value: The raw label value to sanitize
Returns:
A sanitized label that is a valid Python identifier
"""
return sanitize_identifier(value, default="block")
def sanitize_parameter_key(value: str) -> str:
"""Sanitizes a parameter key to be a valid Python/Jinja2 identifier.
Parameter keys are used as Jinja2 template variable names.
Args:
value: The raw key value to sanitize
Returns:
A sanitized key that is a valid Python identifier
"""
return sanitize_identifier(value, default="parameter")
def _replace_references_in_value(value: Any, old_key: str, new_key: str) -> Any:
"""Recursively replaces Jinja references in a value (string, dict, or list)."""
if isinstance(value, str):
return replace_jinja_reference(value, old_key, new_key)
elif isinstance(value, dict):
return {k: _replace_references_in_value(v, old_key, new_key) for k, v in value.items()}
elif isinstance(value, list):
return [_replace_references_in_value(item, old_key, new_key) for item in value]
return value
def _replace_direct_string_in_value(value: Any, old_key: str, new_key: str) -> Any:
"""Recursively replaces exact string matches in a value (for fields like source_parameter_key)."""
if isinstance(value, str):
return new_key if value == old_key else value
elif isinstance(value, dict):
return {k: _replace_direct_string_in_value(v, old_key, new_key) for k, v in value.items()}
elif isinstance(value, list):
return [_replace_direct_string_in_value(item, old_key, new_key) for item in value]
return value
def _make_unique(candidate: str, seen: set[str]) -> str:
"""Appends a numeric suffix to make candidate unique within the seen set.
Args:
candidate: The candidate identifier
seen: Set of already-used identifiers (mutated -- the chosen name is added)
Returns:
A unique identifier, either candidate itself or candidate_N
"""
if candidate not in seen:
seen.add(candidate)
return candidate
counter = 2
while f"{candidate}_{counter}" in seen:
counter += 1
unique = f"{candidate}_{counter}"
seen.add(unique)
return unique
def _sanitize_blocks_recursive(
blocks: list[dict[str, Any]],
label_mapping: dict[str, str],
seen_labels: set[str],
) -> list[dict[str, Any]]:
"""Recursively sanitizes block labels and collects the label mapping.
Args:
blocks: List of block dictionaries
label_mapping: Dictionary to store old_label -> new_label mappings (mutated)
seen_labels: Set of already-used labels for collision avoidance (mutated)
Returns:
List of blocks with sanitized labels
"""
sanitized_blocks = []
for block in blocks:
block = dict(block) # Make a copy to avoid mutating the original
# Sanitize the block's label
if "label" in block and isinstance(block["label"], str):
old_label = block["label"]
new_label = sanitize_block_label(old_label)
new_label = _make_unique(new_label, seen_labels)
if old_label != new_label:
label_mapping[old_label] = new_label
block["label"] = new_label
else:
# Even if unchanged, track it as seen for collision avoidance
seen_labels.add(old_label)
# Handle nested blocks in for_loop
if "loop_blocks" in block and isinstance(block["loop_blocks"], list):
block["loop_blocks"] = _sanitize_blocks_recursive(block["loop_blocks"], label_mapping, seen_labels)
sanitized_blocks.append(block)
return sanitized_blocks
def _sanitize_parameters(
parameters: list[dict[str, Any]],
key_mapping: dict[str, str],
seen_keys: set[str],
) -> list[dict[str, Any]]:
"""Sanitizes parameter keys and collects the key mapping.
Args:
parameters: List of parameter dictionaries
key_mapping: Dictionary to store old_key -> new_key mappings (mutated)
seen_keys: Set of already-used keys for collision avoidance (mutated)
Returns:
List of parameters with sanitized keys
"""
sanitized_params = []
for param in parameters:
param = dict(param) # Make a copy
if "key" in param and isinstance(param["key"], str):
old_key = param["key"]
new_key = sanitize_parameter_key(old_key)
new_key = _make_unique(new_key, seen_keys)
if old_key != new_key:
key_mapping[old_key] = new_key
param["key"] = new_key
else:
# Even if unchanged, track it as seen for collision avoidance
seen_keys.add(old_key)
sanitized_params.append(param)
return sanitized_params
def _update_parameter_keys_in_blocks(
blocks: list[dict[str, Any]],
key_mapping: dict[str, str],
) -> list[dict[str, Any]]:
"""Updates parameter_keys arrays in blocks to use new parameter key names.
Args:
blocks: List of block dictionaries
key_mapping: Dictionary of old_key -> new_key mappings
Returns:
List of blocks with updated parameter_keys
"""
updated_blocks = []
for block in blocks:
block = dict(block)
# Update parameter_keys array if present
if "parameter_keys" in block and isinstance(block["parameter_keys"], list):
block["parameter_keys"] = [
key_mapping.get(key, key) if isinstance(key, str) else key for key in block["parameter_keys"]
]
# Handle nested blocks in for_loop
if "loop_blocks" in block and isinstance(block["loop_blocks"], list):
block["loop_blocks"] = _update_parameter_keys_in_blocks(block["loop_blocks"], key_mapping)
updated_blocks.append(block)
return updated_blocks
def sanitize_workflow_yaml_with_references(workflow_yaml: dict[str, Any]) -> dict[str, Any]:
"""Sanitizes block labels and parameter keys, and updates all references throughout the workflow.
This function:
1. Sanitizes all block labels to be valid Python identifiers
2. Sanitizes all parameter keys to be valid Python identifiers
3. Updates all Jinja references from {old_key} to {new_key}
4. Updates next_block_label if it references an old label
5. Updates finally_block_label if it references an old label
6. Updates parameter_keys arrays in blocks
7. Updates source_parameter_key and other direct references
Args:
workflow_yaml: The parsed workflow YAML dictionary
Returns:
The workflow YAML with sanitized identifiers and updated references
"""
workflow_yaml = dict(workflow_yaml) # Make a copy
workflow_definition = workflow_yaml.get("workflow_definition")
if not workflow_definition or not isinstance(workflow_definition, dict):
return workflow_yaml
workflow_definition = dict(workflow_definition) # Make a copy
workflow_yaml["workflow_definition"] = workflow_definition
# Step 1: Sanitize all block labels and collect the mapping
label_mapping: dict[str, str] = {} # old_label -> new_label
seen_labels: set[str] = set()
blocks = workflow_definition.get("blocks")
if blocks and isinstance(blocks, list):
workflow_definition["blocks"] = _sanitize_blocks_recursive(blocks, label_mapping, seen_labels)
# Step 2: Sanitize all parameter keys and collect the mapping
param_key_mapping: dict[str, str] = {} # old_key -> new_key
seen_keys: set[str] = set()
parameters = workflow_definition.get("parameters")
if parameters and isinstance(parameters, list):
workflow_definition["parameters"] = _sanitize_parameters(parameters, param_key_mapping, seen_keys)
# If nothing was changed, return early
if not label_mapping and not param_key_mapping:
return workflow_yaml
LOG.info(
"Auto-sanitized workflow identifiers during import",
sanitized_labels=label_mapping if label_mapping else None,
sanitized_parameter_keys=param_key_mapping if param_key_mapping else None,
)
# Step 3: Update all block label references
for old_label, new_label in label_mapping.items():
old_output_key = f"{old_label}_output"
new_output_key = f"{new_label}_output"
# Update Jinja references in blocks for {label}_output pattern
if "blocks" in workflow_definition:
workflow_definition["blocks"] = _replace_references_in_value(
workflow_definition["blocks"], old_output_key, new_output_key
)
# Also update shorthand {{ label }} references (must be done after _output to avoid partial matches)
workflow_definition["blocks"] = _replace_references_in_value(
workflow_definition["blocks"], old_label, new_label
)
# Update Jinja references in parameters for {label}_output pattern
if "parameters" in workflow_definition:
workflow_definition["parameters"] = _replace_references_in_value(
workflow_definition["parameters"], old_output_key, new_output_key
)
# Also update shorthand {{ label }} references
workflow_definition["parameters"] = _replace_references_in_value(
workflow_definition["parameters"], old_label, new_label
)
# Also update direct string references (e.g., source_parameter_key)
workflow_definition["parameters"] = _replace_direct_string_in_value(
workflow_definition["parameters"], old_output_key, new_output_key
)
# Step 4: Update all parameter key references
for old_key, new_key in param_key_mapping.items():
# Update Jinja references in blocks (e.g., {{ old_key }})
if "blocks" in workflow_definition:
workflow_definition["blocks"] = _replace_references_in_value(
workflow_definition["blocks"], old_key, new_key
)
# Update Jinja references in parameters (e.g., default values that reference other params)
if "parameters" in workflow_definition:
workflow_definition["parameters"] = _replace_references_in_value(
workflow_definition["parameters"], old_key, new_key
)
# Also update direct string references (e.g., source_parameter_key)
workflow_definition["parameters"] = _replace_direct_string_in_value(
workflow_definition["parameters"], old_key, new_key
)
# Step 5: Update parameter_keys arrays in blocks
if param_key_mapping and "blocks" in workflow_definition:
workflow_definition["blocks"] = _update_parameter_keys_in_blocks(
workflow_definition["blocks"], param_key_mapping
)
# Step 6: Update finally_block_label if it references an old label
if "finally_block_label" in workflow_definition:
finally_label = workflow_definition["finally_block_label"]
if finally_label in label_mapping:
workflow_definition["finally_block_label"] = label_mapping[finally_label]
# Step 7: Update next_block_label in all blocks
def update_next_block_label(blocks: list[dict[str, Any]]) -> list[dict[str, Any]]:
updated_blocks = []
for block in blocks:
block = dict(block)
if "next_block_label" in block:
next_label = block["next_block_label"]
if next_label in label_mapping:
block["next_block_label"] = label_mapping[next_label]
if "loop_blocks" in block and isinstance(block["loop_blocks"], list):
block["loop_blocks"] = update_next_block_label(block["loop_blocks"])
updated_blocks.append(block)
return updated_blocks
if label_mapping and "blocks" in workflow_definition:
workflow_definition["blocks"] = update_next_block_label(workflow_definition["blocks"])
return workflow_yaml
class WorkflowStatus(StrEnum): class WorkflowStatus(StrEnum):
@@ -89,9 +403,26 @@ class ParameterYAML(BaseModel, abc.ABC):
@field_validator("key") @field_validator("key")
@classmethod @classmethod
def validate_no_whitespace(cls, v: str) -> str: def validate_key_is_valid_identifier(cls, v: str) -> str:
"""Validate that parameter key is a valid Jinja2/Python identifier.
Parameter keys are used as Jinja2 template variable names. Jinja2 variable names
must be valid Python identifiers (letters, digits, underscores; cannot start with digit).
Characters like '/', '-', '.', etc. are NOT allowed because they are interpreted as
operators in Jinja2 templates, causing parsing errors like "'State_' is undefined"
when using a key like "State_/_Province".
"""
if any(char in v for char in [" ", "\t", "\n", "\r"]): if any(char in v for char in [" ", "\t", "\n", "\r"]):
raise ValueError("Key cannot contain whitespaces") raise ValueError("Key cannot contain whitespace characters")
if not v.isidentifier():
raise ValueError(
f"Key '{v}' is not a valid parameter name. "
"Parameter keys must be valid Python identifiers "
"(only letters, digits, and underscores; cannot start with a digit). "
"Characters like '/', '-', '.', etc. are not allowed because they conflict with Jinja2 template syntax."
)
return v return v
@@ -222,8 +553,21 @@ class BlockYAML(BaseModel, abc.ABC):
@field_validator("label") @field_validator("label")
@classmethod @classmethod
def validate_label(cls, value: str) -> str: def validate_label(cls, value: str) -> str:
"""Validate that block label is a valid Python identifier.
Block labels are used to create output parameter keys (e.g., '{label}_output')
which are then used as Jinja2 template variable names. Therefore, block labels
must be valid Python identifiers.
"""
if not value or not value.strip(): if not value or not value.strip():
raise ValueError("Block labels cannot be empty.") raise ValueError("Block labels cannot be empty.")
if not value.isidentifier():
raise ValueError(
f"Block label '{value}' is not a valid label. "
"Block labels must be valid Python identifiers "
"(only letters, digits, and underscores; cannot start with a digit). "
"Characters like '/', '-', '.', etc. are not allowed because they conflict with Jinja2 template syntax."
)
return value return value

View File

@@ -1,5 +1,6 @@
import os import os
import random import random
import re
import string import string
import uuid import uuid
@@ -18,3 +19,30 @@ def is_uuid(string: str) -> bool:
return True return True
except ValueError: except ValueError:
return False return False
def sanitize_identifier(value: str, default: str = "identifier") -> str:
"""Sanitizes a string to be a valid Python/Jinja2 identifier.
Replaces non-alphanumeric characters (except underscores) with underscores,
collapses consecutive underscores, strips leading/trailing underscores,
and prepends an underscore if the result starts with a digit.
Args:
value: The raw value to sanitize.
default: Fallback value if everything is stripped.
Returns:
A sanitized string that is a valid Python identifier.
"""
sanitized = re.sub(r"[^a-zA-Z0-9_]", "_", value)
sanitized = re.sub(r"_+", "_", sanitized)
sanitized = sanitized.strip("_")
if sanitized and sanitized[0].isdigit():
sanitized = "_" + sanitized
if not sanitized:
sanitized = default
return sanitized

View File

@@ -7,6 +7,28 @@ class Constants:
MissingVariablePattern = var_pattern = r"\{\{\s*([a-zA-Z_][a-zA-Z0-9_.\[\]'\"]*)\s*\}\}" MissingVariablePattern = var_pattern = r"\{\{\s*([a-zA-Z_][a-zA-Z0-9_.\[\]'\"]*)\s*\}\}"
def replace_jinja_reference(text: str, old_key: str, new_key: str) -> str:
"""Replaces jinja-style references in a string.
Handles patterns like {{oldKey}}, {{oldKey.field}}, {{oldKey | filter}}, {{oldKey[0]}}
Args:
text: The text to search in
old_key: The key to replace (without braces)
new_key: The new key to use (without braces)
Returns:
The text with references replaced
"""
# Match {{oldKey}} or {{oldKey.something}} or {{oldKey | filter}} or {{oldKey[0]}} etc.
# Use negative lookahead to ensure key is not followed by identifier characters,
# which prevents matching {{keyOther}} when searching for {{key}}
# Capture whitespace after {{ to preserve formatting (e.g., "{{ key }}" stays "{{ newKey }}")
escaped_old_key = re.escape(old_key)
pattern = rf"\{{\{{(\s*){escaped_old_key}(?![a-zA-Z0-9_])"
return re.sub(pattern, rf"{{{{\1{new_key}", text)
def get_missing_variables(template_source: str, template_data: dict) -> set[str]: def get_missing_variables(template_source: str, template_data: dict) -> set[str]:
# quick check - catch top-level undefineds # quick check - catch top-level undefineds
env = Environment(undefined=StrictUndefined) env = Environment(undefined=StrictUndefined)