Add conditional block support for script caching (v2 - with bug fix) (#4642)

This commit is contained in:
pedrohsdb
2026-02-05 11:42:29 -08:00
committed by GitHub
parent 7e978bba36
commit 5fd4263847
7 changed files with 307 additions and 115 deletions

View File

@@ -1,10 +1,30 @@
# Script Generation Context
# Script Generation & Caching
## Overview
Script generation converts workflow runs into executable Python code that can be cached and reused. This enables "run with code" mode where workflows execute via cached scripts instead of the AI agent.
## Key Files
| File | Purpose |
|------|---------|
| `generate_script.py` | Generates Python code from workflow run data |
| `transform_workflow_run.py` | Transforms DB workflow run into code gen input |
| `skyvern/services/workflow_script_service.py` | Caching logic, script storage |
| `skyvern/forge/sdk/workflow/service.py` | Regeneration decision logic (`generate_script_if_needed`) |
## Key Constants
- `SCRIPT_TASK_BLOCKS` - Block types that have task_id and actions (task, navigation, extraction, etc.)
- `BLOCK_TYPES_THAT_SHOULD_BE_CACHED` in `workflow/service.py` - Block types eligible for caching (includes for_loop)
## How Caching Works
1. **Block execution tracking** (service.py:1309-1316): When a block executes via agent and completes, it's added to `blocks_to_update`
2. **Regeneration decision** (`generate_script_if_needed`): Decides whether to regenerate based on `blocks_to_update` and `missing_labels`
3. **Script generation** (`generate_workflow_script`): Generates code only for blocks that executed this run
4. **Progressive caching**: Only executed blocks are cached; unexecuted blocks remain uncached until they run
## Script Block Requirements for `run_with: code`
For a workflow to execute with cached scripts (`run_with: code`), ALL top-level blocks must have:
@@ -13,6 +33,53 @@ For a workflow to execute with cached scripts (`run_with: code`), ALL top-level
Without these, the system falls back to `run_with: agent`.
## Critical: Two Mechanisms for Detecting New Blocks
| Mechanism | Location | What it catches |
|-----------|----------|-----------------|
| Execution tracking | service.py:1316 | Blocks that EXECUTED and aren't cached |
| `missing_labels` check | service.py:3436-3441 | Blocks in DEFINITION that aren't cached |
For workflows WITHOUT conditionals, these are equivalent.
For workflows WITH conditionals, they differ - see "Conditional Blocks" below.
## Conditional Blocks
Conditional blocks (`BlockType.CONDITIONAL`) are **NOT cached** - they always run via agent to evaluate conditions at runtime. However, cacheable blocks inside conditional branches ARE cached when they execute.
### Key Insight: Progressive Branch Caching
With conditionals, not all branches execute in a single run. The caching system handles this via "progressive caching":
- Run 1 takes branch A → caches blocks from A
- Run 2 takes branch B → caches blocks from B (preserves A's cache)
- Eventually all executed branches have cached blocks
This means the workflow DEFINITION has all blocks, but the workflow RUN only executes some blocks.
## Performance Optimizations
### Batch Task and Action Queries
**Location**: `transform_workflow_run.py`
Previously, the code made N+1 queries: one `get_task()` and one `get_task_actions_hydrated()` per task block. For workflows with 20 blocks, this meant 40 DB queries.
Now we batch all queries upfront:
1. Collect all task_ids from workflow_run_blocks
2. Single `get_tasks_by_ids()` call for all tasks
3. Single `get_tasks_actions()` call for all actions
4. Process blocks using pre-fetched data from dictionaries
**Impact**: Reduces from 2N queries to 2 queries.
### Block-Level Script Generation
**Location**: `service.py:_generate_pending_script_for_block()`
Previously, `generate_or_update_pending_workflow_script()` was called after each action (CLICK, INPUT_TEXT, etc.), generating "pending" script drafts ~10-50x per workflow run.
Now script generation happens at block completion via `_generate_pending_script_for_block()`, called from both `_execute_workflow_blocks()` and `_execute_workflow_blocks_dag()`.
**Impact**: Reduces script generation frequency by 10-50x while maintaining progressive updates.
## Adding New Cacheable Block Types
When adding a new block type that should support cached execution:
@@ -30,3 +97,37 @@ When adding a new block type that should support cached execution:
2. `task_v2_blocks` - task_v2 blocks with child blocks
3. `for_loop_blocks` - ForLoop container blocks
4. `__start_block__` - Workflow entry point
## Things to Watch Out For
1. **Definition vs Execution**: The workflow DEFINITION has all blocks; the workflow RUN only executes some blocks (especially with conditionals)
2. **`blocks_to_update` sources**: This set is populated from multiple places - block execution (line 1316), finalize logic, explicit requests. Understand all sources before modifying.
3. **Database operations per regeneration**: Each regeneration does DELETE + CREATE + UPLOAD + INSERT. Unnecessary regenerations can flood the database.
4. **`BLOCK_TYPES_THAT_SHOULD_BE_CACHED`**: Not all block types are cached. Conditional, wait, code blocks etc. are excluded.
5. **Batch query data mapping**: When using `tasks_by_id` and `actions_by_task_id` dicts, ensure task_ids are consistent between run_blocks and the queried data.
## Testing Caching Changes
When modifying regeneration or caching logic, test these scenarios:
1. **Same blocks run twice** - Should NOT regenerate on 2nd run
2. **New block added** - Should regenerate to include new block
3. **Workflow with conditionals** - Different branches should cache progressively
4. **Block type not in `BLOCK_TYPES_THAT_SHOULD_BE_CACHED`** - Should NOT trigger caching
## Test Commands
```bash
# Run script-related tests
python -m pytest tests/unit/ -k "script" --ignore=tests/unit/test_security.py -v
# Run conditional caching tests specifically
python -m pytest tests/unit/test_conditional_script_caching.py -v
# Run forloop script tests
python -m pytest tests/unit/test_forloop_script_generation.py -v
```

View File

@@ -2028,9 +2028,30 @@ def _build_block_statement(
stmt = _build_http_request_statement(block)
elif block_type == "pdf_parser":
stmt = _build_pdf_parser_statement(block)
elif block_type == "conditional":
# Conditional blocks are evaluated at runtime by the workflow engine.
# Generate a descriptive comment showing this is a runtime branch point.
# The blocks inside conditional branches are processed separately when executed.
branches = block.get("branches") or block.get("ordered_branches") or []
branch_info_lines = []
for i, branch in enumerate(branches):
next_label = branch.get("next_block_label", "?")
condition = branch.get("condition", "")
# Truncate long conditions for readability
if len(condition) > 50:
condition = condition[:47] + "..."
branch_info_lines.append(f"# Branch {i + 1}: {condition!r}{next_label}")
if branch_info_lines:
branch_info = "\n".join(branch_info_lines)
comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine. One branch executes:\n{branch_info}"
else:
comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine."
stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(repr(comment_text)))])
else:
# Default case for unknown block types
stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))])
# Default case for unknown block types - use quoted string literal to avoid libcst validation error
stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"'# Unknown block type: {block_type}'"))])
return stmt

