shu/DAG workflow execution engine part1 (#4219)

This commit is contained in:
Shuchang Zheng
2025-12-05 22:20:28 -05:00
committed by GitHub
parent 33d4d87102
commit 0dba57f423
3 changed files with 625 additions and 248 deletions

View File

@@ -48,7 +48,7 @@ if TYPE_CHECKING:
LOG = structlog.get_logger()
BlockMetadata = dict[str, str | int | float | bool | dict | list]
BlockMetadata = dict[str, str | int | float | bool | dict | list | None]
jinja_sandbox_env = SandboxedEnvironment()

View File

@@ -4044,6 +4044,40 @@ class BranchCriteria(BaseModel, abc.ABC):
return False
def _evaluate_truthy_string(value: str) -> bool:
"""
Evaluate a string as a boolean, handling common truthy/falsy representations.
Truthy: "true", "True", "TRUE", "1", "yes", "y", "on", non-zero numbers
Falsy: "", "false", "False", "FALSE", "0", "no", "n", "off", "null", "None", whitespace-only
For other strings, use Python's default bool() behavior (non-empty = truthy).
"""
if not value or not value.strip():
return False
normalized = value.strip().lower()
# Explicit falsy values
if normalized in ("false", "0", "no", "n", "off", "null", "none"):
return False
# Explicit truthy values
if normalized in ("true", "1", "yes", "y", "on"):
return True
# Try to parse as a number
try:
num = float(normalized)
return num != 0.0
except ValueError:
pass
# For any other non-empty string, consider it truthy
# This allows expressions like "{{ 'some text' }}" to be truthy
return True
class JinjaBranchCriteria(BranchCriteria):
"""Jinja2-templated branch criteria (only supported criteria type for now)."""
@@ -4073,12 +4107,13 @@ class JinjaBranchCriteria(BranchCriteria):
msg=str(exc),
) from exc
return bool(rendered)
return _evaluate_truthy_string(rendered)
class BranchCondition(BaseModel):
"""Represents a single conditional branch edge within a ConditionalBlock."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
criteria: BranchCriteriaTypeVar | None = None
next_block_label: str | None = None
description: str | None = None
@@ -4133,12 +4168,98 @@ class ConditionalBlock(Block):
**kwargs: dict,
) -> BlockResult:
"""
Placeholder execute implementation.
Evaluate conditional branches and determine next block to execute.
Conditional block execution will be implemented alongside the DAG workflow
engine refactor (see branching workflow spec).
Returns a BlockResult with branch metadata in the output_parameter_value.
"""
raise NotImplementedError("Conditional block execution is handled by the DAG engine.")
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
evaluation_context = BranchEvaluationContext(
workflow_run_context=workflow_run_context,
block_label=self.label,
)
matched_branch = None
failure_reason: str | None = None
for idx, branch in enumerate(self.ordered_branches):
if branch.criteria is None:
continue
try:
if await branch.criteria.evaluate(evaluation_context):
matched_branch = branch
LOG.info(
"Conditional branch matched",
block_label=self.label,
branch_index=idx,
next_block_label=branch.next_block_label,
)
break
except Exception as exc:
failure_reason = f"Failed to evaluate branch {idx} for {self.label}: {str(exc)}"
LOG.error(
"Failed to evaluate conditional branch",
block_label=self.label,
branch_index=idx,
error=str(exc),
exc_info=True,
)
break
if matched_branch is None and failure_reason is None:
matched_branch = self.get_default_branch()
matched_index = self.ordered_branches.index(matched_branch) if matched_branch in self.ordered_branches else None
next_block_label = matched_branch.next_block_label if matched_branch else None
branch_metadata: BlockMetadata = {
"branch_taken": next_block_label,
"branch_index": matched_index,
"branch_description": matched_branch.description if matched_branch else None,
"criteria_type": matched_branch.criteria.criteria_type
if matched_branch and matched_branch.criteria
else None,
"criteria_expression": matched_branch.criteria.expression
if matched_branch and matched_branch.criteria
else None,
"next_block_label": next_block_label,
}
status = BlockStatus.completed
success = True
if failure_reason:
status = BlockStatus.failed
success = False
elif matched_branch is None:
failure_reason = "No conditional branch matched and no default branch configured"
status = BlockStatus.failed
success = False
if workflow_run_context:
workflow_run_context.update_block_metadata(self.label, branch_metadata)
try:
await self.record_output_parameter_value(
workflow_run_context=workflow_run_context,
workflow_run_id=workflow_run_id,
value=branch_metadata,
)
except Exception as exc:
LOG.warning(
"Failed to record branch metadata as output parameter",
workflow_run_id=workflow_run_id,
block_label=self.label,
error=str(exc),
)
block_result = await self.build_block_result(
success=success,
failure_reason=failure_reason,
output_parameter_value=branch_metadata,
status=status,
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
)
return block_result
@property
def ordered_branches(self) -> list[BranchCondition]:

View File

@@ -60,7 +60,9 @@ from skyvern.forge.sdk.workflow.exceptions import (
from skyvern.forge.sdk.workflow.models.block import (
ActionBlock,
BlockTypeVar,
BranchCondition,
CodeBlock,
ConditionalBlock,
DownloadToS3Block,
ExtractionBlock,
FileDownloadBlock,
@@ -69,6 +71,7 @@ from skyvern.forge.sdk.workflow.models.block import (
ForLoopBlock,
HttpRequestBlock,
HumanInteractionBlock,
JinjaBranchCriteria,
LoginBlock,
NavigationBlock,
PDFParserBlock,
@@ -869,287 +872,514 @@ class WorkflowService:
if not blocks:
raise SkyvernException(f"No blocks found for the given block labels: {block_labels}")
workflow_version = workflow.workflow_definition.version or 1
if workflow_version >= 2 and not block_labels:
return await self._execute_workflow_blocks_dag(
workflow=workflow,
workflow_run=workflow_run,
organization=organization,
browser_session_id=browser_session_id,
script_blocks_by_label=script_blocks_by_label,
loaded_script_module=loaded_script_module,
is_script_run=is_script_run,
blocks_to_update=blocks_to_update,
)
#
# Execute workflow blocks
blocks_cnt = len(blocks)
block_result = None
for block_idx, block in enumerate(blocks):
try:
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
):
workflow_run = refreshed_workflow_run
if workflow_run.status == WorkflowRunStatus.canceled:
LOG.info(
"Workflow run is canceled, stopping execution inside workflow execution loop",
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_type=block.block_type,
block_label=block.label,
)
break
(
workflow_run,
blocks_to_update,
block_result,
should_stop,
_,
) = await self._execute_single_block(
block=block,
block_idx=block_idx,
blocks_cnt=blocks_cnt,
workflow_run=workflow_run,
organization=organization,
workflow_run_id=workflow_run_id,
browser_session_id=browser_session_id,
script_blocks_by_label=script_blocks_by_label,
loaded_script_module=loaded_script_module,
is_script_run=is_script_run,
blocks_to_update=blocks_to_update,
)
if workflow_run.status == WorkflowRunStatus.timed_out:
LOG.info(
"Workflow run is timed out, stopping execution inside workflow execution loop",
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_type=block.block_type,
block_label=block.label,
)
break
if should_stop:
break
return workflow_run, blocks_to_update
parameters = block.get_all_parameters(workflow_run_id)
await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run(
workflow_run_id, parameters, organization
async def _execute_workflow_blocks_dag(
self,
*,
workflow: Workflow,
workflow_run: WorkflowRun,
organization: Organization,
browser_session_id: str | None,
script_blocks_by_label: dict[str, Any],
loaded_script_module: Any,
is_script_run: bool,
blocks_to_update: set[str],
) -> tuple[WorkflowRun, set[str]]:
try:
start_label, label_to_block, default_next_map = self._build_workflow_graph(
workflow.workflow_definition.blocks
)
except InvalidWorkflowDefinition as exc:
LOG.error("Workflow graph validation failed", error=str(exc), workflow_id=workflow.workflow_id)
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id,
failure_reason=str(exc),
)
return workflow_run, blocks_to_update
visited_labels: set[str] = set()
current_label = start_label
block_idx = 0
total_blocks = len(label_to_block)
while current_label:
block = label_to_block.get(current_label)
if not block:
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id,
failure_reason=f"Unable to find block with label {current_label}",
)
break
(
workflow_run,
blocks_to_update,
block_result,
should_stop,
branch_metadata,
) = await self._execute_single_block(
block=block,
block_idx=block_idx,
blocks_cnt=total_blocks,
workflow_run=workflow_run,
organization=organization,
workflow_run_id=workflow_run.workflow_run_id,
browser_session_id=browser_session_id,
script_blocks_by_label=script_blocks_by_label,
loaded_script_module=loaded_script_module,
is_script_run=is_script_run,
blocks_to_update=blocks_to_update,
)
visited_labels.add(current_label)
if should_stop:
break
next_label = None
if isinstance(block, ConditionalBlock):
next_label = (branch_metadata or {}).get("next_block_label")
else:
next_label = default_next_map.get(block.label)
if not next_label:
LOG.info(
f"Executing root block {block.block_type} at index {block_idx}/{blocks_cnt - 1} for workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_type_var=block.block_type,
"DAG traversal reached terminal node",
workflow_run_id=workflow_run.workflow_run_id,
block_label=block.label,
model=block.model,
)
break
# Try executing with script code if available
block_executed_with_code = False
valid_to_run_code = (
is_script_run and block.label and block.label in script_blocks_by_label and not block.disable_cache
if next_label not in label_to_block:
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id,
failure_reason=f"Next block label {next_label} not found in workflow definition",
)
if valid_to_run_code:
script_block = script_blocks_by_label[block.label]
break
if next_label in visited_labels:
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id,
failure_reason=f"Cycle detected while traversing workflow definition at block {next_label}",
)
break
block_idx += 1
current_label = next_label
return workflow_run, blocks_to_update
async def _execute_single_block(
self,
*,
block: BlockTypeVar,
block_idx: int,
blocks_cnt: int,
workflow_run: WorkflowRun,
organization: Organization,
workflow_run_id: str,
browser_session_id: str | None,
script_blocks_by_label: dict[str, Any],
loaded_script_module: Any,
is_script_run: bool,
blocks_to_update: set[str],
) -> tuple[WorkflowRun, set[str], BlockResult | None, bool, dict[str, Any] | None]:
organization_id = organization.organization_id
workflow_run_block_result: BlockResult | None = None
branch_metadata: dict[str, Any] | None = None
block_executed_with_code = False
try:
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
):
workflow_run = refreshed_workflow_run
if workflow_run.status == WorkflowRunStatus.canceled:
LOG.info(
"Attempting to execute block with script code",
"Workflow run is canceled, stopping execution inside workflow execution loop",
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_type=block.block_type,
block_label=block.label,
run_signature=script_block.run_signature,
)
return workflow_run, blocks_to_update, workflow_run_block_result, True, branch_metadata
if workflow_run.status == WorkflowRunStatus.timed_out:
LOG.info(
"Workflow run is timed out, stopping execution inside workflow execution loop",
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_type=block.block_type,
block_label=block.label,
)
return workflow_run, blocks_to_update, workflow_run_block_result, True, branch_metadata
parameters = block.get_all_parameters(workflow_run_id)
await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run(
workflow_run_id, parameters, organization
)
LOG.info(
f"Executing root block {block.block_type} at index {block_idx}/{blocks_cnt - 1} for workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_type_var=block.block_type,
block_label=block.label,
model=block.model,
)
valid_to_run_code = (
is_script_run and block.label and block.label in script_blocks_by_label and not block.disable_cache
)
if valid_to_run_code:
script_block = script_blocks_by_label[block.label]
LOG.info(
"Attempting to execute block with script code",
block_label=block.label,
run_signature=script_block.run_signature,
)
try:
vars_dict = vars(loaded_script_module) if loaded_script_module else {}
exec_globals = {
**vars_dict,
"skyvern": skyvern,
"__builtins__": __builtins__,
}
assert script_block.run_signature is not None
normalized_signature = textwrap.dedent(script_block.run_signature).strip()
indented_signature = textwrap.indent(normalized_signature, " ")
wrapper_code = f"async def __run_signature_wrapper():\n return (\n{indented_signature}\n )\n"
LOG.debug("Executing run_signature wrapper", wrapper_code=wrapper_code)
try:
# Execute the run signature and capture the return value
vars_dict = vars(loaded_script_module) if loaded_script_module else {}
exec_globals = {
**vars_dict,
"skyvern": skyvern,
"__builtins__": __builtins__,
}
# Use exec to handle multi-line run_signature statements
# Create an async function and execute it
# Dedent first to normalize indentation, then re-indent for function body
assert script_block.run_signature is not None
normalized_signature = textwrap.dedent(script_block.run_signature).strip()
# Add 8 spaces (2 levels: function + return statement)
indented_signature = textwrap.indent(normalized_signature, " ")
# Build the wrapper function
wrapper_code = (
f"async def __run_signature_wrapper():\n return (\n{indented_signature}\n )\n"
)
LOG.debug("Executing run_signature wrapper", wrapper_code=wrapper_code)
try:
exec_code = compile(wrapper_code, "<run_signature>", "exec")
exec(exec_code, exec_globals)
output_value = await exec_globals["__run_signature_wrapper"]()
except ScriptTerminationException as e:
LOG.warning(
"Script termination",
block_label=block.label,
error=str(e),
exc_info=True,
)
# Execution succeeded - get the block result from the workflow run blocks
# The script execution should have created the workflow run block
workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
# Find the most recent block with matching label
matching_blocks = [b for b in workflow_run_blocks if b.label == block.label]
if matching_blocks:
latest_block = max(matching_blocks, key=lambda b: b.created_at)
# Construct BlockResult from the workflow_run_block
block_result = BlockResult(
success=latest_block.status == BlockStatus.completed,
failure_reason=latest_block.failure_reason,
output_parameter=block.output_parameter,
output_parameter_value=latest_block.output,
status=BlockStatus(latest_block.status) if latest_block.status else BlockStatus.failed,
workflow_run_block_id=latest_block.workflow_run_block_id,
)
block_executed_with_code = True
LOG.info(
"Successfully executed block with script code",
block_label=block.label,
block_status=block_result.status,
has_output=output_value is not None,
)
else:
LOG.warning(
"Block executed with code but no workflow run block found",
block_label=block.label,
)
# Fallback to AI execution
block_executed_with_code = False
except Exception as e:
exec_code = compile(wrapper_code, "<run_signature>", "exec")
exec(exec_code, exec_globals)
output_value = await exec_globals["__run_signature_wrapper"]()
except ScriptTerminationException as e:
LOG.warning(
"Failed to execute block with script code, falling back to AI",
"Script termination",
block_label=block.label,
error=str(e),
exc_info=True,
)
block_executed_with_code = False
# Execute with AI if code execution was not attempted or failed
if not block_executed_with_code:
LOG.info(
"Executing block with AI",
block_label=block.label,
block_type=block.block_type,
)
block_result = await block.execute_safe(
workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
browser_session_id=browser_session_id,
)
if not block_result:
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id, failure_reason="Block result is None"
)
break
if (
not block_executed_with_code
and block.label
and block.label not in script_blocks_by_label
and block_result.status == BlockStatus.completed
and block.block_type in BLOCK_TYPES_THAT_SHOULD_BE_CACHED
):
blocks_to_update.add(block.label)
if block_result.status == BlockStatus.canceled:
LOG.info(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was canceled for workflow run {workflow_run_id}, cancelling workflow run",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
block_type_var=block.block_type,
block_label=block.label,
)
workflow_run = await self.mark_workflow_run_as_canceled(workflow_run_id=workflow_run_id)
break
elif block_result.status == BlockStatus.failed:
LOG.error(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed for workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
block_type_var=block.block_type,
block_label=block.label,
)
if not block.continue_on_failure:
failure_reason = (
f"{block.block_type} block failed. failure reason: {block_result.failure_reason}"
matching_blocks = [b for b in workflow_run_blocks if b.label == block.label]
if matching_blocks:
latest_block = max(matching_blocks, key=lambda b: b.created_at)
workflow_run_block_result = BlockResult(
success=latest_block.status == BlockStatus.completed,
failure_reason=latest_block.failure_reason,
output_parameter=block.output_parameter,
output_parameter_value=latest_block.output,
status=BlockStatus(latest_block.status) if latest_block.status else BlockStatus.failed,
workflow_run_block_id=latest_block.workflow_run_block_id,
)
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id, failure_reason=failure_reason
block_executed_with_code = True
LOG.info(
"Successfully executed block with script code",
block_label=block.label,
block_status=workflow_run_block_result.status,
has_output=output_value is not None,
)
break
else:
LOG.warning(
"Block executed with code but no workflow run block found",
block_label=block.label,
)
block_executed_with_code = False
except Exception as e:
LOG.warning(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed but will continue executing the workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
continue_on_failure=block.continue_on_failure,
block_type_var=block.block_type,
"Failed to execute block with script code, falling back to AI",
block_label=block.label,
error=str(e),
exc_info=True,
)
block_executed_with_code = False
elif block_result.status == BlockStatus.terminated:
LOG.info(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
block_type_var=block.block_type,
block_label=block.label,
)
if not block.continue_on_failure:
failure_reason = f"{block.block_type} block terminated. Reason: {block_result.failure_reason}"
workflow_run = await self.mark_workflow_run_as_terminated(
workflow_run_id=workflow_run_id, failure_reason=failure_reason
)
break
LOG.warning(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
continue_on_failure=block.continue_on_failure,
block_type_var=block.block_type,
block_label=block.label,
)
elif block_result.status == BlockStatus.timed_out:
LOG.info(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, marking workflow run as failed",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
block_type_var=block.block_type,
block_label=block.label,
)
if not block.continue_on_failure:
failure_reason = f"{block.block_type} block timed out. Reason: {block_result.failure_reason}"
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id, failure_reason=failure_reason
)
break
LOG.warning(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, but will continue executing the workflow run",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
continue_on_failure=block.continue_on_failure,
block_type_var=block.block_type,
block_label=block.label,
)
except Exception as e:
LOG.exception(
f"Error while executing workflow run {workflow_run_id}",
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_type=block.block_type,
if not block_executed_with_code:
LOG.info(
"Executing block",
block_label=block.label,
block_type=block.block_type,
)
workflow_run_block_result = await block.execute_safe(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
browser_session_id=browser_session_id,
)
exception_message = f"Unexpected error: {str(e)}"
if isinstance(e, SkyvernException):
exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}"
# Extract branch metadata for conditional blocks
if isinstance(block, ConditionalBlock) and workflow_run_block_result:
branch_metadata = cast(dict[str, Any] | None, workflow_run_block_result.output_parameter_value)
failure_reason = f"{block.block_type} block failed. failure reason: {exception_message}"
if not workflow_run_block_result:
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id, failure_reason="Block result is None"
)
return workflow_run, blocks_to_update, workflow_run_block_result, True, branch_metadata
if (
not block_executed_with_code
and block.label
and block.label not in script_blocks_by_label
and workflow_run_block_result.status == BlockStatus.completed
and block.block_type in BLOCK_TYPES_THAT_SHOULD_BE_CACHED
):
blocks_to_update.add(block.label)
workflow_run, should_stop = await self._handle_block_result_status(
block=block,
block_idx=block_idx,
blocks_cnt=blocks_cnt,
block_result=workflow_run_block_result,
workflow_run=workflow_run,
workflow_run_id=workflow_run_id,
)
return workflow_run, blocks_to_update, workflow_run_block_result, should_stop, branch_metadata
except Exception as e:
LOG.exception(
f"Error while executing workflow run {workflow_run_id}",
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_type=block.block_type,
block_label=block.label,
)
exception_message = f"Unexpected error: {str(e)}"
if isinstance(e, SkyvernException):
exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}"
failure_reason = f"{block.block_type} block failed. failure reason: {exception_message}"
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id, failure_reason=failure_reason
)
return workflow_run, blocks_to_update, workflow_run_block_result, True, branch_metadata
async def _handle_block_result_status(
self,
*,
block: BlockTypeVar,
block_idx: int,
blocks_cnt: int,
block_result: BlockResult,
workflow_run: WorkflowRun,
workflow_run_id: str,
) -> tuple[WorkflowRun, bool]:
if block_result.status == BlockStatus.canceled:
LOG.info(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was canceled for workflow run {workflow_run_id}, cancelling workflow run",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
block_type_var=block.block_type,
block_label=block.label,
)
workflow_run = await self.mark_workflow_run_as_canceled(workflow_run_id=workflow_run_id)
return workflow_run, True
if block_result.status == BlockStatus.failed:
LOG.error(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed for workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
block_type_var=block.block_type,
block_label=block.label,
)
if not block.continue_on_failure:
failure_reason = f"{block.block_type} block failed. failure reason: {block_result.failure_reason}"
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id, failure_reason=failure_reason
)
break
return workflow_run, blocks_to_update
return workflow_run, True
LOG.warning(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed but will continue executing the workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
continue_on_failure=block.continue_on_failure,
block_type_var=block.block_type,
block_label=block.label,
)
return workflow_run, False
if block_result.status == BlockStatus.terminated:
LOG.info(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
block_type_var=block.block_type,
block_label=block.label,
)
if not block.continue_on_failure:
failure_reason = f"{block.block_type} block terminated. Reason: {block_result.failure_reason}"
workflow_run = await self.mark_workflow_run_as_terminated(
workflow_run_id=workflow_run_id, failure_reason=failure_reason
)
return workflow_run, True
LOG.warning(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
continue_on_failure=block.continue_on_failure,
block_type_var=block.block_type,
block_label=block.label,
)
return workflow_run, False
if block_result.status == BlockStatus.timed_out:
LOG.info(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, marking workflow run as failed",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
block_type_var=block.block_type,
block_label=block.label,
)
if not block.continue_on_failure:
failure_reason = f"{block.block_type} block timed out. Reason: {block_result.failure_reason}"
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id, failure_reason=failure_reason
)
return workflow_run, True
LOG.warning(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, but will continue executing the workflow run",
block_type=block.block_type,
workflow_run_id=workflow_run_id,
block_idx=block_idx,
block_result=block_result,
continue_on_failure=block.continue_on_failure,
block_type_var=block.block_type,
block_label=block.label,
)
return workflow_run, False
return workflow_run, False
def _build_workflow_graph(
self,
blocks: list[BlockTypeVar],
) -> tuple[str, dict[str, BlockTypeVar], dict[str, str | None]]:
all_blocks = get_all_blocks(blocks)
label_to_block: dict[str, BlockTypeVar] = {}
default_next_map: dict[str, str | None] = {}
for block in all_blocks:
if block.label in label_to_block:
raise InvalidWorkflowDefinition(f"Duplicate block label detected: {block.label}")
label_to_block[block.label] = block
default_next_map[block.label] = block.next_block_label
# Only apply sequential defaulting if there are no conditional blocks
# Conditional blocks break sequential ordering since they have multiple branches
has_conditional_blocks = any(isinstance(block, ConditionalBlock) for block in all_blocks)
if not has_conditional_blocks:
for idx, block in enumerate(blocks[:-1]):
if default_next_map.get(block.label) is None:
default_next_map[block.label] = blocks[idx + 1].label
adjacency: dict[str, set[str]] = {label: set() for label in label_to_block}
incoming: dict[str, int] = {label: 0 for label in label_to_block}
def _add_edge(source: str, target: str | None) -> None:
if not target:
return
if target not in label_to_block:
raise InvalidWorkflowDefinition(f"Block {source} references unknown next_block_label {target}")
adjacency[source].add(target)
incoming[target] += 1
for label, block in label_to_block.items():
if isinstance(block, ConditionalBlock):
for branch in block.ordered_branches:
_add_edge(label, branch.next_block_label)
else:
_add_edge(label, default_next_map.get(label))
roots = [label for label, count in incoming.items() if count == 0]
if not roots:
raise InvalidWorkflowDefinition("No entry block found for workflow definition")
if len(roots) > 1:
raise InvalidWorkflowDefinition(
f"Multiple entry blocks detected ({', '.join(sorted(roots))}); only one entry block is supported."
)
# Kahn's algorithm for cycle detection
queue: deque[str] = deque([roots[0]])
visited_count = 0
in_degree = dict(incoming)
while queue:
node = queue.popleft()
visited_count += 1
for neighbor in adjacency[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if visited_count != len(label_to_block):
raise InvalidWorkflowDefinition("Workflow definition contains a cycle; DAG traversal is required.")
return roots[0], label_to_block, default_next_map
async def create_workflow(
self,
@@ -3112,6 +3342,32 @@ class WorkflowService:
loop_blocks=loop_blocks,
complete_if_empty=block_yaml.complete_if_empty,
)
elif block_yaml.block_type == BlockType.CONDITIONAL:
branch_conditions = []
for branch in block_yaml.branch_conditions:
branch_criteria = (
JinjaBranchCriteria(
criteria_type=branch.criteria.criteria_type,
expression=branch.criteria.expression,
description=branch.criteria.description,
)
if branch.criteria
else None
)
branch_conditions.append(
BranchCondition(
criteria=branch_criteria,
next_block_label=branch.next_block_label,
description=branch.description,
is_default=branch.is_default,
)
)
return ConditionalBlock(
**base_kwargs,
branch_conditions=branch_conditions,
)
elif block_yaml.block_type == BlockType.CODE:
return CodeBlock(
**base_kwargs,