Preserve field names for unchanged blocks during schema regeneration (#SKY-7434) (#4535)
This commit is contained in:
@@ -20,6 +20,7 @@ from libcst import Attribute, Call, Dict, DictElement, FunctionDef, Name, Param
|
||||
from skyvern.config import settings
|
||||
from skyvern.core.script_generations.constants import SCRIPT_TASK_BLOCKS, SCRIPT_TASK_BLOCKS_WITH_COMPLETE_ACTION
|
||||
from skyvern.core.script_generations.generate_workflow_parameters import (
|
||||
CUSTOM_FIELD_ACTIONS,
|
||||
generate_workflow_parameters_schema,
|
||||
hydrate_input_text_actions_with_field_names,
|
||||
)
|
||||
@@ -42,6 +43,79 @@ class ScriptBlockSource:
|
||||
input_fields: list[str] | None
|
||||
|
||||
|
||||
def _build_existing_field_assignments(
|
||||
blocks: list[dict[str, Any]],
|
||||
actions_by_task: dict[str, list[dict[str, Any]]],
|
||||
cached_blocks: dict[str, ScriptBlockSource],
|
||||
updated_block_labels: set[str],
|
||||
) -> dict[int, str]:
|
||||
"""
|
||||
Build a mapping of action index (1-based) to existing field names for unchanged blocks.
|
||||
|
||||
This is used to tell the LLM which field names must be preserved when regenerating
|
||||
the workflow parameters schema, preventing schema mismatches with cached block code.
|
||||
|
||||
Args:
|
||||
blocks: List of block dictionaries from the workflow
|
||||
actions_by_task: Dictionary mapping task IDs to lists of action dictionaries
|
||||
cached_blocks: Dictionary mapping block labels to their cached ScriptBlockSource
|
||||
updated_block_labels: Set of block labels that have been updated (should not preserve)
|
||||
|
||||
Returns:
|
||||
Dictionary mapping action index (1-based) to the existing field name that must be preserved
|
||||
"""
|
||||
# Build mapping of block label -> task_id
|
||||
block_label_to_task_id: dict[str, str] = {}
|
||||
for idx, block in enumerate(blocks):
|
||||
if block.get("block_type") not in SCRIPT_TASK_BLOCKS:
|
||||
continue
|
||||
label = block.get("label") or block.get("title") or block.get("task_id") or f"task_{idx}"
|
||||
task_id = block.get("task_id")
|
||||
if task_id:
|
||||
block_label_to_task_id[label] = task_id
|
||||
|
||||
# Build mapping of task_id -> list of existing field names (for unchanged blocks)
|
||||
task_id_to_existing_fields: dict[str, list[str]] = {}
|
||||
for label, cached_source in cached_blocks.items():
|
||||
# Skip blocks that have been updated - they need new field names
|
||||
if label in updated_block_labels:
|
||||
continue
|
||||
# Skip blocks without input_fields
|
||||
if not cached_source.input_fields:
|
||||
continue
|
||||
# Find the task_id for this block
|
||||
task_id = block_label_to_task_id.get(label)
|
||||
if task_id:
|
||||
task_id_to_existing_fields[task_id] = list(cached_source.input_fields)
|
||||
|
||||
# Now iterate through actions in the same order as generate_workflow_parameters_schema
|
||||
# to build the action index -> field name mapping
|
||||
existing_field_assignments: dict[int, str] = {}
|
||||
action_counter = 1
|
||||
|
||||
# Track position within each task's field list
|
||||
task_field_position: dict[str, int] = {}
|
||||
|
||||
for task_id, actions in actions_by_task.items():
|
||||
for action in actions:
|
||||
action_type = action.get("action_type", "")
|
||||
if action_type not in CUSTOM_FIELD_ACTIONS:
|
||||
continue
|
||||
|
||||
# Check if this task has existing field names to preserve
|
||||
if task_id in task_id_to_existing_fields:
|
||||
existing_fields = task_id_to_existing_fields[task_id]
|
||||
position = task_field_position.get(task_id, 0)
|
||||
|
||||
if position < len(existing_fields):
|
||||
existing_field_assignments[action_counter] = existing_fields[position]
|
||||
task_field_position[task_id] = position + 1
|
||||
|
||||
action_counter += 1
|
||||
|
||||
return existing_field_assignments
|
||||
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# 1. helpers #
|
||||
# --------------------------------------------------------------------- #
|
||||
@@ -543,7 +617,7 @@ def _collect_block_input_fields(
|
||||
actions_by_task: dict[str, list[dict[str, Any]]],
|
||||
) -> list[str]:
|
||||
"""
|
||||
Gather the sequence of workflow parameter field names referenced by input_text actions within a block.
|
||||
Gather the sequence of workflow parameter field names referenced by custom field actions within a block.
|
||||
"""
|
||||
task_id = block.get("task_id")
|
||||
if not task_id:
|
||||
@@ -554,8 +628,8 @@ def _collect_block_input_fields(
|
||||
for action in actions_by_task.get(task_id, []):
|
||||
action_type = action.get("action_type")
|
||||
|
||||
# Only support input_text actions for now
|
||||
if action_type not in {ActionType.INPUT_TEXT}:
|
||||
# Keep in sync with CUSTOM_FIELD_ACTIONS used for schema generation
|
||||
if action_type not in CUSTOM_FIELD_ACTIONS:
|
||||
continue
|
||||
field_name = action.get("field_name")
|
||||
if not field_name or not isinstance(field_name, str):
|
||||
@@ -1936,7 +2010,17 @@ async def generate_workflow_script_python_code(
|
||||
]
|
||||
|
||||
# --- generate schema and hydrate actions ---------------------------
|
||||
generated_schema, field_mappings = await generate_workflow_parameters_schema(actions_by_task)
|
||||
# Build existing field assignments from cached blocks to preserve field names
|
||||
# for unchanged blocks, preventing schema mismatches with cached code
|
||||
existing_field_assignments = _build_existing_field_assignments(
|
||||
blocks=blocks,
|
||||
actions_by_task=actions_by_task,
|
||||
cached_blocks=cached_blocks,
|
||||
updated_block_labels=updated_block_labels,
|
||||
)
|
||||
generated_schema, field_mappings = await generate_workflow_parameters_schema(
|
||||
actions_by_task, existing_field_assignments
|
||||
)
|
||||
actions_by_task = hydrate_input_text_actions_with_field_names(actions_by_task, field_mappings)
|
||||
|
||||
# --- class + cached params -----------------------------------------
|
||||
|
||||
@@ -27,18 +27,24 @@ class GeneratedFieldMapping(BaseModel):
|
||||
|
||||
async def generate_workflow_parameters_schema(
|
||||
actions_by_task: Dict[str, List[Dict[str, Any]]],
|
||||
existing_field_assignments: Dict[int, str] | None = None,
|
||||
) -> Tuple[str, Dict[str, str]]:
|
||||
"""
|
||||
Generate a GeneratedWorkflowParameters Pydantic schema based on input_text actions.
|
||||
|
||||
Args:
|
||||
actions_by_task: Dictionary mapping task IDs to lists of action dictionaries
|
||||
existing_field_assignments: Optional dictionary mapping action index (1-based) to
|
||||
existing field names that must be preserved. Used when regenerating schemas
|
||||
to maintain compatibility with cached block code.
|
||||
|
||||
Returns:
|
||||
Tuple of (schema_code, field_mappings) where:
|
||||
- schema_code: Python code for the GeneratedWorkflowParameters class
|
||||
- field_mappings: Dictionary mapping action indices to field names for hydration
|
||||
"""
|
||||
existing_field_assignments = existing_field_assignments or {}
|
||||
|
||||
# Extract all input_text actions
|
||||
custom_field_actions = []
|
||||
action_index_map = {}
|
||||
@@ -57,6 +63,10 @@ async def generate_workflow_parameters_schema(
|
||||
value = action.get("file_url", "")
|
||||
elif action_type == ActionType.SELECT_OPTION:
|
||||
value = action.get("option", "")
|
||||
|
||||
# Check if this action has an existing field name that must be preserved
|
||||
existing_field_name = existing_field_assignments.get(action_counter)
|
||||
|
||||
custom_field_actions.append(
|
||||
{
|
||||
"action_type": action_type,
|
||||
@@ -64,6 +74,7 @@ async def generate_workflow_parameters_schema(
|
||||
"intention": action.get("intention", ""),
|
||||
"task_id": task_id,
|
||||
"action_id": action.get("action_id", ""),
|
||||
"existing_field_name": existing_field_name,
|
||||
}
|
||||
)
|
||||
action_index_map[f"action_index_{action_counter}"] = {
|
||||
@@ -98,18 +109,27 @@ async def generate_workflow_parameters_schema(
|
||||
return _generate_empty_schema(), {}
|
||||
|
||||
|
||||
async def _generate_field_names_with_llm(custom_field_actions: List[Dict[str, Any]]) -> GeneratedFieldMapping:
|
||||
async def _generate_field_names_with_llm(
|
||||
custom_field_actions: List[Dict[str, Any]],
|
||||
) -> GeneratedFieldMapping:
|
||||
"""
|
||||
Use LLM to generate field names from input actions.
|
||||
|
||||
Args:
|
||||
input_actions: List of input_text action dictionaries
|
||||
custom_field_actions: List of action dictionaries with action details.
|
||||
Each action may include an "existing_field_name" key if the field
|
||||
name must be preserved from a cached block.
|
||||
|
||||
Returns:
|
||||
GeneratedFieldMapping with field mappings and schema definitions
|
||||
"""
|
||||
# Check if any actions have existing field names that must be preserved
|
||||
has_existing_fields = any(action.get("existing_field_name") for action in custom_field_actions)
|
||||
|
||||
prompt = prompt_engine.load_prompt(
|
||||
template="generate-workflow-parameters", custom_field_actions=custom_field_actions
|
||||
template="generate-workflow-parameters",
|
||||
custom_field_actions=custom_field_actions,
|
||||
has_existing_fields=has_existing_fields,
|
||||
)
|
||||
|
||||
response = await app.SCRIPT_GENERATION_LLM_API_HANDLER(prompt=prompt, prompt_name="generate-workflow-parameters")
|
||||
|
||||
Reference in New Issue
Block a user