View File

@@ -1,3 +1,4 @@
from collections import defaultdict
from dataclasses import dataclass
from typing import Any
@@ -8,6 +9,7 @@ from skyvern.forge import app
from skyvern.schemas.workflows import BlockType
from skyvern.services import workflow_service
from skyvern.webeye.actions.action_types import ActionType
from skyvern.webeye.actions.actions import Action
LOG = structlog.get_logger(__name__)
@@ -22,6 +24,29 @@ class CodeGenInput:
task_v2_child_blocks: dict[str, list[dict[str, Any]]] # task_v2_label -> list of child blocks
def _process_action_for_block(
action: Action,
block_dump: dict[str, Any],
) -> dict[str, Any]:
"""Process a single action and add block-specific context like data extraction goal."""
action_dump = action.model_dump()
action_dump["xpath"] = action.get_xpath()
action_dump["has_mini_agent"] = action.has_mini_agent
if (
"data_extraction_goal" in block_dump
and block_dump["data_extraction_goal"]
and action.action_type == ActionType.EXTRACT
):
action_dump["data_extraction_goal"] = block_dump["data_extraction_goal"]
if (
"extracted_information_schema" in block_dump
and block_dump["extracted_information_schema"]
and action.action_type == ActionType.EXTRACT
):
action_dump["data_extraction_schema"] = block_dump["extracted_information_schema"]
return action_dump
async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organization_id: str) -> CodeGenInput:
# get the workflow run request
workflow_run_resp = await workflow_service.get_workflow_run_response(
@@ -54,8 +79,41 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
# Create mapping from definition blocks by label for quick lookup
workflow_run_blocks_by_label = {block.label: block for block in workflow_run_blocks if block.label}
# Batch fetch all tasks and actions upfront to avoid N+1 queries
# First pass: collect all task_ids from workflow run blocks
all_task_ids: set[str] = set()
for rb in workflow_run_blocks:
if rb.block_type in SCRIPT_TASK_BLOCKS and rb.task_id:
all_task_ids.add(rb.task_id)
# Batch fetch all tasks and actions in 2 queries instead of N+1
tasks_by_id: dict[str, Any] = {}
actions_by_task_id: dict[str, list[Action]] = defaultdict(list)
if all_task_ids:
task_ids_list = list(all_task_ids)
# Single query for all tasks
tasks = await app.DATABASE.get_tasks_by_ids(task_ids=task_ids_list, organization_id=organization_id)
tasks_by_id = {task.task_id: task for task in tasks}
LOG.debug(
"Batch fetched tasks for code gen",
workflow_run_id=workflow_run_id,
task_count=len(tasks),
)
# Single query for all actions
all_actions = await app.DATABASE.get_tasks_actions(task_ids=task_ids_list, organization_id=organization_id)
for action in all_actions:
if action.task_id:
actions_by_task_id[action.task_id].append(action)
LOG.debug(
"Batch fetched actions for code gen",
workflow_run_id=workflow_run_id,
action_count=len(all_actions),
)
workflow_block_dump = []
actions_by_task = {}
actions_by_task: dict[str, list[dict[str, Any]]] = {}
task_v2_child_blocks = {}
# Loop through workflow run blocks and match to original definition blocks by label
@@ -75,7 +133,8 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
# For task blocks, add execution data while preserving templated information
if run_block.block_type in SCRIPT_TASK_BLOCKS and run_block.task_id:
task = await app.DATABASE.get_task(task_id=run_block.task_id, organization_id=organization_id)
# Use pre-fetched task data (batch fetched)
task = tasks_by_id.get(run_block.task_id)
if task:
# Add task execution data but preserve original templated fields
task_dump = task.model_dump()
@@ -94,29 +153,9 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
}
)
# Get task actions
actions = await app.DATABASE.get_task_actions_hydrated(
task_id=run_block.task_id, organization_id=organization_id
)
action_dumps = []
for action in actions:
action_dump = action.model_dump()
action_dump["xpath"] = action.get_xpath()
action_dump["has_mini_agent"] = action.has_mini_agent
if (
"data_extraction_goal" in final_dump
and final_dump["data_extraction_goal"]
and action.action_type == ActionType.EXTRACT
):
# use the right data extraction goal for the extract action
action_dump["data_extraction_goal"] = final_dump["data_extraction_goal"]
if (
"extracted_information_schema" in final_dump
and final_dump["extracted_information_schema"]
and action.action_type == ActionType.EXTRACT
):
action_dump["data_extraction_schema"] = final_dump["extracted_information_schema"]
action_dumps.append(action_dump)
# Use pre-fetched actions (batch fetched)
actions = actions_by_task_id.get(run_block.task_id, [])
action_dumps = [_process_action_for_block(action, final_dump) for action in actions]
actions_by_task[run_block.task_id] = action_dumps
else:
LOG.warning("Task not found", task_id=run_block.task_id)
@@ -197,8 +236,8 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
child_run_block = child_run_blocks_by_label.get(loop_block_label) if loop_block_label else None
if child_run_block and child_run_block.block_type in SCRIPT_TASK_BLOCKS and child_run_block.task_id:
# Get task data for this child block
task = await app.DATABASE.get_task(task_id=child_run_block.task_id, organization_id=organization_id)
# Use pre-fetched task data (batch fetched)
task = tasks_by_id.get(child_run_block.task_id)
if task:
task_dump = task.model_dump()
loop_block_dump.update({k: v for k, v in task_dump.items() if k not in loop_block_dump})
@@ -210,28 +249,9 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
}
)
# Get task actions for the child block
actions = await app.DATABASE.get_task_actions_hydrated(
task_id=child_run_block.task_id, organization_id=organization_id
)
action_dumps = []
for action in actions:
action_dump = action.model_dump()
action_dump["xpath"] = action.get_xpath()
action_dump["has_mini_agent"] = action.has_mini_agent
if (
"data_extraction_goal" in loop_block_dump
and loop_block_dump["data_extraction_goal"]
and action.action_type == ActionType.EXTRACT
):
action_dump["data_extraction_goal"] = loop_block_dump["data_extraction_goal"]
if (
"extracted_information_schema" in loop_block_dump
and loop_block_dump["extracted_information_schema"]
and action.action_type == ActionType.EXTRACT
):
action_dump["data_extraction_schema"] = loop_block_dump["extracted_information_schema"]
action_dumps.append(action_dump)
# Use pre-fetched actions (batch fetched)
actions = actions_by_task_id.get(child_run_block.task_id, [])
action_dumps = [_process_action_for_block(action, loop_block_dump) for action in actions]
actions_by_task[child_run_block.task_id] = action_dumps
else:
LOG.warning(

View File

@@ -20,8 +20,6 @@ from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus
from skyvern.forge.sdk.trace import traced
from skyvern.forge.sdk.workflow.models.block import BlockTypeVar
from skyvern.services import workflow_script_service
from skyvern.webeye.actions.action_types import POST_ACTION_EXECUTION_ACTION_TYPES
from skyvern.webeye.actions.actions import Action
from skyvern.webeye.browser_state import BrowserState
from skyvern.webeye.scraper.scraped_page import ELEMENT_NODE_ATTRIBUTES, CleanupElementTreeFunc, json_to_html
@@ -601,42 +599,12 @@ class AgentFunction:
if not settings.ENABLE_CODE_BLOCK:
raise DisabledBlockExecutionError("CodeBlock is disabled")
# TODO: Remove these methods if nothing calls them after verifying in production
async def _post_action_execution(self, action: Action) -> None:
"""
If this is a workflow running environment, generate the
"""
if action.action_type not in POST_ACTION_EXECUTION_ACTION_TYPES:
return
context = skyvern_context.current()
if (
not context
or not context.root_workflow_run_id
or not context.organization_id
or not context.generate_script
):
return
root_workflow_run_id = context.root_workflow_run_id
organization_id = context.organization_id
workflow_run = await app.DATABASE.get_workflow_run(
workflow_run_id=root_workflow_run_id, organization_id=organization_id
)
if not workflow_run:
return
workflow = await app.DATABASE.get_workflow(
workflow_id=workflow_run.workflow_id, organization_id=organization_id
)
if not workflow:
return
LOG.info(
"Post action execution",
root_workflow_run_id=context.root_workflow_run_id,
organization_id=context.organization_id,
)
"""Post-action hook - now a no-op.
await workflow_script_service.generate_or_update_pending_workflow_script(
workflow_run=workflow_run,
workflow=workflow,
)
Script generation moved to block-level via _generate_pending_script_for_block() in service.py.
"""
async def post_action_execution(self, action: Action) -> None:
asyncio.create_task(self._post_action_execution(action))
pass

View File

@@ -577,10 +577,11 @@ class AgentDB(BaseAlchemyDB):
select(ActionModel)
.filter(ActionModel.organization_id == organization_id)
.filter(ActionModel.task_id.in_(task_ids))
.order_by(ActionModel.created_at.desc())
.order_by(ActionModel.created_at)
)
actions = (await session.scalars(query)).all()
return [hydrate_action(action, empty_element_id=True) for action in actions]
# Must match get_task_actions_hydrated: no empty_element_id so None element_ids stay None
return [hydrate_action(action) for action in actions]
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)

