From 0dba57f423cbec80a9f07438c8adbdad4fba35d5 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Fri, 5 Dec 2025 22:20:28 -0500 Subject: [PATCH] shu/DAG workflow execution engine part1 (#4219) --- skyvern/forge/sdk/workflow/context_manager.py | 2 +- skyvern/forge/sdk/workflow/models/block.py | 131 +++- skyvern/forge/sdk/workflow/service.py | 740 ++++++++++++------ 3 files changed, 625 insertions(+), 248 deletions(-) diff --git a/skyvern/forge/sdk/workflow/context_manager.py b/skyvern/forge/sdk/workflow/context_manager.py index cc3d59ea..b12d85a5 100644 --- a/skyvern/forge/sdk/workflow/context_manager.py +++ b/skyvern/forge/sdk/workflow/context_manager.py @@ -48,7 +48,7 @@ if TYPE_CHECKING: LOG = structlog.get_logger() -BlockMetadata = dict[str, str | int | float | bool | dict | list] +BlockMetadata = dict[str, str | int | float | bool | dict | list | None] jinja_sandbox_env = SandboxedEnvironment() diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 77c3c9f8..0ba5057c 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -4044,6 +4044,40 @@ class BranchCriteria(BaseModel, abc.ABC): return False +def _evaluate_truthy_string(value: str) -> bool: + """ + Evaluate a string as a boolean, handling common truthy/falsy representations. + + Truthy: "true", "True", "TRUE", "1", "yes", "y", "on", non-zero numbers + Falsy: "", "false", "False", "FALSE", "0", "no", "n", "off", "null", "None", whitespace-only + + For other strings, use Python's default bool() behavior (non-empty = truthy). + """ + if not value or not value.strip(): + return False + + normalized = value.strip().lower() + + # Explicit falsy values + if normalized in ("false", "0", "no", "n", "off", "null", "none"): + return False + + # Explicit truthy values + if normalized in ("true", "1", "yes", "y", "on"): + return True + + # Try to parse as a number + try: + num = float(normalized) + return num != 0.0 + except ValueError: + pass + + # For any other non-empty string, consider it truthy + # This allows expressions like "{{ 'some text' }}" to be truthy + return True + + class JinjaBranchCriteria(BranchCriteria): """Jinja2-templated branch criteria (only supported criteria type for now).""" @@ -4073,12 +4107,13 @@ class JinjaBranchCriteria(BranchCriteria): msg=str(exc), ) from exc - return bool(rendered) + return _evaluate_truthy_string(rendered) class BranchCondition(BaseModel): """Represents a single conditional branch edge within a ConditionalBlock.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) criteria: BranchCriteriaTypeVar | None = None next_block_label: str | None = None description: str | None = None @@ -4133,12 +4168,98 @@ class ConditionalBlock(Block): **kwargs: dict, ) -> BlockResult: """ - Placeholder execute implementation. + Evaluate conditional branches and determine next block to execute. - Conditional block execution will be implemented alongside the DAG workflow - engine refactor (see branching workflow spec). + Returns a BlockResult with branch metadata in the output_parameter_value. """ - raise NotImplementedError("Conditional block execution is handled by the DAG engine.") + workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id) + evaluation_context = BranchEvaluationContext( + workflow_run_context=workflow_run_context, + block_label=self.label, + ) + + matched_branch = None + failure_reason: str | None = None + + for idx, branch in enumerate(self.ordered_branches): + if branch.criteria is None: + continue + try: + if await branch.criteria.evaluate(evaluation_context): + matched_branch = branch + LOG.info( + "Conditional branch matched", + block_label=self.label, + branch_index=idx, + next_block_label=branch.next_block_label, + ) + break + except Exception as exc: + failure_reason = f"Failed to evaluate branch {idx} for {self.label}: {str(exc)}" + LOG.error( + "Failed to evaluate conditional branch", + block_label=self.label, + branch_index=idx, + error=str(exc), + exc_info=True, + ) + break + + if matched_branch is None and failure_reason is None: + matched_branch = self.get_default_branch() + + matched_index = self.ordered_branches.index(matched_branch) if matched_branch in self.ordered_branches else None + next_block_label = matched_branch.next_block_label if matched_branch else None + + branch_metadata: BlockMetadata = { + "branch_taken": next_block_label, + "branch_index": matched_index, + "branch_description": matched_branch.description if matched_branch else None, + "criteria_type": matched_branch.criteria.criteria_type + if matched_branch and matched_branch.criteria + else None, + "criteria_expression": matched_branch.criteria.expression + if matched_branch and matched_branch.criteria + else None, + "next_block_label": next_block_label, + } + + status = BlockStatus.completed + success = True + + if failure_reason: + status = BlockStatus.failed + success = False + elif matched_branch is None: + failure_reason = "No conditional branch matched and no default branch configured" + status = BlockStatus.failed + success = False + + if workflow_run_context: + workflow_run_context.update_block_metadata(self.label, branch_metadata) + try: + await self.record_output_parameter_value( + workflow_run_context=workflow_run_context, + workflow_run_id=workflow_run_id, + value=branch_metadata, + ) + except Exception as exc: + LOG.warning( + "Failed to record branch metadata as output parameter", + workflow_run_id=workflow_run_id, + block_label=self.label, + error=str(exc), + ) + + block_result = await self.build_block_result( + success=success, + failure_reason=failure_reason, + output_parameter_value=branch_metadata, + status=status, + workflow_run_block_id=workflow_run_block_id, + organization_id=organization_id, + ) + return block_result @property def ordered_branches(self) -> list[BranchCondition]: diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 1ce712ef..a5b614cd 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -60,7 +60,9 @@ from skyvern.forge.sdk.workflow.exceptions import ( from skyvern.forge.sdk.workflow.models.block import ( ActionBlock, BlockTypeVar, + BranchCondition, CodeBlock, + ConditionalBlock, DownloadToS3Block, ExtractionBlock, FileDownloadBlock, @@ -69,6 +71,7 @@ from skyvern.forge.sdk.workflow.models.block import ( ForLoopBlock, HttpRequestBlock, HumanInteractionBlock, + JinjaBranchCriteria, LoginBlock, NavigationBlock, PDFParserBlock, @@ -869,287 +872,514 @@ class WorkflowService: if not blocks: raise SkyvernException(f"No blocks found for the given block labels: {block_labels}") + workflow_version = workflow.workflow_definition.version or 1 + if workflow_version >= 2 and not block_labels: + return await self._execute_workflow_blocks_dag( + workflow=workflow, + workflow_run=workflow_run, + organization=organization, + browser_session_id=browser_session_id, + script_blocks_by_label=script_blocks_by_label, + loaded_script_module=loaded_script_module, + is_script_run=is_script_run, + blocks_to_update=blocks_to_update, + ) + # # Execute workflow blocks blocks_cnt = len(blocks) block_result = None for block_idx, block in enumerate(blocks): - try: - if refreshed_workflow_run := await app.DATABASE.get_workflow_run( - workflow_run_id=workflow_run_id, - organization_id=organization_id, - ): - workflow_run = refreshed_workflow_run - if workflow_run.status == WorkflowRunStatus.canceled: - LOG.info( - "Workflow run is canceled, stopping execution inside workflow execution loop", - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_type=block.block_type, - block_label=block.label, - ) - break + ( + workflow_run, + blocks_to_update, + block_result, + should_stop, + _, + ) = await self._execute_single_block( + block=block, + block_idx=block_idx, + blocks_cnt=blocks_cnt, + workflow_run=workflow_run, + organization=organization, + workflow_run_id=workflow_run_id, + browser_session_id=browser_session_id, + script_blocks_by_label=script_blocks_by_label, + loaded_script_module=loaded_script_module, + is_script_run=is_script_run, + blocks_to_update=blocks_to_update, + ) - if workflow_run.status == WorkflowRunStatus.timed_out: - LOG.info( - "Workflow run is timed out, stopping execution inside workflow execution loop", - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_type=block.block_type, - block_label=block.label, - ) - break + if should_stop: + break + return workflow_run, blocks_to_update - parameters = block.get_all_parameters(workflow_run_id) - await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run( - workflow_run_id, parameters, organization + async def _execute_workflow_blocks_dag( + self, + *, + workflow: Workflow, + workflow_run: WorkflowRun, + organization: Organization, + browser_session_id: str | None, + script_blocks_by_label: dict[str, Any], + loaded_script_module: Any, + is_script_run: bool, + blocks_to_update: set[str], + ) -> tuple[WorkflowRun, set[str]]: + try: + start_label, label_to_block, default_next_map = self._build_workflow_graph( + workflow.workflow_definition.blocks + ) + except InvalidWorkflowDefinition as exc: + LOG.error("Workflow graph validation failed", error=str(exc), workflow_id=workflow.workflow_id) + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, + failure_reason=str(exc), + ) + return workflow_run, blocks_to_update + + visited_labels: set[str] = set() + current_label = start_label + block_idx = 0 + total_blocks = len(label_to_block) + + while current_label: + block = label_to_block.get(current_label) + if not block: + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, + failure_reason=f"Unable to find block with label {current_label}", ) + break + + ( + workflow_run, + blocks_to_update, + block_result, + should_stop, + branch_metadata, + ) = await self._execute_single_block( + block=block, + block_idx=block_idx, + blocks_cnt=total_blocks, + workflow_run=workflow_run, + organization=organization, + workflow_run_id=workflow_run.workflow_run_id, + browser_session_id=browser_session_id, + script_blocks_by_label=script_blocks_by_label, + loaded_script_module=loaded_script_module, + is_script_run=is_script_run, + blocks_to_update=blocks_to_update, + ) + + visited_labels.add(current_label) + if should_stop: + break + + next_label = None + if isinstance(block, ConditionalBlock): + next_label = (branch_metadata or {}).get("next_block_label") + else: + next_label = default_next_map.get(block.label) + + if not next_label: LOG.info( - f"Executing root block {block.block_type} at index {block_idx}/{blocks_cnt - 1} for workflow run {workflow_run_id}", - block_type=block.block_type, - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_type_var=block.block_type, + "DAG traversal reached terminal node", + workflow_run_id=workflow_run.workflow_run_id, block_label=block.label, - model=block.model, ) + break - # Try executing with script code if available - block_executed_with_code = False - valid_to_run_code = ( - is_script_run and block.label and block.label in script_blocks_by_label and not block.disable_cache + if next_label not in label_to_block: + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, + failure_reason=f"Next block label {next_label} not found in workflow definition", ) - if valid_to_run_code: - script_block = script_blocks_by_label[block.label] + break + + if next_label in visited_labels: + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, + failure_reason=f"Cycle detected while traversing workflow definition at block {next_label}", + ) + break + + block_idx += 1 + current_label = next_label + + return workflow_run, blocks_to_update + + async def _execute_single_block( + self, + *, + block: BlockTypeVar, + block_idx: int, + blocks_cnt: int, + workflow_run: WorkflowRun, + organization: Organization, + workflow_run_id: str, + browser_session_id: str | None, + script_blocks_by_label: dict[str, Any], + loaded_script_module: Any, + is_script_run: bool, + blocks_to_update: set[str], + ) -> tuple[WorkflowRun, set[str], BlockResult | None, bool, dict[str, Any] | None]: + organization_id = organization.organization_id + workflow_run_block_result: BlockResult | None = None + branch_metadata: dict[str, Any] | None = None + block_executed_with_code = False + + try: + if refreshed_workflow_run := await app.DATABASE.get_workflow_run( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ): + workflow_run = refreshed_workflow_run + if workflow_run.status == WorkflowRunStatus.canceled: LOG.info( - "Attempting to execute block with script code", + "Workflow run is canceled, stopping execution inside workflow execution loop", + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_type=block.block_type, block_label=block.label, - run_signature=script_block.run_signature, ) + return workflow_run, blocks_to_update, workflow_run_block_result, True, branch_metadata + + if workflow_run.status == WorkflowRunStatus.timed_out: + LOG.info( + "Workflow run is timed out, stopping execution inside workflow execution loop", + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_type=block.block_type, + block_label=block.label, + ) + return workflow_run, blocks_to_update, workflow_run_block_result, True, branch_metadata + + parameters = block.get_all_parameters(workflow_run_id) + await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run( + workflow_run_id, parameters, organization + ) + LOG.info( + f"Executing root block {block.block_type} at index {block_idx}/{blocks_cnt - 1} for workflow run {workflow_run_id}", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_type_var=block.block_type, + block_label=block.label, + model=block.model, + ) + + valid_to_run_code = ( + is_script_run and block.label and block.label in script_blocks_by_label and not block.disable_cache + ) + if valid_to_run_code: + script_block = script_blocks_by_label[block.label] + LOG.info( + "Attempting to execute block with script code", + block_label=block.label, + run_signature=script_block.run_signature, + ) + try: + vars_dict = vars(loaded_script_module) if loaded_script_module else {} + exec_globals = { + **vars_dict, + "skyvern": skyvern, + "__builtins__": __builtins__, + } + + assert script_block.run_signature is not None + normalized_signature = textwrap.dedent(script_block.run_signature).strip() + indented_signature = textwrap.indent(normalized_signature, " ") + wrapper_code = f"async def __run_signature_wrapper():\n return (\n{indented_signature}\n )\n" + + LOG.debug("Executing run_signature wrapper", wrapper_code=wrapper_code) + try: - # Execute the run signature and capture the return value - vars_dict = vars(loaded_script_module) if loaded_script_module else {} - exec_globals = { - **vars_dict, - "skyvern": skyvern, - "__builtins__": __builtins__, - } - - # Use exec to handle multi-line run_signature statements - # Create an async function and execute it - - # Dedent first to normalize indentation, then re-indent for function body - assert script_block.run_signature is not None - normalized_signature = textwrap.dedent(script_block.run_signature).strip() - # Add 8 spaces (2 levels: function + return statement) - indented_signature = textwrap.indent(normalized_signature, " ") - - # Build the wrapper function - wrapper_code = ( - f"async def __run_signature_wrapper():\n return (\n{indented_signature}\n )\n" - ) - - LOG.debug("Executing run_signature wrapper", wrapper_code=wrapper_code) - - try: - exec_code = compile(wrapper_code, "", "exec") - exec(exec_code, exec_globals) - output_value = await exec_globals["__run_signature_wrapper"]() - except ScriptTerminationException as e: - LOG.warning( - "Script termination", - block_label=block.label, - error=str(e), - exc_info=True, - ) - - # Execution succeeded - get the block result from the workflow run blocks - # The script execution should have created the workflow run block - workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks( - workflow_run_id=workflow_run_id, - organization_id=organization_id, - ) - # Find the most recent block with matching label - matching_blocks = [b for b in workflow_run_blocks if b.label == block.label] - if matching_blocks: - latest_block = max(matching_blocks, key=lambda b: b.created_at) - - # Construct BlockResult from the workflow_run_block - block_result = BlockResult( - success=latest_block.status == BlockStatus.completed, - failure_reason=latest_block.failure_reason, - output_parameter=block.output_parameter, - output_parameter_value=latest_block.output, - status=BlockStatus(latest_block.status) if latest_block.status else BlockStatus.failed, - workflow_run_block_id=latest_block.workflow_run_block_id, - ) - block_executed_with_code = True - LOG.info( - "Successfully executed block with script code", - block_label=block.label, - block_status=block_result.status, - has_output=output_value is not None, - ) - else: - LOG.warning( - "Block executed with code but no workflow run block found", - block_label=block.label, - ) - # Fallback to AI execution - block_executed_with_code = False - except Exception as e: + exec_code = compile(wrapper_code, "", "exec") + exec(exec_code, exec_globals) + output_value = await exec_globals["__run_signature_wrapper"]() + except ScriptTerminationException as e: LOG.warning( - "Failed to execute block with script code, falling back to AI", + "Script termination", block_label=block.label, error=str(e), exc_info=True, ) - block_executed_with_code = False - # Execute with AI if code execution was not attempted or failed - if not block_executed_with_code: - LOG.info( - "Executing block with AI", - block_label=block.label, - block_type=block.block_type, - ) - block_result = await block.execute_safe( + workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks( workflow_run_id=workflow_run_id, organization_id=organization_id, - browser_session_id=browser_session_id, ) - if not block_result: - workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run_id, failure_reason="Block result is None" - ) - break - if ( - not block_executed_with_code - and block.label - and block.label not in script_blocks_by_label - and block_result.status == BlockStatus.completed - and block.block_type in BLOCK_TYPES_THAT_SHOULD_BE_CACHED - ): - blocks_to_update.add(block.label) - if block_result.status == BlockStatus.canceled: - LOG.info( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was canceled for workflow run {workflow_run_id}, cancelling workflow run", - block_type=block.block_type, - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_result=block_result, - block_type_var=block.block_type, - block_label=block.label, - ) - workflow_run = await self.mark_workflow_run_as_canceled(workflow_run_id=workflow_run_id) - break - elif block_result.status == BlockStatus.failed: - LOG.error( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed for workflow run {workflow_run_id}", - block_type=block.block_type, - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_result=block_result, - block_type_var=block.block_type, - block_label=block.label, - ) - if not block.continue_on_failure: - failure_reason = ( - f"{block.block_type} block failed. failure reason: {block_result.failure_reason}" + matching_blocks = [b for b in workflow_run_blocks if b.label == block.label] + if matching_blocks: + latest_block = max(matching_blocks, key=lambda b: b.created_at) + workflow_run_block_result = BlockResult( + success=latest_block.status == BlockStatus.completed, + failure_reason=latest_block.failure_reason, + output_parameter=block.output_parameter, + output_parameter_value=latest_block.output, + status=BlockStatus(latest_block.status) if latest_block.status else BlockStatus.failed, + workflow_run_block_id=latest_block.workflow_run_block_id, ) - workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run_id, failure_reason=failure_reason + block_executed_with_code = True + LOG.info( + "Successfully executed block with script code", + block_label=block.label, + block_status=workflow_run_block_result.status, + has_output=output_value is not None, ) - break - + else: + LOG.warning( + "Block executed with code but no workflow run block found", + block_label=block.label, + ) + block_executed_with_code = False + except Exception as e: LOG.warning( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed but will continue executing the workflow run {workflow_run_id}", - block_type=block.block_type, - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_result=block_result, - continue_on_failure=block.continue_on_failure, - block_type_var=block.block_type, + "Failed to execute block with script code, falling back to AI", block_label=block.label, + error=str(e), + exc_info=True, ) + block_executed_with_code = False - elif block_result.status == BlockStatus.terminated: - LOG.info( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated", - block_type=block.block_type, - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_result=block_result, - block_type_var=block.block_type, - block_label=block.label, - ) - - if not block.continue_on_failure: - failure_reason = f"{block.block_type} block terminated. Reason: {block_result.failure_reason}" - workflow_run = await self.mark_workflow_run_as_terminated( - workflow_run_id=workflow_run_id, failure_reason=failure_reason - ) - break - - LOG.warning( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run", - block_type=block.block_type, - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_result=block_result, - continue_on_failure=block.continue_on_failure, - block_type_var=block.block_type, - block_label=block.label, - ) - - elif block_result.status == BlockStatus.timed_out: - LOG.info( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, marking workflow run as failed", - block_type=block.block_type, - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_result=block_result, - block_type_var=block.block_type, - block_label=block.label, - ) - - if not block.continue_on_failure: - failure_reason = f"{block.block_type} block timed out. Reason: {block_result.failure_reason}" - workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run_id, failure_reason=failure_reason - ) - break - - LOG.warning( - f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, but will continue executing the workflow run", - block_type=block.block_type, - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_result=block_result, - continue_on_failure=block.continue_on_failure, - block_type_var=block.block_type, - block_label=block.label, - ) - - except Exception as e: - LOG.exception( - f"Error while executing workflow run {workflow_run_id}", - workflow_run_id=workflow_run_id, - block_idx=block_idx, - block_type=block.block_type, + if not block_executed_with_code: + LOG.info( + "Executing block", block_label=block.label, + block_type=block.block_type, + ) + workflow_run_block_result = await block.execute_safe( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + browser_session_id=browser_session_id, ) - exception_message = f"Unexpected error: {str(e)}" - if isinstance(e, SkyvernException): - exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}" + # Extract branch metadata for conditional blocks + if isinstance(block, ConditionalBlock) and workflow_run_block_result: + branch_metadata = cast(dict[str, Any] | None, workflow_run_block_result.output_parameter_value) - failure_reason = f"{block.block_type} block failed. failure reason: {exception_message}" + if not workflow_run_block_result: + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, failure_reason="Block result is None" + ) + return workflow_run, blocks_to_update, workflow_run_block_result, True, branch_metadata + + if ( + not block_executed_with_code + and block.label + and block.label not in script_blocks_by_label + and workflow_run_block_result.status == BlockStatus.completed + and block.block_type in BLOCK_TYPES_THAT_SHOULD_BE_CACHED + ): + blocks_to_update.add(block.label) + + workflow_run, should_stop = await self._handle_block_result_status( + block=block, + block_idx=block_idx, + blocks_cnt=blocks_cnt, + block_result=workflow_run_block_result, + workflow_run=workflow_run, + workflow_run_id=workflow_run_id, + ) + return workflow_run, blocks_to_update, workflow_run_block_result, should_stop, branch_metadata + + except Exception as e: + LOG.exception( + f"Error while executing workflow run {workflow_run_id}", + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_type=block.block_type, + block_label=block.label, + ) + + exception_message = f"Unexpected error: {str(e)}" + if isinstance(e, SkyvernException): + exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}" + + failure_reason = f"{block.block_type} block failed. failure reason: {exception_message}" + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, failure_reason=failure_reason + ) + return workflow_run, blocks_to_update, workflow_run_block_result, True, branch_metadata + + async def _handle_block_result_status( + self, + *, + block: BlockTypeVar, + block_idx: int, + blocks_cnt: int, + block_result: BlockResult, + workflow_run: WorkflowRun, + workflow_run_id: str, + ) -> tuple[WorkflowRun, bool]: + if block_result.status == BlockStatus.canceled: + LOG.info( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was canceled for workflow run {workflow_run_id}, cancelling workflow run", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + workflow_run = await self.mark_workflow_run_as_canceled(workflow_run_id=workflow_run_id) + return workflow_run, True + if block_result.status == BlockStatus.failed: + LOG.error( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed for workflow run {workflow_run_id}", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + if not block.continue_on_failure: + failure_reason = f"{block.block_type} block failed. failure reason: {block_result.failure_reason}" workflow_run = await self.mark_workflow_run_as_failed( workflow_run_id=workflow_run_id, failure_reason=failure_reason ) - break - return workflow_run, blocks_to_update + return workflow_run, True + + LOG.warning( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed but will continue executing the workflow run {workflow_run_id}", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + continue_on_failure=block.continue_on_failure, + block_type_var=block.block_type, + block_label=block.label, + ) + return workflow_run, False + + if block_result.status == BlockStatus.terminated: + LOG.info( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + + if not block.continue_on_failure: + failure_reason = f"{block.block_type} block terminated. Reason: {block_result.failure_reason}" + workflow_run = await self.mark_workflow_run_as_terminated( + workflow_run_id=workflow_run_id, failure_reason=failure_reason + ) + return workflow_run, True + + LOG.warning( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + continue_on_failure=block.continue_on_failure, + block_type_var=block.block_type, + block_label=block.label, + ) + return workflow_run, False + + if block_result.status == BlockStatus.timed_out: + LOG.info( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, marking workflow run as failed", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + + if not block.continue_on_failure: + failure_reason = f"{block.block_type} block timed out. Reason: {block_result.failure_reason}" + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, failure_reason=failure_reason + ) + return workflow_run, True + + LOG.warning( + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, but will continue executing the workflow run", + block_type=block.block_type, + workflow_run_id=workflow_run_id, + block_idx=block_idx, + block_result=block_result, + continue_on_failure=block.continue_on_failure, + block_type_var=block.block_type, + block_label=block.label, + ) + return workflow_run, False + + return workflow_run, False + + def _build_workflow_graph( + self, + blocks: list[BlockTypeVar], + ) -> tuple[str, dict[str, BlockTypeVar], dict[str, str | None]]: + all_blocks = get_all_blocks(blocks) + label_to_block: dict[str, BlockTypeVar] = {} + default_next_map: dict[str, str | None] = {} + + for block in all_blocks: + if block.label in label_to_block: + raise InvalidWorkflowDefinition(f"Duplicate block label detected: {block.label}") + label_to_block[block.label] = block + default_next_map[block.label] = block.next_block_label + + # Only apply sequential defaulting if there are no conditional blocks + # Conditional blocks break sequential ordering since they have multiple branches + has_conditional_blocks = any(isinstance(block, ConditionalBlock) for block in all_blocks) + if not has_conditional_blocks: + for idx, block in enumerate(blocks[:-1]): + if default_next_map.get(block.label) is None: + default_next_map[block.label] = blocks[idx + 1].label + + adjacency: dict[str, set[str]] = {label: set() for label in label_to_block} + incoming: dict[str, int] = {label: 0 for label in label_to_block} + + def _add_edge(source: str, target: str | None) -> None: + if not target: + return + if target not in label_to_block: + raise InvalidWorkflowDefinition(f"Block {source} references unknown next_block_label {target}") + adjacency[source].add(target) + incoming[target] += 1 + + for label, block in label_to_block.items(): + if isinstance(block, ConditionalBlock): + for branch in block.ordered_branches: + _add_edge(label, branch.next_block_label) + else: + _add_edge(label, default_next_map.get(label)) + + roots = [label for label, count in incoming.items() if count == 0] + if not roots: + raise InvalidWorkflowDefinition("No entry block found for workflow definition") + if len(roots) > 1: + raise InvalidWorkflowDefinition( + f"Multiple entry blocks detected ({', '.join(sorted(roots))}); only one entry block is supported." + ) + + # Kahn's algorithm for cycle detection + queue: deque[str] = deque([roots[0]]) + visited_count = 0 + in_degree = dict(incoming) + while queue: + node = queue.popleft() + visited_count += 1 + for neighbor in adjacency[node]: + in_degree[neighbor] -= 1 + if in_degree[neighbor] == 0: + queue.append(neighbor) + + if visited_count != len(label_to_block): + raise InvalidWorkflowDefinition("Workflow definition contains a cycle; DAG traversal is required.") + + return roots[0], label_to_block, default_next_map async def create_workflow( self, @@ -3112,6 +3342,32 @@ class WorkflowService: loop_blocks=loop_blocks, complete_if_empty=block_yaml.complete_if_empty, ) + elif block_yaml.block_type == BlockType.CONDITIONAL: + branch_conditions = [] + for branch in block_yaml.branch_conditions: + branch_criteria = ( + JinjaBranchCriteria( + criteria_type=branch.criteria.criteria_type, + expression=branch.criteria.expression, + description=branch.criteria.description, + ) + if branch.criteria + else None + ) + + branch_conditions.append( + BranchCondition( + criteria=branch_criteria, + next_block_label=branch.next_block_label, + description=branch.description, + is_default=branch.is_default, + ) + ) + + return ConditionalBlock( + **base_kwargs, + branch_conditions=branch_conditions, + ) elif block_yaml.block_type == BlockType.CODE: return CodeBlock( **base_kwargs,