From 5fd4263847c73ebfa69972f378eac47199a610c4 Mon Sep 17 00:00:00 2001 From: pedrohsdb Date: Thu, 5 Feb 2026 11:42:29 -0800 Subject: [PATCH] Add conditional block support for script caching (v2 - with bug fix) (#4642) --- skyvern/core/script_generations/CLAUDE.md | 103 ++++++++++++++- .../script_generations/generate_script.py | 25 +++- .../transform_workflow_run.py | 118 ++++++++++-------- skyvern/forge/agent_functions.py | 42 +------ skyvern/forge/sdk/db/agent_db.py | 5 +- skyvern/forge/sdk/workflow/service.py | 83 +++++++++++- skyvern/services/workflow_script_service.py | 46 +++---- 7 files changed, 307 insertions(+), 115 deletions(-) diff --git a/skyvern/core/script_generations/CLAUDE.md b/skyvern/core/script_generations/CLAUDE.md index af676145..a8eaffda 100644 --- a/skyvern/core/script_generations/CLAUDE.md +++ b/skyvern/core/script_generations/CLAUDE.md @@ -1,10 +1,30 @@ -# Script Generation Context +# Script Generation & Caching + +## Overview + +Script generation converts workflow runs into executable Python code that can be cached and reused. This enables "run with code" mode where workflows execute via cached scripts instead of the AI agent. + +## Key Files + +| File | Purpose | +|------|---------| +| `generate_script.py` | Generates Python code from workflow run data | +| `transform_workflow_run.py` | Transforms DB workflow run into code gen input | +| `skyvern/services/workflow_script_service.py` | Caching logic, script storage | +| `skyvern/forge/sdk/workflow/service.py` | Regeneration decision logic (`generate_script_if_needed`) | ## Key Constants - `SCRIPT_TASK_BLOCKS` - Block types that have task_id and actions (task, navigation, extraction, etc.) - `BLOCK_TYPES_THAT_SHOULD_BE_CACHED` in `workflow/service.py` - Block types eligible for caching (includes for_loop) +## How Caching Works + +1. **Block execution tracking** (service.py:1309-1316): When a block executes via agent and completes, it's added to `blocks_to_update` +2. **Regeneration decision** (`generate_script_if_needed`): Decides whether to regenerate based on `blocks_to_update` and `missing_labels` +3. **Script generation** (`generate_workflow_script`): Generates code only for blocks that executed this run +4. **Progressive caching**: Only executed blocks are cached; unexecuted blocks remain uncached until they run + ## Script Block Requirements for `run_with: code` For a workflow to execute with cached scripts (`run_with: code`), ALL top-level blocks must have: @@ -13,6 +33,53 @@ For a workflow to execute with cached scripts (`run_with: code`), ALL top-level Without these, the system falls back to `run_with: agent`. +## Critical: Two Mechanisms for Detecting New Blocks + +| Mechanism | Location | What it catches | +|-----------|----------|-----------------| +| Execution tracking | service.py:1316 | Blocks that EXECUTED and aren't cached | +| `missing_labels` check | service.py:3436-3441 | Blocks in DEFINITION that aren't cached | + +For workflows WITHOUT conditionals, these are equivalent. +For workflows WITH conditionals, they differ - see "Conditional Blocks" below. + +## Conditional Blocks + +Conditional blocks (`BlockType.CONDITIONAL`) are **NOT cached** - they always run via agent to evaluate conditions at runtime. However, cacheable blocks inside conditional branches ARE cached when they execute. + +### Key Insight: Progressive Branch Caching + +With conditionals, not all branches execute in a single run. The caching system handles this via "progressive caching": +- Run 1 takes branch A → caches blocks from A +- Run 2 takes branch B → caches blocks from B (preserves A's cache) +- Eventually all executed branches have cached blocks + +This means the workflow DEFINITION has all blocks, but the workflow RUN only executes some blocks. + +## Performance Optimizations + +### Batch Task and Action Queries +**Location**: `transform_workflow_run.py` + +Previously, the code made N+1 queries: one `get_task()` and one `get_task_actions_hydrated()` per task block. For workflows with 20 blocks, this meant 40 DB queries. + +Now we batch all queries upfront: +1. Collect all task_ids from workflow_run_blocks +2. Single `get_tasks_by_ids()` call for all tasks +3. Single `get_tasks_actions()` call for all actions +4. Process blocks using pre-fetched data from dictionaries + +**Impact**: Reduces from 2N queries to 2 queries. + +### Block-Level Script Generation +**Location**: `service.py:_generate_pending_script_for_block()` + +Previously, `generate_or_update_pending_workflow_script()` was called after each action (CLICK, INPUT_TEXT, etc.), generating "pending" script drafts ~10-50x per workflow run. + +Now script generation happens at block completion via `_generate_pending_script_for_block()`, called from both `_execute_workflow_blocks()` and `_execute_workflow_blocks_dag()`. + +**Impact**: Reduces script generation frequency by 10-50x while maintaining progressive updates. + ## Adding New Cacheable Block Types When adding a new block type that should support cached execution: @@ -30,3 +97,37 @@ When adding a new block type that should support cached execution: 2. `task_v2_blocks` - task_v2 blocks with child blocks 3. `for_loop_blocks` - ForLoop container blocks 4. `__start_block__` - Workflow entry point + +## Things to Watch Out For + +1. **Definition vs Execution**: The workflow DEFINITION has all blocks; the workflow RUN only executes some blocks (especially with conditionals) + +2. **`blocks_to_update` sources**: This set is populated from multiple places - block execution (line 1316), finalize logic, explicit requests. Understand all sources before modifying. + +3. **Database operations per regeneration**: Each regeneration does DELETE + CREATE + UPLOAD + INSERT. Unnecessary regenerations can flood the database. + +4. **`BLOCK_TYPES_THAT_SHOULD_BE_CACHED`**: Not all block types are cached. Conditional, wait, code blocks etc. are excluded. + +5. **Batch query data mapping**: When using `tasks_by_id` and `actions_by_task_id` dicts, ensure task_ids are consistent between run_blocks and the queried data. + +## Testing Caching Changes + +When modifying regeneration or caching logic, test these scenarios: + +1. **Same blocks run twice** - Should NOT regenerate on 2nd run +2. **New block added** - Should regenerate to include new block +3. **Workflow with conditionals** - Different branches should cache progressively +4. **Block type not in `BLOCK_TYPES_THAT_SHOULD_BE_CACHED`** - Should NOT trigger caching + +## Test Commands + +```bash +# Run script-related tests +python -m pytest tests/unit/ -k "script" --ignore=tests/unit/test_security.py -v + +# Run conditional caching tests specifically +python -m pytest tests/unit/test_conditional_script_caching.py -v + +# Run forloop script tests +python -m pytest tests/unit/test_forloop_script_generation.py -v +``` diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index d4c3ce25..dfdae3b9 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -2028,9 +2028,30 @@ def _build_block_statement( stmt = _build_http_request_statement(block) elif block_type == "pdf_parser": stmt = _build_pdf_parser_statement(block) + elif block_type == "conditional": + # Conditional blocks are evaluated at runtime by the workflow engine. + # Generate a descriptive comment showing this is a runtime branch point. + # The blocks inside conditional branches are processed separately when executed. + branches = block.get("branches") or block.get("ordered_branches") or [] + branch_info_lines = [] + for i, branch in enumerate(branches): + next_label = branch.get("next_block_label", "?") + condition = branch.get("condition", "") + # Truncate long conditions for readability + if len(condition) > 50: + condition = condition[:47] + "..." + branch_info_lines.append(f"# Branch {i + 1}: {condition!r} → {next_label}") + + if branch_info_lines: + branch_info = "\n".join(branch_info_lines) + comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine. One branch executes:\n{branch_info}" + else: + comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine." + + stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(repr(comment_text)))]) else: - # Default case for unknown block types - stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))]) + # Default case for unknown block types - use quoted string literal to avoid libcst validation error + stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"'# Unknown block type: {block_type}'"))]) return stmt diff --git a/skyvern/core/script_generations/transform_workflow_run.py b/skyvern/core/script_generations/transform_workflow_run.py index 6269ff2f..ac53448b 100644 --- a/skyvern/core/script_generations/transform_workflow_run.py +++ b/skyvern/core/script_generations/transform_workflow_run.py @@ -1,3 +1,4 @@ +from collections import defaultdict from dataclasses import dataclass from typing import Any @@ -8,6 +9,7 @@ from skyvern.forge import app from skyvern.schemas.workflows import BlockType from skyvern.services import workflow_service from skyvern.webeye.actions.action_types import ActionType +from skyvern.webeye.actions.actions import Action LOG = structlog.get_logger(__name__) @@ -22,6 +24,29 @@ class CodeGenInput: task_v2_child_blocks: dict[str, list[dict[str, Any]]] # task_v2_label -> list of child blocks +def _process_action_for_block( + action: Action, + block_dump: dict[str, Any], +) -> dict[str, Any]: + """Process a single action and add block-specific context like data extraction goal.""" + action_dump = action.model_dump() + action_dump["xpath"] = action.get_xpath() + action_dump["has_mini_agent"] = action.has_mini_agent + if ( + "data_extraction_goal" in block_dump + and block_dump["data_extraction_goal"] + and action.action_type == ActionType.EXTRACT + ): + action_dump["data_extraction_goal"] = block_dump["data_extraction_goal"] + if ( + "extracted_information_schema" in block_dump + and block_dump["extracted_information_schema"] + and action.action_type == ActionType.EXTRACT + ): + action_dump["data_extraction_schema"] = block_dump["extracted_information_schema"] + return action_dump + + async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organization_id: str) -> CodeGenInput: # get the workflow run request workflow_run_resp = await workflow_service.get_workflow_run_response( @@ -54,8 +79,41 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz # Create mapping from definition blocks by label for quick lookup workflow_run_blocks_by_label = {block.label: block for block in workflow_run_blocks if block.label} + # Batch fetch all tasks and actions upfront to avoid N+1 queries + # First pass: collect all task_ids from workflow run blocks + all_task_ids: set[str] = set() + for rb in workflow_run_blocks: + if rb.block_type in SCRIPT_TASK_BLOCKS and rb.task_id: + all_task_ids.add(rb.task_id) + + # Batch fetch all tasks and actions in 2 queries instead of N+1 + tasks_by_id: dict[str, Any] = {} + actions_by_task_id: dict[str, list[Action]] = defaultdict(list) + + if all_task_ids: + task_ids_list = list(all_task_ids) + # Single query for all tasks + tasks = await app.DATABASE.get_tasks_by_ids(task_ids=task_ids_list, organization_id=organization_id) + tasks_by_id = {task.task_id: task for task in tasks} + LOG.debug( + "Batch fetched tasks for code gen", + workflow_run_id=workflow_run_id, + task_count=len(tasks), + ) + + # Single query for all actions + all_actions = await app.DATABASE.get_tasks_actions(task_ids=task_ids_list, organization_id=organization_id) + for action in all_actions: + if action.task_id: + actions_by_task_id[action.task_id].append(action) + LOG.debug( + "Batch fetched actions for code gen", + workflow_run_id=workflow_run_id, + action_count=len(all_actions), + ) + workflow_block_dump = [] - actions_by_task = {} + actions_by_task: dict[str, list[dict[str, Any]]] = {} task_v2_child_blocks = {} # Loop through workflow run blocks and match to original definition blocks by label @@ -75,7 +133,8 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz # For task blocks, add execution data while preserving templated information if run_block.block_type in SCRIPT_TASK_BLOCKS and run_block.task_id: - task = await app.DATABASE.get_task(task_id=run_block.task_id, organization_id=organization_id) + # Use pre-fetched task data (batch fetched) + task = tasks_by_id.get(run_block.task_id) if task: # Add task execution data but preserve original templated fields task_dump = task.model_dump() @@ -94,29 +153,9 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz } ) - # Get task actions - actions = await app.DATABASE.get_task_actions_hydrated( - task_id=run_block.task_id, organization_id=organization_id - ) - action_dumps = [] - for action in actions: - action_dump = action.model_dump() - action_dump["xpath"] = action.get_xpath() - action_dump["has_mini_agent"] = action.has_mini_agent - if ( - "data_extraction_goal" in final_dump - and final_dump["data_extraction_goal"] - and action.action_type == ActionType.EXTRACT - ): - # use the right data extraction goal for the extract action - action_dump["data_extraction_goal"] = final_dump["data_extraction_goal"] - if ( - "extracted_information_schema" in final_dump - and final_dump["extracted_information_schema"] - and action.action_type == ActionType.EXTRACT - ): - action_dump["data_extraction_schema"] = final_dump["extracted_information_schema"] - action_dumps.append(action_dump) + # Use pre-fetched actions (batch fetched) + actions = actions_by_task_id.get(run_block.task_id, []) + action_dumps = [_process_action_for_block(action, final_dump) for action in actions] actions_by_task[run_block.task_id] = action_dumps else: LOG.warning("Task not found", task_id=run_block.task_id) @@ -197,8 +236,8 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz child_run_block = child_run_blocks_by_label.get(loop_block_label) if loop_block_label else None if child_run_block and child_run_block.block_type in SCRIPT_TASK_BLOCKS and child_run_block.task_id: - # Get task data for this child block - task = await app.DATABASE.get_task(task_id=child_run_block.task_id, organization_id=organization_id) + # Use pre-fetched task data (batch fetched) + task = tasks_by_id.get(child_run_block.task_id) if task: task_dump = task.model_dump() loop_block_dump.update({k: v for k, v in task_dump.items() if k not in loop_block_dump}) @@ -210,28 +249,9 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz } ) - # Get task actions for the child block - actions = await app.DATABASE.get_task_actions_hydrated( - task_id=child_run_block.task_id, organization_id=organization_id - ) - action_dumps = [] - for action in actions: - action_dump = action.model_dump() - action_dump["xpath"] = action.get_xpath() - action_dump["has_mini_agent"] = action.has_mini_agent - if ( - "data_extraction_goal" in loop_block_dump - and loop_block_dump["data_extraction_goal"] - and action.action_type == ActionType.EXTRACT - ): - action_dump["data_extraction_goal"] = loop_block_dump["data_extraction_goal"] - if ( - "extracted_information_schema" in loop_block_dump - and loop_block_dump["extracted_information_schema"] - and action.action_type == ActionType.EXTRACT - ): - action_dump["data_extraction_schema"] = loop_block_dump["extracted_information_schema"] - action_dumps.append(action_dump) + # Use pre-fetched actions (batch fetched) + actions = actions_by_task_id.get(child_run_block.task_id, []) + action_dumps = [_process_action_for_block(action, loop_block_dump) for action in actions] actions_by_task[child_run_block.task_id] = action_dumps else: LOG.warning( diff --git a/skyvern/forge/agent_functions.py b/skyvern/forge/agent_functions.py index e227dae0..22fce7ea 100644 --- a/skyvern/forge/agent_functions.py +++ b/skyvern/forge/agent_functions.py @@ -20,8 +20,6 @@ from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus from skyvern.forge.sdk.trace import traced from skyvern.forge.sdk.workflow.models.block import BlockTypeVar -from skyvern.services import workflow_script_service -from skyvern.webeye.actions.action_types import POST_ACTION_EXECUTION_ACTION_TYPES from skyvern.webeye.actions.actions import Action from skyvern.webeye.browser_state import BrowserState from skyvern.webeye.scraper.scraped_page import ELEMENT_NODE_ATTRIBUTES, CleanupElementTreeFunc, json_to_html @@ -601,42 +599,12 @@ class AgentFunction: if not settings.ENABLE_CODE_BLOCK: raise DisabledBlockExecutionError("CodeBlock is disabled") + # TODO: Remove these methods if nothing calls them after verifying in production async def _post_action_execution(self, action: Action) -> None: - """ - If this is a workflow running environment, generate the - """ - if action.action_type not in POST_ACTION_EXECUTION_ACTION_TYPES: - return - context = skyvern_context.current() - if ( - not context - or not context.root_workflow_run_id - or not context.organization_id - or not context.generate_script - ): - return - root_workflow_run_id = context.root_workflow_run_id - organization_id = context.organization_id - workflow_run = await app.DATABASE.get_workflow_run( - workflow_run_id=root_workflow_run_id, organization_id=organization_id - ) - if not workflow_run: - return - workflow = await app.DATABASE.get_workflow( - workflow_id=workflow_run.workflow_id, organization_id=organization_id - ) - if not workflow: - return - LOG.info( - "Post action execution", - root_workflow_run_id=context.root_workflow_run_id, - organization_id=context.organization_id, - ) + """Post-action hook - now a no-op. - await workflow_script_service.generate_or_update_pending_workflow_script( - workflow_run=workflow_run, - workflow=workflow, - ) + Script generation moved to block-level via _generate_pending_script_for_block() in service.py. + """ async def post_action_execution(self, action: Action) -> None: - asyncio.create_task(self._post_action_execution(action)) + pass diff --git a/skyvern/forge/sdk/db/agent_db.py b/skyvern/forge/sdk/db/agent_db.py index 84351d83..8a70114c 100644 --- a/skyvern/forge/sdk/db/agent_db.py +++ b/skyvern/forge/sdk/db/agent_db.py @@ -577,10 +577,11 @@ class AgentDB(BaseAlchemyDB): select(ActionModel) .filter(ActionModel.organization_id == organization_id) .filter(ActionModel.task_id.in_(task_ids)) - .order_by(ActionModel.created_at.desc()) + .order_by(ActionModel.created_at) ) actions = (await session.scalars(query)).all() - return [hydrate_action(action, empty_element_id=True) for action in actions] + # Must match get_task_actions_hydrated: no empty_element_id so None element_ids stay None + return [hydrate_action(action) for action in actions] except SQLAlchemyError: LOG.error("SQLAlchemyError", exc_info=True) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 52054f10..e205f95d 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -695,6 +695,7 @@ class WorkflowService: ) workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id, organization_id=organization_id) workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id=workflow_run.workflow_permanent_id) + has_conditionals = workflow_script_service.workflow_has_conditionals(workflow) browser_profile_id = workflow_run.browser_profile_id close_browser_on_completion = browser_session_id is None and not workflow_run.browser_address @@ -853,6 +854,7 @@ class WorkflowService: block_labels=block_labels, blocks_to_update=blocks_to_update, finalize=True, # Force regeneration to ensure field mappings have complete action data + has_conditionals=has_conditionals, ) # Execute finally block if configured. Skip for: canceled (user explicitly stopped) @@ -1035,6 +1037,7 @@ class WorkflowService: should_stop, _, ) = await self._execute_single_block( + workflow=workflow, block=block, block_idx=block_idx, blocks_cnt=blocks_cnt, @@ -1052,6 +1055,56 @@ class WorkflowService: break return workflow_run, blocks_to_update + async def _generate_pending_script_for_block( + self, + workflow: Workflow, + workflow_run: WorkflowRun, + block_result: BlockResult | None, + ) -> None: + """Generate pending script after a block completes successfully. + + This is called after each block execution instead of after each action, + reducing script generation frequency while maintaining progressive updates. + Uses asyncio.create_task() to avoid adding latency between blocks. + """ + if not block_result or block_result.status != BlockStatus.completed: + return + + context = skyvern_context.current() + if not context or not context.generate_script: + return + + disable_script_generation = await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached( + "DISABLE_GENERATE_SCRIPT_AFTER_BLOCK", + workflow_run.workflow_run_id, + properties={"organization_id": workflow_run.organization_id}, + ) + if disable_script_generation: + return + + asyncio.create_task( + self._do_generate_pending_script(workflow, workflow_run), + name=f"script_gen_{workflow_run.workflow_run_id}", + ) + + async def _do_generate_pending_script( + self, + workflow: Workflow, + workflow_run: WorkflowRun, + ) -> None: + """Fire-and-forget wrapper for pending script generation with error handling.""" + try: + await workflow_script_service.generate_or_update_pending_workflow_script( + workflow_run=workflow_run, + workflow=workflow, + ) + except Exception: + LOG.warning( + "Failed to generate pending script after block completion", + workflow_run_id=workflow_run.workflow_run_id, + exc_info=True, + ) + async def _execute_workflow_blocks_dag( self, *, @@ -1102,6 +1155,7 @@ class WorkflowService: should_stop, branch_metadata, ) = await self._execute_single_block( + workflow=workflow, block=block, block_idx=block_idx, blocks_cnt=total_blocks, @@ -1155,6 +1209,7 @@ class WorkflowService: async def _execute_single_block( self, *, + workflow: Workflow, block: BlockTypeVar, block_idx: int, blocks_cnt: int, @@ -1325,6 +1380,10 @@ class WorkflowService: workflow_run=workflow_run, workflow_run_id=workflow_run_id, ) + + # Generate pending script after block completes successfully + await self._generate_pending_script_for_block(workflow, workflow_run, workflow_run_block_result) + return workflow_run, blocks_to_update, workflow_run_block_result, should_stop, branch_metadata except Exception as e: @@ -3357,6 +3416,7 @@ class WorkflowService: block_labels: list[str] | None = None, blocks_to_update: set[str] | None = None, finalize: bool = False, + has_conditionals: bool | None = None, ) -> None: """ Generate or regenerate workflow script if needed. @@ -3369,6 +3429,7 @@ class WorkflowService: finalize: If True, check if any actions were skipped during script generation due to missing data (race condition). Only regenerate if needed. This fixes SKY-7653 while avoiding unnecessary regeneration costs. + has_conditionals: Whether the workflow has conditional blocks. If None, will be computed. """ code_gen = workflow_run.code_gen blocks_to_update = set(blocks_to_update or []) @@ -3435,12 +3496,28 @@ class WorkflowService: should_cache_block_labels.add(settings.WORKFLOW_START_BLOCK_LABEL) cached_block_labels.add(settings.WORKFLOW_START_BLOCK_LABEL) + # For workflows with conditional blocks, "missing" labels from unexecuted branches + # should NOT trigger regeneration. They will be cached when those branches execute. + # This prevents the bug where every run triggers unnecessary regeneration because + # blocks from unexecuted branches are always "missing". + if has_conditionals is None: + has_conditionals = workflow_script_service.workflow_has_conditionals(workflow) + if cached_block_labels != should_cache_block_labels: missing_labels = should_cache_block_labels - cached_block_labels - if missing_labels: + if missing_labels and not has_conditionals: + # Only add missing labels for workflows WITHOUT conditionals. + # For workflows WITH conditionals, missing labels are expected (unexecuted branches). blocks_to_update.update(missing_labels) - # Always rebuild the orchestrator if the definition changed - blocks_to_update.add(settings.WORKFLOW_START_BLOCK_LABEL) + # Always rebuild the orchestrator if the definition changed + blocks_to_update.add(settings.WORKFLOW_START_BLOCK_LABEL) + elif missing_labels and has_conditionals: + LOG.debug( + "Skipping regeneration for missing labels in workflow with conditionals", + workflow_id=workflow.workflow_id, + workflow_run_id=workflow_run.workflow_run_id, + missing_labels=list(missing_labels), + ) should_regenerate = bool(blocks_to_update) or bool(code_gen) diff --git a/skyvern/services/workflow_script_service.py b/skyvern/services/workflow_script_service.py index 9ef45644..ff0cb4bb 100644 --- a/skyvern/services/workflow_script_service.py +++ b/skyvern/services/workflow_script_service.py @@ -18,6 +18,27 @@ from skyvern.services import script_service LOG = structlog.get_logger() jinja_sandbox_env = SandboxedEnvironment() + +def workflow_has_conditionals(workflow: Workflow) -> bool: + """ + Check if a workflow contains any conditional blocks. + + This is used to determine whether "missing" blocks in the cache should trigger + regeneration. For workflows with conditionals, blocks in unexecuted branches + are legitimately missing and should NOT trigger regeneration. + """ + try: + all_blocks = get_all_blocks(workflow.workflow_definition.blocks) + return any(block.block_type == BlockType.CONDITIONAL for block in all_blocks) + except Exception: + LOG.warning( + "Failed to check workflow for conditional blocks", + workflow_id=workflow.workflow_id, + exc_info=True, + ) + return False + + # Cache for workflow scripts - only stores non-None results _workflow_script_cache: TTLCache[tuple, "Script"] = TTLCache(maxsize=128, ttl=60 * 60) @@ -232,27 +253,10 @@ async def generate_workflow_script( cached_script: Script | None = None, updated_block_labels: set[str] | None = None, ) -> None: - # Disable script generation for workflows containing conditional blocks to avoid caching divergent paths. - try: - all_blocks = get_all_blocks(workflow.workflow_definition.blocks) - has_conditional = any(block.block_type == BlockType.CONDITIONAL for block in all_blocks) - except Exception: - has_conditional = False - LOG.warning( - "Failed to inspect workflow blocks for conditional types; continuing with script generation", - workflow_id=workflow.workflow_id, - workflow_run_id=workflow_run.workflow_run_id, - exc_info=True, - ) - - if has_conditional: - LOG.info( - "Skipping script generation for workflow containing conditional blocks", - workflow_id=workflow.workflow_id, - workflow_run_id=workflow_run.workflow_run_id, - ) - return - + # Note: Workflows with conditional blocks ARE supported. The conditional block itself + # is not cached (it's evaluated at runtime), but cacheable blocks in branches are + # cached progressively as they execute. See workflow_has_conditionals() for the + # regeneration logic that prevents unnecessary regeneration for unexecuted branches. try: LOG.info( "Generating script for workflow",