From d2a57400f4418ab7bf8dcfa74116a905a6530205 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sun, 27 Jul 2025 21:24:53 -0700 Subject: [PATCH] Revert "Natural language support for Loop Block (#3027)" (#3043) --- .../routes/workflows/editor/helpContent.ts | 2 +- skyvern/forge/sdk/workflow/models/block.py | 364 +++--------------- 2 files changed, 49 insertions(+), 317 deletions(-) diff --git a/skyvern-frontend/src/routes/workflows/editor/helpContent.ts b/skyvern-frontend/src/routes/workflows/editor/helpContent.ts index 86215fe9..c17713ba 100644 --- a/skyvern-frontend/src/routes/workflows/editor/helpContent.ts +++ b/skyvern-frontend/src/routes/workflows/editor/helpContent.ts @@ -72,7 +72,7 @@ export const helpTooltips = { loop: { ...baseHelpTooltipContent, loopValue: - "Define what to iterate over using either a parameter key (e.g., my_extraction_output.extracted_information.results) or a clear, specific natural language prompt (e.g., 'Extract URLs of top 3 articles and visit each one').", + "Define this parameterized field with a parameter key to let Skyvern know the core value you're iterating over.", }, sendEmail: { ...baseHelpTooltipContent, diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index e064bda3..2bfe8ed3 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -11,7 +11,6 @@ import textwrap import uuid from collections import defaultdict from dataclasses import dataclass -from datetime import datetime from email.message import EmailMessage from enum import StrEnum from pathlib import Path @@ -59,9 +58,11 @@ from skyvern.forge.sdk.trace import TraceManager from skyvern.forge.sdk.workflow.context_manager import BlockMetadata, WorkflowRunContext from skyvern.forge.sdk.workflow.exceptions import ( CustomizedCodeException, + FailedToFormatJinjaStyleParameter, InsecureCodeDetected, InvalidEmailClientConfiguration, InvalidFileType, + NoIterableValueFound, NoValidEmailRecipient, ) from skyvern.forge.sdk.workflow.models.constants import FileStorageType @@ -70,7 +71,7 @@ from skyvern.forge.sdk.workflow.models.parameter import ( AWSSecretParameter, ContextParameter, OutputParameter, - ParameterType, + WorkflowParameter, ) from skyvern.schemas.runs import RunEngine from skyvern.utils.url_validators import prepend_scheme_and_validate_url @@ -899,13 +900,6 @@ class ForLoopBlock(Block): if isinstance(parameter, ContextParameter): context_parameters.append(parameter) - # Always add current_value to context parameters - workflow_run_context = self.get_workflow_run_context(workflow_run_id) - current_value_param = ContextParameter( - key="current_value", value=loop_data, source=workflow_run_context.get_parameter(self.output_parameter.key) - ) - context_parameters.append(current_value_param) - if self.loop_over is None: return context_parameters @@ -928,309 +922,54 @@ class ForLoopBlock(Block): return context_parameters - def _create_block_from_sequence( - self, - block_type: str, - parameters: dict[str, Any], - idx: int, - ) -> BlockTypeVar | None: - """Create a block instance based on type and parameters. - - Args: - block_type: The type of block to create (goto_url, extraction, etc.) - parameters: Block-specific parameters - idx: Index in the sequence (used for labeling) - - Returns: - BlockTypeVar | None: The created block or None if type not supported - """ - # Create output parameter for this block - output_param = OutputParameter( - output_parameter_id=str(uuid.uuid4()), - key=f"{self.label}_block_{idx}_output", - workflow_id=self.output_parameter.workflow_id, - created_at=datetime.now(), - modified_at=datetime.now(), - parameter_type=ParameterType.OUTPUT, - description=f"Output parameter for {block_type} block {idx}", - ) - - # Create the block with its parameters - if block_type == "goto_url": - return UrlBlock(label=f"{self.label}_block_{idx}", url=parameters["url"], output_parameter=output_param) - elif block_type == "extraction": - return ExtractionBlock( - label=f"{self.label}_block_{idx}", - data_extraction_goal=parameters["data_extraction_goal"], - data_schema=parameters.get("data_schema"), - engine=RunEngine.skyvern_v1, - output_parameter=output_param, - ) - elif block_type == "file_download": - return FileDownloadBlock( - label=f"{self.label}_block_{idx}", url=parameters["url"], output_parameter=output_param - ) - elif block_type == "task_v2": - return TaskV2Block( - label=f"{self.label}_block_{idx}", prompt=parameters["prompt"], output_parameter=output_param - ) - elif block_type == "action": - return ActionBlock( - label=f"{self.label}_block_{idx}", action=parameters["action"], output_parameter=output_param - ) - else: - LOG.warning(f"Unsupported block type: {block_type}") - return None - - def _create_initial_extraction_block(self, natural_language_prompt: str) -> ExtractionBlock: - """Create the initial extraction block that determines what to iterate over. - - Args: - natural_language_prompt: The user's natural language description - - Returns: - ExtractionBlock: Configured to extract items and determine block sequence - """ - return ExtractionBlock( - label=f"{self.label}_extraction", - data_extraction_goal=f"Extract the items to iterate over and determine the sequence of blocks needed. Natural language prompt: {natural_language_prompt}", - data_schema={ - "type": "object", - "properties": { - "results": { - "type": "array", - "items": { - "type": "object", - "properties": { - "data": {"type": "object", "description": "The data to iterate over (URLs, IDs, etc.)"} - }, - "required": ["data"], - }, - }, - "block_sequence": { - "type": "array", - "items": { - "type": "object", - "properties": { - "block_type": { - "type": "string", - "description": "Type of block to execute (goto_url, extraction, etc.)", - }, - "parameters": { - "type": "object", - "description": "Parameters for this block (url, data_extraction_goal, etc.)", - "properties": { - "url": { - "type": "string", - "description": "For URL-based blocks, use current_value.data.X (wrapped by 2 curly brackets) to reference extracted data ", - }, - "data_extraction_goal": { - "type": "string", - "description": "For extraction blocks, what to extract from the page", - }, - }, - }, - }, - "required": ["block_type", "parameters"], - }, - }, - }, - "required": ["results", "block_sequence"], - }, - engine=RunEngine.skyvern_v1, - output_parameter=self.output_parameter, - ) - - async def get_loop_over_parameter_values( - self, - workflow_run_context: WorkflowRunContext, - workflow_run_id: str, - workflow_run_block_id: str, - organization_id: str | None = None, - ) -> list[Any]: - parameter_value = None + def get_loop_over_parameter_values(self, workflow_run_context: WorkflowRunContext) -> list[Any]: + # parse the value from self.loop_variable_reference and then from self.loop_over if self.loop_variable_reference: - LOG.debug("Processing loop variable reference", loop_variable_reference=self.loop_variable_reference) + value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}" + try: + value_json = self.format_block_parameter_template_from_workflow_run_context( + value_template, workflow_run_context + ) + except Exception as e: + raise FailedToFormatJinjaStyleParameter(value_template, str(e)) + parameter_value = json.loads(value_json) - # Check if this looks like a parameter path (contains dots and/or _output) - is_likely_parameter_path = "." in self.loop_variable_reference or "_output" in self.loop_variable_reference - - # Try parsing as Jinja template - parameter_value = self.try_parse_jinja_template(workflow_run_context) - - if parameter_value is None and not is_likely_parameter_path: - try: - # Create and execute extraction block - extraction_block = self._create_initial_extraction_block(self.loop_variable_reference) - - LOG.info( - "Processing natural language loop input", - prompt=self.loop_variable_reference, - extraction_goal=extraction_block.data_extraction_goal, - ) - - extraction_result = await extraction_block.execute( - workflow_run_id=workflow_run_id, - workflow_run_block_id=workflow_run_block_id, - organization_id=organization_id, - ) - - if not extraction_result.success: - LOG.error("Extraction block failed", failure_reason=extraction_result.failure_reason) - raise ValueError(f"Extraction block failed: {extraction_result.failure_reason}") - - LOG.debug("Extraction block succeeded", output=extraction_result.output_parameter_value) - - # Store the extraction result in the workflow context - await extraction_block.record_output_parameter_value( - workflow_run_context=workflow_run_context, - workflow_run_id=workflow_run_id, - value=extraction_result.output_parameter_value, - ) - - # Get the extracted information and create blocks - if not isinstance(extraction_result.output_parameter_value, dict): - LOG.error( - "Extraction result output_parameter_value is not a dict", - output_parameter_value=extraction_result.output_parameter_value, - ) - raise ValueError("Extraction result output_parameter_value is not a dictionary") - - if "extracted_information" not in extraction_result.output_parameter_value: - LOG.error( - "Extraction result missing extracted_information key", - output_parameter_value=extraction_result.output_parameter_value, - ) - raise ValueError("Extraction result missing extracted_information key") - - extracted_info = extraction_result.output_parameter_value["extracted_information"] - - # Handle different possible structures of extracted_info - if isinstance(extracted_info, list): - # If it's a list, take the first element - if len(extracted_info) > 0: - extracted_info = extracted_info[0] + elif self.loop_over is not None: + if isinstance(self.loop_over, WorkflowParameter): + parameter_value = workflow_run_context.get_value(self.loop_over.key) + elif isinstance(self.loop_over, OutputParameter): + # If the output parameter is for a TaskBlock, it will be a TaskOutput object. We need to extract the + # value from the TaskOutput object's extracted_information field. + output_parameter_value = workflow_run_context.get_value(self.loop_over.key) + if isinstance(output_parameter_value, dict) and "extracted_information" in output_parameter_value: + parameter_value = output_parameter_value["extracted_information"] + else: + parameter_value = output_parameter_value + elif isinstance(self.loop_over, ContextParameter): + parameter_value = self.loop_over.value + if not parameter_value: + source_parameter_value = workflow_run_context.get_value(self.loop_over.source.key) + if isinstance(source_parameter_value, dict): + if "extracted_information" in source_parameter_value: + parameter_value = source_parameter_value["extracted_information"].get(self.loop_over.key) else: - LOG.error("Extracted information list is empty") - raise ValueError("Extracted information list is empty") + parameter_value = source_parameter_value.get(self.loop_over.key) + else: + raise ValueError("ContextParameter source value should be a dict") + else: + raise NotImplementedError() - # At this point, extracted_info should be a dict - if not isinstance(extracted_info, dict): - LOG.error("Invalid extraction result structure - not a dict", extracted_info=extracted_info) - raise ValueError("Extraction result is not a dictionary") + else: + if self.complete_if_empty: + return [] + else: + raise NoIterableValueFound() - if "block_sequence" not in extracted_info: - LOG.error( - "Invalid extraction result structure - missing block_sequence", - extracted_info=extracted_info, - ) - raise ValueError("Extraction result does not contain expected block_sequence") - - # At this point, extracted_info is guaranteed to be a dict with block_sequence - block_sequence = extracted_info["block_sequence"] - self.loop_blocks = [] - - # Create blocks based on the sequence - for idx, block_info in enumerate(block_sequence): - block_type = block_info["block_type"].lower() - block = self._create_block_from_sequence(block_type, block_info["parameters"], idx) - if block: - self.loop_blocks.append(block) - - LOG.info( - "Generated loop blocks from extraction output", - num_blocks=len(self.loop_blocks), - block_types=[b.block_type for b in self.loop_blocks], - ) - - # Update the loop variable reference to point to the extraction results - self.loop_variable_reference = f"{self.label}_output.extracted_information.results" - - # Try parsing again with the new reference - parameter_value = self.try_parse_jinja_template(workflow_run_context) - if parameter_value is None: - raise ValueError("Failed to get values after extraction") - except Exception as e: - LOG.error("Error during extraction", error=str(e), exc_info=True) - raise - elif parameter_value is None and is_likely_parameter_path: - # If this looks like a parameter path but parsing failed, raise a more specific error - LOG.error("Failed to parse parameter path", loop_variable_reference=self.loop_variable_reference) - raise ValueError(f"Failed to parse parameter path: {self.loop_variable_reference}") - - if parameter_value is None: - raise ValueError("No parameter value found") - - if not isinstance(parameter_value, list): - raise ValueError("Parameter value must be a list") - - LOG.debug("Successfully got loop values", num_values=len(parameter_value)) - return parameter_value - - def try_parse_jinja_template(self, workflow_run_context: WorkflowRunContext) -> Any | None: - if not self.loop_variable_reference: - return None - try: - # Log available parameters in context for debugging - available_params = { - key: value - for key, value in workflow_run_context.values.items() - if isinstance(key, str) and "_output" in key - } - LOG.debug( - "Available output parameters in context", - params=list(available_params.keys()), - values=available_params, - loop_variable_reference=self.loop_variable_reference, - ) - - # Loop variable reference is critical to the extraction block, so we add multiple access patterns - # to handle different cases - # 1. Try direct access to the parameter (for cases where it's already a list) - try: - value_template = f"{{{{ {self.loop_variable_reference.split('.')[0]} | tojson }}}}" - value_json = self.format_block_parameter_template_from_workflow_run_context( - value_template, workflow_run_context - ) - value = json.loads(value_json) - if isinstance(value, list): - LOG.debug("Successfully parsed as direct list", value=value) - return value - except Exception as e: - LOG.debug("Direct list access failed", error=str(e)) - - # 2. Try accessing through extracted_information.results - try: - value_template = ( - f"{{{{ {self.loop_variable_reference.strip(' {}')}.extracted_information.results | tojson }}}}" - ) - value_json = self.format_block_parameter_template_from_workflow_run_context( - value_template, workflow_run_context - ) - value = json.loads(value_json) - if isinstance(value, list): - LOG.debug("Successfully parsed through extracted_information.results", value=value) - return value - except Exception as e: - LOG.debug("Extracted information access failed", error=str(e)) - - # 3. Try the exact path as given (fallback) - try: - value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}" - value_json = self.format_block_parameter_template_from_workflow_run_context( - value_template, workflow_run_context - ) - value = json.loads(value_json) - if isinstance(value, list): - LOG.debug("Successfully parsed using exact path", value=value) - return value - except Exception as e: - LOG.debug("Exact path access failed", error=str(e)) - - return None - except Exception as e: - LOG.debug("Template parsing failed", error=str(e)) - return None + if isinstance(parameter_value, list): + return parameter_value + else: + # TODO (kerem): Should we raise an error here? + return [parameter_value] async def execute_loop_helper( self, @@ -1349,19 +1088,12 @@ class ForLoopBlock(Block): **kwargs: dict, ) -> BlockResult: workflow_run_context = self.get_workflow_run_context(workflow_run_id) - - # Get loop values to iterate over - this now handles both Jinja and natural language try: - loop_over_values = await self.get_loop_over_parameter_values( - workflow_run_context, - workflow_run_id=workflow_run_id, - workflow_run_block_id=workflow_run_block_id, - organization_id=organization_id, - ) + loop_over_values = self.get_loop_over_parameter_values(workflow_run_context) except Exception as e: return await self.build_block_result( success=False, - failure_reason=f"Failed to get loop values: {str(e)}", + failure_reason=f"failed to get loop values: {str(e)}", status=BlockStatus.failed, workflow_run_block_id=workflow_run_block_id, organization_id=organization_id,