Validate all block parameters are defined in workflow (#4464)

This commit is contained in:
Stanislav Novosad
2026-01-15 14:08:30 -07:00
committed by GitHub
parent 5bfa0b0961
commit 5e23c580e7
3 changed files with 79 additions and 56 deletions

View File

@@ -90,7 +90,7 @@ from skyvern.forge.sdk.workflow.exceptions import (
FailedToCreateWorkflow, FailedToCreateWorkflow,
FailedToUpdateWorkflow, FailedToUpdateWorkflow,
InvalidTemplateWorkflowPermanentId, InvalidTemplateWorkflowPermanentId,
WorkflowParameterMissingRequiredValue, WorkflowDefinitionValidationException,
) )
from skyvern.forge.sdk.workflow.models.workflow import ( from skyvern.forge.sdk.workflow.models.workflow import (
RunWorkflowResponse, RunWorkflowResponse,
@@ -522,7 +522,7 @@ async def create_workflow_legacy(
return await app.WORKFLOW_SERVICE.create_workflow_from_request( return await app.WORKFLOW_SERVICE.create_workflow_from_request(
organization=current_org, request=workflow_create_request organization=current_org, request=workflow_create_request
) )
except WorkflowParameterMissingRequiredValue as e: except WorkflowDefinitionValidationException as e:
raise e raise e
except Exception as e: except Exception as e:
LOG.error("Failed to create workflow", exc_info=True, organization_id=current_org.organization_id) LOG.error("Failed to create workflow", exc_info=True, organization_id=current_org.organization_id)
@@ -583,7 +583,7 @@ async def create_workflow(
) )
except yaml.YAMLError: except yaml.YAMLError:
raise HTTPException(status_code=422, detail="Invalid YAML") raise HTTPException(status_code=422, detail="Invalid YAML")
except WorkflowParameterMissingRequiredValue as e: except WorkflowDefinitionValidationException as e:
raise e raise e
except Exception as e: except Exception as e:
LOG.error("Failed to create workflow", exc_info=True, organization_id=current_org.organization_id) LOG.error("Failed to create workflow", exc_info=True, organization_id=current_org.organization_id)
@@ -842,7 +842,7 @@ async def update_workflow_legacy(
status_code=422, status_code=422,
detail=str(e), detail=str(e),
) from e ) from e
except WorkflowParameterMissingRequiredValue as e: except WorkflowDefinitionValidationException as e:
raise e raise e
except (SkyvernHTTPException, ValidationError) as e: except (SkyvernHTTPException, ValidationError) as e:
# Bubble up well-formed client errors so they are not converted to 500s # Bubble up well-formed client errors so they are not converted to 500s
@@ -916,7 +916,7 @@ async def update_workflow(
) )
except yaml.YAMLError: except yaml.YAMLError:
raise HTTPException(status_code=422, detail="Invalid YAML") raise HTTPException(status_code=422, detail="Invalid YAML")
except WorkflowParameterMissingRequiredValue as e: except WorkflowDefinitionValidationException as e:
raise e raise e
except (SkyvernHTTPException, ValidationError) as e: except (SkyvernHTTPException, ValidationError) as e:
# Bubble up well-formed client errors so they are not converted to 500s # Bubble up well-formed client errors so they are not converted to 500s

View File

