Add workflow definition version (#4066)

This commit is contained in:
Shuchang Zheng
2025-11-21 17:23:39 -08:00
committed by GitHub
parent 8c1f00dea2
commit 335aa8f2df
4 changed files with 179 additions and 78 deletions

View File

@@ -26,7 +26,7 @@ from email_validator import EmailNotValidError, validate_email
from jinja2 import StrictUndefined from jinja2 import StrictUndefined
from jinja2.sandbox import SandboxedEnvironment from jinja2.sandbox import SandboxedEnvironment
from playwright.async_api import Page from playwright.async_api import Page
from pydantic import BaseModel, Field from pydantic import BaseModel, Field, model_validator
from pypdf import PdfReader from pypdf import PdfReader
from pypdf.errors import PdfReadError from pypdf.errors import PdfReadError
@@ -119,8 +119,15 @@ DEFAULT_MAX_STEPS_PER_ITERATION = 50
class Block(BaseModel, abc.ABC): class Block(BaseModel, abc.ABC):
"""Base class for workflow nodes (see branching spec [[s-4bnl]] for metadata semantics)."""
# Must be unique within workflow definition # Must be unique within workflow definition
label: str label: str = Field(description="Author-facing identifier for a block; unique within a workflow.")
next_block_label: str | None = Field(
default=None,
description="Optional pointer to the next block label when constructing a DAG. "
"Defaults to sequential order when omitted.",
)
block_type: BlockType block_type: BlockType
output_parameter: OutputParameter output_parameter: OutputParameter
continue_on_failure: bool = False continue_on_failure: bool = False
@@ -3821,6 +3828,107 @@ class HttpRequestBlock(Block):
) )
class BranchEvaluationContext(BaseModel):
"""Collection of runtime data that BranchCriteria evaluators can consume."""
workflow_parameters: dict[str, Any] = Field(default_factory=dict)
block_outputs: dict[str, Any] = Field(default_factory=dict)
environment: dict[str, Any] | None = None
llm_results: dict[str, Any] | None = None
class BranchCriteria(BaseModel, abc.ABC):
"""Abstract interface describing how a branch condition should be evaluated."""
criteria_type: str
description: str | None = None
@abc.abstractmethod
async def evaluate(self, context: BranchEvaluationContext) -> bool:
"""Return True when the branch should execute."""
raise NotImplementedError
def requires_llm(self) -> bool:
"""Whether the criteria relies on an LLM classification step."""
return False
class BranchCondition(BaseModel):
"""Represents a single conditional branch edge within a ConditionalBlock."""
criteria: BranchCriteria | None = None
next_block_label: str | None = None
description: str | None = None
order: int = Field(ge=0)
is_default: bool = False
@model_validator(mode="after")
def validate_condition(cls, condition: BranchCondition) -> BranchCondition:
if condition.criteria is None and not condition.is_default:
raise ValueError("Branches without criteria must be marked as default.")
if condition.criteria is not None and condition.is_default:
raise ValueError("Default branches may not define criteria.")
return condition
class ConditionalBlock(Block):
"""Branching block that selects the next block label based on ordered conditions."""
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
# Parameter 1 of Literal[...] cannot be of type "Any"
block_type: Literal[BlockType.CONDITIONAL] = BlockType.CONDITIONAL # type: ignore
branches: list[BranchCondition] = Field(default_factory=list)
@model_validator(mode="after")
def validate_branches(cls, block: ConditionalBlock) -> ConditionalBlock:
if not block.branches:
raise ValueError("Conditional blocks require at least one branch.")
orders = [branch.order for branch in block.branches]
if len(orders) != len(set(orders)):
raise ValueError("Branch order must be unique within a conditional block.")
default_branches = [branch for branch in block.branches if branch.is_default]
if len(default_branches) > 1:
raise ValueError("Only one default branch is permitted per conditional block.")
block.branches = sorted(block.branches, key=lambda branch: branch.order)
return block
def get_all_parameters(
self,
workflow_run_id: str, # noqa: ARG002 - preserved for interface compatibility
) -> list[PARAMETER_TYPE]:
# BranchCriteria subclasses will surface their parameter dependencies once implemented.
return []
async def execute( # noqa: D401
self,
workflow_run_id: str,
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
**kwargs: dict,
) -> BlockResult:
"""
Placeholder execute implementation.
Conditional block execution will be implemented alongside the DAG workflow
engine refactor (see branching workflow spec).
"""
raise NotImplementedError("Conditional block execution is handled by the DAG engine.")
@property
def ordered_branches(self) -> list[BranchCondition]:
"""Convenience accessor that returns branches sorted by order."""
return list(self.branches)
def get_default_branch(self) -> BranchCondition | None:
"""Return the default/else branch when configured."""
return next((branch for branch in self.branches if branch.is_default), None)
def get_all_blocks(blocks: list[BlockTypeVar]) -> list[BlockTypeVar]: def get_all_blocks(blocks: list[BlockTypeVar]) -> list[BlockTypeVar]:
""" """
Recursively get "all blocks" in a workflow definition. Recursively get "all blocks" in a workflow definition.
@@ -3842,6 +3950,7 @@ def get_all_blocks(blocks: list[BlockTypeVar]) -> list[BlockTypeVar]:
BlockSubclasses = Union[ BlockSubclasses = Union[
ConditionalBlock,
ForLoopBlock, ForLoopBlock,
TaskBlock, TaskBlock,
CodeBlock, CodeBlock,

View File

@@ -51,6 +51,7 @@ class RunWorkflowResponse(BaseModel):
class WorkflowDefinition(BaseModel): class WorkflowDefinition(BaseModel):
version: int = 1
parameters: list[PARAMETER_TYPE] parameters: list[PARAMETER_TYPE]
blocks: List[BlockTypeVar] blocks: List[BlockTypeVar]

View File

@@ -173,6 +173,8 @@ def _get_workflow_definition_core_data(workflow_definition: WorkflowDefinition)
"onepassword_credential_parameter_id", "onepassword_credential_parameter_id",
"azure_vault_credential_parameter_id", "azure_vault_credential_parameter_id",
"disable_cache", "disable_cache",
"next_block_label",
"version",
] ]
# Use BFS to recursively remove fields from all nested objects # Use BFS to recursively remove fields from all nested objects
@@ -2685,8 +2687,12 @@ class WorkflowService:
blocks.append(block) blocks.append(block)
block_label_mapping[block.label] = block block_label_mapping[block.label] = block
# Set the blocks for the workflow definition # Set the blocks for the workflow definition and derive DAG version metadata
workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks) dag_version = workflow_definition_yaml.version
if dag_version is None:
dag_version = 2 if WorkflowService._has_dag_metadata(workflow_definition_yaml.blocks) else 1
workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks, version=dag_version)
LOG.info( LOG.info(
f"Created workflow from request, title: {title}", f"Created workflow from request, title: {title}",
@@ -2839,12 +2845,26 @@ class WorkflowService:
) )
return output_parameters return output_parameters
@staticmethod
def _build_block_kwargs(
block_yaml: BLOCK_YAML_TYPES,
output_parameter: OutputParameter,
) -> dict[str, Any]:
return {
"label": block_yaml.label,
"next_block_label": block_yaml.next_block_label,
"output_parameter": output_parameter,
"continue_on_failure": block_yaml.continue_on_failure,
"model": block_yaml.model,
}
@staticmethod @staticmethod
async def block_yaml_to_block( async def block_yaml_to_block(
block_yaml: BLOCK_YAML_TYPES, block_yaml: BLOCK_YAML_TYPES,
parameters: dict[str, PARAMETER_TYPE], parameters: dict[str, PARAMETER_TYPE],
) -> BlockTypeVar: ) -> BlockTypeVar:
output_parameter = cast(OutputParameter, parameters[f"{block_yaml.label}_output"]) output_parameter = cast(OutputParameter, parameters[f"{block_yaml.label}_output"])
base_kwargs = WorkflowService._build_block_kwargs(block_yaml, output_parameter)
if block_yaml.block_type == BlockType.TASK: if block_yaml.block_type == BlockType.TASK:
task_block_parameters = ( task_block_parameters = (
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] [parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
@@ -2852,22 +2872,19 @@ class WorkflowService:
else [] else []
) )
return TaskBlock( return TaskBlock(
label=block_yaml.label, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
title=block_yaml.title, title=block_yaml.title,
engine=block_yaml.engine, engine=block_yaml.engine,
parameters=task_block_parameters, parameters=task_block_parameters,
output_parameter=output_parameter,
navigation_goal=block_yaml.navigation_goal, navigation_goal=block_yaml.navigation_goal,
data_extraction_goal=block_yaml.data_extraction_goal, data_extraction_goal=block_yaml.data_extraction_goal,
data_schema=block_yaml.data_schema, data_schema=block_yaml.data_schema,
error_code_mapping=block_yaml.error_code_mapping, error_code_mapping=block_yaml.error_code_mapping,
max_steps_per_run=block_yaml.max_steps_per_run, max_steps_per_run=block_yaml.max_steps_per_run,
max_retries=block_yaml.max_retries, max_retries=block_yaml.max_retries,
model=block_yaml.model,
complete_on_download=block_yaml.complete_on_download, complete_on_download=block_yaml.complete_on_download,
download_suffix=block_yaml.download_suffix, download_suffix=block_yaml.download_suffix,
continue_on_failure=block_yaml.continue_on_failure,
totp_verification_url=block_yaml.totp_verification_url, totp_verification_url=block_yaml.totp_verification_url,
totp_identifier=block_yaml.totp_identifier, totp_identifier=block_yaml.totp_identifier,
disable_cache=block_yaml.disable_cache, disable_cache=block_yaml.disable_cache,
@@ -2899,29 +2916,25 @@ class WorkflowService:
raise Exception("Loop value parameter is required for for loop block") raise Exception("Loop value parameter is required for for loop block")
return ForLoopBlock( return ForLoopBlock(
label=block_yaml.label, **base_kwargs,
loop_over=loop_over_parameter, loop_over=loop_over_parameter,
loop_variable_reference=block_yaml.loop_variable_reference, loop_variable_reference=block_yaml.loop_variable_reference,
loop_blocks=loop_blocks, loop_blocks=loop_blocks,
output_parameter=output_parameter,
continue_on_failure=block_yaml.continue_on_failure,
complete_if_empty=block_yaml.complete_if_empty, complete_if_empty=block_yaml.complete_if_empty,
) )
elif block_yaml.block_type == BlockType.CODE: elif block_yaml.block_type == BlockType.CODE:
return CodeBlock( return CodeBlock(
label=block_yaml.label, **base_kwargs,
code=block_yaml.code, code=block_yaml.code,
parameters=( parameters=(
[parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] [parameters[parameter_key] for parameter_key in block_yaml.parameter_keys]
if block_yaml.parameter_keys if block_yaml.parameter_keys
else [] else []
), ),
output_parameter=output_parameter,
continue_on_failure=block_yaml.continue_on_failure,
) )
elif block_yaml.block_type == BlockType.TEXT_PROMPT: elif block_yaml.block_type == BlockType.TEXT_PROMPT:
return TextPromptBlock( return TextPromptBlock(
label=block_yaml.label, **base_kwargs,
llm_key=block_yaml.llm_key, llm_key=block_yaml.llm_key,
prompt=block_yaml.prompt, prompt=block_yaml.prompt,
parameters=( parameters=(
@@ -2930,28 +2943,20 @@ class WorkflowService:
else [] else []
), ),
json_schema=block_yaml.json_schema, json_schema=block_yaml.json_schema,
output_parameter=output_parameter,
continue_on_failure=block_yaml.continue_on_failure,
model=block_yaml.model,
) )
elif block_yaml.block_type == BlockType.DOWNLOAD_TO_S3: elif block_yaml.block_type == BlockType.DOWNLOAD_TO_S3:
return DownloadToS3Block( return DownloadToS3Block(
label=block_yaml.label, **base_kwargs,
output_parameter=output_parameter,
url=block_yaml.url, url=block_yaml.url,
continue_on_failure=block_yaml.continue_on_failure,
) )
elif block_yaml.block_type == BlockType.UPLOAD_TO_S3: elif block_yaml.block_type == BlockType.UPLOAD_TO_S3:
return UploadToS3Block( return UploadToS3Block(
label=block_yaml.label, **base_kwargs,
output_parameter=output_parameter,
path=block_yaml.path, path=block_yaml.path,
continue_on_failure=block_yaml.continue_on_failure,
) )
elif block_yaml.block_type == BlockType.FILE_UPLOAD: elif block_yaml.block_type == BlockType.FILE_UPLOAD:
return FileUploadBlock( return FileUploadBlock(
label=block_yaml.label, **base_kwargs,
output_parameter=output_parameter,
storage_type=block_yaml.storage_type, storage_type=block_yaml.storage_type,
s3_bucket=block_yaml.s3_bucket, s3_bucket=block_yaml.s3_bucket,
aws_access_key_id=block_yaml.aws_access_key_id, aws_access_key_id=block_yaml.aws_access_key_id,
@@ -2961,12 +2966,10 @@ class WorkflowService:
azure_storage_account_key=block_yaml.azure_storage_account_key, azure_storage_account_key=block_yaml.azure_storage_account_key,
azure_blob_container_name=block_yaml.azure_blob_container_name, azure_blob_container_name=block_yaml.azure_blob_container_name,
path=block_yaml.path, path=block_yaml.path,
continue_on_failure=block_yaml.continue_on_failure,
) )
elif block_yaml.block_type == BlockType.SEND_EMAIL: elif block_yaml.block_type == BlockType.SEND_EMAIL:
return SendEmailBlock( return SendEmailBlock(
label=block_yaml.label, **base_kwargs,
output_parameter=output_parameter,
smtp_host=parameters[block_yaml.smtp_host_secret_parameter_key], smtp_host=parameters[block_yaml.smtp_host_secret_parameter_key],
smtp_port=parameters[block_yaml.smtp_port_secret_parameter_key], smtp_port=parameters[block_yaml.smtp_port_secret_parameter_key],
smtp_username=parameters[block_yaml.smtp_username_secret_parameter_key], smtp_username=parameters[block_yaml.smtp_username_secret_parameter_key],
@@ -2976,26 +2979,19 @@ class WorkflowService:
subject=block_yaml.subject, subject=block_yaml.subject,
body=block_yaml.body, body=block_yaml.body,
file_attachments=block_yaml.file_attachments or [], file_attachments=block_yaml.file_attachments or [],
continue_on_failure=block_yaml.continue_on_failure,
) )
elif block_yaml.block_type == BlockType.FILE_URL_PARSER: elif block_yaml.block_type == BlockType.FILE_URL_PARSER:
return FileParserBlock( return FileParserBlock(
label=block_yaml.label, **base_kwargs,
output_parameter=output_parameter,
file_url=block_yaml.file_url, file_url=block_yaml.file_url,
file_type=block_yaml.file_type, file_type=block_yaml.file_type,
json_schema=block_yaml.json_schema, json_schema=block_yaml.json_schema,
continue_on_failure=block_yaml.continue_on_failure,
model=block_yaml.model,
) )
elif block_yaml.block_type == BlockType.PDF_PARSER: elif block_yaml.block_type == BlockType.PDF_PARSER:
return PDFParserBlock( return PDFParserBlock(
label=block_yaml.label, **base_kwargs,
output_parameter=output_parameter,
file_url=block_yaml.file_url, file_url=block_yaml.file_url,
json_schema=block_yaml.json_schema, json_schema=block_yaml.json_schema,
continue_on_failure=block_yaml.continue_on_failure,
model=block_yaml.model,
) )
elif block_yaml.block_type == BlockType.VALIDATION: elif block_yaml.block_type == BlockType.VALIDATION:
validation_block_parameters = ( validation_block_parameters = (
@@ -3008,18 +3004,14 @@ class WorkflowService:
raise Exception("Both complete criterion and terminate criterion are empty") raise Exception("Both complete criterion and terminate criterion are empty")
return ValidationBlock( return ValidationBlock(
label=block_yaml.label, **base_kwargs,
task_type=TaskType.validation, task_type=TaskType.validation,
parameters=validation_block_parameters, parameters=validation_block_parameters,
output_parameter=output_parameter,
complete_criterion=block_yaml.complete_criterion, complete_criterion=block_yaml.complete_criterion,
terminate_criterion=block_yaml.terminate_criterion, terminate_criterion=block_yaml.terminate_criterion,
error_code_mapping=block_yaml.error_code_mapping, error_code_mapping=block_yaml.error_code_mapping,
continue_on_failure=block_yaml.continue_on_failure,
# Should only need one step for validation block, but we allow 2 in case the LLM has an unexpected failure and we need to retry. # Should only need one step for validation block, but we allow 2 in case the LLM has an unexpected failure and we need to retry.
max_steps_per_run=2, max_steps_per_run=2,
disable_cache=block_yaml.disable_cache,
model=block_yaml.model,
) )
elif block_yaml.block_type == BlockType.ACTION: elif block_yaml.block_type == BlockType.ACTION:
@@ -3033,20 +3025,17 @@ class WorkflowService:
raise Exception("empty action instruction") raise Exception("empty action instruction")
return ActionBlock( return ActionBlock(
label=block_yaml.label, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
title=block_yaml.title, title=block_yaml.title,
engine=block_yaml.engine, engine=block_yaml.engine,
task_type=TaskType.action, task_type=TaskType.action,
parameters=action_block_parameters, parameters=action_block_parameters,
output_parameter=output_parameter,
navigation_goal=block_yaml.navigation_goal, navigation_goal=block_yaml.navigation_goal,
error_code_mapping=block_yaml.error_code_mapping, error_code_mapping=block_yaml.error_code_mapping,
max_retries=block_yaml.max_retries, max_retries=block_yaml.max_retries,
model=block_yaml.model,
complete_on_download=block_yaml.complete_on_download, complete_on_download=block_yaml.complete_on_download,
download_suffix=block_yaml.download_suffix, download_suffix=block_yaml.download_suffix,
continue_on_failure=block_yaml.continue_on_failure,
totp_verification_url=block_yaml.totp_verification_url, totp_verification_url=block_yaml.totp_verification_url,
totp_identifier=block_yaml.totp_identifier, totp_identifier=block_yaml.totp_identifier,
disable_cache=block_yaml.disable_cache, disable_cache=block_yaml.disable_cache,
@@ -3062,20 +3051,17 @@ class WorkflowService:
else [] else []
) )
return NavigationBlock( return NavigationBlock(
label=block_yaml.label, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
title=block_yaml.title, title=block_yaml.title,
engine=block_yaml.engine, engine=block_yaml.engine,
parameters=navigation_block_parameters, parameters=navigation_block_parameters,
output_parameter=output_parameter,
navigation_goal=block_yaml.navigation_goal, navigation_goal=block_yaml.navigation_goal,
error_code_mapping=block_yaml.error_code_mapping, error_code_mapping=block_yaml.error_code_mapping,
max_steps_per_run=block_yaml.max_steps_per_run, max_steps_per_run=block_yaml.max_steps_per_run,
max_retries=block_yaml.max_retries, max_retries=block_yaml.max_retries,
model=block_yaml.model,
complete_on_download=block_yaml.complete_on_download, complete_on_download=block_yaml.complete_on_download,
download_suffix=block_yaml.download_suffix, download_suffix=block_yaml.download_suffix,
continue_on_failure=block_yaml.continue_on_failure,
totp_verification_url=block_yaml.totp_verification_url, totp_verification_url=block_yaml.totp_verification_url,
totp_identifier=block_yaml.totp_identifier, totp_identifier=block_yaml.totp_identifier,
disable_cache=block_yaml.disable_cache, disable_cache=block_yaml.disable_cache,
@@ -3087,8 +3073,7 @@ class WorkflowService:
elif block_yaml.block_type == BlockType.HUMAN_INTERACTION: elif block_yaml.block_type == BlockType.HUMAN_INTERACTION:
return HumanInteractionBlock( return HumanInteractionBlock(
label=block_yaml.label, **base_kwargs,
output_parameter=output_parameter,
instructions=block_yaml.instructions, instructions=block_yaml.instructions,
positive_descriptor=block_yaml.positive_descriptor, positive_descriptor=block_yaml.positive_descriptor,
negative_descriptor=block_yaml.negative_descriptor, negative_descriptor=block_yaml.negative_descriptor,
@@ -3107,18 +3092,15 @@ class WorkflowService:
else [] else []
) )
return ExtractionBlock( return ExtractionBlock(
label=block_yaml.label, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
title=block_yaml.title, title=block_yaml.title,
engine=block_yaml.engine, engine=block_yaml.engine,
parameters=extraction_block_parameters, parameters=extraction_block_parameters,
output_parameter=output_parameter,
data_extraction_goal=block_yaml.data_extraction_goal, data_extraction_goal=block_yaml.data_extraction_goal,
data_schema=block_yaml.data_schema, data_schema=block_yaml.data_schema,
max_steps_per_run=block_yaml.max_steps_per_run, max_steps_per_run=block_yaml.max_steps_per_run,
max_retries=block_yaml.max_retries, max_retries=block_yaml.max_retries,
model=block_yaml.model,
continue_on_failure=block_yaml.continue_on_failure,
disable_cache=block_yaml.disable_cache, disable_cache=block_yaml.disable_cache,
complete_verification=False, complete_verification=False,
) )
@@ -3130,18 +3112,15 @@ class WorkflowService:
else [] else []
) )
return LoginBlock( return LoginBlock(
label=block_yaml.label, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
title=block_yaml.title, title=block_yaml.title,
engine=block_yaml.engine, engine=block_yaml.engine,
parameters=login_block_parameters, parameters=login_block_parameters,
output_parameter=output_parameter,
navigation_goal=block_yaml.navigation_goal, navigation_goal=block_yaml.navigation_goal,
error_code_mapping=block_yaml.error_code_mapping, error_code_mapping=block_yaml.error_code_mapping,
max_steps_per_run=block_yaml.max_steps_per_run, max_steps_per_run=block_yaml.max_steps_per_run,
max_retries=block_yaml.max_retries, max_retries=block_yaml.max_retries,
model=block_yaml.model,
continue_on_failure=block_yaml.continue_on_failure,
totp_verification_url=block_yaml.totp_verification_url, totp_verification_url=block_yaml.totp_verification_url,
totp_identifier=block_yaml.totp_identifier, totp_identifier=block_yaml.totp_identifier,
disable_cache=block_yaml.disable_cache, disable_cache=block_yaml.disable_cache,
@@ -3155,10 +3134,8 @@ class WorkflowService:
raise InvalidWaitBlockTime(settings.WORKFLOW_WAIT_BLOCK_MAX_SEC) raise InvalidWaitBlockTime(settings.WORKFLOW_WAIT_BLOCK_MAX_SEC)
return WaitBlock( return WaitBlock(
label=block_yaml.label, **base_kwargs,
wait_sec=block_yaml.wait_sec, wait_sec=block_yaml.wait_sec,
continue_on_failure=block_yaml.continue_on_failure,
output_parameter=output_parameter,
) )
elif block_yaml.block_type == BlockType.FILE_DOWNLOAD: elif block_yaml.block_type == BlockType.FILE_DOWNLOAD:
@@ -3168,19 +3145,16 @@ class WorkflowService:
else [] else []
) )
return FileDownloadBlock( return FileDownloadBlock(
label=block_yaml.label, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
title=block_yaml.title, title=block_yaml.title,
engine=block_yaml.engine, engine=block_yaml.engine,
parameters=file_download_block_parameters, parameters=file_download_block_parameters,
output_parameter=output_parameter,
navigation_goal=block_yaml.navigation_goal, navigation_goal=block_yaml.navigation_goal,
error_code_mapping=block_yaml.error_code_mapping, error_code_mapping=block_yaml.error_code_mapping,
max_steps_per_run=block_yaml.max_steps_per_run, max_steps_per_run=block_yaml.max_steps_per_run,
max_retries=block_yaml.max_retries, max_retries=block_yaml.max_retries,
model=block_yaml.model,
download_suffix=block_yaml.download_suffix, download_suffix=block_yaml.download_suffix,
continue_on_failure=block_yaml.continue_on_failure,
totp_verification_url=block_yaml.totp_verification_url, totp_verification_url=block_yaml.totp_verification_url,
totp_identifier=block_yaml.totp_identifier, totp_identifier=block_yaml.totp_identifier,
disable_cache=block_yaml.disable_cache, disable_cache=block_yaml.disable_cache,
@@ -3190,16 +3164,13 @@ class WorkflowService:
) )
elif block_yaml.block_type == BlockType.TaskV2: elif block_yaml.block_type == BlockType.TaskV2:
return TaskV2Block( return TaskV2Block(
label=block_yaml.label, **base_kwargs,
prompt=block_yaml.prompt, prompt=block_yaml.prompt,
url=block_yaml.url, url=block_yaml.url,
totp_verification_url=block_yaml.totp_verification_url, totp_verification_url=block_yaml.totp_verification_url,
totp_identifier=block_yaml.totp_identifier, totp_identifier=block_yaml.totp_identifier,
max_iterations=block_yaml.max_iterations, max_iterations=block_yaml.max_iterations,
max_steps=block_yaml.max_steps, max_steps=block_yaml.max_steps,
disable_cache=block_yaml.disable_cache,
model=block_yaml.model,
output_parameter=output_parameter,
) )
elif block_yaml.block_type == BlockType.HTTP_REQUEST: elif block_yaml.block_type == BlockType.HTTP_REQUEST:
http_request_block_parameters = ( http_request_block_parameters = (
@@ -3208,7 +3179,7 @@ class WorkflowService:
else [] else []
) )
return HttpRequestBlock( return HttpRequestBlock(
label=block_yaml.label, **base_kwargs,
method=block_yaml.method, method=block_yaml.method,
url=block_yaml.url, url=block_yaml.url,
headers=block_yaml.headers, headers=block_yaml.headers,
@@ -3216,14 +3187,11 @@ class WorkflowService:
timeout=block_yaml.timeout, timeout=block_yaml.timeout,
follow_redirects=block_yaml.follow_redirects, follow_redirects=block_yaml.follow_redirects,
parameters=http_request_block_parameters, parameters=http_request_block_parameters,
output_parameter=output_parameter,
continue_on_failure=block_yaml.continue_on_failure,
) )
elif block_yaml.block_type == BlockType.GOTO_URL: elif block_yaml.block_type == BlockType.GOTO_URL:
return UrlBlock( return UrlBlock(
label=block_yaml.label, **base_kwargs,
url=block_yaml.url, url=block_yaml.url,
output_parameter=output_parameter,
complete_verification=False, complete_verification=False,
) )
@@ -3448,3 +3416,12 @@ class WorkflowService:
if workflow.run_with == "code": if workflow.run_with == "code":
return True return True
return False return False
@staticmethod
def _has_dag_metadata(block_yamls: list[BLOCK_YAML_TYPES]) -> bool:
for block_yaml in block_yamls:
if block_yaml.next_block_label:
return True
if isinstance(block_yaml, ForLoopBlockYAML) and WorkflowService._has_dag_metadata(block_yaml.loop_blocks):
return True
return False

View File

@@ -22,6 +22,7 @@ class BlockType(StrEnum):
TASK = "task" TASK = "task"
TaskV2 = "task_v2" TaskV2 = "task_v2"
FOR_LOOP = "for_loop" FOR_LOOP = "for_loop"
CONDITIONAL = "conditional"
CODE = "code" CODE = "code"
TEXT_PROMPT = "text_prompt" TEXT_PROMPT = "text_prompt"
DOWNLOAD_TO_S3 = "download_to_s3" DOWNLOAD_TO_S3 = "download_to_s3"
@@ -197,10 +198,22 @@ class OutputParameterYAML(ParameterYAML):
class BlockYAML(BaseModel, abc.ABC): class BlockYAML(BaseModel, abc.ABC):
block_type: BlockType block_type: BlockType
label: str label: str = Field(description="Author-facing identifier; must be unique per workflow.")
next_block_label: str | None = Field(
default=None,
description="Optional pointer to the label of the next block. "
"When omitted, it will default to sequential order. See [[s-4bnl]].",
)
continue_on_failure: bool = False continue_on_failure: bool = False
model: dict[str, Any] | None = None model: dict[str, Any] | None = None
@field_validator("label")
@classmethod
def validate_label(cls, value: str) -> str:
if not value or not value.strip():
raise ValueError("Block labels cannot be empty.")
return value
class TaskBlockYAML(BlockYAML): class TaskBlockYAML(BlockYAML):
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error: # There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
@@ -530,6 +543,7 @@ BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_t
class WorkflowDefinitionYAML(BaseModel): class WorkflowDefinitionYAML(BaseModel):
version: int = 1
parameters: list[PARAMETER_YAML_TYPES] parameters: list[PARAMETER_YAML_TYPES]
blocks: list[BLOCK_YAML_TYPES] blocks: list[BLOCK_YAML_TYPES]