script gen - support skyvern.loop & cleaner interfaces for generated code (no need to pass context.parameters, implicit template rendering) (#3542)

This commit is contained in:
Shuchang Zheng
2025-09-26 23:27:29 -07:00
committed by GitHub
parent 8c54475fda
commit 90096bc453
7 changed files with 336 additions and 161 deletions

View File

@@ -168,57 +168,6 @@ def _render_value(
return _value(prompt_text)
def _generate_text_call(text_value: str, intention: str, parameter_key: str) -> cst.BaseExpression:
"""Create a generate_text function call CST expression."""
return cst.Await(
expression=cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("generate_text")),
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(DOUBLE_INDENT),
),
args=[
# First positional argument: context.parameters['parameter_key']
cst.Arg(
value=cst.Subscript(
value=cst.Attribute(
value=cst.Name("context"),
attr=cst.Name("parameters"),
),
slice=[cst.SubscriptElement(slice=cst.Index(value=_value(parameter_key)))],
),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(DOUBLE_INDENT),
),
),
# intention keyword argument
cst.Arg(
keyword=cst.Name("intention"),
value=_value(intention),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(DOUBLE_INDENT),
),
),
# data keyword argument
cst.Arg(
keyword=cst.Name("data"),
value=cst.Attribute(
value=cst.Name("context"),
attr=cst.Name("parameters"),
),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
comma=cst.Comma(),
),
],
)
)
# --------------------------------------------------------------------- #
# 2. utility builders #
# --------------------------------------------------------------------- #
@@ -434,7 +383,7 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output:
args.append(
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(act["data_extraction_goal"]),
value=_value(act["data_extraction_goal"]),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -459,14 +408,6 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output:
cst.Arg(
keyword=cst.Name("intention"),
value=_value(act.get("intention") or act.get("reasoning") or ""),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("data"),
value=cst.Attribute(value=cst.Name("context"), attr=cst.Name("parameters")),
whitespace_after_arg=cst.ParenthesizedWhitespace(indent=True),
comma=cst.Comma(),
),
@@ -646,7 +587,7 @@ def _build_download_statement(
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("navigation_goal") or "", data_variable_name=data_variable_name),
value=_value(block.get("navigation_goal") or ""),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -657,7 +598,7 @@ def _build_download_statement(
args.append(
cst.Arg(
keyword=cst.Name("download_suffix"),
value=_render_value(block.get("download_suffix"), data_variable_name=data_variable_name),
value=_value(block.get("download_suffix")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -694,7 +635,7 @@ def _build_action_statement(
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("navigation_goal", ""), data_variable_name=data_variable_name),
value=_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -746,7 +687,7 @@ def _build_extract_statement(
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("data_extraction_goal", ""), data_variable_name=data_variable_name),
value=_value(block.get("data_extraction_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -870,7 +811,7 @@ def _build_validate_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("navigation_goal", "")),
value=_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
@@ -896,6 +837,14 @@ def _build_wait_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
cst.Arg(
keyword=cst.Name("seconds"),
value=_value(block.get("wait_sec", 1)),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("label"),
value=_value(block.get("label")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
@@ -920,7 +869,7 @@ def _build_goto_statement(block: dict[str, Any], data_variable_name: str | None
args = [
cst.Arg(
keyword=cst.Name("url"),
value=_render_value(block.get("url", ""), data_variable_name=data_variable_name),
value=_value(block.get("url", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -1212,7 +1161,7 @@ def _build_prompt_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("prompt", "")),
value=_value(block.get("prompt", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -1275,7 +1224,7 @@ def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.Fo
An example of a for loop statement:
```
for current_value in context.parameters["urls"]:
async for current_value in skyvern.loop(context.parameters["urls"]):
await skyvern.goto(
url=current_value,
label="block_4",
@@ -1309,28 +1258,28 @@ def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.Fo
body_statements = []
# Add loop_data assignment as the first statement
loop_data_variable_name = "loop_data"
loop_data_assignment = cst.SimpleStatementLine(
[
cst.Assign(
targets=[cst.AssignTarget(target=cst.Name(loop_data_variable_name))],
value=cst.Dict(
[cst.DictElement(key=cst.SimpleString('"current_value"'), value=cst.Name("current_value"))]
),
)
]
)
body_statements.append(loop_data_assignment)
for loop_block in loop_blocks:
stmt = _build_block_statement(loop_block, data_variable_name=loop_data_variable_name)
stmt = _build_block_statement(loop_block)
body_statements.append(stmt)
# Create the for loop
# create skyvern.loop(loop_over_parameter_key, label=block_title)
loop_call_args = [cst.Arg(keyword=cst.Name("values"), value=_value(loop_over_parameter_key))]
if block.get("complete_if_empty"):
loop_call_args.append(
cst.Arg(keyword=cst.Name("complete_if_empty"), value=_value(block.get("complete_if_empty")))
)
loop_call_args.append(cst.Arg(keyword=cst.Name("label"), value=_value(block_title)))
loop_call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("loop")),
args=loop_call_args,
)
# Create the async for loop
for_loop = cst.For(
target=target,
iter=_render_value(loop_over_parameter_key, render_func_name="render_list"),
iter=loop_call,
body=cst.IndentedBlock(body=body_statements),
asynchronous=cst.Asynchronous(),
whitespace_after_for=cst.SimpleWhitespace(" "),
whitespace_before_in=cst.SimpleWhitespace(" "),
whitespace_after_in=cst.SimpleWhitespace(" "),
@@ -1405,7 +1354,7 @@ def __build_base_task_statement(
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(prompt, data_variable_name=data_variable_name),
value=_value(prompt),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -1416,7 +1365,7 @@ def __build_base_task_statement(
args.append(
cst.Arg(
keyword=cst.Name("url"),
value=_render_value(block.get("url", "")),
value=_value(block.get("url", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -1439,7 +1388,7 @@ def __build_base_task_statement(
args.append(
cst.Arg(
keyword=cst.Name("totp_identifier"),
value=_render_value(block.get("totp_identifier", "")),
value=_value(block.get("totp_identifier", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -1450,7 +1399,7 @@ def __build_base_task_statement(
args.append(
cst.Arg(
keyword=cst.Name("totp_url"),
value=_render_value(block.get("totp_verification_url", "")),
value=_value(block.get("totp_verification_url", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),

View File

@@ -26,6 +26,7 @@ async def setup(
parameter = parameters_in_workflow_context[key]
if parameter.workflow_parameter_type == WorkflowParameterType.CREDENTIAL_ID:
parameters[key] = workflow_run_context.values[key]
context.script_run_parameters.update(parameters)
skyvern_page = await SkyvernPage.create(browser_session_id=browser_session_id)
run_context = RunContext(
parameters=parameters,

View File

@@ -64,13 +64,29 @@ async def _get_element_id_by_xpath(xpath: str, page: Page) -> str | None:
return element_id
def _get_context_data(data: str | dict[str, Any] | None = None) -> dict[str, Any] | str | None:
context = skyvern_context.current()
global_context_data = context.script_run_parameters if context else None
if not data:
return global_context_data
result: dict[str, Any] | str | None
if isinstance(data, dict):
result = {k: v for k, v in data.items() if v}
if global_context_data:
result.update(global_context_data)
else:
global_context_data_str = json.dumps(global_context_data) if global_context_data else ""
result = f"{data}\n{global_context_data_str}"
return result
def render_template(template: str, data: dict[str, Any] | None = None) -> str:
"""
Refer to Block.format_block_parameter_template_from_workflow_run_context
TODO: complete this function so that block code shares the same template rendering logic
"""
template_data = data or {}
template_data = data.copy() if data else {}
jinja_template = jinja_sandbox_env.from_string(template)
context = skyvern_context.current()
if context and context.workflow_run_id:
@@ -355,7 +371,7 @@ class SkyvernPage:
try:
# Build the element tree of the current page for the prompt
context = skyvern_context.ensure_context()
payload_str = json.dumps(data) if isinstance(data, (dict, list)) else (data or "")
payload_str = _get_context_data(data)
refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots()
element_tree = refreshed_page.build_element_tree()
single_click_prompt = prompt_engine.load_prompt(
@@ -463,9 +479,7 @@ class SkyvernPage:
if ai_infer and intention:
try:
prompt = context.prompt if context else None
# Build the element tree of the current page for the prompt
# clean up empty data values
data = {k: v for k, v in data.items() if v} if isinstance(data, dict) else (data or "")
data = _get_context_data(data)
if (totp_identifier or totp_url) and context and organization_id and task_id:
verification_code = await poll_verification_code(
organization_id=organization_id,
@@ -488,11 +502,10 @@ class SkyvernPage:
self.scraped_page = refreshed_page
# get the element_id by the xpath
element_id = await _get_element_id_by_xpath(xpath, self.page)
payload_str = json.dumps(data) if isinstance(data, (dict, list)) else (data or "")
script_generation_input_text_prompt = prompt_engine.load_prompt(
template="script-generation-input-text-generatiion",
intention=intention,
data=payload_str,
data=data,
goal=prompt,
)
json_response = await app.SINGLE_INPUT_AGENT_LLM_API_HANDLER(
@@ -539,12 +552,11 @@ class SkyvernPage:
try:
context = skyvern_context.current()
prompt = context.prompt if context else None
data = {k: v for k, v in data.items() if v} if isinstance(data, dict) else (data or "")
payload_str = json.dumps(data) if isinstance(data, (dict, list)) else (data or "")
data = _get_context_data(data)
script_generation_file_url_prompt = prompt_engine.load_prompt(
template="script-generation-file-url-generation",
intention=intention,
data=payload_str,
data=data,
goal=prompt,
)
json_response = await app.SINGLE_INPUT_AGENT_LLM_API_HANDLER(
@@ -578,15 +590,14 @@ class SkyvernPage:
if ai_infer and intention and task and step:
try:
prompt = context.prompt if context else None
data = {k: v for k, v in data.items() if v} if isinstance(data, dict) else (data or "")
payload_str = json.dumps(data) if isinstance(data, (dict, list)) else (data or "")
data = _get_context_data(data)
refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots()
self.scraped_page = refreshed_page
element_tree = refreshed_page.build_element_tree()
merged_goal = SELECT_OPTION_GOAL.format(intention=intention, prompt=prompt)
single_select_prompt = prompt_engine.load_prompt(
template="single-select-action",
navigation_payload_str=payload_str,
navigation_payload_str=data,
navigation_goal=merged_goal,
current_url=self.page.url,
elements=element_tree,