@@ -119,7 +119,11 @@ class InvalidFileType(BaseWorkflowHTTPException):
) )
class WorkflowParameterMissingRequiredValue(BaseWorkflowHTTPException): class WorkflowDefinitionValidationException(BaseWorkflowHTTPException):
"""Base exception for workflow definition validation errors."""
class WorkflowParameterMissingRequiredValue(WorkflowDefinitionValidationException):
def __init__(self, workflow_parameter_type: str, workflow_parameter_key: str, required_value: str) -> None: def __init__(self, workflow_parameter_type: str, workflow_parameter_key: str, required_value: str) -> None:
super().__init__( super().__init__(
f"Missing required value for workflow parameter. Workflow parameter type: {workflow_parameter_type}. workflow_parameter_key: {workflow_parameter_key}. Required value: {required_value}", f"Missing required value for workflow parameter. Workflow parameter type: {workflow_parameter_type}. workflow_parameter_key: {workflow_parameter_key}. Required value: {required_value}",
@@ -127,6 +131,25 @@ class WorkflowParameterMissingRequiredValue(BaseWorkflowHTTPException):
) )
class WorkflowDefinitionHasUndefinedParameters(WorkflowDefinitionValidationException):
def __init__(self, undefined_parameters: dict[str, list[str]]) -> None:
# Format: {"block_label": ["param1", "param2"]}
error_details = []
for block_label, params in undefined_parameters.items():
params_str = ", ".join(f"'{p}'" for p in params)
error_details.append(f" - Block '{block_label}' references undefined parameter(s): {params_str}")
error_message = (
f"Workflow definition has blocks that reference undefined parameters:\n"
f"{chr(10).join(error_details)}\n\n"
f"Make sure to define all parameters in the workflow parameters list before using them in blocks."
)
super().__init__(
error_message,
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
class InvalidWaitBlockTime(SkyvernException): class InvalidWaitBlockTime(SkyvernException):
def __init__(self, max_sec: int) -> None: def __init__(self, max_sec: int) -> None:
super().__init__(f"Invalid wait time for wait block, it should be a number between 0 and {max_sec}.") super().__init__(f"Invalid wait time for wait block, it should be a number between 0 and {max_sec}.")

View File

@@ -23,6 +23,7 @@ from skyvern.forge.sdk.workflow.exceptions import (
InvalidWorkflowDefinition, InvalidWorkflowDefinition,
WorkflowDefinitionHasDuplicateParameterKeys, WorkflowDefinitionHasDuplicateParameterKeys,
WorkflowDefinitionHasReservedParameterKeys, WorkflowDefinitionHasReservedParameterKeys,
WorkflowDefinitionHasUndefinedParameters,
WorkflowParameterMissingRequiredValue, WorkflowParameterMissingRequiredValue,
) )
from skyvern.forge.sdk.workflow.models.block import ( from skyvern.forge.sdk.workflow.models.block import (
@@ -288,6 +289,11 @@ def convert_workflow_definition(
if duplicate_parameter_keys: if duplicate_parameter_keys:
raise WorkflowDefinitionHasDuplicateParameterKeys(duplicate_keys=duplicate_parameter_keys) raise WorkflowDefinitionHasDuplicateParameterKeys(duplicate_keys=duplicate_parameter_keys)
# Validate that all blocks reference defined parameters
undefined_parameters = _collect_undefined_parameters(workflow_definition_yaml.blocks, parameters)
if undefined_parameters:
raise WorkflowDefinitionHasUndefinedParameters(undefined_parameters=undefined_parameters)
# Create blocks from the request # Create blocks from the request
block_label_mapping = {} block_label_mapping = {}
blocks: list[BlockTypeVar] = [] blocks: list[BlockTypeVar] = []
@@ -362,11 +368,7 @@ def block_yaml_to_block(
output_parameter = cast(OutputParameter, parameters[f"{block_yaml.label}_output"]) output_parameter = cast(OutputParameter, parameters[f"{block_yaml.label}_output"])
base_kwargs = _build_block_kwargs(block_yaml, output_parameter) base_kwargs = _build_block_kwargs(block_yaml, output_parameter)
if block_yaml.block_type == BlockType.TASK: if block_yaml.block_type == BlockType.TASK:
task_block_parameters = ( task_block_parameters = _resolve_block_parameters(block_yaml, parameters)
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
)
return TaskBlock( return TaskBlock(
**base_kwargs, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
@@ -452,22 +454,14 @@ def block_yaml_to_block(
return CodeBlock( return CodeBlock(
**base_kwargs, **base_kwargs,
code=block_yaml.code, code=block_yaml.code,
parameters=( parameters=_resolve_block_parameters(block_yaml, parameters),
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
),
) )
elif block_yaml.block_type == BlockType.TEXT_PROMPT: elif block_yaml.block_type == BlockType.TEXT_PROMPT:
return TextPromptBlock( return TextPromptBlock(
**base_kwargs, **base_kwargs,
llm_key=block_yaml.llm_key, llm_key=block_yaml.llm_key,
prompt=block_yaml.prompt, prompt=block_yaml.prompt,
parameters=( parameters=_resolve_block_parameters(block_yaml, parameters),
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
),
json_schema=block_yaml.json_schema, json_schema=block_yaml.json_schema,
) )
elif block_yaml.block_type == BlockType.DOWNLOAD_TO_S3: elif block_yaml.block_type == BlockType.DOWNLOAD_TO_S3:
@@ -520,11 +514,7 @@ def block_yaml_to_block(
json_schema=block_yaml.json_schema, json_schema=block_yaml.json_schema,
) )
elif block_yaml.block_type == BlockType.VALIDATION: elif block_yaml.block_type == BlockType.VALIDATION:
validation_block_parameters = ( validation_block_parameters = _resolve_block_parameters(block_yaml, parameters)
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
)
if not block_yaml.complete_criterion and not block_yaml.terminate_criterion: if not block_yaml.complete_criterion and not block_yaml.terminate_criterion:
raise InvalidWorkflowDefinition( raise InvalidWorkflowDefinition(
@@ -543,11 +533,7 @@ def block_yaml_to_block(
) )
elif block_yaml.block_type == BlockType.ACTION: elif block_yaml.block_type == BlockType.ACTION:
action_block_parameters = ( action_block_parameters = _resolve_block_parameters(block_yaml, parameters)
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
)
if not block_yaml.navigation_goal: if not block_yaml.navigation_goal:
raise InvalidWorkflowDefinition(f"Action block '{block_yaml.label}' requires navigation_goal") raise InvalidWorkflowDefinition(f"Action block '{block_yaml.label}' requires navigation_goal")
@@ -573,11 +559,7 @@ def block_yaml_to_block(
) )
elif block_yaml.block_type == BlockType.NAVIGATION: elif block_yaml.block_type == BlockType.NAVIGATION:
navigation_block_parameters = ( navigation_block_parameters = _resolve_block_parameters(block_yaml, parameters)
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
)
return NavigationBlock( return NavigationBlock(
**base_kwargs, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
@@ -614,11 +596,7 @@ def block_yaml_to_block(
) )
elif block_yaml.block_type == BlockType.EXTRACTION: elif block_yaml.block_type == BlockType.EXTRACTION:
extraction_block_parameters = ( extraction_block_parameters = _resolve_block_parameters(block_yaml, parameters)
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
)
return ExtractionBlock( return ExtractionBlock(
**base_kwargs, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
@@ -634,11 +612,7 @@ def block_yaml_to_block(
) )
elif block_yaml.block_type == BlockType.LOGIN: elif block_yaml.block_type == BlockType.LOGIN:
login_block_parameters = ( login_block_parameters = _resolve_block_parameters(block_yaml, parameters)
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
)
return LoginBlock( return LoginBlock(
**base_kwargs, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
@@ -667,11 +641,7 @@ def block_yaml_to_block(
) )
elif block_yaml.block_type == BlockType.FILE_DOWNLOAD: elif block_yaml.block_type == BlockType.FILE_DOWNLOAD:
file_download_block_parameters = ( file_download_block_parameters = _resolve_block_parameters(block_yaml, parameters)
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
)
return FileDownloadBlock( return FileDownloadBlock(
**base_kwargs, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
@@ -702,11 +672,7 @@ def block_yaml_to_block(
max_steps=block_yaml.max_steps, max_steps=block_yaml.max_steps,
) )
elif block_yaml.block_type == BlockType.HTTP_REQUEST: elif block_yaml.block_type == BlockType.HTTP_REQUEST:
http_request_block_parameters = ( http_request_block_parameters = _resolve_block_parameters(block_yaml, parameters)
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys
else []
)
return HttpRequestBlock( return HttpRequestBlock(
**base_kwargs, **base_kwargs,
method=block_yaml.method, method=block_yaml.method,
@@ -739,6 +705,40 @@ def block_yaml_to_block(
raise ValueError(f"Invalid block type {block_yaml.block_type}") raise ValueError(f"Invalid block type {block_yaml.block_type}")
def _collect_undefined_parameters(
block_yamls: list[BLOCK_YAML_TYPES],
parameters: dict[str, PARAMETER_TYPE],
) -> dict[str, list[str]]:
"""
Collect all undefined parameters referenced by blocks (including nested blocks in for_loop).
Returns a dict mapping block labels to lists of undefined parameter keys.
"""
undefined_params: dict[str, list[str]] = {}
for block_yaml in block_yamls:
# Check parameters for this block
if block_yaml.parameter_keys:
undefined_for_block = [param_key for param_key in block_yaml.parameter_keys if param_key not in parameters]
if undefined_for_block:
undefined_params[block_yaml.label] = undefined_for_block
# Recursively check nested blocks in for_loop
if isinstance(block_yaml, ForLoopBlockYAML) and block_yaml.loop_blocks:
nested_undefined = _collect_undefined_parameters(block_yaml.loop_blocks, parameters)
undefined_params.update(nested_undefined)
return undefined_params
def _resolve_block_parameters(
block_yaml: BLOCK_YAML_TYPES,
parameters: dict[str, PARAMETER_TYPE],
) -> list[PARAMETER_TYPE]:
return (
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] if block_yaml.parameter_keys else []
)
def _has_dag_metadata(block_yamls: list[BLOCK_YAML_TYPES]) -> bool: def _has_dag_metadata(block_yamls: list[BLOCK_YAML_TYPES]) -> bool:
for block_yaml in block_yamls: for block_yaml in block_yamls:
if block_yaml.next_block_label: if block_yaml.next_block_label: