From 1cb84f13b54aa2dcae403f279c7a50491559c752 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Thu, 18 Sep 2025 00:27:49 -0700 Subject: [PATCH] script gen - loop block support (#3455) --- skyvern/__init__.py | 2 + .../script_generations/generate_script.py | 493 ++++++++++++------ .../core/script_generations/skyvern_page.py | 28 +- .../transform_workflow_run.py | 57 +- skyvern/forge/sdk/workflow/service.py | 1 + skyvern/services/script_service.py | 344 +++++++----- 6 files changed, 611 insertions(+), 314 deletions(-) diff --git a/skyvern/__init__.py b/skyvern/__init__.py index 75f22af0..d9671fbe 100644 --- a/skyvern/__init__.py +++ b/skyvern/__init__.py @@ -39,6 +39,7 @@ from skyvern.services.script_service import ( # noqa: E402 login, # noqa: E402 parse_file, # noqa: E402 prompt, # noqa: E402 + render_list, # noqa: E402 render_template, # noqa: E402 run_code, # noqa: E402 run_script, # noqa: E402 @@ -63,6 +64,7 @@ __all__ = [ "login", "parse_file", "prompt", + "render_list", "render_template", "run_code", "run_script", diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index bb83fb8a..29c6a875 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -151,15 +151,26 @@ def _value(value: Any) -> cst.BaseExpression: return cst.SimpleString(repr(str(value))) -def _render_value(prompt_text: str | None = None) -> cst.BaseExpression: +def _render_value( + prompt_text: str | None = None, + data_variable_name: str | None = None, + render_func_name: str = "render_template", +) -> cst.BaseExpression: """Create a prompt value with template rendering logic if needed.""" if not prompt_text: return cst.SimpleString("") if "{{" in prompt_text and "}}" in prompt_text: - # Generate code for: render_template(prompt_text) + args = [cst.Arg(value=_value(prompt_text))] + if data_variable_name: + args.append( + cst.Arg( + keyword=cst.Name("data"), + value=cst.Name(data_variable_name), + ) + ) return cst.Call( - func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("render_template")), - args=[cst.Arg(value=_value(prompt_text))], + func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name(render_func_name)), + args=args, ) else: # Return the prompt as a simple string value @@ -537,6 +548,34 @@ def _build_block_fn(block: dict[str, Any], actions: list[dict[str, Any]]) -> Fun ) +def _build_task_v2_block_fn(block: dict[str, Any], child_blocks: list[dict[str, Any]]) -> FunctionDef: + """Build a cached function for task_v2 blocks that calls child workflow sub-tasks.""" + name = block.get("label") or _safe_name(block.get("title") or f"block_{block.get('workflow_run_block_id')}") + body_stmts: list[cst.BaseStatement] = [] + + # Add calls to child workflow sub-tasks + for child_block in child_blocks: + stmt = _build_block_statement(child_block) + body_stmts.append(stmt) + + if not body_stmts: + body_stmts.append(cst.parse_statement("return None")) + + return FunctionDef( + name=Name(name), + params=cst.Parameters( + params=[ + Param(name=Name("page"), annotation=cst.Annotation(cst.Name("SkyvernPage"))), + Param(name=Name("context"), annotation=cst.Annotation(cst.Name("RunContext"))), + ] + ), + decorators=[_make_decorator(name, block)], + body=cst.IndentedBlock(body_stmts), + returns=None, + asynchronous=cst.Asynchronous(), + ) + + def _build_model(workflow: dict[str, Any]) -> cst.ClassDef: """ class WorkflowParameters(BaseModel): @@ -592,9 +631,11 @@ def _build_generated_model_from_schema(schema_code: str) -> cst.ClassDef | None: # --------------------------------------------------------------------- # -def _build_run_task_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine: +def _build_run_task_statement( + block_title: str, block: dict[str, Any], data_variable_name: str | None = None +) -> cst.SimpleStatementLine: """Build a skyvern.run_task statement.""" - args = __build_base_task_statement(block_title, block) + args = __build_base_task_statement(block_title, block, data_variable_name) call = cst.Call( func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("run_task")), args=args, @@ -607,20 +648,14 @@ def _build_run_task_statement(block_title: str, block: dict[str, Any]) -> cst.Si return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) -def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine: +def _build_download_statement( + block_title: str, block: dict[str, Any], data_variable_name: str | None = None +) -> cst.SimpleStatementLine: """Build a skyvern.download statement.""" args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_render_value(block.get("navigation_goal") or ""), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("complete_on_download"), - value=_value(block.get("complete_on_download", False)), + value=_render_value(block.get("navigation_goal") or "", data_variable_name=data_variable_name), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -631,7 +666,7 @@ def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.Si args.append( cst.Arg( keyword=cst.Name("download_suffix"), - value=_render_value(block.get("download_suffix")), + value=_render_value(block.get("download_suffix"), data_variable_name=data_variable_name), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -661,19 +696,21 @@ def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.Si return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) -def _build_action_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine: +def _build_action_statement( + block_title: str, block: dict[str, Any], data_variable_name: str | None = None +) -> cst.SimpleStatementLine: """Build a skyvern.action statement.""" args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_render_value(block.get("navigation_goal", "")), + value=_render_value(block.get("navigation_goal", ""), data_variable_name=data_variable_name), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), ), ), cst.Arg( - keyword=cst.Name("cache_key"), + keyword=cst.Name("label"), value=_value(block_title), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, @@ -694,9 +731,11 @@ def _build_action_statement(block_title: str, block: dict[str, Any]) -> cst.Simp return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) -def _build_login_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine: +def _build_login_statement( + block_title: str, block: dict[str, Any], data_variable_name: str | None = None +) -> cst.SimpleStatementLine: """Build a skyvern.login statement.""" - args = __build_base_task_statement(block_title, block) + args = __build_base_task_statement(block_title, block, data_variable_name) call = cst.Call( func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("login")), args=args, @@ -709,12 +748,14 @@ def _build_login_statement(block_title: str, block: dict[str, Any]) -> cst.Simpl return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) -def _build_extract_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine: +def _build_extract_statement( + block_title: str, block: dict[str, Any], data_variable_name: str | None = None +) -> cst.SimpleStatementLine: """Build a skyvern.extract statement.""" args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_render_value(block.get("data_extraction_goal", "")), + value=_render_value(block.get("data_extraction_goal", ""), data_variable_name=data_variable_name), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -729,7 +770,7 @@ def _build_extract_statement(block_title: str, block: dict[str, Any]) -> cst.Sim ), ), cst.Arg( - keyword=cst.Name("cache_key"), + keyword=cst.Name("label"), value=_value(block_title), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, @@ -750,43 +791,11 @@ def _build_extract_statement(block_title: str, block: dict[str, Any]) -> cst.Sim return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) -def _build_navigate_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine: +def _build_navigate_statement( + block_title: str, block: dict[str, Any], data_variable_name: str | None = None +) -> cst.SimpleStatementLine: """Build a skyvern.navigate statement.""" - args = [ - cst.Arg( - keyword=cst.Name("prompt"), - value=_render_value(block.get("navigation_goal", "")), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("url"), - value=_value(block.get("url", "")), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("max_steps"), - value=_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("cache_key"), - value=_value(block_title), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - ), - comma=cst.Comma(), - ), - ] - + args = __build_base_task_statement(block_title, block, data_variable_name) call = cst.Call( func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("run_task")), args=args, @@ -915,45 +924,12 @@ def _build_wait_statement(block: dict[str, Any]) -> cst.SimpleStatementLine: return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) -def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine: - """Build a skyvern.for_loop statement.""" - args = [ - cst.Arg( - keyword=cst.Name("prompt"), - value=_render_value(block.get("navigation_goal", "")), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - cst.Arg( - keyword=cst.Name("max_steps"), - value=_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - ), - comma=cst.Comma(), - ), - ] - - call = cst.Call( - func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("for_loop")), - args=args, - whitespace_before_args=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ) - - return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) - - -def _build_goto_statement(block: dict[str, Any]) -> cst.SimpleStatementLine: +def _build_goto_statement(block: dict[str, Any], data_variable_name: str | None = None) -> cst.SimpleStatementLine: """Build a skyvern.goto statement.""" args = [ cst.Arg( keyword=cst.Name("url"), - value=_value(block.get("url", "")), + value=_render_value(block.get("url", ""), data_variable_name=data_variable_name), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -1301,6 +1277,119 @@ def _build_prompt_statement(block: dict[str, Any]) -> cst.SimpleStatementLine: return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) +def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.For: + """ + Build a for loop statement. + All the blocks within the for loop block statement will run without cache_key. + + An example of a for loop statement: + ``` + for current_value in context.parameters["urls"]: + await skyvern.goto( + url=current_value, + label="block_4", + ) + await skyvern.extract( + prompt="Get a summary of the page", + schema={ + "type": "object", + "properties": { + "summary": { + "type": "string", + "description": "A concise summary of the main content or purpose of the page" + } + }, + "required": [ + "summary" + ] + }, + label="block_5", + ) + ``` + """ + # Extract loop configuration + loop_over_parameter_key = block.get("loop_variable_reference", "") + loop_blocks = block.get("loop_blocks", []) + + # Create the for loop target (current_value) + target = cst.Name("current_value") + + # Build body statements from loop_blocks + body_statements = [] + + # Add loop_data assignment as the first statement + loop_data_variable_name = "loop_data" + loop_data_assignment = cst.SimpleStatementLine( + [ + cst.Assign( + targets=[cst.AssignTarget(target=cst.Name(loop_data_variable_name))], + value=cst.Dict( + [cst.DictElement(key=cst.SimpleString('"current_value"'), value=cst.Name("current_value"))] + ), + ) + ] + ) + body_statements.append(loop_data_assignment) + + for loop_block in loop_blocks: + stmt = _build_block_statement(loop_block, data_variable_name=loop_data_variable_name) + body_statements.append(stmt) + + # Create the for loop + for_loop = cst.For( + target=target, + iter=_render_value(loop_over_parameter_key, render_func_name="render_list"), + body=cst.IndentedBlock(body=body_statements), + whitespace_after_for=cst.SimpleWhitespace(" "), + whitespace_before_in=cst.SimpleWhitespace(" "), + whitespace_after_in=cst.SimpleWhitespace(" "), + whitespace_before_colon=cst.SimpleWhitespace(""), + ) + + return for_loop + + +def _build_goto_statement_for_loop(block: dict[str, Any]) -> cst.SimpleStatementLine: + """Build a skyvern.goto statement for use within loops, handling current_value template.""" + url_value = block.get("url", "") + + # Handle {{current_value}} template by replacing it with the current_value variable + if url_value == "{{current_value}}": + url_expr = cst.Name("current_value") + else: + url_expr = _value(url_value) + + args = [ + cst.Arg( + keyword=cst.Name("url"), + value=url_expr, + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ), + cst.Arg( + keyword=cst.Name("label"), + value=_value(block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}"), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + ), + comma=cst.Comma(), + ), + ] + + call = cst.Call( + func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("goto")), + args=args, + whitespace_before_args=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + + return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) + + def _mark_last_arg_as_comma(args: list[cst.Arg]) -> None: if not args: return @@ -1317,11 +1406,15 @@ def _mark_last_arg_as_comma(args: list[cst.Arg]) -> None: args.append(new_arg) -def __build_base_task_statement(block_title: str, block: dict[str, Any]) -> list[cst.Arg]: +def __build_base_task_statement( + block_title: str, block: dict[str, Any], data_variable_name: str | None = None +) -> list[cst.Arg]: + block_type = block.get("block_type") + prompt = block.get("prompt") if block_type == "task_v2" else block.get("navigation_goal") args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_render_value(block.get("navigation_goal", "")), + value=_render_value(prompt, data_variable_name=data_variable_name), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -1339,11 +1432,12 @@ def __build_base_task_statement(block_title: str, block: dict[str, Any]) -> list ), ) ) - if block.get("max_steps_per_run"): + max_steps = block.get("max_steps") if block_type == "task_v2" else block.get("max_steps_per_run") + if max_steps: args.append( cst.Arg( keyword=cst.Name("max_steps"), - value=_render_value(block.get("max_steps_per_run", settings.MAX_STEPS_PER_RUN)), + value=_value(max_steps or settings.MAX_STEPS_PER_RUN), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -1372,9 +1466,20 @@ def __build_base_task_statement(block_title: str, block: dict[str, Any]) -> list ), ) ) + if block.get("block_type") == "task_v2": + args.append( + cst.Arg( + keyword=cst.Name("engine"), + value=_value("skyvern-2.0"), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) args.append( cst.Arg( - keyword=cst.Name("cache_key"), + keyword=cst.Name("label"), value=_value(block_title), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, @@ -1390,6 +1495,52 @@ def __build_base_task_statement(block_title: str, block: dict[str, Any]) -> list # --------------------------------------------------------------------- # +def _build_block_statement(block: dict[str, Any], data_variable_name: str | None = None) -> cst.SimpleStatementLine: + """Build a block statement.""" + block_type = block.get("block_type") + block_title = block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}" + + if block_type in SCRIPT_TASK_BLOCKS: + # For task blocks, call the custom function with cache_key + if block_type == "task": + stmt = _build_run_task_statement(block_title, block, data_variable_name) + elif block_type == "file_download": + stmt = _build_download_statement(block_title, block, data_variable_name) + elif block_type == "action": + stmt = _build_action_statement(block_title, block, data_variable_name) + elif block_type == "login": + stmt = _build_login_statement(block_title, block, data_variable_name) + elif block_type == "extraction": + stmt = _build_extract_statement(block_title, block, data_variable_name) + elif block_type == "navigation": + stmt = _build_navigate_statement(block_title, block, data_variable_name) + elif block_type == "task_v2": + stmt = _build_run_task_statement(block_title, block, data_variable_name) + elif block_type == "send_email": + stmt = _build_send_email_statement(block) + elif block_type == "text_prompt": + stmt = _build_prompt_statement(block) + elif block_type == "wait": + stmt = _build_wait_statement(block) + elif block_type == "for_loop": + stmt = _build_for_loop_statement(block_title, block) + elif block_type == "goto_url": + stmt = _build_goto_statement(block, data_variable_name) + elif block_type == "code": + stmt = _build_code_statement(block) + elif block_type == "file_upload": + stmt = _build_file_upload_statement(block) + elif block_type == "file_url_parser": + stmt = _build_file_url_parser_statement(block) + elif block_type == "http_request": + stmt = _build_http_request_statement(block) + else: + # Default case for unknown block types + stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))]) + + return stmt + + def _build_run_fn(blocks: list[dict[str, Any]], wf_req: dict[str, Any]) -> FunctionDef: body = [ cst.parse_statement( @@ -1399,69 +1550,9 @@ def _build_run_fn(blocks: list[dict[str, Any]], wf_req: dict[str, Any]) -> Funct ] for block in blocks: - block_type = block.get("block_type") - block_title = block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}" - - if block_type in SCRIPT_TASK_BLOCKS: - # For task blocks, call the custom function with cache_key - if block_type == "task": - stmt = _build_run_task_statement(block_title, block) - elif block_type == "file_download": - stmt = _build_download_statement(block_title, block) - elif block_type == "action": - stmt = _build_action_statement(block_title, block) - elif block_type == "login": - stmt = _build_login_statement(block_title, block) - elif block_type == "extraction": - stmt = _build_extract_statement(block_title, block) - elif block_type == "navigation": - stmt = _build_navigate_statement(block_title, block) - elif block_type == "send_email": - stmt = _build_send_email_statement(block) - elif block_type == "text_prompt": - stmt = _build_prompt_statement(block) - elif block_type == "wait": - stmt = _build_wait_statement(block) - elif block_type == "for_loop": - stmt = _build_for_loop_statement(block_title, block) - elif block_type == "goto_url": - stmt = _build_goto_statement(block) - elif block_type == "code": - stmt = _build_code_statement(block) - elif block_type == "file_upload": - stmt = _build_file_upload_statement(block) - elif block_type == "file_url_parser": - stmt = _build_file_url_parser_statement(block) - elif block_type == "http_request": - stmt = _build_http_request_statement(block) - else: - # Default case for unknown block types - stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))]) - + stmt = _build_block_statement(block) body.append(stmt) - # Add a final validation step if not already present - # has_validation = any(block.get("block_type") == "text_prompt" for block in blocks) - # has_task_blocks = any(block.get("block_type") in SCRIPT_TASK_BLOCKS for block in blocks) - # if not has_validation and not has_task_blocks: - # # Build the final validation statement using LibCST components - # args = [ - # cst.Arg( - # keyword=cst.Name("prompt"), - # value=cst.SimpleString( - # '"Your goal is to validate that the workflow completed successfully. COMPLETE if successful, TERMINATE if there are issues."' - # ), - # ), - # ] - - # call = cst.Call( - # func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("validate")), - # args=args, - # ) - - # validation_stmt = cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) - # body.append(validation_stmt) - params = cst.Parameters( params=[ Param( @@ -1518,6 +1609,7 @@ async def generate_workflow_script( workflow: dict[str, Any], blocks: list[dict[str, Any]], actions_by_task: dict[str, list[dict[str, Any]]], + task_v2_child_blocks: dict[str, list[dict[str, Any]]] | None = None, organization_id: str | None = None, run_id: str | None = None, script_id: str | None = None, @@ -1575,11 +1667,18 @@ async def generate_workflow_script( # --- blocks --------------------------------------------------------- block_fns = [] - task_blocks = [block for block in blocks if block["block_type"] in SCRIPT_TASK_BLOCKS] - length_of_tasks = len(task_blocks) + task_v1_blocks = [block for block in blocks if block["block_type"] in SCRIPT_TASK_BLOCKS] + task_v2_blocks = [block for block in blocks if block["block_type"] == "task_v2"] + + if task_v2_child_blocks is None: + task_v2_child_blocks = {} + + # Handle task v1 blocks (excluding child blocks of task_v2) + for idx, task in enumerate(task_v1_blocks): + # Skip if this is a child block of a task_v2 block + if task.get("parent_task_v2_label"): + continue - # Create script first if organization_id is provided - for idx, task in enumerate(task_blocks): block_fn_def = _build_block_fn(task, actions_by_task.get(task.get("task_id", ""), [])) # Create script block if we have script context @@ -1602,7 +1701,67 @@ async def generate_workflow_script( # Continue without script block creation if it fails block_fns.append(block_fn_def) - if idx < length_of_tasks - 1: + if idx < len(task_v1_blocks) - 1: + block_fns.append(cst.EmptyLine()) + block_fns.append(cst.EmptyLine()) + + # Handle task_v2 blocks + for idx, task_v2 in enumerate(task_v2_blocks): + task_v2_label = task_v2.get("label") or f"task_v2_{task_v2.get('workflow_run_block_id')}" + child_blocks = task_v2_child_blocks.get(task_v2_label, []) + + # Create the task_v2 function + task_v2_fn_def = _build_task_v2_block_fn(task_v2, child_blocks) + + # Create script block for task_v2 that includes both the main function and child functions + if script_id and script_revision_id and organization_id: + try: + # Build the complete module for this task_v2 block + task_v2_block_body = [task_v2_fn_def] + + # Add child block functions + for child_block in child_blocks: + if ( + child_block.get("block_type") in SCRIPT_TASK_BLOCKS + and child_block.get("block_type") != "task_v2" + ): + child_fn_def = _build_block_fn( + child_block, actions_by_task.get(child_block.get("task_id", ""), []) + ) + task_v2_block_body.append(cst.EmptyLine()) + task_v2_block_body.append(cst.EmptyLine()) + task_v2_block_body.append(child_fn_def) + + # Create the complete module for this task_v2 block + temp_module = cst.Module(body=task_v2_block_body) + task_v2_block_code = temp_module.code + + block_name = task_v2.get("label") or task_v2.get("title") or f"task_v2_{idx}" + block_description = f"Generated task_v2 block with child functions: {block_name}" + + await create_script_block( + block_code=task_v2_block_code, + script_revision_id=script_revision_id, + script_id=script_id, + organization_id=organization_id, + block_name=block_name, + block_description=block_description, + ) + except Exception as e: + LOG.error("Failed to create task_v2 script block", error=str(e), exc_info=True) + # Continue without script block creation if it fails + + block_fns.append(task_v2_fn_def) + + # Create individual functions for child blocks + for child_block in child_blocks: + if child_block.get("block_type") in SCRIPT_TASK_BLOCKS and child_block.get("block_type") != "task_v2": + child_fn_def = _build_block_fn(child_block, actions_by_task.get(child_block.get("task_id", ""), [])) + block_fns.append(cst.EmptyLine()) + block_fns.append(cst.EmptyLine()) + block_fns.append(child_fn_def) + + if idx < len(task_v2_blocks) - 1: block_fns.append(cst.EmptyLine()) block_fns.append(cst.EmptyLine()) diff --git a/skyvern/core/script_generations/skyvern_page.py b/skyvern/core/script_generations/skyvern_page.py index c65d9669..c6a1cfa0 100644 --- a/skyvern/core/script_generations/skyvern_page.py +++ b/skyvern/core/script_generations/skyvern_page.py @@ -9,6 +9,7 @@ from enum import StrEnum from typing import Any, Callable, Literal import structlog +from jinja2.sandbox import SandboxedEnvironment from playwright.async_api import Page from skyvern.config import settings @@ -29,6 +30,7 @@ from skyvern.webeye.actions.parse_actions import parse_actions from skyvern.webeye.browser_factory import BrowserState from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website +jinja_sandbox_env = SandboxedEnvironment() LOG = structlog.get_logger() SELECT_OPTION_GOAL = """- The intention to select an option: {intention}. - The overall goal that the user wants to achieve: {prompt}.""" @@ -62,6 +64,25 @@ async def _get_element_id_by_xpath(xpath: str, page: Page) -> str | None: return element_id +def render_template(template: str, data: dict[str, Any] | None = None) -> str: + """ + Refer to Block.format_block_parameter_template_from_workflow_run_context + + TODO: complete this function so that block code shares the same template rendering logic + """ + template_data = data or {} + jinja_template = jinja_sandbox_env.from_string(template) + context = skyvern_context.current() + if context and context.workflow_run_id: + workflow_run_id = context.workflow_run_id + workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id) + template_data.update(workflow_run_context.values) + if template in template_data: + return template_data[template] + + return jinja_template.render(template_data) + + class SkyvernPage: """ A minimal adapter around the chosen driver that: @@ -195,6 +216,7 @@ class SkyvernPage: return decorator async def goto(self, url: str, timeout: float = settings.BROWSER_LOADING_TIMEOUT_MS) -> None: + url = render_template(url) await self.page.goto( url, timeout=timeout, @@ -779,8 +801,10 @@ class ScriptRunContextManager: def set_cached_fn(self, cache_key: str, fn: Callable) -> None: self.cached_fns[cache_key] = fn - def get_cached_fn(self, cache_key: str) -> Callable | None: - return self.cached_fns.get(cache_key) + def get_cached_fn(self, cache_key: str | None = None) -> Callable | None: + if cache_key: + return self.cached_fns.get(cache_key) + return None script_run_context_manager = ScriptRunContextManager() diff --git a/skyvern/core/script_generations/transform_workflow_run.py b/skyvern/core/script_generations/transform_workflow_run.py index b10a6795..8b46ee63 100644 --- a/skyvern/core/script_generations/transform_workflow_run.py +++ b/skyvern/core/script_generations/transform_workflow_run.py @@ -19,6 +19,7 @@ class CodeGenInput: workflow: dict[str, Any] workflow_blocks: list[dict[str, Any]] actions_by_task: dict[str, list[dict[str, Any]]] + task_v2_child_blocks: dict[str, list[dict[str, Any]]] # task_v2_label -> list of child blocks async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organization_id: str) -> CodeGenInput: @@ -51,26 +52,26 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz workflow_run_blocks.sort(key=lambda x: x.created_at) # Create mapping from definition blocks by label for quick lookup - definition_blocks_by_label = {block.label: block for block in workflow_definition_blocks if block.label} + workflow_run_blocks_by_label = {block.label: block for block in workflow_run_blocks if block.label} workflow_block_dump = [] actions_by_task = {} + task_v2_child_blocks = {} # Loop through workflow run blocks and match to original definition blocks by label - for run_block in workflow_run_blocks: - if run_block.block_type == BlockType.TaskV2: - raise ValueError("TaskV2 blocks are not supported yet") + for definition_block in workflow_definition_blocks: + # if definition_block.block_type == BlockType.TaskV2: + # raise ValueError("TaskV2 blocks are not supported yet") - # Find corresponding definition block by label to get templated information - definition_block = definition_blocks_by_label.get(run_block.label) if run_block.label else None + run_block = workflow_run_blocks_by_label.get(definition_block.label) if definition_block.label else None - if definition_block: + final_dump = {} + if run_block: # Start with the original templated definition block final_dump = definition_block.model_dump() else: - # Fallback to run block data if no matching definition block found - final_dump = run_block.model_dump() - LOG.warning(f"No matching definition block found for run block with label: {run_block.label}") + # the run_block is not a top level block - for now we will skip non top level blocks, like any blocks defined inside a loop block + continue # For task blocks, add execution data while preserving templated information if run_block.block_type in SCRIPT_TASK_BLOCKS and run_block.task_id: @@ -119,6 +120,41 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz else: LOG.warning(f"Task {run_block.task_id} not found") + if run_block.block_type == BlockType.TaskV2: + # Merge child workflow run data for task v2 blocks + if run_block.block_workflow_run_id: + try: + # Recursively get child workflow run data + child_code_gen_input = await transform_workflow_run_to_code_gen_input( + workflow_run_id=run_block.block_workflow_run_id, organization_id=organization_id + ) + + # Store child blocks for this task_v2 block + task_v2_label = run_block.label or f"task_v2_{run_block.workflow_run_block_id}" + task_v2_child_blocks[task_v2_label] = child_code_gen_input.workflow_blocks + + # Merge child workflow blocks into the current workflow_block_dump (but mark them as child blocks) + # for child_block in child_code_gen_input.workflow_blocks: + # child_block["parent_task_v2_label"] = task_v2_label + # workflow_block_dump.append(child_block) + + # Merge child actions_by_task into current actions_by_task + for task_id, child_actions in child_code_gen_input.actions_by_task.items(): + actions_by_task[task_id] = child_actions + + # Also merge nested task_v2 child blocks if any + for nested_label, nested_blocks in child_code_gen_input.task_v2_child_blocks.items(): + task_v2_child_blocks[nested_label] = nested_blocks + + except Exception as e: + LOG.warning( + "Failed to merge child workflow run data for task v2 block", + task_v2_workflow_run_id=run_block.block_workflow_run_id, + error=str(e), + ) + else: + LOG.warning(f"Task v2 block {run_block.label} does not have a child workflow run id") + workflow_block_dump.append(final_dump) return CodeGenInput( @@ -127,4 +163,5 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz workflow=workflow_json, workflow_blocks=workflow_block_dump, actions_by_task=actions_by_task, + task_v2_child_blocks=task_v2_child_blocks, ) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 769ffe8c..aa5e5be2 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -2624,6 +2624,7 @@ class WorkflowService: workflow=codegen_input.workflow, blocks=codegen_input.workflow_blocks, actions_by_task=codegen_input.actions_by_task, + task_v2_child_blocks=codegen_input.task_v2_child_blocks, organization_id=workflow.organization_id, script_id=created_script.script_id, script_revision_id=created_script.script_revision_id, diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 71c664bb..73aa9da9 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -7,7 +7,7 @@ import os import uuid from dataclasses import dataclass from datetime import datetime -from typing import Any, cast +from typing import Any, Callable, cast import libcst as cst import structlog @@ -29,16 +29,20 @@ from skyvern.forge.sdk.schemas.files import FileInfo from skyvern.forge.sdk.schemas.tasks import Task, TaskOutput, TaskStatus from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock from skyvern.forge.sdk.workflow.models.block import ( + ActionBlock, CodeBlock, + ExtractionBlock, + FileDownloadBlock, FileParserBlock, FileUploadBlock, HttpRequestBlock, + LoginBlock, SendEmailBlock, TaskBlock, TextPromptBlock, UrlBlock, ) -from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, OutputParameter +from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, OutputParameter, ParameterType from skyvern.forge.sdk.workflow.models.workflow import Workflow from skyvern.schemas.runs import RunEngine from skyvern.schemas.scripts import CreateScriptResponse, FileEncoding, FileNode, ScriptFileCreate @@ -438,7 +442,16 @@ async def _record_output_parameter_value( # get the output_paramter output_parameter = workflow.get_output_parameter(label) if not output_parameter: - return + # NOT sure if this is legit hack to create output parameter like this + label = label or f"block_{uuid.uuid4()}" + output_parameter = OutputParameter( + output_parameter_id=str(uuid.uuid4()), + key=f"{label}_output", + workflow_id=workflow_id, + created_at=datetime.now(), + modified_at=datetime.now(), + parameter_type=ParameterType.OUTPUT, + ) await workflow_run_context.register_output_parameter_value_post_execution( parameter=output_parameter, @@ -527,14 +540,9 @@ async def _update_workflow_block( ) -async def _run_cached_function(cache_key: str) -> Any: - cached_fn = script_run_context_manager.get_cached_fn(cache_key) - if cached_fn: - # TODO: handle exceptions here and fall back to AI run in case of error - run_context = script_run_context_manager.ensure_run_context() - return await cached_fn(page=run_context.page, context=run_context) - else: - raise Exception(f"Cache key {cache_key} not found") +async def _run_cached_function(cached_fn: Callable) -> Any: + run_context = script_run_context_manager.ensure_run_context() + return await cached_fn(page=run_context.page, context=run_context) async def _fallback_to_ai_run( @@ -623,13 +631,15 @@ async def _fallback_to_ai_run( # get the output_paramter output_parameter = workflow.get_output_parameter(cache_key) if not output_parameter: - LOG.exception( - "Output parameter not found for the workflow", + # NOT sure if this is legit hack to create output parameter like this + output_parameter = OutputParameter( + output_parameter_id=str(uuid.uuid4()), + key=f"{cache_key}_output", workflow_id=workflow_id, - workflow_permanent_id=workflow_permanent_id, - workflow_run_id=workflow_run_id, + created_at=datetime.now(), + modified_at=datetime.now(), + parameter_type=ParameterType.OUTPUT, ) - return LOG.info( "Script starting to fallback to AI run", cache_key=cache_key, @@ -1026,21 +1036,27 @@ async def run_task( max_steps: int | None = None, totp_identifier: str | None = None, totp_url: str | None = None, + label: str | None = None, cache_key: str | None = None, + engine: RunEngine = RunEngine.skyvern_v1, + model: dict[str, Any] | None = None, ) -> None: - # Auto-create workflow block run and task if workflow_run_id is available - workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( - block_type=BlockType.TASK, - prompt=prompt, - url=url, - ) - # set the prompt in the RunContext - context = skyvern_context.ensure_context() - context.prompt = prompt + cache_key = cache_key or label + cached_fn = script_run_context_manager.get_cached_fn(cache_key) - if cache_key: + context: skyvern_context.SkyvernContext | None = None + if cache_key and cached_fn: + # Auto-create workflow block run and task if workflow_run_id is available + workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( + block_type=BlockType.TASK, + prompt=prompt, + url=url, + ) + # set the prompt in the RunContext + context = skyvern_context.ensure_context() + context.prompt = prompt try: - await _run_cached_function(cache_key) + await _run_cached_function(cached_fn) # Update block status to completed if workflow block was created if workflow_run_block_id: @@ -1069,39 +1085,52 @@ async def run_task( # clear the prompt in the RunContext context.prompt = None else: - if workflow_run_block_id: - await _update_workflow_block( - workflow_run_block_id, - BlockStatus.failed, - task_id=task_id, - task_status=TaskStatus.failed, - step_id=step_id, - step_status=StepStatus.failed, - failure_reason="Cache key is required", - ) - context.prompt = None - raise Exception("Cache key is required to run task block in a script") + block_validation_output = await _validate_and_get_output_parameter(label) + task_block = TaskBlock( + label=block_validation_output.label, + output_parameter=block_validation_output.output_parameter, + url=url, + navigation_goal=prompt, + max_steps_per_run=max_steps, + totp_identifier=totp_identifier, + totp_verification_url=totp_url, + include_action_history_in_verification=True, + engine=RunEngine.skyvern_v1, + ) + await task_block.execute_safe( + workflow_run_id=block_validation_output.workflow_run_id, + organization_id=block_validation_output.organization_id, + browser_session_id=block_validation_output.browser_session_id, + ) async def download( prompt: str, url: str | None = None, + complete_on_download: bool = True, max_steps: int | None = None, + totp_identifier: str | None = None, + totp_url: str | None = None, + label: str | None = None, cache_key: str | None = None, ) -> None: - # Auto-create workflow block run and task if workflow_run_id is available - workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( - block_type=BlockType.FILE_DOWNLOAD, - prompt=prompt, - url=url, - ) - # set the prompt in the RunContext - context = skyvern_context.ensure_context() - context.prompt = prompt + cache_key = cache_key or label + cached_fn = script_run_context_manager.get_cached_fn(cache_key) + + context: skyvern_context.SkyvernContext | None + if cache_key and cached_fn: + # Auto-create workflow block run and task if workflow_run_id is available + workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( + block_type=BlockType.FILE_DOWNLOAD, + prompt=prompt, + url=url, + ) + # set the prompt in the RunContext + context = skyvern_context.ensure_context() + context.prompt = prompt - if cache_key: try: - await _run_cached_function(cache_key) + await _run_cached_function(cached_fn) # Update block status to completed if workflow block was created if workflow_run_block_id: @@ -1121,25 +1150,31 @@ async def download( prompt=prompt, url=url, max_steps=max_steps, - complete_on_download=True, + complete_on_download=complete_on_download, error=e, workflow_run_block_id=workflow_run_block_id, ) finally: context.prompt = None else: - if workflow_run_block_id: - await _update_workflow_block( - workflow_run_block_id, - BlockStatus.failed, - task_id=task_id, - task_status=TaskStatus.failed, - step_id=step_id, - step_status=StepStatus.failed, - failure_reason="Cache key is required", - ) - context.prompt = None - raise Exception("Cache key is required to run task block in a script") + block_validation_output = await _validate_and_get_output_parameter(label) + file_download_block = FileDownloadBlock( + label=block_validation_output.label, + output_parameter=block_validation_output.output_parameter, + url=url, + complete_on_download=complete_on_download, + navigation_goal=prompt, + max_steps_per_run=max_steps, + totp_identifier=totp_identifier, + totp_verification_url=totp_url, + include_action_history_in_verification=True, + engine=RunEngine.skyvern_v1, + ) + await file_download_block.execute_safe( + workflow_run_id=block_validation_output.workflow_run_id, + organization_id=block_validation_output.organization_id, + browser_session_id=block_validation_output.browser_session_id, + ) async def action( @@ -1148,21 +1183,25 @@ async def action( max_steps: int | None = None, totp_identifier: str | None = None, totp_url: str | None = None, + label: str | None = None, cache_key: str | None = None, ) -> None: - # Auto-create workflow block run and task if workflow_run_id is available - workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( - block_type=BlockType.ACTION, - prompt=prompt, - url=url, - ) - # set the prompt in the RunContext - context = skyvern_context.ensure_context() - context.prompt = prompt + context: skyvern_context.SkyvernContext | None + cache_key = cache_key or label + cached_fn = script_run_context_manager.get_cached_fn(cache_key) + if cache_key and cached_fn: + # Auto-create workflow block run and task if workflow_run_id is available + workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( + block_type=BlockType.ACTION, + prompt=prompt, + url=url, + ) + # set the prompt in the RunContext + context = skyvern_context.ensure_context() + context.prompt = prompt - if cache_key: try: - await _run_cached_function(cache_key) + await _run_cached_function(cached_fn) # Update block status to completed if workflow block was created if workflow_run_block_id: @@ -1190,18 +1229,21 @@ async def action( finally: context.prompt = None else: - if workflow_run_block_id: - await _update_workflow_block( - workflow_run_block_id, - BlockStatus.failed, - task_id=task_id, - task_status=TaskStatus.failed, - step_id=step_id, - step_status=StepStatus.failed, - failure_reason="Cache key is required", - ) - context.prompt = None - raise Exception("Cache key is required to run task block in a script") + block_validation_output = await _validate_and_get_output_parameter(label) + action_block = ActionBlock( + label=block_validation_output.label, + output_parameter=block_validation_output.output_parameter, + url=url, + navigation_goal=prompt, + max_steps_per_run=max_steps, + totp_identifier=totp_identifier, + totp_verification_url=totp_url, + ) + await action_block.execute_safe( + workflow_run_id=block_validation_output.workflow_run_id, + organization_id=block_validation_output.organization_id, + browser_session_id=block_validation_output.browser_session_id, + ) async def login( @@ -1210,21 +1252,24 @@ async def login( max_steps: int | None = None, totp_identifier: str | None = None, totp_url: str | None = None, + label: str | None = None, cache_key: str | None = None, ) -> None: - # Auto-create workflow block run and task if workflow_run_id is available - workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( - block_type=BlockType.LOGIN, - prompt=prompt, - url=url, - ) - # set the prompt in the RunContext - context = skyvern_context.ensure_context() - context.prompt = prompt - - if cache_key: + context: skyvern_context.SkyvernContext | None + cache_key = cache_key or label + cached_fn = script_run_context_manager.get_cached_fn(cache_key) + if cache_key and cached_fn: + # Auto-create workflow block run and task if workflow_run_id is available + workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( + block_type=BlockType.LOGIN, + prompt=prompt, + url=url, + ) + # set the prompt in the RunContext + context = skyvern_context.ensure_context() + context.prompt = prompt try: - await _run_cached_function(cache_key) + await _run_cached_function(cached_fn) # Update block status to completed if workflow block was created if workflow_run_block_id: @@ -1252,18 +1297,21 @@ async def login( finally: context.prompt = None else: - if workflow_run_block_id: - await _update_workflow_block( - workflow_run_block_id, - BlockStatus.failed, - task_id=task_id, - task_status=TaskStatus.failed, - step_id=step_id, - step_status=StepStatus.failed, - failure_reason="Cache key is required", - ) - context.prompt = None - raise Exception("Cache key is required to run task block in a script") + block_validation_output = await _validate_and_get_output_parameter(label) + login_block = LoginBlock( + label=block_validation_output.label, + output_parameter=block_validation_output.output_parameter, + url=url, + navigation_goal=prompt, + max_steps_per_run=max_steps, + totp_identifier=totp_identifier, + totp_verification_url=totp_url, + ) + await login_block.execute_safe( + workflow_run_id=block_validation_output.workflow_run_id, + organization_id=block_validation_output.organization_id, + browser_session_id=block_validation_output.browser_session_id, + ) async def extract( @@ -1271,23 +1319,27 @@ async def extract( schema: dict[str, Any] | list | str | None = None, url: str | None = None, max_steps: int | None = None, + label: str | None = None, cache_key: str | None = None, ) -> dict[str, Any] | list | str | None: - # Auto-create workflow block run and task if workflow_run_id is available - workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( - block_type=BlockType.EXTRACTION, - prompt=prompt, - schema=schema, - url=url, - ) - # set the prompt in the RunContext - context = skyvern_context.ensure_context() - context.prompt = prompt output: dict[str, Any] | list | str | None = None - if cache_key: + context: skyvern_context.SkyvernContext | None + cache_key = cache_key or label + cached_fn = script_run_context_manager.get_cached_fn(cache_key) + if cache_key and cached_fn: + # Auto-create workflow block run and task if workflow_run_id is available + workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( + block_type=BlockType.EXTRACTION, + prompt=prompt, + schema=schema, + url=url, + ) + # set the prompt in the RunContext + context = skyvern_context.ensure_context() + context.prompt = prompt try: - output = cast(dict[str, Any] | list | str | None, await _run_cached_function(cache_key)) + output = cast(dict[str, Any] | list | str | None, await _run_cached_function(cached_fn)) # Update block status to completed if workflow block was created if workflow_run_block_id: @@ -1318,18 +1370,21 @@ async def extract( finally: context.prompt = None else: - if workflow_run_block_id: - await _update_workflow_block( - workflow_run_block_id, - BlockStatus.failed, - task_id=task_id, - task_status=TaskStatus.failed, - step_id=step_id, - step_status=StepStatus.failed, - failure_reason="Cache key is required", - ) - context.prompt = None - raise Exception("Cache key is required to run task block in a script") + block_validation_output = await _validate_and_get_output_parameter(label) + extraction_block = ExtractionBlock( + label=block_validation_output.label, + url=url, + data_extraction_goal=prompt, + max_steps_per_run=max_steps, + data_schema=schema, + output_parameter=block_validation_output.output_parameter, + ) + block_result = await extraction_block.execute_safe( + workflow_run_id=block_validation_output.workflow_run_id, + organization_id=block_validation_output.organization_id, + browser_session_id=block_validation_output.browser_session_id, + ) + return block_result.output_parameter_value async def wait(seconds: int) -> None: @@ -1441,10 +1496,21 @@ def render_template(template: str, data: dict[str, Any] | None = None) -> str: workflow_run_id = context.workflow_run_id workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id) template_data.update(workflow_run_context.values) + if template in template_data: + return template_data[template] return jinja_template.render(template_data) +def render_list(template: str, data: dict[str, Any] | None = None) -> list[str]: + rendered_value = render_template(template, data) + list_value = eval(rendered_value) + if isinstance(list_value, list): + return list_value + else: + return [list_value] + + # Non-task-based blocks ## Non-task-based block helpers @dataclass @@ -1476,7 +1542,15 @@ async def _validate_and_get_output_parameter(label: str | None = None) -> BlockV label = label or f"block_{uuid.uuid4()}" output_parameter = workflow.get_output_parameter(label) if not output_parameter: - raise Exception("Output parameter not found") + # NOT sure if this is legit hack to create output parameter like this + output_parameter = OutputParameter( + output_parameter_id=str(uuid.uuid4()), + key=f"{label}_output", + workflow_id=workflow_id, + created_at=datetime.now(), + modified_at=datetime.now(), + parameter_type=ParameterType.OUTPUT, + ) return BlockValidationOutput( label=label, output_parameter=output_parameter,