This commit is contained in:
@@ -2028,30 +2028,9 @@ def _build_block_statement(
|
|||||||
stmt = _build_http_request_statement(block)
|
stmt = _build_http_request_statement(block)
|
||||||
elif block_type == "pdf_parser":
|
elif block_type == "pdf_parser":
|
||||||
stmt = _build_pdf_parser_statement(block)
|
stmt = _build_pdf_parser_statement(block)
|
||||||
elif block_type == "conditional":
|
|
||||||
# Conditional blocks are evaluated at runtime by the workflow engine.
|
|
||||||
# Generate a descriptive comment showing this is a runtime branch point.
|
|
||||||
# The blocks inside conditional branches are processed separately when executed.
|
|
||||||
branches = block.get("branches") or block.get("ordered_branches") or []
|
|
||||||
branch_info_lines = []
|
|
||||||
for i, branch in enumerate(branches):
|
|
||||||
next_label = branch.get("next_block_label", "?")
|
|
||||||
condition = branch.get("condition", "")
|
|
||||||
# Truncate long conditions for readability
|
|
||||||
if len(condition) > 50:
|
|
||||||
condition = condition[:47] + "..."
|
|
||||||
branch_info_lines.append(f"# Branch {i + 1}: {condition!r} → {next_label}")
|
|
||||||
|
|
||||||
if branch_info_lines:
|
|
||||||
branch_info = "\n".join(branch_info_lines)
|
|
||||||
comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine. One branch executes:\n{branch_info}"
|
|
||||||
else:
|
|
||||||
comment_text = f"# === CONDITIONAL: {block_title} ===\n# Evaluated at runtime by workflow engine."
|
|
||||||
|
|
||||||
stmt = cst.SimpleStatementLine([cst.Expr(cst.parse_expression(f"({repr(comment_text)})"))])
|
|
||||||
else:
|
else:
|
||||||
# Default case for unknown block types - use quoted string literal to avoid libcst validation error
|
# Default case for unknown block types
|
||||||
stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"'# Unknown block type: {block_type}'"))])
|
stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))])
|
||||||
|
|
||||||
return stmt
|
return stmt
|
||||||
|
|
||||||
@@ -2132,23 +2111,15 @@ async def generate_workflow_script_python_code(
|
|||||||
pending: bool = False,
|
pending: bool = False,
|
||||||
cached_blocks: dict[str, ScriptBlockSource] | None = None,
|
cached_blocks: dict[str, ScriptBlockSource] | None = None,
|
||||||
updated_block_labels: set[str] | None = None,
|
updated_block_labels: set[str] | None = None,
|
||||||
workflow_definition_labels: set[str] | None = None,
|
|
||||||
downstream_of_updated: set[str] | None = None,
|
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Build a LibCST Module and emit .code (PEP-8-formatted source).
|
Build a LibCST Module and emit .code (PEP-8-formatted source).
|
||||||
|
|
||||||
Cached script blocks can be reused by providing them via `cached_blocks`. Any labels present in
|
Cached script blocks can be reused by providing them via `cached_blocks`. Any labels present in
|
||||||
`updated_block_labels` will be regenerated from the latest workflow run execution data.
|
`updated_block_labels` will be regenerated from the latest workflow run execution data.
|
||||||
|
|
||||||
Args:
|
|
||||||
downstream_of_updated: Set of block labels that are downstream of any updated block
|
|
||||||
in the DAG. These blocks should not be preserved from cache because they may
|
|
||||||
depend on data from the updated upstream blocks.
|
|
||||||
"""
|
"""
|
||||||
cached_blocks = cached_blocks or {}
|
cached_blocks = cached_blocks or {}
|
||||||
updated_block_labels = set(updated_block_labels or [])
|
updated_block_labels = set(updated_block_labels or [])
|
||||||
downstream_of_updated = downstream_of_updated or set()
|
|
||||||
|
|
||||||
# Drop cached entries that do not have usable source
|
# Drop cached entries that do not have usable source
|
||||||
cached_blocks = {label: source for label, source in cached_blocks.items() if source.code}
|
cached_blocks = {label: source for label, source in cached_blocks.items() if source.code}
|
||||||
@@ -2334,86 +2305,6 @@ async def generate_workflow_script_python_code(
|
|||||||
|
|
||||||
append_block_code(block_code)
|
append_block_code(block_code)
|
||||||
|
|
||||||
# --- preserve cached blocks not executed in current run (progressive caching) ---
|
|
||||||
# Track which blocks were processed from the current run
|
|
||||||
processed_labels = {
|
|
||||||
task.get("label") or task.get("title") or task.get("task_id") or f"task_{idx}"
|
|
||||||
for idx, task in enumerate(task_v1_blocks)
|
|
||||||
if not task.get("parent_task_v2_label")
|
|
||||||
}
|
|
||||||
processed_labels.update(
|
|
||||||
task_v2.get("label") or f"task_v2_{task_v2.get('workflow_run_block_id')}" for task_v2 in task_v2_blocks
|
|
||||||
)
|
|
||||||
|
|
||||||
# Add any cached blocks that weren't in the current run (from other conditional branches)
|
|
||||||
preserved_block_count = 0
|
|
||||||
failed_block_count = 0
|
|
||||||
for cached_label, cached_source in cached_blocks.items():
|
|
||||||
if cached_label in processed_labels:
|
|
||||||
continue # Already processed from current run
|
|
||||||
if cached_label == settings.WORKFLOW_START_BLOCK_LABEL:
|
|
||||||
continue # Skip the start block, it's always regenerated
|
|
||||||
# Only preserve blocks that still exist in the current workflow definition.
|
|
||||||
# This prevents preserving stale blocks after a user deletes or renames blocks.
|
|
||||||
if workflow_definition_labels is not None and cached_label not in workflow_definition_labels:
|
|
||||||
LOG.debug(
|
|
||||||
"Skipping cached block that no longer exists in workflow definition",
|
|
||||||
block_label=cached_label,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
# Don't preserve blocks that are downstream of any updated block.
|
|
||||||
# These may depend on data from the updated upstream block, so their cached
|
|
||||||
# code could be stale. For example, if Block A extracts data used by Block B,
|
|
||||||
# and A is modified, then B's cached code may produce incorrect results.
|
|
||||||
if cached_label in downstream_of_updated:
|
|
||||||
LOG.debug(
|
|
||||||
"Skipping cached block that is downstream of an updated block",
|
|
||||||
block_label=cached_label,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
# Note: cached_blocks is already filtered to remove entries without code (line ~2129),
|
|
||||||
# but we keep this check as defensive programming in case the upstream filter changes
|
|
||||||
if not cached_source.code:
|
|
||||||
continue
|
|
||||||
|
|
||||||
LOG.info(
|
|
||||||
"Preserving cached block from previous run (progressive caching)",
|
|
||||||
block_label=cached_label,
|
|
||||||
workflow_run_id=cached_source.workflow_run_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Persist the cached block to the new script revision
|
|
||||||
if script_id and script_revision_id and organization_id:
|
|
||||||
try:
|
|
||||||
await create_or_update_script_block(
|
|
||||||
block_code=cached_source.code,
|
|
||||||
script_revision_id=script_revision_id,
|
|
||||||
script_id=script_id,
|
|
||||||
organization_id=organization_id,
|
|
||||||
block_label=cached_label,
|
|
||||||
update=pending,
|
|
||||||
run_signature=cached_source.run_signature,
|
|
||||||
workflow_run_id=cached_source.workflow_run_id,
|
|
||||||
workflow_run_block_id=cached_source.workflow_run_block_id,
|
|
||||||
input_fields=cached_source.input_fields,
|
|
||||||
)
|
|
||||||
preserved_block_count += 1
|
|
||||||
except Exception as e:
|
|
||||||
failed_block_count += 1
|
|
||||||
LOG.error("Failed to preserve cached block", block_label=cached_label, error=str(e), exc_info=True)
|
|
||||||
# Continue on individual block failures - these are cached blocks from previous runs
|
|
||||||
# that aren't critical to the current run's success. The script will still work,
|
|
||||||
# just without the previously cached code for this particular block.
|
|
||||||
|
|
||||||
append_block_code(cached_source.code)
|
|
||||||
|
|
||||||
if preserved_block_count > 0 or failed_block_count > 0:
|
|
||||||
LOG.info(
|
|
||||||
"Progressive caching summary",
|
|
||||||
preserved_blocks=preserved_block_count,
|
|
||||||
failed_blocks=failed_block_count,
|
|
||||||
)
|
|
||||||
|
|
||||||
# --- runner ---------------------------------------------------------
|
# --- runner ---------------------------------------------------------
|
||||||
run_fn = _build_run_fn(blocks, workflow_run_request)
|
run_fn = _build_run_fn(blocks, workflow_run_request)
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import hashlib
|
|||||||
import importlib.util
|
import importlib.util
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
import uuid
|
import uuid
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@@ -2027,17 +2026,7 @@ async def run_script(
|
|||||||
context.workflow_run_id = workflow_run_id
|
context.workflow_run_id = workflow_run_id
|
||||||
context.organization_id = organization_id
|
context.organization_id = organization_id
|
||||||
|
|
||||||
# Check if script contains conditional blocks (not supported in CLI execution)
|
# run the script as subprocess; pass the parameters and run_id to the script
|
||||||
# The marker "# === CONDITIONAL:" is generated by _build_block_statement() for conditional blocks.
|
|
||||||
with open(path, encoding="utf-8") as f:
|
|
||||||
script_content = f.read()
|
|
||||||
if re.search(r"# === CONDITIONAL:", script_content):
|
|
||||||
raise Exception(
|
|
||||||
"This script contains conditional blocks which are not supported in CLI execution. "
|
|
||||||
"Conditional blocks require the workflow engine's DAG traversal to properly evaluate "
|
|
||||||
"branch conditions at runtime. Please run this workflow through the API instead."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Dynamically import the script at the given path
|
# Dynamically import the script at the given path
|
||||||
spec = importlib.util.spec_from_file_location("user_script", path)
|
spec = importlib.util.spec_from_file_location("user_script", path)
|
||||||
if not spec or not spec.loader:
|
if not spec or not spec.loader:
|
||||||
|
|||||||
@@ -1,6 +1,4 @@
|
|||||||
import base64
|
import base64
|
||||||
from collections import deque
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
import structlog
|
import structlog
|
||||||
from cachetools import TTLCache
|
from cachetools import TTLCache
|
||||||
@@ -14,11 +12,9 @@ from skyvern.forge.sdk.core import skyvern_context
|
|||||||
from skyvern.forge.sdk.workflow.models.block import get_all_blocks
|
from skyvern.forge.sdk.workflow.models.block import get_all_blocks
|
||||||
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun
|
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun
|
||||||
from skyvern.schemas.scripts import FileEncoding, Script, ScriptFileCreate, ScriptStatus
|
from skyvern.schemas.scripts import FileEncoding, Script, ScriptFileCreate, ScriptStatus
|
||||||
|
from skyvern.schemas.workflows import BlockType
|
||||||
from skyvern.services import script_service
|
from skyvern.services import script_service
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from skyvern.forge.sdk.workflow.models.block import BlockTypeVar
|
|
||||||
|
|
||||||
LOG = structlog.get_logger()
|
LOG = structlog.get_logger()
|
||||||
jinja_sandbox_env = SandboxedEnvironment()
|
jinja_sandbox_env = SandboxedEnvironment()
|
||||||
|
|
||||||
@@ -26,55 +22,6 @@ jinja_sandbox_env = SandboxedEnvironment()
|
|||||||
_workflow_script_cache: TTLCache[tuple, "Script"] = TTLCache(maxsize=128, ttl=60 * 60)
|
_workflow_script_cache: TTLCache[tuple, "Script"] = TTLCache(maxsize=128, ttl=60 * 60)
|
||||||
|
|
||||||
|
|
||||||
def get_downstream_blocks(
|
|
||||||
updated_labels: set[str],
|
|
||||||
blocks: list["BlockTypeVar"],
|
|
||||||
) -> set[str]:
|
|
||||||
"""
|
|
||||||
Get all blocks that are downstream of any updated block in the DAG.
|
|
||||||
|
|
||||||
Uses BFS to find all blocks reachable from the updated blocks by following
|
|
||||||
next_block_label edges. This is used to invalidate cached blocks that may
|
|
||||||
depend on data from updated upstream blocks.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
updated_labels: Set of block labels that have been updated
|
|
||||||
blocks: List of all blocks in the workflow definition
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Set of block labels that are downstream of any updated block
|
|
||||||
"""
|
|
||||||
# Build adjacency graph: block_label -> set of next block labels
|
|
||||||
adjacency: dict[str, set[str]] = {}
|
|
||||||
for block in blocks:
|
|
||||||
if not block.label:
|
|
||||||
continue
|
|
||||||
adjacency[block.label] = set()
|
|
||||||
# Check for conditional blocks (using duck typing for testability)
|
|
||||||
# ConditionalBlock has ordered_branches with multiple next_block_label targets
|
|
||||||
if hasattr(block, "ordered_branches"):
|
|
||||||
for branch in block.ordered_branches:
|
|
||||||
if branch.next_block_label:
|
|
||||||
adjacency[block.label].add(branch.next_block_label)
|
|
||||||
elif block.next_block_label:
|
|
||||||
adjacency[block.label].add(block.next_block_label)
|
|
||||||
|
|
||||||
# BFS from all updated blocks to find downstream blocks
|
|
||||||
downstream: set[str] = set()
|
|
||||||
queue: deque[str] = deque(updated_labels)
|
|
||||||
visited: set[str] = set(updated_labels)
|
|
||||||
|
|
||||||
while queue:
|
|
||||||
current = queue.popleft()
|
|
||||||
for next_label in adjacency.get(current, set()):
|
|
||||||
if next_label not in visited:
|
|
||||||
visited.add(next_label)
|
|
||||||
downstream.add(next_label)
|
|
||||||
queue.append(next_label)
|
|
||||||
|
|
||||||
return downstream
|
|
||||||
|
|
||||||
|
|
||||||
def _make_workflow_script_cache_key(
|
def _make_workflow_script_cache_key(
|
||||||
organization_id: str,
|
organization_id: str,
|
||||||
workflow_permanent_id: str,
|
workflow_permanent_id: str,
|
||||||
@@ -285,6 +232,27 @@ async def generate_workflow_script(
|
|||||||
cached_script: Script | None = None,
|
cached_script: Script | None = None,
|
||||||
updated_block_labels: set[str] | None = None,
|
updated_block_labels: set[str] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
# Disable script generation for workflows containing conditional blocks to avoid caching divergent paths.
|
||||||
|
try:
|
||||||
|
all_blocks = get_all_blocks(workflow.workflow_definition.blocks)
|
||||||
|
has_conditional = any(block.block_type == BlockType.CONDITIONAL for block in all_blocks)
|
||||||
|
except Exception:
|
||||||
|
has_conditional = False
|
||||||
|
LOG.warning(
|
||||||
|
"Failed to inspect workflow blocks for conditional types; continuing with script generation",
|
||||||
|
workflow_id=workflow.workflow_id,
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if has_conditional:
|
||||||
|
LOG.info(
|
||||||
|
"Skipping script generation for workflow containing conditional blocks",
|
||||||
|
workflow_id=workflow.workflow_id,
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Generating script for workflow",
|
"Generating script for workflow",
|
||||||
@@ -313,18 +281,6 @@ async def generate_workflow_script(
|
|||||||
updated_block_labels.update(missing_labels)
|
updated_block_labels.update(missing_labels)
|
||||||
updated_block_labels.add(settings.WORKFLOW_START_BLOCK_LABEL)
|
updated_block_labels.add(settings.WORKFLOW_START_BLOCK_LABEL)
|
||||||
|
|
||||||
# Build set of all block labels from the current workflow definition.
|
|
||||||
# This is used to filter out stale cached blocks that no longer exist
|
|
||||||
# (e.g., after a user deletes or renames blocks in the workflow).
|
|
||||||
all_definition_blocks = get_all_blocks(workflow.workflow_definition.blocks)
|
|
||||||
workflow_definition_labels = {block.label for block in all_definition_blocks if block.label}
|
|
||||||
|
|
||||||
# Compute blocks downstream of any updated block. These should not be preserved
|
|
||||||
# from cache because they may depend on data from the updated upstream blocks.
|
|
||||||
# For example, if Block A extracts data used by Block B, and A is modified,
|
|
||||||
# then B's cached code may be stale even if B itself wasn't modified.
|
|
||||||
downstream_of_updated = get_downstream_blocks(updated_block_labels, all_definition_blocks)
|
|
||||||
|
|
||||||
python_src = await generate_workflow_script_python_code(
|
python_src = await generate_workflow_script_python_code(
|
||||||
file_name=codegen_input.file_name,
|
file_name=codegen_input.file_name,
|
||||||
workflow_run_request=codegen_input.workflow_run,
|
workflow_run_request=codegen_input.workflow_run,
|
||||||
@@ -338,8 +294,6 @@ async def generate_workflow_script(
|
|||||||
pending=pending,
|
pending=pending,
|
||||||
cached_blocks=cached_block_sources,
|
cached_blocks=cached_block_sources,
|
||||||
updated_block_labels=updated_block_labels,
|
updated_block_labels=updated_block_labels,
|
||||||
workflow_definition_labels=workflow_definition_labels,
|
|
||||||
downstream_of_updated=downstream_of_updated,
|
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error("Failed to generate workflow script source", exc_info=True)
|
LOG.error("Failed to generate workflow script source", exc_info=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user