Natural language support for Loop Block (#3027)

Co-authored-by: Pedro Henrique Sales de Bruin <pedrodebruin@Pedros-MacBook-Air.local>
This commit is contained in:
PHSB
2025-07-25 11:58:02 -07:00
committed by GitHub
parent bff7544b83
commit 3cdb10a7f2
2 changed files with 313 additions and 45 deletions

View File

@@ -72,7 +72,7 @@ export const helpTooltips = {
loop: {
...baseHelpTooltipContent,
loopValue:
"Define this parameterized field with a parameter key to let Skyvern know the core value you're iterating over.",
"Define what to iterate over using either a parameter key (e.g., my_extraction_output.extracted_information.results) or a clear, specific natural language prompt (e.g., 'Extract URLs of top 3 articles and visit each one').",
},
sendEmail: {
...baseHelpTooltipContent,

View File

@@ -11,6 +11,7 @@ import textwrap
import uuid
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from email.message import EmailMessage
from enum import StrEnum
from pathlib import Path
@@ -58,11 +59,9 @@ from skyvern.forge.sdk.trace import TraceManager
from skyvern.forge.sdk.workflow.context_manager import BlockMetadata, WorkflowRunContext
from skyvern.forge.sdk.workflow.exceptions import (
CustomizedCodeException,
FailedToFormatJinjaStyleParameter,
InsecureCodeDetected,
InvalidEmailClientConfiguration,
InvalidFileType,
NoIterableValueFound,
NoValidEmailRecipient,
)
from skyvern.forge.sdk.workflow.models.constants import FileStorageType
@@ -71,7 +70,7 @@ from skyvern.forge.sdk.workflow.models.parameter import (
AWSSecretParameter,
ContextParameter,
OutputParameter,
WorkflowParameter,
ParameterType,
)
from skyvern.schemas.runs import RunEngine
from skyvern.utils.url_validators import prepend_scheme_and_validate_url
@@ -900,6 +899,13 @@ class ForLoopBlock(Block):
if isinstance(parameter, ContextParameter):
context_parameters.append(parameter)
# Always add current_value to context parameters
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
current_value_param = ContextParameter(
key="current_value", value=loop_data, source=workflow_run_context.get_parameter(self.output_parameter.key)
)
context_parameters.append(current_value_param)
if self.loop_over is None:
return context_parameters
@@ -922,54 +928,309 @@ class ForLoopBlock(Block):
return context_parameters
def get_loop_over_parameter_values(self, workflow_run_context: WorkflowRunContext) -> list[Any]:
# parse the value from self.loop_variable_reference and then from self.loop_over
def _create_block_from_sequence(
self,
block_type: str,
parameters: dict[str, Any],
idx: int,
) -> BlockTypeVar | None:
"""Create a block instance based on type and parameters.
Args:
block_type: The type of block to create (goto_url, extraction, etc.)
parameters: Block-specific parameters
idx: Index in the sequence (used for labeling)
Returns:
BlockTypeVar | None: The created block or None if type not supported
"""
# Create output parameter for this block
output_param = OutputParameter(
output_parameter_id=str(uuid.uuid4()),
key=f"{self.label}_block_{idx}_output",
workflow_id=self.output_parameter.workflow_id,
created_at=datetime.now(),
modified_at=datetime.now(),
parameter_type=ParameterType.OUTPUT,
description=f"Output parameter for {block_type} block {idx}",
)
# Create the block with its parameters
if block_type == "goto_url":
return UrlBlock(label=f"{self.label}_block_{idx}", url=parameters["url"], output_parameter=output_param)
elif block_type == "extraction":
return ExtractionBlock(
label=f"{self.label}_block_{idx}",
data_extraction_goal=parameters["data_extraction_goal"],
data_schema=parameters.get("data_schema"),
engine=RunEngine.skyvern_v1,
output_parameter=output_param,
)
elif block_type == "file_download":
return FileDownloadBlock(
label=f"{self.label}_block_{idx}", url=parameters["url"], output_parameter=output_param
)
elif block_type == "task_v2":
return TaskV2Block(
label=f"{self.label}_block_{idx}", prompt=parameters["prompt"], output_parameter=output_param
)
elif block_type == "action":
return ActionBlock(
label=f"{self.label}_block_{idx}", action=parameters["action"], output_parameter=output_param
)
else:
LOG.warning(f"Unsupported block type: {block_type}")
return None
def _create_initial_extraction_block(self, natural_language_prompt: str) -> ExtractionBlock:
"""Create the initial extraction block that determines what to iterate over.
Args:
natural_language_prompt: The user's natural language description
Returns:
ExtractionBlock: Configured to extract items and determine block sequence
"""
return ExtractionBlock(
label=f"{self.label}_extraction",
data_extraction_goal=f"Extract the items to iterate over and determine the sequence of blocks needed. Natural language prompt: {natural_language_prompt}",
data_schema={
"type": "object",
"properties": {
"results": {
"type": "array",
"items": {
"type": "object",
"properties": {
"data": {"type": "object", "description": "The data to iterate over (URLs, IDs, etc.)"}
},
"required": ["data"],
},
},
"block_sequence": {
"type": "array",
"items": {
"type": "object",
"properties": {
"block_type": {
"type": "string",
"description": "Type of block to execute (goto_url, extraction, etc.)",
},
"parameters": {
"type": "object",
"description": "Parameters for this block (url, data_extraction_goal, etc.)",
"properties": {
"url": {
"type": "string",
"description": "For URL-based blocks, use current_value.data.X (wrapped by 2 curly brackets) to reference extracted data ",
},
"data_extraction_goal": {
"type": "string",
"description": "For extraction blocks, what to extract from the page",
},
},
},
},
"required": ["block_type", "parameters"],
},
},
},
"required": ["results", "block_sequence"],
},
engine=RunEngine.skyvern_v1,
output_parameter=self.output_parameter,
)
async def get_loop_over_parameter_values(
self,
workflow_run_context: WorkflowRunContext,
workflow_run_id: str,
workflow_run_block_id: str,
organization_id: str | None = None,
) -> list[Any]:
parameter_value = None
if self.loop_variable_reference:
value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}"
LOG.debug("Processing loop variable reference", loop_variable_reference=self.loop_variable_reference)
# Check if this looks like a parameter path (contains dots and/or _output)
is_likely_parameter_path = "." in self.loop_variable_reference or "_output" in self.loop_variable_reference
# Try parsing as Jinja template
parameter_value = self.try_parse_jinja_template(workflow_run_context)
if parameter_value is None and not is_likely_parameter_path:
try:
# Create and execute extraction block
extraction_block = self._create_initial_extraction_block(self.loop_variable_reference)
LOG.info(
"Processing natural language loop input",
prompt=self.loop_variable_reference,
extraction_goal=extraction_block.data_extraction_goal,
)
extraction_result = await extraction_block.execute(
workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
)
if not extraction_result.success:
LOG.error("Extraction block failed", failure_reason=extraction_result.failure_reason)
raise ValueError(f"Extraction block failed: {extraction_result.failure_reason}")
LOG.debug("Extraction block succeeded", output=extraction_result.output_parameter_value)
# Store the extraction result in the workflow context
await extraction_block.record_output_parameter_value(
workflow_run_context=workflow_run_context,
workflow_run_id=workflow_run_id,
value=extraction_result.output_parameter_value,
)
# Get the extracted information and create blocks
if not isinstance(extraction_result.output_parameter_value, dict):
LOG.error(
"Extraction result output_parameter_value is not a dict",
output_parameter_value=extraction_result.output_parameter_value,
)
raise ValueError("Extraction result output_parameter_value is not a dictionary")
if "extracted_information" not in extraction_result.output_parameter_value:
LOG.error(
"Extraction result missing extracted_information key",
output_parameter_value=extraction_result.output_parameter_value,
)
raise ValueError("Extraction result missing extracted_information key")
extracted_info = extraction_result.output_parameter_value["extracted_information"]
# Handle different possible structures of extracted_info
if isinstance(extracted_info, list):
# If it's a list, take the first element
if len(extracted_info) > 0:
extracted_info = extracted_info[0]
else:
LOG.error("Extracted information list is empty")
raise ValueError("Extracted information list is empty")
# At this point, extracted_info should be a dict
if not isinstance(extracted_info, dict):
LOG.error("Invalid extraction result structure - not a dict", extracted_info=extracted_info)
raise ValueError("Extraction result is not a dictionary")
if "block_sequence" not in extracted_info:
LOG.error(
"Invalid extraction result structure - missing block_sequence",
extracted_info=extracted_info,
)
raise ValueError("Extraction result does not contain expected block_sequence")
# At this point, extracted_info is guaranteed to be a dict with block_sequence
block_sequence = extracted_info["block_sequence"]
self.loop_blocks = []
# Create blocks based on the sequence
for idx, block_info in enumerate(block_sequence):
block_type = block_info["block_type"].lower()
block = self._create_block_from_sequence(block_type, block_info["parameters"], idx)
if block:
self.loop_blocks.append(block)
LOG.info(
"Generated loop blocks from extraction output",
num_blocks=len(self.loop_blocks),
block_types=[b.block_type for b in self.loop_blocks],
)
# Update the loop variable reference to point to the extraction results
self.loop_variable_reference = f"{self.label}_output.extracted_information.results"
# Try parsing again with the new reference
parameter_value = self.try_parse_jinja_template(workflow_run_context)
if parameter_value is None:
raise ValueError("Failed to get values after extraction")
except Exception as e:
LOG.error("Error during extraction", error=str(e), exc_info=True)
raise
elif parameter_value is None and is_likely_parameter_path:
# If this looks like a parameter path but parsing failed, raise a more specific error
LOG.error("Failed to parse parameter path", loop_variable_reference=self.loop_variable_reference)
raise ValueError(f"Failed to parse parameter path: {self.loop_variable_reference}")
if parameter_value is None:
raise ValueError("No parameter value found")
if not isinstance(parameter_value, list):
raise ValueError("Parameter value must be a list")
LOG.debug("Successfully got loop values", num_values=len(parameter_value))
return parameter_value
def try_parse_jinja_template(self, workflow_run_context: WorkflowRunContext) -> Any | None:
if not self.loop_variable_reference:
return None
try:
# Log available parameters in context for debugging
available_params = {
key: value
for key, value in workflow_run_context.values.items()
if isinstance(key, str) and "_output" in key
}
LOG.debug(
"Available output parameters in context",
params=list(available_params.keys()),
values=available_params,
loop_variable_reference=self.loop_variable_reference,
)
# Loop variable reference is critical to the extraction block, so we add multiple access patterns
# to handle different cases
# 1. Try direct access to the parameter (for cases where it's already a list)
try:
value_template = f"{{{{ {self.loop_variable_reference.split('.')[0]} | tojson }}}}"
value_json = self.format_block_parameter_template_from_workflow_run_context(
value_template, workflow_run_context
)
value = json.loads(value_json)
if isinstance(value, list):
LOG.debug("Successfully parsed as direct list", value=value)
return value
except Exception as e:
raise FailedToFormatJinjaStyleParameter(value_template, str(e))
parameter_value = json.loads(value_json)
LOG.debug("Direct list access failed", error=str(e))
elif self.loop_over is not None:
if isinstance(self.loop_over, WorkflowParameter):
parameter_value = workflow_run_context.get_value(self.loop_over.key)
elif isinstance(self.loop_over, OutputParameter):
# If the output parameter is for a TaskBlock, it will be a TaskOutput object. We need to extract the
# value from the TaskOutput object's extracted_information field.
output_parameter_value = workflow_run_context.get_value(self.loop_over.key)
if isinstance(output_parameter_value, dict) and "extracted_information" in output_parameter_value:
parameter_value = output_parameter_value["extracted_information"]
else:
parameter_value = output_parameter_value
elif isinstance(self.loop_over, ContextParameter):
parameter_value = self.loop_over.value
if not parameter_value:
source_parameter_value = workflow_run_context.get_value(self.loop_over.source.key)
if isinstance(source_parameter_value, dict):
if "extracted_information" in source_parameter_value:
parameter_value = source_parameter_value["extracted_information"].get(self.loop_over.key)
else:
parameter_value = source_parameter_value.get(self.loop_over.key)
else:
raise ValueError("ContextParameter source value should be a dict")
else:
raise NotImplementedError()
# 2. Try accessing through extracted_information.results
try:
value_template = (
f"{{{{ {self.loop_variable_reference.strip(' {}')}.extracted_information.results | tojson }}}}"
)
value_json = self.format_block_parameter_template_from_workflow_run_context(
value_template, workflow_run_context
)
value = json.loads(value_json)
if isinstance(value, list):
LOG.debug("Successfully parsed through extracted_information.results", value=value)
return value
except Exception as e:
LOG.debug("Extracted information access failed", error=str(e))
else:
if self.complete_if_empty:
return []
else:
raise NoIterableValueFound()
# 3. Try the exact path as given (fallback)
try:
value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}"
value_json = self.format_block_parameter_template_from_workflow_run_context(
value_template, workflow_run_context
)
value = json.loads(value_json)
if isinstance(value, list):
LOG.debug("Successfully parsed using exact path", value=value)
return value
except Exception as e:
LOG.debug("Exact path access failed", error=str(e))
if isinstance(parameter_value, list):
return parameter_value
else:
# TODO (kerem): Should we raise an error here?
return [parameter_value]
return None
except Exception as e:
LOG.debug("Template parsing failed", error=str(e))
return None
async def execute_loop_helper(
self,
@@ -1088,12 +1349,19 @@ class ForLoopBlock(Block):
**kwargs: dict,
) -> BlockResult:
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
# Get loop values to iterate over - this now handles both Jinja and natural language
try:
loop_over_values = self.get_loop_over_parameter_values(workflow_run_context)
loop_over_values = await self.get_loop_over_parameter_values(
workflow_run_context,
workflow_run_id=workflow_run_id,
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
)
except Exception as e:
return await self.build_block_result(
success=False,
failure_reason=f"failed to get loop values: {str(e)}",
failure_reason=f"Failed to get loop values: {str(e)}",
status=BlockStatus.failed,
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,