View File

@@ -695,6 +695,7 @@ class WorkflowService:
)
workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id, organization_id=organization_id)
workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id=workflow_run.workflow_permanent_id)
has_conditionals = workflow_script_service.workflow_has_conditionals(workflow)
browser_profile_id = workflow_run.browser_profile_id
close_browser_on_completion = browser_session_id is None and not workflow_run.browser_address
@@ -853,6 +854,7 @@ class WorkflowService:
block_labels=block_labels,
blocks_to_update=blocks_to_update,
finalize=True, # Force regeneration to ensure field mappings have complete action data
has_conditionals=has_conditionals,
)
# Execute finally block if configured. Skip for: canceled (user explicitly stopped)
@@ -1035,6 +1037,7 @@ class WorkflowService:
should_stop,
_,
) = await self._execute_single_block(
workflow=workflow,
block=block,
block_idx=block_idx,
blocks_cnt=blocks_cnt,
@@ -1052,6 +1055,56 @@ class WorkflowService:
break
return workflow_run, blocks_to_update
async def _generate_pending_script_for_block(
self,
workflow: Workflow,
workflow_run: WorkflowRun,
block_result: BlockResult | None,
) -> None:
"""Generate pending script after a block completes successfully.
This is called after each block execution instead of after each action,
reducing script generation frequency while maintaining progressive updates.
Uses asyncio.create_task() to avoid adding latency between blocks.
"""
if not block_result or block_result.status != BlockStatus.completed:
return
context = skyvern_context.current()
if not context or not context.generate_script:
return
disable_script_generation = await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"DISABLE_GENERATE_SCRIPT_AFTER_BLOCK",
workflow_run.workflow_run_id,
properties={"organization_id": workflow_run.organization_id},
)
if disable_script_generation:
return
asyncio.create_task(
self._do_generate_pending_script(workflow, workflow_run),
name=f"script_gen_{workflow_run.workflow_run_id}",
)
async def _do_generate_pending_script(
self,
workflow: Workflow,
workflow_run: WorkflowRun,
) -> None:
"""Fire-and-forget wrapper for pending script generation with error handling."""
try:
await workflow_script_service.generate_or_update_pending_workflow_script(
workflow_run=workflow_run,
workflow=workflow,
)
except Exception:
LOG.warning(
"Failed to generate pending script after block completion",
workflow_run_id=workflow_run.workflow_run_id,
exc_info=True,
)
async def _execute_workflow_blocks_dag(
self,
*,
@@ -1102,6 +1155,7 @@ class WorkflowService:
should_stop,
branch_metadata,
) = await self._execute_single_block(
workflow=workflow,
block=block,
block_idx=block_idx,
blocks_cnt=total_blocks,
@@ -1155,6 +1209,7 @@ class WorkflowService:
async def _execute_single_block(
self,
*,
workflow: Workflow,
block: BlockTypeVar,
block_idx: int,
blocks_cnt: int,
@@ -1325,6 +1380,10 @@ class WorkflowService:
workflow_run=workflow_run,
workflow_run_id=workflow_run_id,
)
# Generate pending script after block completes successfully
await self._generate_pending_script_for_block(workflow, workflow_run, workflow_run_block_result)
return workflow_run, blocks_to_update, workflow_run_block_result, should_stop, branch_metadata
except Exception as e:
@@ -3357,6 +3416,7 @@ class WorkflowService:
block_labels: list[str] | None = None,
blocks_to_update: set[str] | None = None,
finalize: bool = False,
has_conditionals: bool | None = None,
) -> None:
"""
Generate or regenerate workflow script if needed.
@@ -3369,6 +3429,7 @@ class WorkflowService:
finalize: If True, check if any actions were skipped during script generation
due to missing data (race condition). Only regenerate if needed.
This fixes SKY-7653 while avoiding unnecessary regeneration costs.
has_conditionals: Whether the workflow has conditional blocks. If None, will be computed.
"""
code_gen = workflow_run.code_gen
blocks_to_update = set(blocks_to_update or [])
@@ -3435,12 +3496,28 @@ class WorkflowService:
should_cache_block_labels.add(settings.WORKFLOW_START_BLOCK_LABEL)
cached_block_labels.add(settings.WORKFLOW_START_BLOCK_LABEL)
# For workflows with conditional blocks, "missing" labels from unexecuted branches
# should NOT trigger regeneration. They will be cached when those branches execute.
# This prevents the bug where every run triggers unnecessary regeneration because
# blocks from unexecuted branches are always "missing".
if has_conditionals is None:
has_conditionals = workflow_script_service.workflow_has_conditionals(workflow)
if cached_block_labels != should_cache_block_labels:
missing_labels = should_cache_block_labels - cached_block_labels
if missing_labels:
if missing_labels and not has_conditionals:
# Only add missing labels for workflows WITHOUT conditionals.
# For workflows WITH conditionals, missing labels are expected (unexecuted branches).
blocks_to_update.update(missing_labels)
# Always rebuild the orchestrator if the definition changed
blocks_to_update.add(settings.WORKFLOW_START_BLOCK_LABEL)
# Always rebuild the orchestrator if the definition changed
blocks_to_update.add(settings.WORKFLOW_START_BLOCK_LABEL)
elif missing_labels and has_conditionals:
LOG.debug(
"Skipping regeneration for missing labels in workflow with conditionals",
workflow_id=workflow.workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
missing_labels=list(missing_labels),
)
should_regenerate = bool(blocks_to_update) or bool(code_gen)

