diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index cb78978f..ee361fc5 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -2028,30 +2028,9 @@ def _build_block_statement( stmt = _build_http_request_statement(block) elif block_type == "pdf_parser": stmt = _build_pdf_parser_statement(block) - elif block_type == "conditional": - # Conditional blocks are evaluated at runtime by the workflow engine. - # Generate a descriptive comment showing this is a runtime branch point. - # The blocks inside conditional branches are processed separately when executed. - branches = block.get("branches") or block.get("ordered_branches") or [] - branch_info_lines = [] - for i, branch in enumerate(branches): - next_label = branch.get("next_block_label", "?") - condition = branch.get("condition", "") - # Truncate long conditions for readability - if len(condition) > 50: - condition = condition[:47] + "..." - branch_info_lines.append(f"# Branch {i + 1}: {condition!r} → {next_label}") - - if branch_info_lines: - branch_info = "\n".join(branch_info_lines) - comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine. One branch executes:\n{branch_info}" - else: - comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine." - - stmt = cst.SimpleStatementLine([cst.Expr(cst.parse_expression(f"({repr(comment_text)})"))]) else: - # Default case for unknown block types - use quoted string literal to avoid libcst validation error - stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"'# Unknown block type: {block_type}'"))]) + # Default case for unknown block types + stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))]) return stmt @@ -2132,23 +2111,15 @@ async def generate_workflow_script_python_code( pending: bool = False, cached_blocks: dict[str, ScriptBlockSource] | None = None, updated_block_labels: set[str] | None = None, - workflow_definition_labels: set[str] | None = None, - downstream_of_updated: set[str] | None = None, ) -> str: """ Build a LibCST Module and emit .code (PEP-8-formatted source). Cached script blocks can be reused by providing them via `cached_blocks`. Any labels present in `updated_block_labels` will be regenerated from the latest workflow run execution data. - - Args: - downstream_of_updated: Set of block labels that are downstream of any updated block - in the DAG. These blocks should not be preserved from cache because they may - depend on data from the updated upstream blocks. """ cached_blocks = cached_blocks or {} updated_block_labels = set(updated_block_labels or []) - downstream_of_updated = downstream_of_updated or set() # Drop cached entries that do not have usable source cached_blocks = {label: source for label, source in cached_blocks.items() if source.code} @@ -2334,86 +2305,6 @@ async def generate_workflow_script_python_code( append_block_code(block_code) - # --- preserve cached blocks not executed in current run (progressive caching) --- - # Track which blocks were processed from the current run - processed_labels = { - task.get("label") or task.get("title") or task.get("task_id") or f"task_{idx}" - for idx, task in enumerate(task_v1_blocks) - if not task.get("parent_task_v2_label") - } - processed_labels.update( - task_v2.get("label") or f"task_v2_{task_v2.get('workflow_run_block_id')}" for task_v2 in task_v2_blocks - ) - - # Add any cached blocks that weren't in the current run (from other conditional branches) - preserved_block_count = 0 - failed_block_count = 0 - for cached_label, cached_source in cached_blocks.items(): - if cached_label in processed_labels: - continue # Already processed from current run - if cached_label == settings.WORKFLOW_START_BLOCK_LABEL: - continue # Skip the start block, it's always regenerated - # Only preserve blocks that still exist in the current workflow definition. - # This prevents preserving stale blocks after a user deletes or renames blocks. - if workflow_definition_labels is not None and cached_label not in workflow_definition_labels: - LOG.debug( - "Skipping cached block that no longer exists in workflow definition", - block_label=cached_label, - ) - continue - # Don't preserve blocks that are downstream of any updated block. - # These may depend on data from the updated upstream block, so their cached - # code could be stale. For example, if Block A extracts data used by Block B, - # and A is modified, then B's cached code may produce incorrect results. - if cached_label in downstream_of_updated: - LOG.debug( - "Skipping cached block that is downstream of an updated block", - block_label=cached_label, - ) - continue - # Note: cached_blocks is already filtered to remove entries without code (line ~2129), - # but we keep this check as defensive programming in case the upstream filter changes - if not cached_source.code: - continue - - LOG.info( - "Preserving cached block from previous run (progressive caching)", - block_label=cached_label, - workflow_run_id=cached_source.workflow_run_id, - ) - - # Persist the cached block to the new script revision - if script_id and script_revision_id and organization_id: - try: - await create_or_update_script_block( - block_code=cached_source.code, - script_revision_id=script_revision_id, - script_id=script_id, - organization_id=organization_id, - block_label=cached_label, - update=pending, - run_signature=cached_source.run_signature, - workflow_run_id=cached_source.workflow_run_id, - workflow_run_block_id=cached_source.workflow_run_block_id, - input_fields=cached_source.input_fields, - ) - preserved_block_count += 1 - except Exception as e: - failed_block_count += 1 - LOG.error("Failed to preserve cached block", block_label=cached_label, error=str(e), exc_info=True) - # Continue on individual block failures - these are cached blocks from previous runs - # that aren't critical to the current run's success. The script will still work, - # just without the previously cached code for this particular block. - - append_block_code(cached_source.code) - - if preserved_block_count > 0 or failed_block_count > 0: - LOG.info( - "Progressive caching summary", - preserved_blocks=preserved_block_count, - failed_blocks=failed_block_count, - ) - # --- runner --------------------------------------------------------- run_fn = _build_run_fn(blocks, workflow_run_request) diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index b7e0d623..98e3019d 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -4,7 +4,6 @@ import hashlib import importlib.util import json import os -import re import uuid from dataclasses import dataclass from datetime import datetime @@ -2027,17 +2026,7 @@ async def run_script( context.workflow_run_id = workflow_run_id context.organization_id = organization_id - # Check if script contains conditional blocks (not supported in CLI execution) - # The marker "# === CONDITIONAL:" is generated by _build_block_statement() for conditional blocks. - with open(path, encoding="utf-8") as f: - script_content = f.read() - if re.search(r"# === CONDITIONAL:", script_content): - raise Exception( - "This script contains conditional blocks which are not supported in CLI execution. " - "Conditional blocks require the workflow engine's DAG traversal to properly evaluate " - "branch conditions at runtime. Please run this workflow through the API instead." - ) - + # run the script as subprocess; pass the parameters and run_id to the script # Dynamically import the script at the given path spec = importlib.util.spec_from_file_location("user_script", path) if not spec or not spec.loader: diff --git a/skyvern/services/workflow_script_service.py b/skyvern/services/workflow_script_service.py index 8d2e656b..9ef45644 100644 --- a/skyvern/services/workflow_script_service.py +++ b/skyvern/services/workflow_script_service.py @@ -1,6 +1,4 @@ import base64 -from collections import deque -from typing import TYPE_CHECKING import structlog from cachetools import TTLCache @@ -14,11 +12,9 @@ from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.workflow.models.block import get_all_blocks from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun from skyvern.schemas.scripts import FileEncoding, Script, ScriptFileCreate, ScriptStatus +from skyvern.schemas.workflows import BlockType from skyvern.services import script_service -if TYPE_CHECKING: - from skyvern.forge.sdk.workflow.models.block import BlockTypeVar - LOG = structlog.get_logger() jinja_sandbox_env = SandboxedEnvironment() @@ -26,55 +22,6 @@ jinja_sandbox_env = SandboxedEnvironment() _workflow_script_cache: TTLCache[tuple, "Script"] = TTLCache(maxsize=128, ttl=60 * 60) -def get_downstream_blocks( - updated_labels: set[str], - blocks: list["BlockTypeVar"], -) -> set[str]: - """ - Get all blocks that are downstream of any updated block in the DAG. - - Uses BFS to find all blocks reachable from the updated blocks by following - next_block_label edges. This is used to invalidate cached blocks that may - depend on data from updated upstream blocks. - - Args: - updated_labels: Set of block labels that have been updated - blocks: List of all blocks in the workflow definition - - Returns: - Set of block labels that are downstream of any updated block - """ - # Build adjacency graph: block_label -> set of next block labels - adjacency: dict[str, set[str]] = {} - for block in blocks: - if not block.label: - continue - adjacency[block.label] = set() - # Check for conditional blocks (using duck typing for testability) - # ConditionalBlock has ordered_branches with multiple next_block_label targets - if hasattr(block, "ordered_branches"): - for branch in block.ordered_branches: - if branch.next_block_label: - adjacency[block.label].add(branch.next_block_label) - elif block.next_block_label: - adjacency[block.label].add(block.next_block_label) - - # BFS from all updated blocks to find downstream blocks - downstream: set[str] = set() - queue: deque[str] = deque(updated_labels) - visited: set[str] = set(updated_labels) - - while queue: - current = queue.popleft() - for next_label in adjacency.get(current, set()): - if next_label not in visited: - visited.add(next_label) - downstream.add(next_label) - queue.append(next_label) - - return downstream - - def _make_workflow_script_cache_key( organization_id: str, workflow_permanent_id: str, @@ -285,6 +232,27 @@ async def generate_workflow_script( cached_script: Script | None = None, updated_block_labels: set[str] | None = None, ) -> None: + # Disable script generation for workflows containing conditional blocks to avoid caching divergent paths. + try: + all_blocks = get_all_blocks(workflow.workflow_definition.blocks) + has_conditional = any(block.block_type == BlockType.CONDITIONAL for block in all_blocks) + except Exception: + has_conditional = False + LOG.warning( + "Failed to inspect workflow blocks for conditional types; continuing with script generation", + workflow_id=workflow.workflow_id, + workflow_run_id=workflow_run.workflow_run_id, + exc_info=True, + ) + + if has_conditional: + LOG.info( + "Skipping script generation for workflow containing conditional blocks", + workflow_id=workflow.workflow_id, + workflow_run_id=workflow_run.workflow_run_id, + ) + return + try: LOG.info( "Generating script for workflow", @@ -313,18 +281,6 @@ async def generate_workflow_script( updated_block_labels.update(missing_labels) updated_block_labels.add(settings.WORKFLOW_START_BLOCK_LABEL) - # Build set of all block labels from the current workflow definition. - # This is used to filter out stale cached blocks that no longer exist - # (e.g., after a user deletes or renames blocks in the workflow). - all_definition_blocks = get_all_blocks(workflow.workflow_definition.blocks) - workflow_definition_labels = {block.label for block in all_definition_blocks if block.label} - - # Compute blocks downstream of any updated block. These should not be preserved - # from cache because they may depend on data from the updated upstream blocks. - # For example, if Block A extracts data used by Block B, and A is modified, - # then B's cached code may be stale even if B itself wasn't modified. - downstream_of_updated = get_downstream_blocks(updated_block_labels, all_definition_blocks) - python_src = await generate_workflow_script_python_code( file_name=codegen_input.file_name, workflow_run_request=codegen_input.workflow_run, @@ -338,8 +294,6 @@ async def generate_workflow_script( pending=pending, cached_blocks=cached_block_sources, updated_block_labels=updated_block_labels, - workflow_definition_labels=workflow_definition_labels, - downstream_of_updated=downstream_of_updated, ) except Exception: LOG.error("Failed to generate workflow script source", exc_info=True)