batch input field processing for input actions when caching and running workflows with code (#4250)

This commit is contained in:
Shuchang Zheng
2025-12-16 10:08:19 +08:00
committed by GitHub
parent 1e5b8b36c1
commit 65b12f7ade
7 changed files with 320 additions and 57 deletions

View File

@@ -39,6 +39,7 @@ class ScriptBlockSource:
run_signature: str | None
workflow_run_id: str | None
workflow_run_block_id: str | None
input_fields: list[str] | None
# --------------------------------------------------------------------- #
@@ -122,6 +123,26 @@ INDENT = " " * 4
DOUBLE_INDENT = " " * 8
def _requires_mini_agent(act: dict[str, Any]) -> bool:
"""
Determine whether an input/select action should be forced into proactive mode.
Mirrors runtime logic that treats some inputs as mini-agent flows or TOTP-sensitive.
"""
if act.get("has_mini_agent", False):
return True
# context = act.get("input_or_select_context") or {}
# if isinstance(context, dict) and any(
# context.get(flag) for flag in ("is_location_input", "is_date_related", "date_format")
# ):
# return True
if act.get("totp_timing_info") and act.get("totp_timing_info", {}).get("is_totp_sequence"):
return True
return False
def _safe_name(label: str) -> str:
s = "".join(c if c.isalnum() else "_" for c in label).lower()
if not s or s[0].isdigit() or keyword.iskeyword(s):
@@ -304,6 +325,10 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output:
else:
text_value = _value(act["text"])
ai_mode = GENERATE_CODE_AI_MODE_FALLBACK
if _requires_mini_agent(act):
ai_mode = GENERATE_CODE_AI_MODE_PROACTIVE
args.append(
cst.Arg(
keyword=cst.Name("value"),
@@ -317,7 +342,7 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output:
args.append(
cst.Arg(
keyword=cst.Name("ai"),
value=_value(GENERATE_CODE_AI_MODE_PROACTIVE),
value=_value(ai_mode),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -353,6 +378,10 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output:
label = option.get("label")
value = value or label
if value:
# TODO: consider supporting fallback mode for select_option actions
# ai_mode = GENERATE_CODE_AI_MODE_FALLBACK
# if _requires_mini_agent(act):
ai_mode = GENERATE_CODE_AI_MODE_PROACTIVE
if act.get("field_name"):
option_value = cst.Subscript(
value=cst.Attribute(
@@ -376,7 +405,7 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output:
args.append(
cst.Arg(
keyword=cst.Name("ai"),
value=_value(GENERATE_CODE_AI_MODE_PROACTIVE),
value=_value(ai_mode),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -509,6 +538,33 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output:
return cst.SimpleStatementLine([cst.Expr(await_expr)])
def _collect_block_input_fields(
block: dict[str, Any],
actions_by_task: dict[str, list[dict[str, Any]]],
) -> list[str]:
"""
Gather the sequence of workflow parameter field names referenced by input_text actions within a block.
"""
task_id = block.get("task_id")
if not task_id:
return []
all_fields: list[str] = []
for action in actions_by_task.get(task_id, []):
action_type = action.get("action_type")
# Only support input_text actions for now
if action_type not in {ActionType.INPUT_TEXT}:
continue
field_name = action.get("field_name")
if not field_name or not isinstance(field_name, str):
continue
all_fields.append(field_name)
return all_fields
def _build_block_fn(block: dict[str, Any], actions: list[dict[str, Any]]) -> FunctionDef:
name = _safe_name(block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}")
cache_key = block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}"
@@ -1908,6 +1964,9 @@ async def generate_workflow_script_python_code(
block_name = task.get("label") or task.get("title") or task.get("task_id") or f"task_{idx}"
cached_source = cached_blocks.get(block_name)
use_cached = cached_source is not None and block_name not in updated_block_labels
input_fields = _collect_block_input_fields(task, actions_by_task)
if not input_fields and cached_source and cached_source.input_fields:
input_fields = cached_source.input_fields
if use_cached:
assert cached_source is not None
@@ -1939,6 +1998,7 @@ async def generate_workflow_script_python_code(
run_signature=run_signature,
workflow_run_id=block_workflow_run_id,
workflow_run_block_id=block_workflow_run_block_id,
input_fields=input_fields,
)
except Exception as e:
LOG.error("Failed to create script block", error=str(e), exc_info=True)
@@ -1952,6 +2012,9 @@ async def generate_workflow_script_python_code(
cached_source = cached_blocks.get(task_v2_label)
use_cached = cached_source is not None and task_v2_label not in updated_block_labels
input_fields = _collect_block_input_fields(task_v2, actions_by_task)
if not input_fields and cached_source and cached_source.input_fields:
input_fields = cached_source.input_fields
block_code = ""
run_signature = None
@@ -1993,6 +2056,7 @@ async def generate_workflow_script_python_code(
run_signature=run_signature,
workflow_run_id=block_workflow_run_id,
workflow_run_block_id=block_workflow_run_block_id,
input_fields=input_fields,
)
except Exception as e:
LOG.error("Failed to create task_v2 script block", error=str(e), exc_info=True)
@@ -2071,6 +2135,7 @@ async def create_or_update_script_block(
run_signature: str | None = None,
workflow_run_id: str | None = None,
workflow_run_block_id: str | None = None,
input_fields: list[str] | None = None,
) -> None:
"""
Create a script block in the database and save the block code to a script file.
@@ -2086,6 +2151,7 @@ async def create_or_update_script_block(
run_signature: The function call code to execute this block (e.g., "await skyvern.action(...)")
workflow_run_id: The workflow run that generated this cached block
workflow_run_block_id: The workflow run block that generated this cached block
input_fields: Workflow parameter field names referenced by this block's cached actions
"""
block_code_bytes = block_code if isinstance(block_code, bytes) else block_code.encode("utf-8")
try:
@@ -2104,15 +2170,17 @@ async def create_or_update_script_block(
run_signature=run_signature,
workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id,
input_fields=input_fields,
)
elif run_signature:
# Update the run_signature if provided
elif any(value is not None for value in [run_signature, workflow_run_id, workflow_run_block_id, input_fields]):
# Update metadata when new values are provided
script_block = await app.DATABASE.update_script_block(
script_block_id=script_block.script_block_id,
organization_id=organization_id,
run_signature=run_signature,
workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id,
input_fields=input_fields,
)
# Step 4: Create script file for the block

View File

@@ -74,7 +74,22 @@ class SkyvernPage(Page):
*args: Any,
**kwargs: Any,
) -> Any:
return await fn(self, *args, **kwargs)
context = skyvern_context.current()
# label = self.current_label
# action_override = None
# if context and label:
# current_count = context.action_counters.get(label, 0) + 1
# context.action_counters[label] = current_count
# action_override = context.action_ai_overrides.get(label, {}).get(current_count)
# context.ai_mode_override = action_override
try:
return await fn(self, *args, **kwargs)
finally:
if context:
# Reset override after each action so defaults apply when no mapping is provided.
# context.ai_mode_override = None
pass
@staticmethod
def action_wrap(

View File

@@ -102,6 +102,7 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
for action in actions:
action_dump = action.model_dump()
action_dump["xpath"] = action.get_xpath()
action_dump["has_mini_agent"] = action.has_mini_agent
if (
"data_extraction_goal" in final_dump
and final_dump["data_extraction_goal"]