Add support for conditional blocks when running with script caching (#4603)
This commit is contained in:
@@ -2028,9 +2028,30 @@ def _build_block_statement(
|
||||
stmt = _build_http_request_statement(block)
|
||||
elif block_type == "pdf_parser":
|
||||
stmt = _build_pdf_parser_statement(block)
|
||||
elif block_type == "conditional":
|
||||
# Conditional blocks are evaluated at runtime by the workflow engine.
|
||||
# Generate a descriptive comment showing this is a runtime branch point.
|
||||
# The blocks inside conditional branches are processed separately when executed.
|
||||
branches = block.get("branches") or block.get("ordered_branches") or []
|
||||
branch_info_lines = []
|
||||
for i, branch in enumerate(branches):
|
||||
next_label = branch.get("next_block_label", "?")
|
||||
condition = branch.get("condition", "")
|
||||
# Truncate long conditions for readability
|
||||
if len(condition) > 50:
|
||||
condition = condition[:47] + "..."
|
||||
branch_info_lines.append(f"# Branch {i + 1}: {condition!r} → {next_label}")
|
||||
|
||||
if branch_info_lines:
|
||||
branch_info = "\n".join(branch_info_lines)
|
||||
comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine. One branch executes:\n{branch_info}"
|
||||
else:
|
||||
comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine."
|
||||
|
||||
stmt = cst.SimpleStatementLine([cst.Expr(cst.parse_expression(f"({repr(comment_text)})"))])
|
||||
else:
|
||||
# Default case for unknown block types
|
||||
stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))])
|
||||
# Default case for unknown block types - use quoted string literal to avoid libcst validation error
|
||||
stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"'# Unknown block type: {block_type}'"))])
|
||||
|
||||
return stmt
|
||||
|
||||
@@ -2111,15 +2132,23 @@ async def generate_workflow_script_python_code(
|
||||
pending: bool = False,
|
||||
cached_blocks: dict[str, ScriptBlockSource] | None = None,
|
||||
updated_block_labels: set[str] | None = None,
|
||||
workflow_definition_labels: set[str] | None = None,
|
||||
downstream_of_updated: set[str] | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
Build a LibCST Module and emit .code (PEP-8-formatted source).
|
||||
|
||||
Cached script blocks can be reused by providing them via `cached_blocks`. Any labels present in
|
||||
`updated_block_labels` will be regenerated from the latest workflow run execution data.
|
||||
|
||||
Args:
|
||||
downstream_of_updated: Set of block labels that are downstream of any updated block
|
||||
in the DAG. These blocks should not be preserved from cache because they may
|
||||
depend on data from the updated upstream blocks.
|
||||
"""
|
||||
cached_blocks = cached_blocks or {}
|
||||
updated_block_labels = set(updated_block_labels or [])
|
||||
downstream_of_updated = downstream_of_updated or set()
|
||||
|
||||
# Drop cached entries that do not have usable source
|
||||
cached_blocks = {label: source for label, source in cached_blocks.items() if source.code}
|
||||
@@ -2305,6 +2334,86 @@ async def generate_workflow_script_python_code(
|
||||
|
||||
append_block_code(block_code)
|
||||
|
||||
# --- preserve cached blocks not executed in current run (progressive caching) ---
|
||||
# Track which blocks were processed from the current run
|
||||
processed_labels = {
|
||||
task.get("label") or task.get("title") or task.get("task_id") or f"task_{idx}"
|
||||
for idx, task in enumerate(task_v1_blocks)
|
||||
if not task.get("parent_task_v2_label")
|
||||
}
|
||||
processed_labels.update(
|
||||
task_v2.get("label") or f"task_v2_{task_v2.get('workflow_run_block_id')}" for task_v2 in task_v2_blocks
|
||||
)
|
||||
|
||||
# Add any cached blocks that weren't in the current run (from other conditional branches)
|
||||
preserved_block_count = 0
|
||||
failed_block_count = 0
|
||||
for cached_label, cached_source in cached_blocks.items():
|
||||
if cached_label in processed_labels:
|
||||
continue # Already processed from current run
|
||||
if cached_label == settings.WORKFLOW_START_BLOCK_LABEL:
|
||||
continue # Skip the start block, it's always regenerated
|
||||
# Only preserve blocks that still exist in the current workflow definition.
|
||||
# This prevents preserving stale blocks after a user deletes or renames blocks.
|
||||
if workflow_definition_labels is not None and cached_label not in workflow_definition_labels:
|
||||
LOG.debug(
|
||||
"Skipping cached block that no longer exists in workflow definition",
|
||||
block_label=cached_label,
|
||||
)
|
||||
continue
|
||||
# Don't preserve blocks that are downstream of any updated block.
|
||||
# These may depend on data from the updated upstream block, so their cached
|
||||
# code could be stale. For example, if Block A extracts data used by Block B,
|
||||
# and A is modified, then B's cached code may produce incorrect results.
|
||||
if cached_label in downstream_of_updated:
|
||||
LOG.debug(
|
||||
"Skipping cached block that is downstream of an updated block",
|
||||
block_label=cached_label,
|
||||
)
|
||||
continue
|
||||
# Note: cached_blocks is already filtered to remove entries without code (line ~2129),
|
||||
# but we keep this check as defensive programming in case the upstream filter changes
|
||||
if not cached_source.code:
|
||||
continue
|
||||
|
||||
LOG.info(
|
||||
"Preserving cached block from previous run (progressive caching)",
|
||||
block_label=cached_label,
|
||||
workflow_run_id=cached_source.workflow_run_id,
|
||||
)
|
||||
|
||||
# Persist the cached block to the new script revision
|
||||
if script_id and script_revision_id and organization_id:
|
||||
try:
|
||||
await create_or_update_script_block(
|
||||
block_code=cached_source.code,
|
||||
script_revision_id=script_revision_id,
|
||||
script_id=script_id,
|
||||
organization_id=organization_id,
|
||||
block_label=cached_label,
|
||||
update=pending,
|
||||
run_signature=cached_source.run_signature,
|
||||
workflow_run_id=cached_source.workflow_run_id,
|
||||
workflow_run_block_id=cached_source.workflow_run_block_id,
|
||||
input_fields=cached_source.input_fields,
|
||||
)
|
||||
preserved_block_count += 1
|
||||
except Exception as e:
|
||||
failed_block_count += 1
|
||||
LOG.error("Failed to preserve cached block", block_label=cached_label, error=str(e), exc_info=True)
|
||||
# Continue on individual block failures - these are cached blocks from previous runs
|
||||
# that aren't critical to the current run's success. The script will still work,
|
||||
# just without the previously cached code for this particular block.
|
||||
|
||||
append_block_code(cached_source.code)
|
||||
|
||||
if preserved_block_count > 0 or failed_block_count > 0:
|
||||
LOG.info(
|
||||
"Progressive caching summary",
|
||||
preserved_blocks=preserved_block_count,
|
||||
failed_blocks=failed_block_count,
|
||||
)
|
||||
|
||||
# --- runner ---------------------------------------------------------
|
||||
run_fn = _build_run_fn(blocks, workflow_run_request)
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import hashlib
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
@@ -2026,7 +2027,17 @@ async def run_script(
|
||||
context.workflow_run_id = workflow_run_id
|
||||
context.organization_id = organization_id
|
||||
|
||||
# run the script as subprocess; pass the parameters and run_id to the script
|
||||
# Check if script contains conditional blocks (not supported in CLI execution)
|
||||
# The marker "# === CONDITIONAL:" is generated by _build_block_statement() for conditional blocks.
|
||||
with open(path, encoding="utf-8") as f:
|
||||
script_content = f.read()
|
||||
if re.search(r"# === CONDITIONAL:", script_content):
|
||||
raise Exception(
|
||||
"This script contains conditional blocks which are not supported in CLI execution. "
|
||||
"Conditional blocks require the workflow engine's DAG traversal to properly evaluate "
|
||||
"branch conditions at runtime. Please run this workflow through the API instead."
|
||||
)
|
||||
|
||||
# Dynamically import the script at the given path
|
||||
spec = importlib.util.spec_from_file_location("user_script", path)
|
||||
if not spec or not spec.loader:
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import base64
|
||||
from collections import deque
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
from cachetools import TTLCache
|
||||
@@ -12,9 +14,11 @@ from skyvern.forge.sdk.core import skyvern_context
|
||||
from skyvern.forge.sdk.workflow.models.block import get_all_blocks
|
||||
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun
|
||||
from skyvern.schemas.scripts import FileEncoding, Script, ScriptFileCreate, ScriptStatus
|
||||
from skyvern.schemas.workflows import BlockType
|
||||
from skyvern.services import script_service
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from skyvern.forge.sdk.workflow.models.block import BlockTypeVar
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
jinja_sandbox_env = SandboxedEnvironment()
|
||||
|
||||
@@ -22,6 +26,55 @@ jinja_sandbox_env = SandboxedEnvironment()
|
||||
_workflow_script_cache: TTLCache[tuple, "Script"] = TTLCache(maxsize=128, ttl=60 * 60)
|
||||
|
||||
|
||||
def get_downstream_blocks(
|
||||
updated_labels: set[str],
|
||||
blocks: list["BlockTypeVar"],
|
||||
) -> set[str]:
|
||||
"""
|
||||
Get all blocks that are downstream of any updated block in the DAG.
|
||||
|
||||
Uses BFS to find all blocks reachable from the updated blocks by following
|
||||
next_block_label edges. This is used to invalidate cached blocks that may
|
||||
depend on data from updated upstream blocks.
|
||||
|
||||
Args:
|
||||
updated_labels: Set of block labels that have been updated
|
||||
blocks: List of all blocks in the workflow definition
|
||||
|
||||
Returns:
|
||||
Set of block labels that are downstream of any updated block
|
||||
"""
|
||||
# Build adjacency graph: block_label -> set of next block labels
|
||||
adjacency: dict[str, set[str]] = {}
|
||||
for block in blocks:
|
||||
if not block.label:
|
||||
continue
|
||||
adjacency[block.label] = set()
|
||||
# Check for conditional blocks (using duck typing for testability)
|
||||
# ConditionalBlock has ordered_branches with multiple next_block_label targets
|
||||
if hasattr(block, "ordered_branches"):
|
||||
for branch in block.ordered_branches:
|
||||
if branch.next_block_label:
|
||||
adjacency[block.label].add(branch.next_block_label)
|
||||
elif block.next_block_label:
|
||||
adjacency[block.label].add(block.next_block_label)
|
||||
|
||||
# BFS from all updated blocks to find downstream blocks
|
||||
downstream: set[str] = set()
|
||||
queue: deque[str] = deque(updated_labels)
|
||||
visited: set[str] = set(updated_labels)
|
||||
|
||||
while queue:
|
||||
current = queue.popleft()
|
||||
for next_label in adjacency.get(current, set()):
|
||||
if next_label not in visited:
|
||||
visited.add(next_label)
|
||||
downstream.add(next_label)
|
||||
queue.append(next_label)
|
||||
|
||||
return downstream
|
||||
|
||||
|
||||
def _make_workflow_script_cache_key(
|
||||
organization_id: str,
|
||||
workflow_permanent_id: str,
|
||||
@@ -232,27 +285,6 @@ async def generate_workflow_script(
|
||||
cached_script: Script | None = None,
|
||||
updated_block_labels: set[str] | None = None,
|
||||
) -> None:
|
||||
# Disable script generation for workflows containing conditional blocks to avoid caching divergent paths.
|
||||
try:
|
||||
all_blocks = get_all_blocks(workflow.workflow_definition.blocks)
|
||||
has_conditional = any(block.block_type == BlockType.CONDITIONAL for block in all_blocks)
|
||||
except Exception:
|
||||
has_conditional = False
|
||||
LOG.warning(
|
||||
"Failed to inspect workflow blocks for conditional types; continuing with script generation",
|
||||
workflow_id=workflow.workflow_id,
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
if has_conditional:
|
||||
LOG.info(
|
||||
"Skipping script generation for workflow containing conditional blocks",
|
||||
workflow_id=workflow.workflow_id,
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
LOG.info(
|
||||
"Generating script for workflow",
|
||||
@@ -281,6 +313,18 @@ async def generate_workflow_script(
|
||||
updated_block_labels.update(missing_labels)
|
||||
updated_block_labels.add(settings.WORKFLOW_START_BLOCK_LABEL)
|
||||
|
||||
# Build set of all block labels from the current workflow definition.
|
||||
# This is used to filter out stale cached blocks that no longer exist
|
||||
# (e.g., after a user deletes or renames blocks in the workflow).
|
||||
all_definition_blocks = get_all_blocks(workflow.workflow_definition.blocks)
|
||||
workflow_definition_labels = {block.label for block in all_definition_blocks if block.label}
|
||||
|
||||
# Compute blocks downstream of any updated block. These should not be preserved
|
||||
# from cache because they may depend on data from the updated upstream blocks.
|
||||
# For example, if Block A extracts data used by Block B, and A is modified,
|
||||
# then B's cached code may be stale even if B itself wasn't modified.
|
||||
downstream_of_updated = get_downstream_blocks(updated_block_labels, all_definition_blocks)
|
||||
|
||||
python_src = await generate_workflow_script_python_code(
|
||||
file_name=codegen_input.file_name,
|
||||
workflow_run_request=codegen_input.workflow_run,
|
||||
@@ -294,6 +338,8 @@ async def generate_workflow_script(
|
||||
pending=pending,
|
||||
cached_blocks=cached_block_sources,
|
||||
updated_block_labels=updated_block_labels,
|
||||
workflow_definition_labels=workflow_definition_labels,
|
||||
downstream_of_updated=downstream_of_updated,
|
||||
)
|
||||
except Exception:
|
||||
LOG.error("Failed to generate workflow script source", exc_info=True)
|
||||
|
||||
Reference in New Issue
Block a user