View File

@@ -18,6 +18,27 @@ from skyvern.services import script_service
LOG = structlog.get_logger()
jinja_sandbox_env = SandboxedEnvironment()
def workflow_has_conditionals(workflow: Workflow) -> bool:
"""
Check if a workflow contains any conditional blocks.
This is used to determine whether "missing" blocks in the cache should trigger
regeneration. For workflows with conditionals, blocks in unexecuted branches
are legitimately missing and should NOT trigger regeneration.
"""
try:
all_blocks = get_all_blocks(workflow.workflow_definition.blocks)
return any(block.block_type == BlockType.CONDITIONAL for block in all_blocks)
except Exception:
LOG.warning(
"Failed to check workflow for conditional blocks",
workflow_id=workflow.workflow_id,
exc_info=True,
)
return False
# Cache for workflow scripts - only stores non-None results
_workflow_script_cache: TTLCache[tuple, "Script"] = TTLCache(maxsize=128, ttl=60 * 60)
@@ -232,27 +253,10 @@ async def generate_workflow_script(
cached_script: Script | None = None,
updated_block_labels: set[str] | None = None,
) -> None:
# Disable script generation for workflows containing conditional blocks to avoid caching divergent paths.
try:
all_blocks = get_all_blocks(workflow.workflow_definition.blocks)
has_conditional = any(block.block_type == BlockType.CONDITIONAL for block in all_blocks)
except Exception:
has_conditional = False
LOG.warning(
"Failed to inspect workflow blocks for conditional types; continuing with script generation",
workflow_id=workflow.workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
exc_info=True,
)
if has_conditional:
LOG.info(
"Skipping script generation for workflow containing conditional blocks",
workflow_id=workflow.workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
)
return
# Note: Workflows with conditional blocks ARE supported. The conditional block itself
# is not cached (it's evaluated at runtime), but cacheable blocks in branches are
# cached progressively as they execute. See workflow_has_conditionals() for the
# regeneration logic that prevents unnecessary regeneration for unexecuted branches.
try:
LOG.info(
"Generating script for workflow",