From 3cdb10a7f23945da3eb4af1936240e6aec04365d Mon Sep 17 00:00:00 2001 From: PHSB <36764637+pedrodebruin@users.noreply.github.com> Date: Fri, 25 Jul 2025 11:58:02 -0700 Subject: [PATCH] Natural language support for Loop Block (#3027) Co-authored-by: Pedro Henrique Sales de Bruin --- .../routes/workflows/editor/helpContent.ts | 2 +- skyvern/forge/sdk/workflow/models/block.py | 356 +++++++++++++++--- 2 files changed, 313 insertions(+), 45 deletions(-) diff --git a/skyvern-frontend/src/routes/workflows/editor/helpContent.ts b/skyvern-frontend/src/routes/workflows/editor/helpContent.ts index c17713ba..86215fe9 100644 --- a/skyvern-frontend/src/routes/workflows/editor/helpContent.ts +++ b/skyvern-frontend/src/routes/workflows/editor/helpContent.ts @@ -72,7 +72,7 @@ export const helpTooltips = { loop: { ...baseHelpTooltipContent, loopValue: - "Define this parameterized field with a parameter key to let Skyvern know the core value you're iterating over.", + "Define what to iterate over using either a parameter key (e.g., my_extraction_output.extracted_information.results) or a clear, specific natural language prompt (e.g., 'Extract URLs of top 3 articles and visit each one').", }, sendEmail: { ...baseHelpTooltipContent, diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 2bfe8ed3..e064bda3 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -11,6 +11,7 @@ import textwrap import uuid from collections import defaultdict from dataclasses import dataclass +from datetime import datetime from email.message import EmailMessage from enum import StrEnum from pathlib import Path @@ -58,11 +59,9 @@ from skyvern.forge.sdk.trace import TraceManager from skyvern.forge.sdk.workflow.context_manager import BlockMetadata, WorkflowRunContext from skyvern.forge.sdk.workflow.exceptions import ( CustomizedCodeException, - FailedToFormatJinjaStyleParameter, InsecureCodeDetected, InvalidEmailClientConfiguration, InvalidFileType, - NoIterableValueFound, NoValidEmailRecipient, ) from skyvern.forge.sdk.workflow.models.constants import FileStorageType @@ -71,7 +70,7 @@ from skyvern.forge.sdk.workflow.models.parameter import ( AWSSecretParameter, ContextParameter, OutputParameter, - WorkflowParameter, + ParameterType, ) from skyvern.schemas.runs import RunEngine from skyvern.utils.url_validators import prepend_scheme_and_validate_url @@ -900,6 +899,13 @@ class ForLoopBlock(Block): if isinstance(parameter, ContextParameter): context_parameters.append(parameter) + # Always add current_value to context parameters + workflow_run_context = self.get_workflow_run_context(workflow_run_id) + current_value_param = ContextParameter( + key="current_value", value=loop_data, source=workflow_run_context.get_parameter(self.output_parameter.key) + ) + context_parameters.append(current_value_param) + if self.loop_over is None: return context_parameters @@ -922,54 +928,309 @@ class ForLoopBlock(Block): return context_parameters - def get_loop_over_parameter_values(self, workflow_run_context: WorkflowRunContext) -> list[Any]: - # parse the value from self.loop_variable_reference and then from self.loop_over + def _create_block_from_sequence( + self, + block_type: str, + parameters: dict[str, Any], + idx: int, + ) -> BlockTypeVar | None: + """Create a block instance based on type and parameters. + + Args: + block_type: The type of block to create (goto_url, extraction, etc.) + parameters: Block-specific parameters + idx: Index in the sequence (used for labeling) + + Returns: + BlockTypeVar | None: The created block or None if type not supported + """ + # Create output parameter for this block + output_param = OutputParameter( + output_parameter_id=str(uuid.uuid4()), + key=f"{self.label}_block_{idx}_output", + workflow_id=self.output_parameter.workflow_id, + created_at=datetime.now(), + modified_at=datetime.now(), + parameter_type=ParameterType.OUTPUT, + description=f"Output parameter for {block_type} block {idx}", + ) + + # Create the block with its parameters + if block_type == "goto_url": + return UrlBlock(label=f"{self.label}_block_{idx}", url=parameters["url"], output_parameter=output_param) + elif block_type == "extraction": + return ExtractionBlock( + label=f"{self.label}_block_{idx}", + data_extraction_goal=parameters["data_extraction_goal"], + data_schema=parameters.get("data_schema"), + engine=RunEngine.skyvern_v1, + output_parameter=output_param, + ) + elif block_type == "file_download": + return FileDownloadBlock( + label=f"{self.label}_block_{idx}", url=parameters["url"], output_parameter=output_param + ) + elif block_type == "task_v2": + return TaskV2Block( + label=f"{self.label}_block_{idx}", prompt=parameters["prompt"], output_parameter=output_param + ) + elif block_type == "action": + return ActionBlock( + label=f"{self.label}_block_{idx}", action=parameters["action"], output_parameter=output_param + ) + else: + LOG.warning(f"Unsupported block type: {block_type}") + return None + + def _create_initial_extraction_block(self, natural_language_prompt: str) -> ExtractionBlock: + """Create the initial extraction block that determines what to iterate over. + + Args: + natural_language_prompt: The user's natural language description + + Returns: + ExtractionBlock: Configured to extract items and determine block sequence + """ + return ExtractionBlock( + label=f"{self.label}_extraction", + data_extraction_goal=f"Extract the items to iterate over and determine the sequence of blocks needed. Natural language prompt: {natural_language_prompt}", + data_schema={ + "type": "object", + "properties": { + "results": { + "type": "array", + "items": { + "type": "object", + "properties": { + "data": {"type": "object", "description": "The data to iterate over (URLs, IDs, etc.)"} + }, + "required": ["data"], + }, + }, + "block_sequence": { + "type": "array", + "items": { + "type": "object", + "properties": { + "block_type": { + "type": "string", + "description": "Type of block to execute (goto_url, extraction, etc.)", + }, + "parameters": { + "type": "object", + "description": "Parameters for this block (url, data_extraction_goal, etc.)", + "properties": { + "url": { + "type": "string", + "description": "For URL-based blocks, use current_value.data.X (wrapped by 2 curly brackets) to reference extracted data ", + }, + "data_extraction_goal": { + "type": "string", + "description": "For extraction blocks, what to extract from the page", + }, + }, + }, + }, + "required": ["block_type", "parameters"], + }, + }, + }, + "required": ["results", "block_sequence"], + }, + engine=RunEngine.skyvern_v1, + output_parameter=self.output_parameter, + ) + + async def get_loop_over_parameter_values( + self, + workflow_run_context: WorkflowRunContext, + workflow_run_id: str, + workflow_run_block_id: str, + organization_id: str | None = None, + ) -> list[Any]: + parameter_value = None if self.loop_variable_reference: - value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}" + LOG.debug("Processing loop variable reference", loop_variable_reference=self.loop_variable_reference) + + # Check if this looks like a parameter path (contains dots and/or _output) + is_likely_parameter_path = "." in self.loop_variable_reference or "_output" in self.loop_variable_reference + + # Try parsing as Jinja template + parameter_value = self.try_parse_jinja_template(workflow_run_context) + + if parameter_value is None and not is_likely_parameter_path: + try: + # Create and execute extraction block + extraction_block = self._create_initial_extraction_block(self.loop_variable_reference) + + LOG.info( + "Processing natural language loop input", + prompt=self.loop_variable_reference, + extraction_goal=extraction_block.data_extraction_goal, + ) + + extraction_result = await extraction_block.execute( + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + + if not extraction_result.success: + LOG.error("Extraction block failed", failure_reason=extraction_result.failure_reason) + raise ValueError(f"Extraction block failed: {extraction_result.failure_reason}") + + LOG.debug("Extraction block succeeded", output=extraction_result.output_parameter_value) + + # Store the extraction result in the workflow context + await extraction_block.record_output_parameter_value( + workflow_run_context=workflow_run_context, + workflow_run_id=workflow_run_id, + value=extraction_result.output_parameter_value, + ) + + # Get the extracted information and create blocks + if not isinstance(extraction_result.output_parameter_value, dict): + LOG.error( + "Extraction result output_parameter_value is not a dict", + output_parameter_value=extraction_result.output_parameter_value, + ) + raise ValueError("Extraction result output_parameter_value is not a dictionary") + + if "extracted_information" not in extraction_result.output_parameter_value: + LOG.error( + "Extraction result missing extracted_information key", + output_parameter_value=extraction_result.output_parameter_value, + ) + raise ValueError("Extraction result missing extracted_information key") + + extracted_info = extraction_result.output_parameter_value["extracted_information"] + + # Handle different possible structures of extracted_info + if isinstance(extracted_info, list): + # If it's a list, take the first element + if len(extracted_info) > 0: + extracted_info = extracted_info[0] + else: + LOG.error("Extracted information list is empty") + raise ValueError("Extracted information list is empty") + + # At this point, extracted_info should be a dict + if not isinstance(extracted_info, dict): + LOG.error("Invalid extraction result structure - not a dict", extracted_info=extracted_info) + raise ValueError("Extraction result is not a dictionary") + + if "block_sequence" not in extracted_info: + LOG.error( + "Invalid extraction result structure - missing block_sequence", + extracted_info=extracted_info, + ) + raise ValueError("Extraction result does not contain expected block_sequence") + + # At this point, extracted_info is guaranteed to be a dict with block_sequence + block_sequence = extracted_info["block_sequence"] + self.loop_blocks = [] + + # Create blocks based on the sequence + for idx, block_info in enumerate(block_sequence): + block_type = block_info["block_type"].lower() + block = self._create_block_from_sequence(block_type, block_info["parameters"], idx) + if block: + self.loop_blocks.append(block) + + LOG.info( + "Generated loop blocks from extraction output", + num_blocks=len(self.loop_blocks), + block_types=[b.block_type for b in self.loop_blocks], + ) + + # Update the loop variable reference to point to the extraction results + self.loop_variable_reference = f"{self.label}_output.extracted_information.results" + + # Try parsing again with the new reference + parameter_value = self.try_parse_jinja_template(workflow_run_context) + if parameter_value is None: + raise ValueError("Failed to get values after extraction") + except Exception as e: + LOG.error("Error during extraction", error=str(e), exc_info=True) + raise + elif parameter_value is None and is_likely_parameter_path: + # If this looks like a parameter path but parsing failed, raise a more specific error + LOG.error("Failed to parse parameter path", loop_variable_reference=self.loop_variable_reference) + raise ValueError(f"Failed to parse parameter path: {self.loop_variable_reference}") + + if parameter_value is None: + raise ValueError("No parameter value found") + + if not isinstance(parameter_value, list): + raise ValueError("Parameter value must be a list") + + LOG.debug("Successfully got loop values", num_values=len(parameter_value)) + return parameter_value + + def try_parse_jinja_template(self, workflow_run_context: WorkflowRunContext) -> Any | None: + if not self.loop_variable_reference: + return None + try: + # Log available parameters in context for debugging + available_params = { + key: value + for key, value in workflow_run_context.values.items() + if isinstance(key, str) and "_output" in key + } + LOG.debug( + "Available output parameters in context", + params=list(available_params.keys()), + values=available_params, + loop_variable_reference=self.loop_variable_reference, + ) + + # Loop variable reference is critical to the extraction block, so we add multiple access patterns + # to handle different cases + # 1. Try direct access to the parameter (for cases where it's already a list) try: + value_template = f"{{{{ {self.loop_variable_reference.split('.')[0]} | tojson }}}}" value_json = self.format_block_parameter_template_from_workflow_run_context( value_template, workflow_run_context ) + value = json.loads(value_json) + if isinstance(value, list): + LOG.debug("Successfully parsed as direct list", value=value) + return value except Exception as e: - raise FailedToFormatJinjaStyleParameter(value_template, str(e)) - parameter_value = json.loads(value_json) + LOG.debug("Direct list access failed", error=str(e)) - elif self.loop_over is not None: - if isinstance(self.loop_over, WorkflowParameter): - parameter_value = workflow_run_context.get_value(self.loop_over.key) - elif isinstance(self.loop_over, OutputParameter): - # If the output parameter is for a TaskBlock, it will be a TaskOutput object. We need to extract the - # value from the TaskOutput object's extracted_information field. - output_parameter_value = workflow_run_context.get_value(self.loop_over.key) - if isinstance(output_parameter_value, dict) and "extracted_information" in output_parameter_value: - parameter_value = output_parameter_value["extracted_information"] - else: - parameter_value = output_parameter_value - elif isinstance(self.loop_over, ContextParameter): - parameter_value = self.loop_over.value - if not parameter_value: - source_parameter_value = workflow_run_context.get_value(self.loop_over.source.key) - if isinstance(source_parameter_value, dict): - if "extracted_information" in source_parameter_value: - parameter_value = source_parameter_value["extracted_information"].get(self.loop_over.key) - else: - parameter_value = source_parameter_value.get(self.loop_over.key) - else: - raise ValueError("ContextParameter source value should be a dict") - else: - raise NotImplementedError() + # 2. Try accessing through extracted_information.results + try: + value_template = ( + f"{{{{ {self.loop_variable_reference.strip(' {}')}.extracted_information.results | tojson }}}}" + ) + value_json = self.format_block_parameter_template_from_workflow_run_context( + value_template, workflow_run_context + ) + value = json.loads(value_json) + if isinstance(value, list): + LOG.debug("Successfully parsed through extracted_information.results", value=value) + return value + except Exception as e: + LOG.debug("Extracted information access failed", error=str(e)) - else: - if self.complete_if_empty: - return [] - else: - raise NoIterableValueFound() + # 3. Try the exact path as given (fallback) + try: + value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}" + value_json = self.format_block_parameter_template_from_workflow_run_context( + value_template, workflow_run_context + ) + value = json.loads(value_json) + if isinstance(value, list): + LOG.debug("Successfully parsed using exact path", value=value) + return value + except Exception as e: + LOG.debug("Exact path access failed", error=str(e)) - if isinstance(parameter_value, list): - return parameter_value - else: - # TODO (kerem): Should we raise an error here? - return [parameter_value] + return None + except Exception as e: + LOG.debug("Template parsing failed", error=str(e)) + return None async def execute_loop_helper( self, @@ -1088,12 +1349,19 @@ class ForLoopBlock(Block): **kwargs: dict, ) -> BlockResult: workflow_run_context = self.get_workflow_run_context(workflow_run_id) + + # Get loop values to iterate over - this now handles both Jinja and natural language try: - loop_over_values = self.get_loop_over_parameter_values(workflow_run_context) + loop_over_values = await self.get_loop_over_parameter_values( + workflow_run_context, + workflow_run_id=workflow_run_id, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) except Exception as e: return await self.build_block_result( success=False, - failure_reason=f"failed to get loop values: {str(e)}", + failure_reason=f"Failed to get loop values: {str(e)}", status=BlockStatus.failed, workflow_run_block_id=workflow_run_block_id, organization_id=organization_id,