@@ -72,7 +72,7 @@ export const helpTooltips = {
|
|||||||
loop: {
|
loop: {
|
||||||
...baseHelpTooltipContent,
|
...baseHelpTooltipContent,
|
||||||
loopValue:
|
loopValue:
|
||||||
"Define what to iterate over using either a parameter key (e.g., my_extraction_output.extracted_information.results) or a clear, specific natural language prompt (e.g., 'Extract URLs of top 3 articles and visit each one').",
|
"Define this parameterized field with a parameter key to let Skyvern know the core value you're iterating over.",
|
||||||
},
|
},
|
||||||
sendEmail: {
|
sendEmail: {
|
||||||
...baseHelpTooltipContent,
|
...baseHelpTooltipContent,
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ import textwrap
|
|||||||
import uuid
|
import uuid
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
|
||||||
from email.message import EmailMessage
|
from email.message import EmailMessage
|
||||||
from enum import StrEnum
|
from enum import StrEnum
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -59,9 +58,11 @@ from skyvern.forge.sdk.trace import TraceManager
|
|||||||
from skyvern.forge.sdk.workflow.context_manager import BlockMetadata, WorkflowRunContext
|
from skyvern.forge.sdk.workflow.context_manager import BlockMetadata, WorkflowRunContext
|
||||||
from skyvern.forge.sdk.workflow.exceptions import (
|
from skyvern.forge.sdk.workflow.exceptions import (
|
||||||
CustomizedCodeException,
|
CustomizedCodeException,
|
||||||
|
FailedToFormatJinjaStyleParameter,
|
||||||
InsecureCodeDetected,
|
InsecureCodeDetected,
|
||||||
InvalidEmailClientConfiguration,
|
InvalidEmailClientConfiguration,
|
||||||
InvalidFileType,
|
InvalidFileType,
|
||||||
|
NoIterableValueFound,
|
||||||
NoValidEmailRecipient,
|
NoValidEmailRecipient,
|
||||||
)
|
)
|
||||||
from skyvern.forge.sdk.workflow.models.constants import FileStorageType
|
from skyvern.forge.sdk.workflow.models.constants import FileStorageType
|
||||||
@@ -70,7 +71,7 @@ from skyvern.forge.sdk.workflow.models.parameter import (
|
|||||||
AWSSecretParameter,
|
AWSSecretParameter,
|
||||||
ContextParameter,
|
ContextParameter,
|
||||||
OutputParameter,
|
OutputParameter,
|
||||||
ParameterType,
|
WorkflowParameter,
|
||||||
)
|
)
|
||||||
from skyvern.schemas.runs import RunEngine
|
from skyvern.schemas.runs import RunEngine
|
||||||
from skyvern.utils.url_validators import prepend_scheme_and_validate_url
|
from skyvern.utils.url_validators import prepend_scheme_and_validate_url
|
||||||
@@ -899,13 +900,6 @@ class ForLoopBlock(Block):
|
|||||||
if isinstance(parameter, ContextParameter):
|
if isinstance(parameter, ContextParameter):
|
||||||
context_parameters.append(parameter)
|
context_parameters.append(parameter)
|
||||||
|
|
||||||
# Always add current_value to context parameters
|
|
||||||
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
|
|
||||||
current_value_param = ContextParameter(
|
|
||||||
key="current_value", value=loop_data, source=workflow_run_context.get_parameter(self.output_parameter.key)
|
|
||||||
)
|
|
||||||
context_parameters.append(current_value_param)
|
|
||||||
|
|
||||||
if self.loop_over is None:
|
if self.loop_over is None:
|
||||||
return context_parameters
|
return context_parameters
|
||||||
|
|
||||||
@@ -928,309 +922,54 @@ class ForLoopBlock(Block):
|
|||||||
|
|
||||||
return context_parameters
|
return context_parameters
|
||||||
|
|
||||||
def _create_block_from_sequence(
|
def get_loop_over_parameter_values(self, workflow_run_context: WorkflowRunContext) -> list[Any]:
|
||||||
self,
|
# parse the value from self.loop_variable_reference and then from self.loop_over
|
||||||
block_type: str,
|
|
||||||
parameters: dict[str, Any],
|
|
||||||
idx: int,
|
|
||||||
) -> BlockTypeVar | None:
|
|
||||||
"""Create a block instance based on type and parameters.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
block_type: The type of block to create (goto_url, extraction, etc.)
|
|
||||||
parameters: Block-specific parameters
|
|
||||||
idx: Index in the sequence (used for labeling)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
BlockTypeVar | None: The created block or None if type not supported
|
|
||||||
"""
|
|
||||||
# Create output parameter for this block
|
|
||||||
output_param = OutputParameter(
|
|
||||||
output_parameter_id=str(uuid.uuid4()),
|
|
||||||
key=f"{self.label}_block_{idx}_output",
|
|
||||||
workflow_id=self.output_parameter.workflow_id,
|
|
||||||
created_at=datetime.now(),
|
|
||||||
modified_at=datetime.now(),
|
|
||||||
parameter_type=ParameterType.OUTPUT,
|
|
||||||
description=f"Output parameter for {block_type} block {idx}",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create the block with its parameters
|
|
||||||
if block_type == "goto_url":
|
|
||||||
return UrlBlock(label=f"{self.label}_block_{idx}", url=parameters["url"], output_parameter=output_param)
|
|
||||||
elif block_type == "extraction":
|
|
||||||
return ExtractionBlock(
|
|
||||||
label=f"{self.label}_block_{idx}",
|
|
||||||
data_extraction_goal=parameters["data_extraction_goal"],
|
|
||||||
data_schema=parameters.get("data_schema"),
|
|
||||||
engine=RunEngine.skyvern_v1,
|
|
||||||
output_parameter=output_param,
|
|
||||||
)
|
|
||||||
elif block_type == "file_download":
|
|
||||||
return FileDownloadBlock(
|
|
||||||
label=f"{self.label}_block_{idx}", url=parameters["url"], output_parameter=output_param
|
|
||||||
)
|
|
||||||
elif block_type == "task_v2":
|
|
||||||
return TaskV2Block(
|
|
||||||
label=f"{self.label}_block_{idx}", prompt=parameters["prompt"], output_parameter=output_param
|
|
||||||
)
|
|
||||||
elif block_type == "action":
|
|
||||||
return ActionBlock(
|
|
||||||
label=f"{self.label}_block_{idx}", action=parameters["action"], output_parameter=output_param
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
LOG.warning(f"Unsupported block type: {block_type}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _create_initial_extraction_block(self, natural_language_prompt: str) -> ExtractionBlock:
|
|
||||||
"""Create the initial extraction block that determines what to iterate over.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
natural_language_prompt: The user's natural language description
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
ExtractionBlock: Configured to extract items and determine block sequence
|
|
||||||
"""
|
|
||||||
return ExtractionBlock(
|
|
||||||
label=f"{self.label}_extraction",
|
|
||||||
data_extraction_goal=f"Extract the items to iterate over and determine the sequence of blocks needed. Natural language prompt: {natural_language_prompt}",
|
|
||||||
data_schema={
|
|
||||||
"type": "object",
|
|
||||||
"properties": {
|
|
||||||
"results": {
|
|
||||||
"type": "array",
|
|
||||||
"items": {
|
|
||||||
"type": "object",
|
|
||||||
"properties": {
|
|
||||||
"data": {"type": "object", "description": "The data to iterate over (URLs, IDs, etc.)"}
|
|
||||||
},
|
|
||||||
"required": ["data"],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"block_sequence": {
|
|
||||||
"type": "array",
|
|
||||||
"items": {
|
|
||||||
"type": "object",
|
|
||||||
"properties": {
|
|
||||||
"block_type": {
|
|
||||||
"type": "string",
|
|
||||||
"description": "Type of block to execute (goto_url, extraction, etc.)",
|
|
||||||
},
|
|
||||||
"parameters": {
|
|
||||||
"type": "object",
|
|
||||||
"description": "Parameters for this block (url, data_extraction_goal, etc.)",
|
|
||||||
"properties": {
|
|
||||||
"url": {
|
|
||||||
"type": "string",
|
|
||||||
"description": "For URL-based blocks, use current_value.data.X (wrapped by 2 curly brackets) to reference extracted data ",
|
|
||||||
},
|
|
||||||
"data_extraction_goal": {
|
|
||||||
"type": "string",
|
|
||||||
"description": "For extraction blocks, what to extract from the page",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"required": ["block_type", "parameters"],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"required": ["results", "block_sequence"],
|
|
||||||
},
|
|
||||||
engine=RunEngine.skyvern_v1,
|
|
||||||
output_parameter=self.output_parameter,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def get_loop_over_parameter_values(
|
|
||||||
self,
|
|
||||||
workflow_run_context: WorkflowRunContext,
|
|
||||||
workflow_run_id: str,
|
|
||||||
workflow_run_block_id: str,
|
|
||||||
organization_id: str | None = None,
|
|
||||||
) -> list[Any]:
|
|
||||||
parameter_value = None
|
|
||||||
if self.loop_variable_reference:
|
if self.loop_variable_reference:
|
||||||
LOG.debug("Processing loop variable reference", loop_variable_reference=self.loop_variable_reference)
|
value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}"
|
||||||
|
try:
|
||||||
|
value_json = self.format_block_parameter_template_from_workflow_run_context(
|
||||||
|
value_template, workflow_run_context
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
raise FailedToFormatJinjaStyleParameter(value_template, str(e))
|
||||||
|
parameter_value = json.loads(value_json)
|
||||||
|
|
||||||
# Check if this looks like a parameter path (contains dots and/or _output)
|
elif self.loop_over is not None:
|
||||||
is_likely_parameter_path = "." in self.loop_variable_reference or "_output" in self.loop_variable_reference
|
if isinstance(self.loop_over, WorkflowParameter):
|
||||||
|
parameter_value = workflow_run_context.get_value(self.loop_over.key)
|
||||||
# Try parsing as Jinja template
|
elif isinstance(self.loop_over, OutputParameter):
|
||||||
parameter_value = self.try_parse_jinja_template(workflow_run_context)
|
# If the output parameter is for a TaskBlock, it will be a TaskOutput object. We need to extract the
|
||||||
|
# value from the TaskOutput object's extracted_information field.
|
||||||
if parameter_value is None and not is_likely_parameter_path:
|
output_parameter_value = workflow_run_context.get_value(self.loop_over.key)
|
||||||
try:
|
if isinstance(output_parameter_value, dict) and "extracted_information" in output_parameter_value:
|
||||||
# Create and execute extraction block
|
parameter_value = output_parameter_value["extracted_information"]
|
||||||
extraction_block = self._create_initial_extraction_block(self.loop_variable_reference)
|
else:
|
||||||
|
parameter_value = output_parameter_value
|
||||||
LOG.info(
|
elif isinstance(self.loop_over, ContextParameter):
|
||||||
"Processing natural language loop input",
|
parameter_value = self.loop_over.value
|
||||||
prompt=self.loop_variable_reference,
|
if not parameter_value:
|
||||||
extraction_goal=extraction_block.data_extraction_goal,
|
source_parameter_value = workflow_run_context.get_value(self.loop_over.source.key)
|
||||||
)
|
if isinstance(source_parameter_value, dict):
|
||||||
|
if "extracted_information" in source_parameter_value:
|
||||||
extraction_result = await extraction_block.execute(
|
parameter_value = source_parameter_value["extracted_information"].get(self.loop_over.key)
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
workflow_run_block_id=workflow_run_block_id,
|
|
||||||
organization_id=organization_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not extraction_result.success:
|
|
||||||
LOG.error("Extraction block failed", failure_reason=extraction_result.failure_reason)
|
|
||||||
raise ValueError(f"Extraction block failed: {extraction_result.failure_reason}")
|
|
||||||
|
|
||||||
LOG.debug("Extraction block succeeded", output=extraction_result.output_parameter_value)
|
|
||||||
|
|
||||||
# Store the extraction result in the workflow context
|
|
||||||
await extraction_block.record_output_parameter_value(
|
|
||||||
workflow_run_context=workflow_run_context,
|
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
value=extraction_result.output_parameter_value,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get the extracted information and create blocks
|
|
||||||
if not isinstance(extraction_result.output_parameter_value, dict):
|
|
||||||
LOG.error(
|
|
||||||
"Extraction result output_parameter_value is not a dict",
|
|
||||||
output_parameter_value=extraction_result.output_parameter_value,
|
|
||||||
)
|
|
||||||
raise ValueError("Extraction result output_parameter_value is not a dictionary")
|
|
||||||
|
|
||||||
if "extracted_information" not in extraction_result.output_parameter_value:
|
|
||||||
LOG.error(
|
|
||||||
"Extraction result missing extracted_information key",
|
|
||||||
output_parameter_value=extraction_result.output_parameter_value,
|
|
||||||
)
|
|
||||||
raise ValueError("Extraction result missing extracted_information key")
|
|
||||||
|
|
||||||
extracted_info = extraction_result.output_parameter_value["extracted_information"]
|
|
||||||
|
|
||||||
# Handle different possible structures of extracted_info
|
|
||||||
if isinstance(extracted_info, list):
|
|
||||||
# If it's a list, take the first element
|
|
||||||
if len(extracted_info) > 0:
|
|
||||||
extracted_info = extracted_info[0]
|
|
||||||
else:
|
else:
|
||||||
LOG.error("Extracted information list is empty")
|
parameter_value = source_parameter_value.get(self.loop_over.key)
|
||||||
raise ValueError("Extracted information list is empty")
|
else:
|
||||||
|
raise ValueError("ContextParameter source value should be a dict")
|
||||||
|
else:
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
# At this point, extracted_info should be a dict
|
else:
|
||||||
if not isinstance(extracted_info, dict):
|
if self.complete_if_empty:
|
||||||
LOG.error("Invalid extraction result structure - not a dict", extracted_info=extracted_info)
|
return []
|
||||||
raise ValueError("Extraction result is not a dictionary")
|
else:
|
||||||
|
raise NoIterableValueFound()
|
||||||
|
|
||||||
if "block_sequence" not in extracted_info:
|
if isinstance(parameter_value, list):
|
||||||
LOG.error(
|
return parameter_value
|
||||||
"Invalid extraction result structure - missing block_sequence",
|
else:
|
||||||
extracted_info=extracted_info,
|
# TODO (kerem): Should we raise an error here?
|
||||||
)
|
return [parameter_value]
|
||||||
raise ValueError("Extraction result does not contain expected block_sequence")
|
|
||||||
|
|
||||||
# At this point, extracted_info is guaranteed to be a dict with block_sequence
|
|
||||||
block_sequence = extracted_info["block_sequence"]
|
|
||||||
self.loop_blocks = []
|
|
||||||
|
|
||||||
# Create blocks based on the sequence
|
|
||||||
for idx, block_info in enumerate(block_sequence):
|
|
||||||
block_type = block_info["block_type"].lower()
|
|
||||||
block = self._create_block_from_sequence(block_type, block_info["parameters"], idx)
|
|
||||||
if block:
|
|
||||||
self.loop_blocks.append(block)
|
|
||||||
|
|
||||||
LOG.info(
|
|
||||||
"Generated loop blocks from extraction output",
|
|
||||||
num_blocks=len(self.loop_blocks),
|
|
||||||
block_types=[b.block_type for b in self.loop_blocks],
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update the loop variable reference to point to the extraction results
|
|
||||||
self.loop_variable_reference = f"{self.label}_output.extracted_information.results"
|
|
||||||
|
|
||||||
# Try parsing again with the new reference
|
|
||||||
parameter_value = self.try_parse_jinja_template(workflow_run_context)
|
|
||||||
if parameter_value is None:
|
|
||||||
raise ValueError("Failed to get values after extraction")
|
|
||||||
except Exception as e:
|
|
||||||
LOG.error("Error during extraction", error=str(e), exc_info=True)
|
|
||||||
raise
|
|
||||||
elif parameter_value is None and is_likely_parameter_path:
|
|
||||||
# If this looks like a parameter path but parsing failed, raise a more specific error
|
|
||||||
LOG.error("Failed to parse parameter path", loop_variable_reference=self.loop_variable_reference)
|
|
||||||
raise ValueError(f"Failed to parse parameter path: {self.loop_variable_reference}")
|
|
||||||
|
|
||||||
if parameter_value is None:
|
|
||||||
raise ValueError("No parameter value found")
|
|
||||||
|
|
||||||
if not isinstance(parameter_value, list):
|
|
||||||
raise ValueError("Parameter value must be a list")
|
|
||||||
|
|
||||||
LOG.debug("Successfully got loop values", num_values=len(parameter_value))
|
|
||||||
return parameter_value
|
|
||||||
|
|
||||||
def try_parse_jinja_template(self, workflow_run_context: WorkflowRunContext) -> Any | None:
|
|
||||||
if not self.loop_variable_reference:
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
# Log available parameters in context for debugging
|
|
||||||
available_params = {
|
|
||||||
key: value
|
|
||||||
for key, value in workflow_run_context.values.items()
|
|
||||||
if isinstance(key, str) and "_output" in key
|
|
||||||
}
|
|
||||||
LOG.debug(
|
|
||||||
"Available output parameters in context",
|
|
||||||
params=list(available_params.keys()),
|
|
||||||
values=available_params,
|
|
||||||
loop_variable_reference=self.loop_variable_reference,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Loop variable reference is critical to the extraction block, so we add multiple access patterns
|
|
||||||
# to handle different cases
|
|
||||||
# 1. Try direct access to the parameter (for cases where it's already a list)
|
|
||||||
try:
|
|
||||||
value_template = f"{{{{ {self.loop_variable_reference.split('.')[0]} | tojson }}}}"
|
|
||||||
value_json = self.format_block_parameter_template_from_workflow_run_context(
|
|
||||||
value_template, workflow_run_context
|
|
||||||
)
|
|
||||||
value = json.loads(value_json)
|
|
||||||
if isinstance(value, list):
|
|
||||||
LOG.debug("Successfully parsed as direct list", value=value)
|
|
||||||
return value
|
|
||||||
except Exception as e:
|
|
||||||
LOG.debug("Direct list access failed", error=str(e))
|
|
||||||
|
|
||||||
# 2. Try accessing through extracted_information.results
|
|
||||||
try:
|
|
||||||
value_template = (
|
|
||||||
f"{{{{ {self.loop_variable_reference.strip(' {}')}.extracted_information.results | tojson }}}}"
|
|
||||||
)
|
|
||||||
value_json = self.format_block_parameter_template_from_workflow_run_context(
|
|
||||||
value_template, workflow_run_context
|
|
||||||
)
|
|
||||||
value = json.loads(value_json)
|
|
||||||
if isinstance(value, list):
|
|
||||||
LOG.debug("Successfully parsed through extracted_information.results", value=value)
|
|
||||||
return value
|
|
||||||
except Exception as e:
|
|
||||||
LOG.debug("Extracted information access failed", error=str(e))
|
|
||||||
|
|
||||||
# 3. Try the exact path as given (fallback)
|
|
||||||
try:
|
|
||||||
value_template = f"{{{{ {self.loop_variable_reference.strip(' {}')} | tojson }}}}"
|
|
||||||
value_json = self.format_block_parameter_template_from_workflow_run_context(
|
|
||||||
value_template, workflow_run_context
|
|
||||||
)
|
|
||||||
value = json.loads(value_json)
|
|
||||||
if isinstance(value, list):
|
|
||||||
LOG.debug("Successfully parsed using exact path", value=value)
|
|
||||||
return value
|
|
||||||
except Exception as e:
|
|
||||||
LOG.debug("Exact path access failed", error=str(e))
|
|
||||||
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
LOG.debug("Template parsing failed", error=str(e))
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def execute_loop_helper(
|
async def execute_loop_helper(
|
||||||
self,
|
self,
|
||||||
@@ -1349,19 +1088,12 @@ class ForLoopBlock(Block):
|
|||||||
**kwargs: dict,
|
**kwargs: dict,
|
||||||
) -> BlockResult:
|
) -> BlockResult:
|
||||||
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
|
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
|
||||||
|
|
||||||
# Get loop values to iterate over - this now handles both Jinja and natural language
|
|
||||||
try:
|
try:
|
||||||
loop_over_values = await self.get_loop_over_parameter_values(
|
loop_over_values = self.get_loop_over_parameter_values(workflow_run_context)
|
||||||
workflow_run_context,
|
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
workflow_run_block_id=workflow_run_block_id,
|
|
||||||
organization_id=organization_id,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return await self.build_block_result(
|
return await self.build_block_result(
|
||||||
success=False,
|
success=False,
|
||||||
failure_reason=f"Failed to get loop values: {str(e)}",
|
failure_reason=f"failed to get loop values: {str(e)}",
|
||||||
status=BlockStatus.failed,
|
status=BlockStatus.failed,
|
||||||
workflow_run_block_id=workflow_run_block_id,
|
workflow_run_block_id=workflow_run_block_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
|
|||||||
Reference in New Issue
Block a user