script gen - loop block support (#3455)

This commit is contained in:
Shuchang Zheng
2025-09-18 00:27:49 -07:00
committed by GitHub
parent 598c06cdd1
commit 1cb84f13b5
6 changed files with 611 additions and 314 deletions

View File

@@ -39,6 +39,7 @@ from skyvern.services.script_service import ( # noqa: E402
login, # noqa: E402
parse_file, # noqa: E402
prompt, # noqa: E402
render_list, # noqa: E402
render_template, # noqa: E402
run_code, # noqa: E402
run_script, # noqa: E402
@@ -63,6 +64,7 @@ __all__ = [
"login",
"parse_file",
"prompt",
"render_list",
"render_template",
"run_code",
"run_script",

View File

@@ -151,15 +151,26 @@ def _value(value: Any) -> cst.BaseExpression:
return cst.SimpleString(repr(str(value)))
def _render_value(prompt_text: str | None = None) -> cst.BaseExpression:
def _render_value(
prompt_text: str | None = None,
data_variable_name: str | None = None,
render_func_name: str = "render_template",
) -> cst.BaseExpression:
"""Create a prompt value with template rendering logic if needed."""
if not prompt_text:
return cst.SimpleString("")
if "{{" in prompt_text and "}}" in prompt_text:
# Generate code for: render_template(prompt_text)
args = [cst.Arg(value=_value(prompt_text))]
if data_variable_name:
args.append(
cst.Arg(
keyword=cst.Name("data"),
value=cst.Name(data_variable_name),
)
)
return cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("render_template")),
args=[cst.Arg(value=_value(prompt_text))],
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name(render_func_name)),
args=args,
)
else:
# Return the prompt as a simple string value
@@ -537,6 +548,34 @@ def _build_block_fn(block: dict[str, Any], actions: list[dict[str, Any]]) -> Fun
)
def _build_task_v2_block_fn(block: dict[str, Any], child_blocks: list[dict[str, Any]]) -> FunctionDef:
"""Build a cached function for task_v2 blocks that calls child workflow sub-tasks."""
name = block.get("label") or _safe_name(block.get("title") or f"block_{block.get('workflow_run_block_id')}")
body_stmts: list[cst.BaseStatement] = []
# Add calls to child workflow sub-tasks
for child_block in child_blocks:
stmt = _build_block_statement(child_block)
body_stmts.append(stmt)
if not body_stmts:
body_stmts.append(cst.parse_statement("return None"))
return FunctionDef(
name=Name(name),
params=cst.Parameters(
params=[
Param(name=Name("page"), annotation=cst.Annotation(cst.Name("SkyvernPage"))),
Param(name=Name("context"), annotation=cst.Annotation(cst.Name("RunContext"))),
]
),
decorators=[_make_decorator(name, block)],
body=cst.IndentedBlock(body_stmts),
returns=None,
asynchronous=cst.Asynchronous(),
)
def _build_model(workflow: dict[str, Any]) -> cst.ClassDef:
"""
class WorkflowParameters(BaseModel):
@@ -592,9 +631,11 @@ def _build_generated_model_from_schema(schema_code: str) -> cst.ClassDef | None:
# --------------------------------------------------------------------- #
def _build_run_task_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
def _build_run_task_statement(
block_title: str, block: dict[str, Any], data_variable_name: str | None = None
) -> cst.SimpleStatementLine:
"""Build a skyvern.run_task statement."""
args = __build_base_task_statement(block_title, block)
args = __build_base_task_statement(block_title, block, data_variable_name)
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("run_task")),
args=args,
@@ -607,20 +648,14 @@ def _build_run_task_statement(block_title: str, block: dict[str, Any]) -> cst.Si
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
def _build_download_statement(
block_title: str, block: dict[str, Any], data_variable_name: str | None = None
) -> cst.SimpleStatementLine:
"""Build a skyvern.download statement."""
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("navigation_goal") or ""),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("complete_on_download"),
value=_value(block.get("complete_on_download", False)),
value=_render_value(block.get("navigation_goal") or "", data_variable_name=data_variable_name),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -631,7 +666,7 @@ def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.Si
args.append(
cst.Arg(
keyword=cst.Name("download_suffix"),
value=_render_value(block.get("download_suffix")),
value=_render_value(block.get("download_suffix"), data_variable_name=data_variable_name),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -661,19 +696,21 @@ def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.Si
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_action_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
def _build_action_statement(
block_title: str, block: dict[str, Any], data_variable_name: str | None = None
) -> cst.SimpleStatementLine:
"""Build a skyvern.action statement."""
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("navigation_goal", "")),
value=_render_value(block.get("navigation_goal", ""), data_variable_name=data_variable_name),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("cache_key"),
keyword=cst.Name("label"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
@@ -694,9 +731,11 @@ def _build_action_statement(block_title: str, block: dict[str, Any]) -> cst.Simp
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_login_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
def _build_login_statement(
block_title: str, block: dict[str, Any], data_variable_name: str | None = None
) -> cst.SimpleStatementLine:
"""Build a skyvern.login statement."""
args = __build_base_task_statement(block_title, block)
args = __build_base_task_statement(block_title, block, data_variable_name)
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("login")),
args=args,
@@ -709,12 +748,14 @@ def _build_login_statement(block_title: str, block: dict[str, Any]) -> cst.Simpl
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_extract_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
def _build_extract_statement(
block_title: str, block: dict[str, Any], data_variable_name: str | None = None
) -> cst.SimpleStatementLine:
"""Build a skyvern.extract statement."""
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("data_extraction_goal", "")),
value=_render_value(block.get("data_extraction_goal", ""), data_variable_name=data_variable_name),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -729,7 +770,7 @@ def _build_extract_statement(block_title: str, block: dict[str, Any]) -> cst.Sim
),
),
cst.Arg(
keyword=cst.Name("cache_key"),
keyword=cst.Name("label"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
@@ -750,43 +791,11 @@ def _build_extract_statement(block_title: str, block: dict[str, Any]) -> cst.Sim
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_navigate_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
def _build_navigate_statement(
block_title: str, block: dict[str, Any], data_variable_name: str | None = None
) -> cst.SimpleStatementLine:
"""Build a skyvern.navigate statement."""
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("url"),
value=_value(block.get("url", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("max_steps"),
value=_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("cache_key"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
args = __build_base_task_statement(block_title, block, data_variable_name)
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("run_task")),
args=args,
@@ -915,45 +924,12 @@ def _build_wait_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.for_loop statement."""
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("max_steps"),
value=_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("for_loop")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_goto_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
def _build_goto_statement(block: dict[str, Any], data_variable_name: str | None = None) -> cst.SimpleStatementLine:
"""Build a skyvern.goto statement."""
args = [
cst.Arg(
keyword=cst.Name("url"),
value=_value(block.get("url", "")),
value=_render_value(block.get("url", ""), data_variable_name=data_variable_name),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -1301,6 +1277,119 @@ def _build_prompt_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.For:
"""
Build a for loop statement.
All the blocks within the for loop block statement will run without cache_key.
An example of a for loop statement:
```
for current_value in context.parameters["urls"]:
await skyvern.goto(
url=current_value,
label="block_4",
)
await skyvern.extract(
prompt="Get a summary of the page",
schema={
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "A concise summary of the main content or purpose of the page"
}
},
"required": [
"summary"
]
},
label="block_5",
)
```
"""
# Extract loop configuration
loop_over_parameter_key = block.get("loop_variable_reference", "")
loop_blocks = block.get("loop_blocks", [])
# Create the for loop target (current_value)
target = cst.Name("current_value")
# Build body statements from loop_blocks
body_statements = []
# Add loop_data assignment as the first statement
loop_data_variable_name = "loop_data"
loop_data_assignment = cst.SimpleStatementLine(
[
cst.Assign(
targets=[cst.AssignTarget(target=cst.Name(loop_data_variable_name))],
value=cst.Dict(
[cst.DictElement(key=cst.SimpleString('"current_value"'), value=cst.Name("current_value"))]
),
)
]
)
body_statements.append(loop_data_assignment)
for loop_block in loop_blocks:
stmt = _build_block_statement(loop_block, data_variable_name=loop_data_variable_name)
body_statements.append(stmt)
# Create the for loop
for_loop = cst.For(
target=target,
iter=_render_value(loop_over_parameter_key, render_func_name="render_list"),
body=cst.IndentedBlock(body=body_statements),
whitespace_after_for=cst.SimpleWhitespace(" "),
whitespace_before_in=cst.SimpleWhitespace(" "),
whitespace_after_in=cst.SimpleWhitespace(" "),
whitespace_before_colon=cst.SimpleWhitespace(""),
)
return for_loop
def _build_goto_statement_for_loop(block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.goto statement for use within loops, handling current_value template."""
url_value = block.get("url", "")
# Handle {{current_value}} template by replacing it with the current_value variable
if url_value == "{{current_value}}":
url_expr = cst.Name("current_value")
else:
url_expr = _value(url_value)
args = [
cst.Arg(
keyword=cst.Name("url"),
value=url_expr,
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("label"),
value=_value(block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}"),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("goto")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _mark_last_arg_as_comma(args: list[cst.Arg]) -> None:
if not args:
return
@@ -1317,11 +1406,15 @@ def _mark_last_arg_as_comma(args: list[cst.Arg]) -> None:
args.append(new_arg)
def __build_base_task_statement(block_title: str, block: dict[str, Any]) -> list[cst.Arg]:
def __build_base_task_statement(
block_title: str, block: dict[str, Any], data_variable_name: str | None = None
) -> list[cst.Arg]:
block_type = block.get("block_type")
prompt = block.get("prompt") if block_type == "task_v2" else block.get("navigation_goal")
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_render_value(block.get("navigation_goal", "")),
value=_render_value(prompt, data_variable_name=data_variable_name),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -1339,11 +1432,12 @@ def __build_base_task_statement(block_title: str, block: dict[str, Any]) -> list
),
)
)
if block.get("max_steps_per_run"):
max_steps = block.get("max_steps") if block_type == "task_v2" else block.get("max_steps_per_run")
if max_steps:
args.append(
cst.Arg(
keyword=cst.Name("max_steps"),
value=_render_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)),
value=_value(max_steps or settings.MAX_STEPS_PER_RUN),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
@@ -1372,9 +1466,20 @@ def __build_base_task_statement(block_title: str, block: dict[str, Any]) -> list
),
)
)
if block.get("block_type") == "task_v2":
args.append(
cst.Arg(
keyword=cst.Name("engine"),
value=_value("skyvern-2.0"),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
)
args.append(
cst.Arg(
keyword=cst.Name("cache_key"),
keyword=cst.Name("label"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
@@ -1390,6 +1495,52 @@ def __build_base_task_statement(block_title: str, block: dict[str, Any]) -> list
# --------------------------------------------------------------------- #
def _build_block_statement(block: dict[str, Any], data_variable_name: str | None = None) -> cst.SimpleStatementLine:
"""Build a block statement."""
block_type = block.get("block_type")
block_title = block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}"
if block_type in SCRIPT_TASK_BLOCKS:
# For task blocks, call the custom function with cache_key
if block_type == "task":
stmt = _build_run_task_statement(block_title, block, data_variable_name)
elif block_type == "file_download":
stmt = _build_download_statement(block_title, block, data_variable_name)
elif block_type == "action":
stmt = _build_action_statement(block_title, block, data_variable_name)
elif block_type == "login":
stmt = _build_login_statement(block_title, block, data_variable_name)
elif block_type == "extraction":
stmt = _build_extract_statement(block_title, block, data_variable_name)
elif block_type == "navigation":
stmt = _build_navigate_statement(block_title, block, data_variable_name)
elif block_type == "task_v2":
stmt = _build_run_task_statement(block_title, block, data_variable_name)
elif block_type == "send_email":
stmt = _build_send_email_statement(block)
elif block_type == "text_prompt":
stmt = _build_prompt_statement(block)
elif block_type == "wait":
stmt = _build_wait_statement(block)
elif block_type == "for_loop":
stmt = _build_for_loop_statement(block_title, block)
elif block_type == "goto_url":
stmt = _build_goto_statement(block, data_variable_name)
elif block_type == "code":
stmt = _build_code_statement(block)
elif block_type == "file_upload":
stmt = _build_file_upload_statement(block)
elif block_type == "file_url_parser":
stmt = _build_file_url_parser_statement(block)
elif block_type == "http_request":
stmt = _build_http_request_statement(block)
else:
# Default case for unknown block types
stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))])
return stmt
def _build_run_fn(blocks: list[dict[str, Any]], wf_req: dict[str, Any]) -> FunctionDef:
body = [
cst.parse_statement(
@@ -1399,69 +1550,9 @@ def _build_run_fn(blocks: list[dict[str, Any]], wf_req: dict[str, Any]) -> Funct
]
for block in blocks:
block_type = block.get("block_type")
block_title = block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}"
if block_type in SCRIPT_TASK_BLOCKS:
# For task blocks, call the custom function with cache_key
if block_type == "task":
stmt = _build_run_task_statement(block_title, block)
elif block_type == "file_download":
stmt = _build_download_statement(block_title, block)
elif block_type == "action":
stmt = _build_action_statement(block_title, block)
elif block_type == "login":
stmt = _build_login_statement(block_title, block)
elif block_type == "extraction":
stmt = _build_extract_statement(block_title, block)
elif block_type == "navigation":
stmt = _build_navigate_statement(block_title, block)
elif block_type == "send_email":
stmt = _build_send_email_statement(block)
elif block_type == "text_prompt":
stmt = _build_prompt_statement(block)
elif block_type == "wait":
stmt = _build_wait_statement(block)
elif block_type == "for_loop":
stmt = _build_for_loop_statement(block_title, block)
elif block_type == "goto_url":
stmt = _build_goto_statement(block)
elif block_type == "code":
stmt = _build_code_statement(block)
elif block_type == "file_upload":
stmt = _build_file_upload_statement(block)
elif block_type == "file_url_parser":
stmt = _build_file_url_parser_statement(block)
elif block_type == "http_request":
stmt = _build_http_request_statement(block)
else:
# Default case for unknown block types
stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))])
stmt = _build_block_statement(block)
body.append(stmt)
# Add a final validation step if not already present
# has_validation = any(block.get("block_type") == "text_prompt" for block in blocks)
# has_task_blocks = any(block.get("block_type") in SCRIPT_TASK_BLOCKS for block in blocks)
# if not has_validation and not has_task_blocks:
# # Build the final validation statement using LibCST components
# args = [
# cst.Arg(
# keyword=cst.Name("prompt"),
# value=cst.SimpleString(
# '"Your goal is to validate that the workflow completed successfully. COMPLETE if successful, TERMINATE if there are issues."'
# ),
# ),
# ]
# call = cst.Call(
# func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("validate")),
# args=args,
# )
# validation_stmt = cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
# body.append(validation_stmt)
params = cst.Parameters(
params=[
Param(
@@ -1518,6 +1609,7 @@ async def generate_workflow_script(
workflow: dict[str, Any],
blocks: list[dict[str, Any]],
actions_by_task: dict[str, list[dict[str, Any]]],
task_v2_child_blocks: dict[str, list[dict[str, Any]]] | None = None,
organization_id: str | None = None,
run_id: str | None = None,
script_id: str | None = None,
@@ -1575,11 +1667,18 @@ async def generate_workflow_script(
# --- blocks ---------------------------------------------------------
block_fns = []
task_blocks = [block for block in blocks if block["block_type"] in SCRIPT_TASK_BLOCKS]
length_of_tasks = len(task_blocks)
task_v1_blocks = [block for block in blocks if block["block_type"] in SCRIPT_TASK_BLOCKS]
task_v2_blocks = [block for block in blocks if block["block_type"] == "task_v2"]
if task_v2_child_blocks is None:
task_v2_child_blocks = {}
# Handle task v1 blocks (excluding child blocks of task_v2)
for idx, task in enumerate(task_v1_blocks):
# Skip if this is a child block of a task_v2 block
if task.get("parent_task_v2_label"):
continue
# Create script first if organization_id is provided
for idx, task in enumerate(task_blocks):
block_fn_def = _build_block_fn(task, actions_by_task.get(task.get("task_id", ""), []))
# Create script block if we have script context
@@ -1602,7 +1701,67 @@ async def generate_workflow_script(
# Continue without script block creation if it fails
block_fns.append(block_fn_def)
if idx < length_of_tasks - 1:
if idx < len(task_v1_blocks) - 1:
block_fns.append(cst.EmptyLine())
block_fns.append(cst.EmptyLine())
# Handle task_v2 blocks
for idx, task_v2 in enumerate(task_v2_blocks):
task_v2_label = task_v2.get("label") or f"task_v2_{task_v2.get('workflow_run_block_id')}"
child_blocks = task_v2_child_blocks.get(task_v2_label, [])
# Create the task_v2 function
task_v2_fn_def = _build_task_v2_block_fn(task_v2, child_blocks)
# Create script block for task_v2 that includes both the main function and child functions
if script_id and script_revision_id and organization_id:
try:
# Build the complete module for this task_v2 block
task_v2_block_body = [task_v2_fn_def]
# Add child block functions
for child_block in child_blocks:
if (
child_block.get("block_type") in SCRIPT_TASK_BLOCKS
and child_block.get("block_type") != "task_v2"
):
child_fn_def = _build_block_fn(
child_block, actions_by_task.get(child_block.get("task_id", ""), [])
)
task_v2_block_body.append(cst.EmptyLine())
task_v2_block_body.append(cst.EmptyLine())
task_v2_block_body.append(child_fn_def)
# Create the complete module for this task_v2 block
temp_module = cst.Module(body=task_v2_block_body)
task_v2_block_code = temp_module.code
block_name = task_v2.get("label") or task_v2.get("title") or f"task_v2_{idx}"
block_description = f"Generated task_v2 block with child functions: {block_name}"
await create_script_block(
block_code=task_v2_block_code,
script_revision_id=script_revision_id,
script_id=script_id,
organization_id=organization_id,
block_name=block_name,
block_description=block_description,
)
except Exception as e:
LOG.error("Failed to create task_v2 script block", error=str(e), exc_info=True)
# Continue without script block creation if it fails
block_fns.append(task_v2_fn_def)
# Create individual functions for child blocks
for child_block in child_blocks:
if child_block.get("block_type") in SCRIPT_TASK_BLOCKS and child_block.get("block_type") != "task_v2":
child_fn_def = _build_block_fn(child_block, actions_by_task.get(child_block.get("task_id", ""), []))
block_fns.append(cst.EmptyLine())
block_fns.append(cst.EmptyLine())
block_fns.append(child_fn_def)
if idx < len(task_v2_blocks) - 1:
block_fns.append(cst.EmptyLine())
block_fns.append(cst.EmptyLine())

View File

@@ -9,6 +9,7 @@ from enum import StrEnum
from typing import Any, Callable, Literal
import structlog
from jinja2.sandbox import SandboxedEnvironment
from playwright.async_api import Page
from skyvern.config import settings
@@ -29,6 +30,7 @@ from skyvern.webeye.actions.parse_actions import parse_actions
from skyvern.webeye.browser_factory import BrowserState
from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website
jinja_sandbox_env = SandboxedEnvironment()
LOG = structlog.get_logger()
SELECT_OPTION_GOAL = """- The intention to select an option: {intention}.
- The overall goal that the user wants to achieve: {prompt}."""
@@ -62,6 +64,25 @@ async def _get_element_id_by_xpath(xpath: str, page: Page) -> str | None:
return element_id
def render_template(template: str, data: dict[str, Any] | None = None) -> str:
"""
Refer to Block.format_block_parameter_template_from_workflow_run_context
TODO: complete this function so that block code shares the same template rendering logic
"""
template_data = data or {}
jinja_template = jinja_sandbox_env.from_string(template)
context = skyvern_context.current()
if context and context.workflow_run_id:
workflow_run_id = context.workflow_run_id
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
template_data.update(workflow_run_context.values)
if template in template_data:
return template_data[template]
return jinja_template.render(template_data)
class SkyvernPage:
"""
A minimal adapter around the chosen driver that:
@@ -195,6 +216,7 @@ class SkyvernPage:
return decorator
async def goto(self, url: str, timeout: float = settings.BROWSER_LOADING_TIMEOUT_MS) -> None:
url = render_template(url)
await self.page.goto(
url,
timeout=timeout,
@@ -779,8 +801,10 @@ class ScriptRunContextManager:
def set_cached_fn(self, cache_key: str, fn: Callable) -> None:
self.cached_fns[cache_key] = fn
def get_cached_fn(self, cache_key: str) -> Callable | None:
return self.cached_fns.get(cache_key)
def get_cached_fn(self, cache_key: str | None = None) -> Callable | None:
if cache_key:
return self.cached_fns.get(cache_key)
return None
script_run_context_manager = ScriptRunContextManager()

View File

@@ -19,6 +19,7 @@ class CodeGenInput:
workflow: dict[str, Any]
workflow_blocks: list[dict[str, Any]]
actions_by_task: dict[str, list[dict[str, Any]]]
task_v2_child_blocks: dict[str, list[dict[str, Any]]] # task_v2_label -> list of child blocks
async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organization_id: str) -> CodeGenInput:
@@ -51,26 +52,26 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
workflow_run_blocks.sort(key=lambda x: x.created_at)
# Create mapping from definition blocks by label for quick lookup
definition_blocks_by_label = {block.label: block for block in workflow_definition_blocks if block.label}
workflow_run_blocks_by_label = {block.label: block for block in workflow_run_blocks if block.label}
workflow_block_dump = []
actions_by_task = {}
task_v2_child_blocks = {}
# Loop through workflow run blocks and match to original definition blocks by label
for run_block in workflow_run_blocks:
if run_block.block_type == BlockType.TaskV2:
raise ValueError("TaskV2 blocks are not supported yet")
for definition_block in workflow_definition_blocks:
# if definition_block.block_type == BlockType.TaskV2:
# raise ValueError("TaskV2 blocks are not supported yet")
# Find corresponding definition block by label to get templated information
definition_block = definition_blocks_by_label.get(run_block.label) if run_block.label else None
run_block = workflow_run_blocks_by_label.get(definition_block.label) if definition_block.label else None
if definition_block:
final_dump = {}
if run_block:
# Start with the original templated definition block
final_dump = definition_block.model_dump()
else:
# Fallback to run block data if no matching definition block found
final_dump = run_block.model_dump()
LOG.warning(f"No matching definition block found for run block with label: {run_block.label}")
# the run_block is not a top level block - for now we will skip non top level blocks, like any blocks defined inside a loop block
continue
# For task blocks, add execution data while preserving templated information
if run_block.block_type in SCRIPT_TASK_BLOCKS and run_block.task_id:
@@ -119,6 +120,41 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
else:
LOG.warning(f"Task {run_block.task_id} not found")
if run_block.block_type == BlockType.TaskV2:
# Merge child workflow run data for task v2 blocks
if run_block.block_workflow_run_id:
try:
# Recursively get child workflow run data
child_code_gen_input = await transform_workflow_run_to_code_gen_input(
workflow_run_id=run_block.block_workflow_run_id, organization_id=organization_id
)
# Store child blocks for this task_v2 block
task_v2_label = run_block.label or f"task_v2_{run_block.workflow_run_block_id}"
task_v2_child_blocks[task_v2_label] = child_code_gen_input.workflow_blocks
# Merge child workflow blocks into the current workflow_block_dump (but mark them as child blocks)
# for child_block in child_code_gen_input.workflow_blocks:
# child_block["parent_task_v2_label"] = task_v2_label
# workflow_block_dump.append(child_block)
# Merge child actions_by_task into current actions_by_task
for task_id, child_actions in child_code_gen_input.actions_by_task.items():
actions_by_task[task_id] = child_actions
# Also merge nested task_v2 child blocks if any
for nested_label, nested_blocks in child_code_gen_input.task_v2_child_blocks.items():
task_v2_child_blocks[nested_label] = nested_blocks
except Exception as e:
LOG.warning(
"Failed to merge child workflow run data for task v2 block",
task_v2_workflow_run_id=run_block.block_workflow_run_id,
error=str(e),
)
else:
LOG.warning(f"Task v2 block {run_block.label} does not have a child workflow run id")
workflow_block_dump.append(final_dump)
return CodeGenInput(
@@ -127,4 +163,5 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
workflow=workflow_json,
workflow_blocks=workflow_block_dump,
actions_by_task=actions_by_task,
task_v2_child_blocks=task_v2_child_blocks,
)

View File

@@ -2624,6 +2624,7 @@ class WorkflowService:
workflow=codegen_input.workflow,
blocks=codegen_input.workflow_blocks,
actions_by_task=codegen_input.actions_by_task,
task_v2_child_blocks=codegen_input.task_v2_child_blocks,
organization_id=workflow.organization_id,
script_id=created_script.script_id,
script_revision_id=created_script.script_revision_id,

View File

@@ -7,7 +7,7 @@ import os
import uuid
from dataclasses import dataclass
from datetime import datetime
from typing import Any, cast
from typing import Any, Callable, cast
import libcst as cst
import structlog
@@ -29,16 +29,20 @@ from skyvern.forge.sdk.schemas.files import FileInfo
from skyvern.forge.sdk.schemas.tasks import Task, TaskOutput, TaskStatus
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock
from skyvern.forge.sdk.workflow.models.block import (
ActionBlock,
CodeBlock,
ExtractionBlock,
FileDownloadBlock,
FileParserBlock,
FileUploadBlock,
HttpRequestBlock,
LoginBlock,
SendEmailBlock,
TaskBlock,
TextPromptBlock,
UrlBlock,
)
from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, OutputParameter
from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, OutputParameter, ParameterType
from skyvern.forge.sdk.workflow.models.workflow import Workflow
from skyvern.schemas.runs import RunEngine
from skyvern.schemas.scripts import CreateScriptResponse, FileEncoding, FileNode, ScriptFileCreate
@@ -438,7 +442,16 @@ async def _record_output_parameter_value(
# get the output_paramter
output_parameter = workflow.get_output_parameter(label)
if not output_parameter:
return
# NOT sure if this is legit hack to create output parameter like this
label = label or f"block_{uuid.uuid4()}"
output_parameter = OutputParameter(
output_parameter_id=str(uuid.uuid4()),
key=f"{label}_output",
workflow_id=workflow_id,
created_at=datetime.now(),
modified_at=datetime.now(),
parameter_type=ParameterType.OUTPUT,
)
await workflow_run_context.register_output_parameter_value_post_execution(
parameter=output_parameter,
@@ -527,14 +540,9 @@ async def _update_workflow_block(
)
async def _run_cached_function(cache_key: str) -> Any:
cached_fn = script_run_context_manager.get_cached_fn(cache_key)
if cached_fn:
# TODO: handle exceptions here and fall back to AI run in case of error
run_context = script_run_context_manager.ensure_run_context()
return await cached_fn(page=run_context.page, context=run_context)
else:
raise Exception(f"Cache key {cache_key} not found")
async def _run_cached_function(cached_fn: Callable) -> Any:
run_context = script_run_context_manager.ensure_run_context()
return await cached_fn(page=run_context.page, context=run_context)
async def _fallback_to_ai_run(
@@ -623,13 +631,15 @@ async def _fallback_to_ai_run(
# get the output_paramter
output_parameter = workflow.get_output_parameter(cache_key)
if not output_parameter:
LOG.exception(
"Output parameter not found for the workflow",
# NOT sure if this is legit hack to create output parameter like this
output_parameter = OutputParameter(
output_parameter_id=str(uuid.uuid4()),
key=f"{cache_key}_output",
workflow_id=workflow_id,
workflow_permanent_id=workflow_permanent_id,
workflow_run_id=workflow_run_id,
created_at=datetime.now(),
modified_at=datetime.now(),
parameter_type=ParameterType.OUTPUT,
)
return
LOG.info(
"Script starting to fallback to AI run",
cache_key=cache_key,
@@ -1026,21 +1036,27 @@ async def run_task(
max_steps: int | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
label: str | None = None,
cache_key: str | None = None,
engine: RunEngine = RunEngine.skyvern_v1,
model: dict[str, Any] | None = None,
) -> None:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.TASK,
prompt=prompt,
url=url,
)
# set the prompt in the RunContext
context = skyvern_context.ensure_context()
context.prompt = prompt
cache_key = cache_key or label
cached_fn = script_run_context_manager.get_cached_fn(cache_key)
if cache_key:
context: skyvern_context.SkyvernContext | None = None
if cache_key and cached_fn:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.TASK,
prompt=prompt,
url=url,
)
# set the prompt in the RunContext
context = skyvern_context.ensure_context()
context.prompt = prompt
try:
await _run_cached_function(cache_key)
await _run_cached_function(cached_fn)
# Update block status to completed if workflow block was created
if workflow_run_block_id:
@@ -1069,39 +1085,52 @@ async def run_task(
# clear the prompt in the RunContext
context.prompt = None
else:
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason="Cache key is required",
)
context.prompt = None
raise Exception("Cache key is required to run task block in a script")
block_validation_output = await _validate_and_get_output_parameter(label)
task_block = TaskBlock(
label=block_validation_output.label,
output_parameter=block_validation_output.output_parameter,
url=url,
navigation_goal=prompt,
max_steps_per_run=max_steps,
totp_identifier=totp_identifier,
totp_verification_url=totp_url,
include_action_history_in_verification=True,
engine=RunEngine.skyvern_v1,
)
await task_block.execute_safe(
workflow_run_id=block_validation_output.workflow_run_id,
organization_id=block_validation_output.organization_id,
browser_session_id=block_validation_output.browser_session_id,
)
async def download(
prompt: str,
url: str | None = None,
complete_on_download: bool = True,
max_steps: int | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
label: str | None = None,
cache_key: str | None = None,
) -> None:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.FILE_DOWNLOAD,
prompt=prompt,
url=url,
)
# set the prompt in the RunContext
context = skyvern_context.ensure_context()
context.prompt = prompt
cache_key = cache_key or label
cached_fn = script_run_context_manager.get_cached_fn(cache_key)
context: skyvern_context.SkyvernContext | None
if cache_key and cached_fn:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.FILE_DOWNLOAD,
prompt=prompt,
url=url,
)
# set the prompt in the RunContext
context = skyvern_context.ensure_context()
context.prompt = prompt
if cache_key:
try:
await _run_cached_function(cache_key)
await _run_cached_function(cached_fn)
# Update block status to completed if workflow block was created
if workflow_run_block_id:
@@ -1121,25 +1150,31 @@ async def download(
prompt=prompt,
url=url,
max_steps=max_steps,
complete_on_download=True,
complete_on_download=complete_on_download,
error=e,
workflow_run_block_id=workflow_run_block_id,
)
finally:
context.prompt = None
else:
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason="Cache key is required",
)
context.prompt = None
raise Exception("Cache key is required to run task block in a script")
block_validation_output = await _validate_and_get_output_parameter(label)
file_download_block = FileDownloadBlock(
label=block_validation_output.label,
output_parameter=block_validation_output.output_parameter,
url=url,
complete_on_download=complete_on_download,
navigation_goal=prompt,
max_steps_per_run=max_steps,
totp_identifier=totp_identifier,
totp_verification_url=totp_url,
include_action_history_in_verification=True,
engine=RunEngine.skyvern_v1,
)
await file_download_block.execute_safe(
workflow_run_id=block_validation_output.workflow_run_id,
organization_id=block_validation_output.organization_id,
browser_session_id=block_validation_output.browser_session_id,
)
async def action(
@@ -1148,21 +1183,25 @@ async def action(
max_steps: int | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
label: str | None = None,
cache_key: str | None = None,
) -> None:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.ACTION,
prompt=prompt,
url=url,
)
# set the prompt in the RunContext
context = skyvern_context.ensure_context()
context.prompt = prompt
context: skyvern_context.SkyvernContext | None
cache_key = cache_key or label
cached_fn = script_run_context_manager.get_cached_fn(cache_key)
if cache_key and cached_fn:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.ACTION,
prompt=prompt,
url=url,
)
# set the prompt in the RunContext
context = skyvern_context.ensure_context()
context.prompt = prompt
if cache_key:
try:
await _run_cached_function(cache_key)
await _run_cached_function(cached_fn)
# Update block status to completed if workflow block was created
if workflow_run_block_id:
@@ -1190,18 +1229,21 @@ async def action(
finally:
context.prompt = None
else:
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason="Cache key is required",
)
context.prompt = None
raise Exception("Cache key is required to run task block in a script")
block_validation_output = await _validate_and_get_output_parameter(label)
action_block = ActionBlock(
label=block_validation_output.label,
output_parameter=block_validation_output.output_parameter,
url=url,
navigation_goal=prompt,
max_steps_per_run=max_steps,
totp_identifier=totp_identifier,
totp_verification_url=totp_url,
)
await action_block.execute_safe(
workflow_run_id=block_validation_output.workflow_run_id,
organization_id=block_validation_output.organization_id,
browser_session_id=block_validation_output.browser_session_id,
)
async def login(
@@ -1210,21 +1252,24 @@ async def login(
max_steps: int | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
label: str | None = None,
cache_key: str | None = None,
) -> None:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.LOGIN,
prompt=prompt,
url=url,
)
# set the prompt in the RunContext
context = skyvern_context.ensure_context()
context.prompt = prompt
if cache_key:
context: skyvern_context.SkyvernContext | None
cache_key = cache_key or label
cached_fn = script_run_context_manager.get_cached_fn(cache_key)
if cache_key and cached_fn:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.LOGIN,
prompt=prompt,
url=url,
)
# set the prompt in the RunContext
context = skyvern_context.ensure_context()
context.prompt = prompt
try:
await _run_cached_function(cache_key)
await _run_cached_function(cached_fn)
# Update block status to completed if workflow block was created
if workflow_run_block_id:
@@ -1252,18 +1297,21 @@ async def login(
finally:
context.prompt = None
else:
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason="Cache key is required",
)
context.prompt = None
raise Exception("Cache key is required to run task block in a script")
block_validation_output = await _validate_and_get_output_parameter(label)
login_block = LoginBlock(
label=block_validation_output.label,
output_parameter=block_validation_output.output_parameter,
url=url,
navigation_goal=prompt,
max_steps_per_run=max_steps,
totp_identifier=totp_identifier,
totp_verification_url=totp_url,
)
await login_block.execute_safe(
workflow_run_id=block_validation_output.workflow_run_id,
organization_id=block_validation_output.organization_id,
browser_session_id=block_validation_output.browser_session_id,
)
async def extract(
@@ -1271,23 +1319,27 @@ async def extract(
schema: dict[str, Any] | list | str | None = None,
url: str | None = None,
max_steps: int | None = None,
label: str | None = None,
cache_key: str | None = None,
) -> dict[str, Any] | list | str | None:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.EXTRACTION,
prompt=prompt,
schema=schema,
url=url,
)
# set the prompt in the RunContext
context = skyvern_context.ensure_context()
context.prompt = prompt
output: dict[str, Any] | list | str | None = None
if cache_key:
context: skyvern_context.SkyvernContext | None
cache_key = cache_key or label
cached_fn = script_run_context_manager.get_cached_fn(cache_key)
if cache_key and cached_fn:
# Auto-create workflow block run and task if workflow_run_id is available
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
block_type=BlockType.EXTRACTION,
prompt=prompt,
schema=schema,
url=url,
)
# set the prompt in the RunContext
context = skyvern_context.ensure_context()
context.prompt = prompt
try:
output = cast(dict[str, Any] | list | str | None, await _run_cached_function(cache_key))
output = cast(dict[str, Any] | list | str | None, await _run_cached_function(cached_fn))
# Update block status to completed if workflow block was created
if workflow_run_block_id:
@@ -1318,18 +1370,21 @@ async def extract(
finally:
context.prompt = None
else:
if workflow_run_block_id:
await _update_workflow_block(
workflow_run_block_id,
BlockStatus.failed,
task_id=task_id,
task_status=TaskStatus.failed,
step_id=step_id,
step_status=StepStatus.failed,
failure_reason="Cache key is required",
)
context.prompt = None
raise Exception("Cache key is required to run task block in a script")
block_validation_output = await _validate_and_get_output_parameter(label)
extraction_block = ExtractionBlock(
label=block_validation_output.label,
url=url,
data_extraction_goal=prompt,
max_steps_per_run=max_steps,
data_schema=schema,
output_parameter=block_validation_output.output_parameter,
)
block_result = await extraction_block.execute_safe(
workflow_run_id=block_validation_output.workflow_run_id,
organization_id=block_validation_output.organization_id,
browser_session_id=block_validation_output.browser_session_id,
)
return block_result.output_parameter_value
async def wait(seconds: int) -> None:
@@ -1441,10 +1496,21 @@ def render_template(template: str, data: dict[str, Any] | None = None) -> str:
workflow_run_id = context.workflow_run_id
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
template_data.update(workflow_run_context.values)
if template in template_data:
return template_data[template]
return jinja_template.render(template_data)
def render_list(template: str, data: dict[str, Any] | None = None) -> list[str]:
rendered_value = render_template(template, data)
list_value = eval(rendered_value)
if isinstance(list_value, list):
return list_value
else:
return [list_value]
# Non-task-based blocks
## Non-task-based block helpers
@dataclass
@@ -1476,7 +1542,15 @@ async def _validate_and_get_output_parameter(label: str | None = None) -> BlockV
label = label or f"block_{uuid.uuid4()}"
output_parameter = workflow.get_output_parameter(label)
if not output_parameter:
raise Exception("Output parameter not found")
# NOT sure if this is legit hack to create output parameter like this
output_parameter = OutputParameter(
output_parameter_id=str(uuid.uuid4()),
key=f"{label}_output",
workflow_id=workflow_id,
created_at=datetime.now(),
modified_at=datetime.now(),
parameter_type=ParameterType.OUTPUT,
)
return BlockValidationOutput(
label=label,
output_parameter=output_parameter,