diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index 3d472741..0586255a 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -185,6 +185,114 @@ ACTIONS_OPT_OUT_INTENTION_FOR_PROMPT = ["extract"] INDENT = " " * 4 DOUBLE_INDENT = " " * 8 +# Minimum length for a parameter value to be eligible for substitution in click prompts. +# Short values (e.g. "1", "No", "CA") cause too many false-positive replacements. +MIN_PARAM_VALUE_LENGTH_FOR_PROMPT_SUB = 4 + + +def _build_value_to_param_lookup( + actions_by_task: dict[str, list[dict[str, Any]]], +) -> dict[str, str]: + """Build a mapping from literal action values to their parameter field names. + + After ``hydrate_input_text_actions_with_field_names`` runs, custom-field actions + carry both the original literal value (``text``, ``option``, or ``file_url``) and + the ``field_name`` assigned by the LLM. This function collects those pairs so + that click-prompt generation can replace matching literals with + ``context.parameters['field_name']`` f-string references. + + The returned dict is sorted by *descending value length* so that callers who + iterate in order will replace longer matches first, avoiding partial collisions. + """ + raw: dict[str, str] = {} + for _task_id, actions in actions_by_task.items(): + for action in actions: + field_name = action.get("field_name") + if not field_name: + continue + action_type = action.get("action_type", "") + if action_type == ActionType.INPUT_TEXT: + value = action.get("text", "") + elif action_type == ActionType.UPLOAD_FILE: + value = action.get("file_url", "") + elif action_type == ActionType.SELECT_OPTION: + value = action.get("option", "") + else: + continue + if value and len(value) >= MIN_PARAM_VALUE_LENGTH_FOR_PROMPT_SUB: + # First writer wins — the field-level name is more specific than a + # workflow-level parameter that may happen to share the same value. + if value not in raw: + raw[value] = field_name + + # Sort by descending value length so longer matches are attempted first. + return dict(sorted(raw.items(), key=lambda kv: len(kv[0]), reverse=True)) + + +def _build_parameterized_prompt_cst( + intention: str, + value_to_param: dict[str, str], +) -> cst.BaseExpression | None: + """If *intention* contains any literal parameter values, return a ``FormattedString`` + CST node (an f-string) with those values replaced by + ``context.parameters['field_name']`` expressions. + + Returns ``None`` when no substitution is needed (the caller should fall back to + emitting a plain string literal). + """ + # Identify all non-overlapping matches, preferring longer values (dict is + # already sorted by descending length). + # Each match is (start, end, field_name). + matches: list[tuple[int, int, str]] = [] + for value, field_name in value_to_param.items(): + start = 0 + while True: + idx = intention.find(value, start) + if idx == -1: + break + end = idx + len(value) + # Check overlap with already-accepted matches. + overlaps = any(not (end <= ms or idx >= me) for ms, me, _ in matches) + if not overlaps: + matches.append((idx, end, field_name)) + start = end + + if not matches: + return None + + # Sort matches by position so we can build the f-string left-to-right. + matches.sort(key=lambda m: m[0]) + + parts: list[cst.BaseFormattedStringContent] = [] + cursor = 0 + for start, end, field_name in matches: + # Text segment before this match. + if start > cursor: + parts.append(cst.FormattedStringText(intention[cursor:start])) + # The {context.parameters['field_name']} expression. + parts.append( + cst.FormattedStringExpression( + expression=cst.Subscript( + value=cst.Attribute( + value=cst.Name("context"), + attr=cst.Name("parameters"), + ), + slice=[ + cst.SubscriptElement( + slice=cst.Index(value=_value(field_name)), + ) + ], + ) + ) + ) + cursor = end + + # Trailing text after last match. + if cursor < len(intention): + parts.append(cst.FormattedStringText(intention[cursor:])) + + return cst.FormattedString(parts=parts) + def _requires_mini_agent(act: dict[str, Any]) -> bool: """ @@ -431,7 +539,12 @@ def _make_decorator(block_label: str, block: dict[str, Any]) -> cst.Decorator: ) -def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output: bool = False) -> cst.BaseStatement: +def _action_to_stmt( + act: dict[str, Any], + task: dict[str, Any], + assign_to_output: bool = False, + value_to_param: dict[str, str] | None = None, +) -> cst.BaseStatement: """ Turn one Action dict into: @@ -440,6 +553,10 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output: Or if assign_to_output is True for extract actions: output = await page.extract(...) + + When *value_to_param* is provided, click prompt strings that contain literal + parameter values will be emitted as f-strings referencing + ``context.parameters['field_name']`` instead of hardcoded text. """ method = ACTION_MAP[act["action_type"]] @@ -740,11 +857,19 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output: intention = act.get("intention") or act.get("reasoning") or "" if intention and method not in ACTIONS_OPT_OUT_INTENTION_FOR_PROMPT: + # Try to parameterize the prompt for click actions so cached scripts + # don't embed recording-specific values (e.g. a patient ID). + prompt_value: cst.BaseExpression | None = None + if value_to_param: + prompt_value = _build_parameterized_prompt_cst(intention, value_to_param) + if prompt_value is None: + prompt_value = _value(intention) + args.extend( [ cst.Arg( keyword=cst.Name("prompt"), - value=_value(intention), + value=prompt_value, whitespace_after_arg=cst.ParenthesizedWhitespace(indent=True), comma=cst.Comma(), ), @@ -811,7 +936,11 @@ def _collect_block_input_fields( return all_fields -def _build_block_fn(block: dict[str, Any], actions: list[dict[str, Any]]) -> FunctionDef: +def _build_block_fn( + block: dict[str, Any], + actions: list[dict[str, Any]], + value_to_param: dict[str, str] | None = None, +) -> FunctionDef: name = _safe_name(block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}") cache_key = block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}" body_stmts: list[cst.BaseStatement] = [] @@ -832,7 +961,7 @@ def _build_block_fn(block: dict[str, Any], actions: list[dict[str, Any]]) -> Fun # For extraction blocks, assign extract action results to output variable assign_to_output = act["action_type"] == "extract" - body_stmts.append(_action_to_stmt(act, block, assign_to_output=assign_to_output)) + body_stmts.append(_action_to_stmt(act, block, assign_to_output=assign_to_output, value_to_param=value_to_param)) # add complete action block_type = block.get("block_type") @@ -2223,6 +2352,10 @@ async def generate_workflow_script_python_code( ) actions_by_task = hydrate_input_text_actions_with_field_names(actions_by_task, field_mappings) + # Build a lookup from literal parameter values to field names so that click + # prompt strings can be parameterized (e.g. patient ID → context.parameters[...]). + value_to_param = _build_value_to_param_lookup(actions_by_task) + # --- class + cached params ----------------------------------------- model_cls = _build_model(workflow) generated_model_cls = _build_generated_model_from_schema(generated_schema) @@ -2259,7 +2392,9 @@ async def generate_workflow_script_python_code( block_workflow_run_id = cached_source.workflow_run_id block_workflow_run_block_id = cached_source.workflow_run_block_id else: - block_fn_def = _build_block_fn(task, actions_by_task.get(task.get("task_id", ""), [])) + block_fn_def = _build_block_fn( + task, actions_by_task.get(task.get("task_id", ""), []), value_to_param=value_to_param + ) temp_module = cst.Module(body=[block_fn_def]) block_code = temp_module.code @@ -2317,7 +2452,11 @@ async def generate_workflow_script_python_code( for child_block in child_blocks: if child_block.get("block_type") in SCRIPT_TASK_BLOCKS and child_block.get("block_type") != "task_v2": - child_fn_def = _build_block_fn(child_block, actions_by_task.get(child_block.get("task_id", ""), [])) + child_fn_def = _build_block_fn( + child_block, + actions_by_task.get(child_block.get("task_id", ""), []), + value_to_param=value_to_param, + ) task_v2_block_body.append(cst.EmptyLine()) task_v2_block_body.append(cst.EmptyLine()) task_v2_block_body.append(child_fn_def)