From 335aa8f2df7385098142795cd07fa0a56de9739c Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Fri, 21 Nov 2025 17:23:39 -0800 Subject: [PATCH] Add workflow definition version (#4066) --- skyvern/forge/sdk/workflow/models/block.py | 113 +++++++++++++++- skyvern/forge/sdk/workflow/models/workflow.py | 1 + skyvern/forge/sdk/workflow/service.py | 127 +++++++----------- skyvern/schemas/workflows.py | 16 ++- 4 files changed, 179 insertions(+), 78 deletions(-) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index aecb07df..d358fff5 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -26,7 +26,7 @@ from email_validator import EmailNotValidError, validate_email from jinja2 import StrictUndefined from jinja2.sandbox import SandboxedEnvironment from playwright.async_api import Page -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator from pypdf import PdfReader from pypdf.errors import PdfReadError @@ -119,8 +119,15 @@ DEFAULT_MAX_STEPS_PER_ITERATION = 50 class Block(BaseModel, abc.ABC): + """Base class for workflow nodes (see branching spec [[s-4bnl]] for metadata semantics).""" + # Must be unique within workflow definition - label: str + label: str = Field(description="Author-facing identifier for a block; unique within a workflow.") + next_block_label: str | None = Field( + default=None, + description="Optional pointer to the next block label when constructing a DAG. " + "Defaults to sequential order when omitted.", + ) block_type: BlockType output_parameter: OutputParameter continue_on_failure: bool = False @@ -3821,6 +3828,107 @@ class HttpRequestBlock(Block): ) +class BranchEvaluationContext(BaseModel): + """Collection of runtime data that BranchCriteria evaluators can consume.""" + + workflow_parameters: dict[str, Any] = Field(default_factory=dict) + block_outputs: dict[str, Any] = Field(default_factory=dict) + environment: dict[str, Any] | None = None + llm_results: dict[str, Any] | None = None + + +class BranchCriteria(BaseModel, abc.ABC): + """Abstract interface describing how a branch condition should be evaluated.""" + + criteria_type: str + description: str | None = None + + @abc.abstractmethod + async def evaluate(self, context: BranchEvaluationContext) -> bool: + """Return True when the branch should execute.""" + raise NotImplementedError + + def requires_llm(self) -> bool: + """Whether the criteria relies on an LLM classification step.""" + return False + + +class BranchCondition(BaseModel): + """Represents a single conditional branch edge within a ConditionalBlock.""" + + criteria: BranchCriteria | None = None + next_block_label: str | None = None + description: str | None = None + order: int = Field(ge=0) + is_default: bool = False + + @model_validator(mode="after") + def validate_condition(cls, condition: BranchCondition) -> BranchCondition: + if condition.criteria is None and not condition.is_default: + raise ValueError("Branches without criteria must be marked as default.") + if condition.criteria is not None and condition.is_default: + raise ValueError("Default branches may not define criteria.") + return condition + + +class ConditionalBlock(Block): + """Branching block that selects the next block label based on ordered conditions.""" + + # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: + # Parameter 1 of Literal[...] cannot be of type "Any" + block_type: Literal[BlockType.CONDITIONAL] = BlockType.CONDITIONAL # type: ignore + + branches: list[BranchCondition] = Field(default_factory=list) + + @model_validator(mode="after") + def validate_branches(cls, block: ConditionalBlock) -> ConditionalBlock: + if not block.branches: + raise ValueError("Conditional blocks require at least one branch.") + + orders = [branch.order for branch in block.branches] + if len(orders) != len(set(orders)): + raise ValueError("Branch order must be unique within a conditional block.") + + default_branches = [branch for branch in block.branches if branch.is_default] + if len(default_branches) > 1: + raise ValueError("Only one default branch is permitted per conditional block.") + + block.branches = sorted(block.branches, key=lambda branch: branch.order) + return block + + def get_all_parameters( + self, + workflow_run_id: str, # noqa: ARG002 - preserved for interface compatibility + ) -> list[PARAMETER_TYPE]: + # BranchCriteria subclasses will surface their parameter dependencies once implemented. + return [] + + async def execute( # noqa: D401 + self, + workflow_run_id: str, + workflow_run_block_id: str, + organization_id: str | None = None, + browser_session_id: str | None = None, + **kwargs: dict, + ) -> BlockResult: + """ + Placeholder execute implementation. + + Conditional block execution will be implemented alongside the DAG workflow + engine refactor (see branching workflow spec). + """ + raise NotImplementedError("Conditional block execution is handled by the DAG engine.") + + @property + def ordered_branches(self) -> list[BranchCondition]: + """Convenience accessor that returns branches sorted by order.""" + return list(self.branches) + + def get_default_branch(self) -> BranchCondition | None: + """Return the default/else branch when configured.""" + return next((branch for branch in self.branches if branch.is_default), None) + + def get_all_blocks(blocks: list[BlockTypeVar]) -> list[BlockTypeVar]: """ Recursively get "all blocks" in a workflow definition. @@ -3842,6 +3950,7 @@ def get_all_blocks(blocks: list[BlockTypeVar]) -> list[BlockTypeVar]: BlockSubclasses = Union[ + ConditionalBlock, ForLoopBlock, TaskBlock, CodeBlock, diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index ee73fbf8..c022bac5 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -51,6 +51,7 @@ class RunWorkflowResponse(BaseModel): class WorkflowDefinition(BaseModel): + version: int = 1 parameters: list[PARAMETER_TYPE] blocks: List[BlockTypeVar] diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 85be1f57..e1d60d14 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -173,6 +173,8 @@ def _get_workflow_definition_core_data(workflow_definition: WorkflowDefinition) "onepassword_credential_parameter_id", "azure_vault_credential_parameter_id", "disable_cache", + "next_block_label", + "version", ] # Use BFS to recursively remove fields from all nested objects @@ -2685,8 +2687,12 @@ class WorkflowService: blocks.append(block) block_label_mapping[block.label] = block - # Set the blocks for the workflow definition - workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks) + # Set the blocks for the workflow definition and derive DAG version metadata + dag_version = workflow_definition_yaml.version + if dag_version is None: + dag_version = 2 if WorkflowService._has_dag_metadata(workflow_definition_yaml.blocks) else 1 + + workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks, version=dag_version) LOG.info( f"Created workflow from request, title: {title}", @@ -2839,12 +2845,26 @@ class WorkflowService: ) return output_parameters + @staticmethod + def _build_block_kwargs( + block_yaml: BLOCK_YAML_TYPES, + output_parameter: OutputParameter, + ) -> dict[str, Any]: + return { + "label": block_yaml.label, + "next_block_label": block_yaml.next_block_label, + "output_parameter": output_parameter, + "continue_on_failure": block_yaml.continue_on_failure, + "model": block_yaml.model, + } + @staticmethod async def block_yaml_to_block( block_yaml: BLOCK_YAML_TYPES, parameters: dict[str, PARAMETER_TYPE], ) -> BlockTypeVar: output_parameter = cast(OutputParameter, parameters[f"{block_yaml.label}_output"]) + base_kwargs = WorkflowService._build_block_kwargs(block_yaml, output_parameter) if block_yaml.block_type == BlockType.TASK: task_block_parameters = ( [parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] @@ -2852,22 +2872,19 @@ class WorkflowService: else [] ) return TaskBlock( - label=block_yaml.label, + **base_kwargs, url=block_yaml.url, title=block_yaml.title, engine=block_yaml.engine, parameters=task_block_parameters, - output_parameter=output_parameter, navigation_goal=block_yaml.navigation_goal, data_extraction_goal=block_yaml.data_extraction_goal, data_schema=block_yaml.data_schema, error_code_mapping=block_yaml.error_code_mapping, max_steps_per_run=block_yaml.max_steps_per_run, max_retries=block_yaml.max_retries, - model=block_yaml.model, complete_on_download=block_yaml.complete_on_download, download_suffix=block_yaml.download_suffix, - continue_on_failure=block_yaml.continue_on_failure, totp_verification_url=block_yaml.totp_verification_url, totp_identifier=block_yaml.totp_identifier, disable_cache=block_yaml.disable_cache, @@ -2899,29 +2916,25 @@ class WorkflowService: raise Exception("Loop value parameter is required for for loop block") return ForLoopBlock( - label=block_yaml.label, + **base_kwargs, loop_over=loop_over_parameter, loop_variable_reference=block_yaml.loop_variable_reference, loop_blocks=loop_blocks, - output_parameter=output_parameter, - continue_on_failure=block_yaml.continue_on_failure, complete_if_empty=block_yaml.complete_if_empty, ) elif block_yaml.block_type == BlockType.CODE: return CodeBlock( - label=block_yaml.label, + **base_kwargs, code=block_yaml.code, parameters=( [parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] if block_yaml.parameter_keys else [] ), - output_parameter=output_parameter, - continue_on_failure=block_yaml.continue_on_failure, ) elif block_yaml.block_type == BlockType.TEXT_PROMPT: return TextPromptBlock( - label=block_yaml.label, + **base_kwargs, llm_key=block_yaml.llm_key, prompt=block_yaml.prompt, parameters=( @@ -2930,28 +2943,20 @@ class WorkflowService: else [] ), json_schema=block_yaml.json_schema, - output_parameter=output_parameter, - continue_on_failure=block_yaml.continue_on_failure, - model=block_yaml.model, ) elif block_yaml.block_type == BlockType.DOWNLOAD_TO_S3: return DownloadToS3Block( - label=block_yaml.label, - output_parameter=output_parameter, + **base_kwargs, url=block_yaml.url, - continue_on_failure=block_yaml.continue_on_failure, ) elif block_yaml.block_type == BlockType.UPLOAD_TO_S3: return UploadToS3Block( - label=block_yaml.label, - output_parameter=output_parameter, + **base_kwargs, path=block_yaml.path, - continue_on_failure=block_yaml.continue_on_failure, ) elif block_yaml.block_type == BlockType.FILE_UPLOAD: return FileUploadBlock( - label=block_yaml.label, - output_parameter=output_parameter, + **base_kwargs, storage_type=block_yaml.storage_type, s3_bucket=block_yaml.s3_bucket, aws_access_key_id=block_yaml.aws_access_key_id, @@ -2961,12 +2966,10 @@ class WorkflowService: azure_storage_account_key=block_yaml.azure_storage_account_key, azure_blob_container_name=block_yaml.azure_blob_container_name, path=block_yaml.path, - continue_on_failure=block_yaml.continue_on_failure, ) elif block_yaml.block_type == BlockType.SEND_EMAIL: return SendEmailBlock( - label=block_yaml.label, - output_parameter=output_parameter, + **base_kwargs, smtp_host=parameters[block_yaml.smtp_host_secret_parameter_key], smtp_port=parameters[block_yaml.smtp_port_secret_parameter_key], smtp_username=parameters[block_yaml.smtp_username_secret_parameter_key], @@ -2976,26 +2979,19 @@ class WorkflowService: subject=block_yaml.subject, body=block_yaml.body, file_attachments=block_yaml.file_attachments or [], - continue_on_failure=block_yaml.continue_on_failure, ) elif block_yaml.block_type == BlockType.FILE_URL_PARSER: return FileParserBlock( - label=block_yaml.label, - output_parameter=output_parameter, + **base_kwargs, file_url=block_yaml.file_url, file_type=block_yaml.file_type, json_schema=block_yaml.json_schema, - continue_on_failure=block_yaml.continue_on_failure, - model=block_yaml.model, ) elif block_yaml.block_type == BlockType.PDF_PARSER: return PDFParserBlock( - label=block_yaml.label, - output_parameter=output_parameter, + **base_kwargs, file_url=block_yaml.file_url, json_schema=block_yaml.json_schema, - continue_on_failure=block_yaml.continue_on_failure, - model=block_yaml.model, ) elif block_yaml.block_type == BlockType.VALIDATION: validation_block_parameters = ( @@ -3008,18 +3004,14 @@ class WorkflowService: raise Exception("Both complete criterion and terminate criterion are empty") return ValidationBlock( - label=block_yaml.label, + **base_kwargs, task_type=TaskType.validation, parameters=validation_block_parameters, - output_parameter=output_parameter, complete_criterion=block_yaml.complete_criterion, terminate_criterion=block_yaml.terminate_criterion, error_code_mapping=block_yaml.error_code_mapping, - continue_on_failure=block_yaml.continue_on_failure, # Should only need one step for validation block, but we allow 2 in case the LLM has an unexpected failure and we need to retry. max_steps_per_run=2, - disable_cache=block_yaml.disable_cache, - model=block_yaml.model, ) elif block_yaml.block_type == BlockType.ACTION: @@ -3033,20 +3025,17 @@ class WorkflowService: raise Exception("empty action instruction") return ActionBlock( - label=block_yaml.label, + **base_kwargs, url=block_yaml.url, title=block_yaml.title, engine=block_yaml.engine, task_type=TaskType.action, parameters=action_block_parameters, - output_parameter=output_parameter, navigation_goal=block_yaml.navigation_goal, error_code_mapping=block_yaml.error_code_mapping, max_retries=block_yaml.max_retries, - model=block_yaml.model, complete_on_download=block_yaml.complete_on_download, download_suffix=block_yaml.download_suffix, - continue_on_failure=block_yaml.continue_on_failure, totp_verification_url=block_yaml.totp_verification_url, totp_identifier=block_yaml.totp_identifier, disable_cache=block_yaml.disable_cache, @@ -3062,20 +3051,17 @@ class WorkflowService: else [] ) return NavigationBlock( - label=block_yaml.label, + **base_kwargs, url=block_yaml.url, title=block_yaml.title, engine=block_yaml.engine, parameters=navigation_block_parameters, - output_parameter=output_parameter, navigation_goal=block_yaml.navigation_goal, error_code_mapping=block_yaml.error_code_mapping, max_steps_per_run=block_yaml.max_steps_per_run, max_retries=block_yaml.max_retries, - model=block_yaml.model, complete_on_download=block_yaml.complete_on_download, download_suffix=block_yaml.download_suffix, - continue_on_failure=block_yaml.continue_on_failure, totp_verification_url=block_yaml.totp_verification_url, totp_identifier=block_yaml.totp_identifier, disable_cache=block_yaml.disable_cache, @@ -3087,8 +3073,7 @@ class WorkflowService: elif block_yaml.block_type == BlockType.HUMAN_INTERACTION: return HumanInteractionBlock( - label=block_yaml.label, - output_parameter=output_parameter, + **base_kwargs, instructions=block_yaml.instructions, positive_descriptor=block_yaml.positive_descriptor, negative_descriptor=block_yaml.negative_descriptor, @@ -3107,18 +3092,15 @@ class WorkflowService: else [] ) return ExtractionBlock( - label=block_yaml.label, + **base_kwargs, url=block_yaml.url, title=block_yaml.title, engine=block_yaml.engine, parameters=extraction_block_parameters, - output_parameter=output_parameter, data_extraction_goal=block_yaml.data_extraction_goal, data_schema=block_yaml.data_schema, max_steps_per_run=block_yaml.max_steps_per_run, max_retries=block_yaml.max_retries, - model=block_yaml.model, - continue_on_failure=block_yaml.continue_on_failure, disable_cache=block_yaml.disable_cache, complete_verification=False, ) @@ -3130,18 +3112,15 @@ class WorkflowService: else [] ) return LoginBlock( - label=block_yaml.label, + **base_kwargs, url=block_yaml.url, title=block_yaml.title, engine=block_yaml.engine, parameters=login_block_parameters, - output_parameter=output_parameter, navigation_goal=block_yaml.navigation_goal, error_code_mapping=block_yaml.error_code_mapping, max_steps_per_run=block_yaml.max_steps_per_run, max_retries=block_yaml.max_retries, - model=block_yaml.model, - continue_on_failure=block_yaml.continue_on_failure, totp_verification_url=block_yaml.totp_verification_url, totp_identifier=block_yaml.totp_identifier, disable_cache=block_yaml.disable_cache, @@ -3155,10 +3134,8 @@ class WorkflowService: raise InvalidWaitBlockTime(settings.WORKFLOW_WAIT_BLOCK_MAX_SEC) return WaitBlock( - label=block_yaml.label, + **base_kwargs, wait_sec=block_yaml.wait_sec, - continue_on_failure=block_yaml.continue_on_failure, - output_parameter=output_parameter, ) elif block_yaml.block_type == BlockType.FILE_DOWNLOAD: @@ -3168,19 +3145,16 @@ class WorkflowService: else [] ) return FileDownloadBlock( - label=block_yaml.label, + **base_kwargs, url=block_yaml.url, title=block_yaml.title, engine=block_yaml.engine, parameters=file_download_block_parameters, - output_parameter=output_parameter, navigation_goal=block_yaml.navigation_goal, error_code_mapping=block_yaml.error_code_mapping, max_steps_per_run=block_yaml.max_steps_per_run, max_retries=block_yaml.max_retries, - model=block_yaml.model, download_suffix=block_yaml.download_suffix, - continue_on_failure=block_yaml.continue_on_failure, totp_verification_url=block_yaml.totp_verification_url, totp_identifier=block_yaml.totp_identifier, disable_cache=block_yaml.disable_cache, @@ -3190,16 +3164,13 @@ class WorkflowService: ) elif block_yaml.block_type == BlockType.TaskV2: return TaskV2Block( - label=block_yaml.label, + **base_kwargs, prompt=block_yaml.prompt, url=block_yaml.url, totp_verification_url=block_yaml.totp_verification_url, totp_identifier=block_yaml.totp_identifier, max_iterations=block_yaml.max_iterations, max_steps=block_yaml.max_steps, - disable_cache=block_yaml.disable_cache, - model=block_yaml.model, - output_parameter=output_parameter, ) elif block_yaml.block_type == BlockType.HTTP_REQUEST: http_request_block_parameters = ( @@ -3208,7 +3179,7 @@ class WorkflowService: else [] ) return HttpRequestBlock( - label=block_yaml.label, + **base_kwargs, method=block_yaml.method, url=block_yaml.url, headers=block_yaml.headers, @@ -3216,14 +3187,11 @@ class WorkflowService: timeout=block_yaml.timeout, follow_redirects=block_yaml.follow_redirects, parameters=http_request_block_parameters, - output_parameter=output_parameter, - continue_on_failure=block_yaml.continue_on_failure, ) elif block_yaml.block_type == BlockType.GOTO_URL: return UrlBlock( - label=block_yaml.label, + **base_kwargs, url=block_yaml.url, - output_parameter=output_parameter, complete_verification=False, ) @@ -3448,3 +3416,12 @@ class WorkflowService: if workflow.run_with == "code": return True return False + + @staticmethod + def _has_dag_metadata(block_yamls: list[BLOCK_YAML_TYPES]) -> bool: + for block_yaml in block_yamls: + if block_yaml.next_block_label: + return True + if isinstance(block_yaml, ForLoopBlockYAML) and WorkflowService._has_dag_metadata(block_yaml.loop_blocks): + return True + return False diff --git a/skyvern/schemas/workflows.py b/skyvern/schemas/workflows.py index d1c335f5..c0b0b988 100644 --- a/skyvern/schemas/workflows.py +++ b/skyvern/schemas/workflows.py @@ -22,6 +22,7 @@ class BlockType(StrEnum): TASK = "task" TaskV2 = "task_v2" FOR_LOOP = "for_loop" + CONDITIONAL = "conditional" CODE = "code" TEXT_PROMPT = "text_prompt" DOWNLOAD_TO_S3 = "download_to_s3" @@ -197,10 +198,22 @@ class OutputParameterYAML(ParameterYAML): class BlockYAML(BaseModel, abc.ABC): block_type: BlockType - label: str + label: str = Field(description="Author-facing identifier; must be unique per workflow.") + next_block_label: str | None = Field( + default=None, + description="Optional pointer to the label of the next block. " + "When omitted, it will default to sequential order. See [[s-4bnl]].", + ) continue_on_failure: bool = False model: dict[str, Any] | None = None + @field_validator("label") + @classmethod + def validate_label(cls, value: str) -> str: + if not value or not value.strip(): + raise ValueError("Block labels cannot be empty.") + return value + class TaskBlockYAML(BlockYAML): # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: @@ -530,6 +543,7 @@ BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_t class WorkflowDefinitionYAML(BaseModel): + version: int = 1 parameters: list[PARAMETER_YAML_TYPES] blocks: list[BLOCK_YAML_TYPES]