diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index ee361fc5..cb78978f 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -2028,9 +2028,30 @@ def _build_block_statement( stmt = _build_http_request_statement(block) elif block_type == "pdf_parser": stmt = _build_pdf_parser_statement(block) + elif block_type == "conditional": + # Conditional blocks are evaluated at runtime by the workflow engine. + # Generate a descriptive comment showing this is a runtime branch point. + # The blocks inside conditional branches are processed separately when executed. + branches = block.get("branches") or block.get("ordered_branches") or [] + branch_info_lines = [] + for i, branch in enumerate(branches): + next_label = branch.get("next_block_label", "?") + condition = branch.get("condition", "") + # Truncate long conditions for readability + if len(condition) > 50: + condition = condition[:47] + "..." + branch_info_lines.append(f"# Branch {i + 1}: {condition!r} → {next_label}") + + if branch_info_lines: + branch_info = "\n".join(branch_info_lines) + comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine. One branch executes:\n{branch_info}" + else: + comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine." + + stmt = cst.SimpleStatementLine([cst.Expr(cst.parse_expression(f"({repr(comment_text)})"))]) else: - # Default case for unknown block types - stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))]) + # Default case for unknown block types - use quoted string literal to avoid libcst validation error + stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"'# Unknown block type: {block_type}'"))]) return stmt @@ -2111,15 +2132,23 @@ async def generate_workflow_script_python_code( pending: bool = False, cached_blocks: dict[str, ScriptBlockSource] | None = None, updated_block_labels: set[str] | None = None, + workflow_definition_labels: set[str] | None = None, + downstream_of_updated: set[str] | None = None, ) -> str: """ Build a LibCST Module and emit .code (PEP-8-formatted source). Cached script blocks can be reused by providing them via `cached_blocks`. Any labels present in `updated_block_labels` will be regenerated from the latest workflow run execution data. + + Args: + downstream_of_updated: Set of block labels that are downstream of any updated block + in the DAG. These blocks should not be preserved from cache because they may + depend on data from the updated upstream blocks. """ cached_blocks = cached_blocks or {} updated_block_labels = set(updated_block_labels or []) + downstream_of_updated = downstream_of_updated or set() # Drop cached entries that do not have usable source cached_blocks = {label: source for label, source in cached_blocks.items() if source.code} @@ -2305,6 +2334,86 @@ async def generate_workflow_script_python_code( append_block_code(block_code) + # --- preserve cached blocks not executed in current run (progressive caching) --- + # Track which blocks were processed from the current run + processed_labels = { + task.get("label") or task.get("title") or task.get("task_id") or f"task_{idx}" + for idx, task in enumerate(task_v1_blocks) + if not task.get("parent_task_v2_label") + } + processed_labels.update( + task_v2.get("label") or f"task_v2_{task_v2.get('workflow_run_block_id')}" for task_v2 in task_v2_blocks + ) + + # Add any cached blocks that weren't in the current run (from other conditional branches) + preserved_block_count = 0 + failed_block_count = 0 + for cached_label, cached_source in cached_blocks.items(): + if cached_label in processed_labels: + continue # Already processed from current run + if cached_label == settings.WORKFLOW_START_BLOCK_LABEL: + continue # Skip the start block, it's always regenerated + # Only preserve blocks that still exist in the current workflow definition. + # This prevents preserving stale blocks after a user deletes or renames blocks. + if workflow_definition_labels is not None and cached_label not in workflow_definition_labels: + LOG.debug( + "Skipping cached block that no longer exists in workflow definition", + block_label=cached_label, + ) + continue + # Don't preserve blocks that are downstream of any updated block. + # These may depend on data from the updated upstream block, so their cached + # code could be stale. For example, if Block A extracts data used by Block B, + # and A is modified, then B's cached code may produce incorrect results. + if cached_label in downstream_of_updated: + LOG.debug( + "Skipping cached block that is downstream of an updated block", + block_label=cached_label, + ) + continue + # Note: cached_blocks is already filtered to remove entries without code (line ~2129), + # but we keep this check as defensive programming in case the upstream filter changes + if not cached_source.code: + continue + + LOG.info( + "Preserving cached block from previous run (progressive caching)", + block_label=cached_label, + workflow_run_id=cached_source.workflow_run_id, + ) + + # Persist the cached block to the new script revision + if script_id and script_revision_id and organization_id: + try: + await create_or_update_script_block( + block_code=cached_source.code, + script_revision_id=script_revision_id, + script_id=script_id, + organization_id=organization_id, + block_label=cached_label, + update=pending, + run_signature=cached_source.run_signature, + workflow_run_id=cached_source.workflow_run_id, + workflow_run_block_id=cached_source.workflow_run_block_id, + input_fields=cached_source.input_fields, + ) + preserved_block_count += 1 + except Exception as e: + failed_block_count += 1 + LOG.error("Failed to preserve cached block", block_label=cached_label, error=str(e), exc_info=True) + # Continue on individual block failures - these are cached blocks from previous runs + # that aren't critical to the current run's success. The script will still work, + # just without the previously cached code for this particular block. + + append_block_code(cached_source.code) + + if preserved_block_count > 0 or failed_block_count > 0: + LOG.info( + "Progressive caching summary", + preserved_blocks=preserved_block_count, + failed_blocks=failed_block_count, + ) + # --- runner --------------------------------------------------------- run_fn = _build_run_fn(blocks, workflow_run_request) diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 98e3019d..b7e0d623 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -4,6 +4,7 @@ import hashlib import importlib.util import json import os +import re import uuid from dataclasses import dataclass from datetime import datetime @@ -2026,7 +2027,17 @@ async def run_script( context.workflow_run_id = workflow_run_id context.organization_id = organization_id - # run the script as subprocess; pass the parameters and run_id to the script + # Check if script contains conditional blocks (not supported in CLI execution) + # The marker "# === CONDITIONAL:" is generated by _build_block_statement() for conditional blocks. + with open(path, encoding="utf-8") as f: + script_content = f.read() + if re.search(r"# === CONDITIONAL:", script_content): + raise Exception( + "This script contains conditional blocks which are not supported in CLI execution. " + "Conditional blocks require the workflow engine's DAG traversal to properly evaluate " + "branch conditions at runtime. Please run this workflow through the API instead." + ) + # Dynamically import the script at the given path spec = importlib.util.spec_from_file_location("user_script", path) if not spec or not spec.loader: diff --git a/skyvern/services/workflow_script_service.py b/skyvern/services/workflow_script_service.py index 9ef45644..8d2e656b 100644 --- a/skyvern/services/workflow_script_service.py +++ b/skyvern/services/workflow_script_service.py @@ -1,4 +1,6 @@ import base64 +from collections import deque +from typing import TYPE_CHECKING import structlog from cachetools import TTLCache @@ -12,9 +14,11 @@ from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.workflow.models.block import get_all_blocks from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun from skyvern.schemas.scripts import FileEncoding, Script, ScriptFileCreate, ScriptStatus -from skyvern.schemas.workflows import BlockType from skyvern.services import script_service +if TYPE_CHECKING: + from skyvern.forge.sdk.workflow.models.block import BlockTypeVar + LOG = structlog.get_logger() jinja_sandbox_env = SandboxedEnvironment() @@ -22,6 +26,55 @@ jinja_sandbox_env = SandboxedEnvironment() _workflow_script_cache: TTLCache[tuple, "Script"] = TTLCache(maxsize=128, ttl=60 * 60) +def get_downstream_blocks( + updated_labels: set[str], + blocks: list["BlockTypeVar"], +) -> set[str]: + """ + Get all blocks that are downstream of any updated block in the DAG. + + Uses BFS to find all blocks reachable from the updated blocks by following + next_block_label edges. This is used to invalidate cached blocks that may + depend on data from updated upstream blocks. + + Args: + updated_labels: Set of block labels that have been updated + blocks: List of all blocks in the workflow definition + + Returns: + Set of block labels that are downstream of any updated block + """ + # Build adjacency graph: block_label -> set of next block labels + adjacency: dict[str, set[str]] = {} + for block in blocks: + if not block.label: + continue + adjacency[block.label] = set() + # Check for conditional blocks (using duck typing for testability) + # ConditionalBlock has ordered_branches with multiple next_block_label targets + if hasattr(block, "ordered_branches"): + for branch in block.ordered_branches: + if branch.next_block_label: + adjacency[block.label].add(branch.next_block_label) + elif block.next_block_label: + adjacency[block.label].add(block.next_block_label) + + # BFS from all updated blocks to find downstream blocks + downstream: set[str] = set() + queue: deque[str] = deque(updated_labels) + visited: set[str] = set(updated_labels) + + while queue: + current = queue.popleft() + for next_label in adjacency.get(current, set()): + if next_label not in visited: + visited.add(next_label) + downstream.add(next_label) + queue.append(next_label) + + return downstream + + def _make_workflow_script_cache_key( organization_id: str, workflow_permanent_id: str, @@ -232,27 +285,6 @@ async def generate_workflow_script( cached_script: Script | None = None, updated_block_labels: set[str] | None = None, ) -> None: - # Disable script generation for workflows containing conditional blocks to avoid caching divergent paths. - try: - all_blocks = get_all_blocks(workflow.workflow_definition.blocks) - has_conditional = any(block.block_type == BlockType.CONDITIONAL for block in all_blocks) - except Exception: - has_conditional = False - LOG.warning( - "Failed to inspect workflow blocks for conditional types; continuing with script generation", - workflow_id=workflow.workflow_id, - workflow_run_id=workflow_run.workflow_run_id, - exc_info=True, - ) - - if has_conditional: - LOG.info( - "Skipping script generation for workflow containing conditional blocks", - workflow_id=workflow.workflow_id, - workflow_run_id=workflow_run.workflow_run_id, - ) - return - try: LOG.info( "Generating script for workflow", @@ -281,6 +313,18 @@ async def generate_workflow_script( updated_block_labels.update(missing_labels) updated_block_labels.add(settings.WORKFLOW_START_BLOCK_LABEL) + # Build set of all block labels from the current workflow definition. + # This is used to filter out stale cached blocks that no longer exist + # (e.g., after a user deletes or renames blocks in the workflow). + all_definition_blocks = get_all_blocks(workflow.workflow_definition.blocks) + workflow_definition_labels = {block.label for block in all_definition_blocks if block.label} + + # Compute blocks downstream of any updated block. These should not be preserved + # from cache because they may depend on data from the updated upstream blocks. + # For example, if Block A extracts data used by Block B, and A is modified, + # then B's cached code may be stale even if B itself wasn't modified. + downstream_of_updated = get_downstream_blocks(updated_block_labels, all_definition_blocks) + python_src = await generate_workflow_script_python_code( file_name=codegen_input.file_name, workflow_run_request=codegen_input.workflow_run, @@ -294,6 +338,8 @@ async def generate_workflow_script( pending=pending, cached_blocks=cached_block_sources, updated_block_labels=updated_block_labels, + workflow_definition_labels=workflow_definition_labels, + downstream_of_updated=downstream_of_updated, ) except Exception: LOG.error("Failed to generate workflow script source", exc_info=True)