812 lines
33 KiB
Python
812 lines
33 KiB
Python
|
|
"""
|
||
|
|
Tests for conditional block script caching support.
|
||
|
|
|
||
|
|
This test file verifies that:
|
||
|
|
1. Workflows with conditional blocks can have scripts generated for cacheable blocks
|
||
|
|
2. The regeneration logic doesn't trigger unnecessary regeneration for unexecuted branches
|
||
|
|
3. Progressive caching works correctly across multiple runs
|
||
|
|
4. Cached blocks from unexecuted branches are preserved during script regeneration (SKY-7815)
|
||
|
|
|
||
|
|
Key bugs this tests against:
|
||
|
|
- Previously, the regeneration check compared cached blocks against ALL blocks in the workflow
|
||
|
|
definition, causing "missing" blocks from unexecuted branches to trigger regeneration
|
||
|
|
on EVERY run, flooding the database with redundant script operations.
|
||
|
|
- (SKY-7815) When regeneration was triggered for a legitimate reason, cached blocks from
|
||
|
|
unexecuted conditional branches were DROPPED because generate_workflow_script_python_code()
|
||
|
|
only iterated blocks from the transform output (executed blocks). This caused a regeneration
|
||
|
|
loop where blocks kept getting dropped and re-added.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from unittest.mock import AsyncMock, patch
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from skyvern.core.script_generations.generate_script import ScriptBlockSource, generate_workflow_script_python_code
|
||
|
|
from skyvern.forge.sdk.workflow.service import BLOCK_TYPES_THAT_SHOULD_BE_CACHED
|
||
|
|
from skyvern.schemas.workflows import BlockType
|
||
|
|
from skyvern.services.workflow_script_service import workflow_has_conditionals
|
||
|
|
|
||
|
|
|
||
|
|
class TestConditionalBlockDetection:
|
||
|
|
"""Tests for workflow_has_conditionals() function."""
|
||
|
|
|
||
|
|
def test_workflow_without_conditionals(self) -> None:
|
||
|
|
"""Workflows without conditional blocks should return False."""
|
||
|
|
|
||
|
|
class MockBlock:
|
||
|
|
def __init__(self, block_type: BlockType):
|
||
|
|
self.block_type = block_type
|
||
|
|
self.label = f"block_{block_type.value}"
|
||
|
|
|
||
|
|
class MockWorkflowDefinition:
|
||
|
|
def __init__(self, blocks: list):
|
||
|
|
self.blocks = blocks
|
||
|
|
|
||
|
|
class MockWorkflow:
|
||
|
|
def __init__(self, blocks: list):
|
||
|
|
self.workflow_definition = MockWorkflowDefinition(blocks)
|
||
|
|
self.workflow_id = "test_workflow"
|
||
|
|
|
||
|
|
# Workflow with only navigation and extraction blocks
|
||
|
|
blocks = [
|
||
|
|
MockBlock(BlockType.NAVIGATION),
|
||
|
|
MockBlock(BlockType.EXTRACTION),
|
||
|
|
]
|
||
|
|
workflow = MockWorkflow(blocks)
|
||
|
|
|
||
|
|
assert workflow_has_conditionals(workflow) is False
|
||
|
|
|
||
|
|
def test_workflow_with_conditionals(self) -> None:
|
||
|
|
"""Workflows with conditional blocks should return True."""
|
||
|
|
|
||
|
|
class MockBlock:
|
||
|
|
def __init__(self, block_type: BlockType):
|
||
|
|
self.block_type = block_type
|
||
|
|
self.label = f"block_{block_type.value}"
|
||
|
|
|
||
|
|
class MockWorkflowDefinition:
|
||
|
|
def __init__(self, blocks: list):
|
||
|
|
self.blocks = blocks
|
||
|
|
|
||
|
|
class MockWorkflow:
|
||
|
|
def __init__(self, blocks: list):
|
||
|
|
self.workflow_definition = MockWorkflowDefinition(blocks)
|
||
|
|
self.workflow_id = "test_workflow"
|
||
|
|
|
||
|
|
# Workflow with a conditional block
|
||
|
|
blocks = [
|
||
|
|
MockBlock(BlockType.NAVIGATION),
|
||
|
|
MockBlock(BlockType.CONDITIONAL),
|
||
|
|
MockBlock(BlockType.EXTRACTION),
|
||
|
|
]
|
||
|
|
workflow = MockWorkflow(blocks)
|
||
|
|
|
||
|
|
assert workflow_has_conditionals(workflow) is True
|
||
|
|
|
||
|
|
|
||
|
|
class TestConditionalBlockNotCached:
|
||
|
|
"""Tests verifying conditional blocks are not in BLOCK_TYPES_THAT_SHOULD_BE_CACHED."""
|
||
|
|
|
||
|
|
def test_conditional_not_in_cached_types(self) -> None:
|
||
|
|
"""Conditional blocks should NOT be in the set of cacheable block types."""
|
||
|
|
assert BlockType.CONDITIONAL not in BLOCK_TYPES_THAT_SHOULD_BE_CACHED
|
||
|
|
|
||
|
|
def test_cacheable_types_exist(self) -> None:
|
||
|
|
"""Verify that cacheable block types exist and include expected types."""
|
||
|
|
assert BlockType.NAVIGATION in BLOCK_TYPES_THAT_SHOULD_BE_CACHED
|
||
|
|
assert BlockType.EXTRACTION in BLOCK_TYPES_THAT_SHOULD_BE_CACHED
|
||
|
|
assert BlockType.TASK in BLOCK_TYPES_THAT_SHOULD_BE_CACHED
|
||
|
|
|
||
|
|
|
||
|
|
class TestRegenerationLogicForConditionals:
|
||
|
|
"""
|
||
|
|
Tests for the regeneration decision logic when conditionals are present.
|
||
|
|
|
||
|
|
The key fix: For workflows WITH conditionals, missing labels from unexecuted
|
||
|
|
branches should NOT trigger regeneration. This prevents the database flooding
|
||
|
|
bug where every run caused unnecessary script regeneration.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def test_missing_labels_computation(self) -> None:
|
||
|
|
"""
|
||
|
|
Test that the missing labels computation works correctly.
|
||
|
|
|
||
|
|
For a workflow with branches A and B:
|
||
|
|
- should_cache_block_labels = {A, B, START}
|
||
|
|
- cached_block_labels = {A, START} (only A executed)
|
||
|
|
- missing_labels = {B}
|
||
|
|
|
||
|
|
Without the fix: missing_labels triggers regeneration every time
|
||
|
|
With the fix: missing_labels is ignored for workflows with conditionals
|
||
|
|
"""
|
||
|
|
# Simulate the computation
|
||
|
|
should_cache_block_labels = {"branch_a_extract", "branch_b_extract", "WORKFLOW_START_BLOCK"}
|
||
|
|
cached_block_labels = {"branch_a_extract", "WORKFLOW_START_BLOCK"}
|
||
|
|
|
||
|
|
missing_labels = should_cache_block_labels - cached_block_labels
|
||
|
|
assert missing_labels == {"branch_b_extract"}
|
||
|
|
|
||
|
|
# With conditionals, this should NOT trigger regeneration
|
||
|
|
has_conditionals = True
|
||
|
|
blocks_to_update: set[str] = set()
|
||
|
|
|
||
|
|
if missing_labels and not has_conditionals:
|
||
|
|
blocks_to_update.update(missing_labels)
|
||
|
|
|
||
|
|
# blocks_to_update should be empty because we have conditionals
|
||
|
|
assert len(blocks_to_update) == 0
|
||
|
|
|
||
|
|
def test_regeneration_triggered_without_conditionals(self) -> None:
|
||
|
|
"""
|
||
|
|
Without conditionals, missing labels SHOULD trigger regeneration.
|
||
|
|
|
||
|
|
This is the expected behavior for regular workflows where all blocks
|
||
|
|
should eventually be cached.
|
||
|
|
"""
|
||
|
|
should_cache_block_labels = {"block_1", "block_2", "WORKFLOW_START_BLOCK"}
|
||
|
|
cached_block_labels = {"block_1", "WORKFLOW_START_BLOCK"}
|
||
|
|
|
||
|
|
missing_labels = should_cache_block_labels - cached_block_labels
|
||
|
|
assert missing_labels == {"block_2"}
|
||
|
|
|
||
|
|
# Without conditionals, this SHOULD trigger regeneration
|
||
|
|
has_conditionals = False
|
||
|
|
blocks_to_update: set[str] = set()
|
||
|
|
|
||
|
|
if missing_labels and not has_conditionals:
|
||
|
|
blocks_to_update.update(missing_labels)
|
||
|
|
|
||
|
|
# blocks_to_update should contain missing labels
|
||
|
|
assert "block_2" in blocks_to_update
|
||
|
|
|
||
|
|
def test_explicit_updates_still_work_with_conditionals(self) -> None:
|
||
|
|
"""
|
||
|
|
Even with conditionals, explicit blocks_to_update from the caller
|
||
|
|
should still trigger regeneration.
|
||
|
|
|
||
|
|
This ensures that actual changes to executed blocks are still processed.
|
||
|
|
"""
|
||
|
|
blocks_to_update: set[str] = {"explicitly_updated_block"} # From caller
|
||
|
|
|
||
|
|
# Even with conditionals, explicit updates should trigger regeneration
|
||
|
|
should_regenerate = bool(blocks_to_update)
|
||
|
|
assert should_regenerate is True
|
||
|
|
|
||
|
|
|
||
|
|
class TestProgressiveCachingConcept:
|
||
|
|
"""
|
||
|
|
Tests documenting the progressive caching concept for conditional workflows.
|
||
|
|
|
||
|
|
Progressive caching means:
|
||
|
|
1. Run 1 takes branch A → caches blocks from A
|
||
|
|
2. Run 2 takes branch B → caches blocks from B (preserves A's cache)
|
||
|
|
3. Eventually all branches have cached blocks
|
||
|
|
|
||
|
|
The key insight is that we DON'T regenerate just because some branches
|
||
|
|
haven't executed yet.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def test_progressive_caching_scenario(self) -> None:
|
||
|
|
"""
|
||
|
|
Simulate multiple runs with different branches.
|
||
|
|
|
||
|
|
Run 1: Branch A executes
|
||
|
|
Run 2: Branch A executes (should NOT regenerate - same blocks)
|
||
|
|
Run 3: Branch B executes (should cache B, preserve A)
|
||
|
|
"""
|
||
|
|
# Initial state
|
||
|
|
cached_blocks: set[str] = set()
|
||
|
|
|
||
|
|
# Run 1: Branch A executes
|
||
|
|
executed_blocks_run1 = {"nav_block", "branch_a_extract"}
|
||
|
|
cached_blocks.update(executed_blocks_run1)
|
||
|
|
assert cached_blocks == {"nav_block", "branch_a_extract"}
|
||
|
|
|
||
|
|
# Run 2: Branch A executes again
|
||
|
|
executed_blocks_run2 = {"nav_block", "branch_a_extract"}
|
||
|
|
# No new blocks to cache - should NOT trigger regeneration
|
||
|
|
new_blocks_run2 = executed_blocks_run2 - cached_blocks
|
||
|
|
assert len(new_blocks_run2) == 0 # Nothing new to cache
|
||
|
|
|
||
|
|
# Run 3: Branch B executes
|
||
|
|
executed_blocks_run3 = {"nav_block", "branch_b_extract"}
|
||
|
|
new_blocks_run3 = executed_blocks_run3 - cached_blocks
|
||
|
|
assert new_blocks_run3 == {"branch_b_extract"} # New block to cache
|
||
|
|
|
||
|
|
# Cache should now have both branches
|
||
|
|
cached_blocks.update(executed_blocks_run3)
|
||
|
|
assert cached_blocks == {"nav_block", "branch_a_extract", "branch_b_extract"}
|
||
|
|
|
||
|
|
|
||
|
|
class TestConditionalBlockCodeGeneration:
|
||
|
|
"""Tests for conditional block handling in code generation."""
|
||
|
|
|
||
|
|
def test_conditional_block_type_string(self) -> None:
|
||
|
|
"""Verify the conditional block type string matches expected value."""
|
||
|
|
assert BlockType.CONDITIONAL.value == "conditional"
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# SKY-7815: Tests for cached block preservation during regeneration
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
class TestCachedBlockPreservationDuringRegeneration:
|
||
|
|
"""
|
||
|
|
Tests verifying that cached blocks from unexecuted conditional branches
|
||
|
|
are preserved when generate_workflow_script_python_code() regenerates a script.
|
||
|
|
|
||
|
|
Bug (SKY-7815):
|
||
|
|
When a workflow has conditional branches A and B:
|
||
|
|
- Run 1 executes branch A → script has blocks from A
|
||
|
|
- Run 2 executes branch B → regeneration triggered → transform only returns B's blocks
|
||
|
|
- generate_workflow_script_python_code() only iterates transform output (B's blocks)
|
||
|
|
- Cached blocks from A are loaded into cached_blocks dict but NEVER iterated
|
||
|
|
- Result: A's blocks are DROPPED from the new script → regeneration loop
|
||
|
|
|
||
|
|
Fix: After processing all blocks from the transform output, iterate remaining
|
||
|
|
cached_blocks entries and preserve them in both the DB and script output.
|
||
|
|
"""
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_cached_blocks_from_unexecuted_branch_are_preserved(self) -> None:
|
||
|
|
"""
|
||
|
|
Core test: when only branch B's blocks are in the transform output,
|
||
|
|
branch A's cached blocks should still appear in the generated script.
|
||
|
|
"""
|
||
|
|
# Branch A's cached block (from a previous run)
|
||
|
|
branch_a_code = (
|
||
|
|
"async def branch_a_extract(page: SkyvernPage, context: RunContext) -> None:\n"
|
||
|
|
" await skyvern.extract(page, \"//div[@id='result']\")\n"
|
||
|
|
)
|
||
|
|
cached_blocks = {
|
||
|
|
"branch_a_extract": ScriptBlockSource(
|
||
|
|
label="branch_a_extract",
|
||
|
|
code=branch_a_code,
|
||
|
|
run_signature="await branch_a_extract(page, context)",
|
||
|
|
workflow_run_id="wr_run1",
|
||
|
|
workflow_run_block_id="wfrb_a",
|
||
|
|
input_fields=None,
|
||
|
|
),
|
||
|
|
}
|
||
|
|
|
||
|
|
# Transform output only has branch B's block (branch B executed this run)
|
||
|
|
blocks = [
|
||
|
|
{
|
||
|
|
"block_type": "navigation",
|
||
|
|
"label": "branch_b_navigate",
|
||
|
|
"task_id": "task_b",
|
||
|
|
"navigation_goal": "Go to page B",
|
||
|
|
"url": "https://example.com/b",
|
||
|
|
"workflow_run_id": "wr_run2",
|
||
|
|
"workflow_run_block_id": "wfrb_b",
|
||
|
|
},
|
||
|
|
]
|
||
|
|
|
||
|
|
actions_by_task = {
|
||
|
|
"task_b": [
|
||
|
|
{
|
||
|
|
"action_type": "click",
|
||
|
|
"action_id": "action_b1",
|
||
|
|
"xpath": "//button[@id='submit']",
|
||
|
|
"element_id": "submit",
|
||
|
|
"reasoning": "Click submit",
|
||
|
|
"intention": "Submit the form",
|
||
|
|
"confidence_float": 0.95,
|
||
|
|
"has_mini_agent": False,
|
||
|
|
},
|
||
|
|
],
|
||
|
|
}
|
||
|
|
|
||
|
|
workflow = {
|
||
|
|
"workflow_id": "wf_test",
|
||
|
|
"workflow_permanent_id": "wpid_test",
|
||
|
|
"title": "Test Conditional Workflow",
|
||
|
|
"workflow_definition": {
|
||
|
|
"parameters": [
|
||
|
|
{"parameter_type": "workflow", "key": "url", "default_value": "https://example.com"},
|
||
|
|
],
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
workflow_run_request = {
|
||
|
|
"workflow_id": "wpid_test",
|
||
|
|
"parameters": {"url": "https://example.com"},
|
||
|
|
}
|
||
|
|
|
||
|
|
with (
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
return_value=("", {}),
|
||
|
|
),
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.create_or_update_script_block",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
) as mock_create_block,
|
||
|
|
):
|
||
|
|
result = await generate_workflow_script_python_code(
|
||
|
|
file_name="test.py",
|
||
|
|
workflow_run_request=workflow_run_request,
|
||
|
|
workflow=workflow,
|
||
|
|
blocks=blocks,
|
||
|
|
actions_by_task=actions_by_task,
|
||
|
|
cached_blocks=cached_blocks,
|
||
|
|
updated_block_labels={"branch_b_navigate", "__start_block__"},
|
||
|
|
script_id="script_123",
|
||
|
|
script_revision_id="rev_123",
|
||
|
|
organization_id="org_123",
|
||
|
|
)
|
||
|
|
|
||
|
|
# The output should contain branch A's cached code
|
||
|
|
assert "branch_a_extract" in result, (
|
||
|
|
"Cached block from unexecuted branch A should be preserved in the script output"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Verify create_or_update_script_block was called for the preserved block
|
||
|
|
preserved_calls = [
|
||
|
|
call
|
||
|
|
for call in mock_create_block.call_args_list
|
||
|
|
if call.kwargs.get("block_label") == "branch_a_extract"
|
||
|
|
]
|
||
|
|
assert len(preserved_calls) == 1, (
|
||
|
|
"create_or_update_script_block should be called for the preserved cached block"
|
||
|
|
)
|
||
|
|
preserved_call = preserved_calls[0]
|
||
|
|
assert preserved_call.kwargs["run_signature"] == "await branch_a_extract(page, context)"
|
||
|
|
assert preserved_call.kwargs["workflow_run_id"] == "wr_run1"
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_cached_blocks_without_run_signature_are_not_preserved(self) -> None:
|
||
|
|
"""Cached blocks without a run_signature should NOT be preserved."""
|
||
|
|
cached_blocks = {
|
||
|
|
"incomplete_block": ScriptBlockSource(
|
||
|
|
label="incomplete_block",
|
||
|
|
code="async def incomplete_block(): pass\n",
|
||
|
|
run_signature=None, # No run_signature
|
||
|
|
workflow_run_id="wr_old",
|
||
|
|
workflow_run_block_id="wfrb_old",
|
||
|
|
input_fields=None,
|
||
|
|
),
|
||
|
|
}
|
||
|
|
|
||
|
|
blocks: list = []
|
||
|
|
actions_by_task: dict = {}
|
||
|
|
workflow = {
|
||
|
|
"workflow_id": "wf_test",
|
||
|
|
"title": "Test",
|
||
|
|
"workflow_definition": {"parameters": []},
|
||
|
|
}
|
||
|
|
|
||
|
|
with (
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
return_value=("", {}),
|
||
|
|
),
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.create_or_update_script_block",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
) as mock_create_block,
|
||
|
|
):
|
||
|
|
result = await generate_workflow_script_python_code(
|
||
|
|
file_name="test.py",
|
||
|
|
workflow_run_request={"workflow_id": "wpid_test"},
|
||
|
|
workflow=workflow,
|
||
|
|
blocks=blocks,
|
||
|
|
actions_by_task=actions_by_task,
|
||
|
|
cached_blocks=cached_blocks,
|
||
|
|
updated_block_labels={"__start_block__"},
|
||
|
|
script_id="script_123",
|
||
|
|
script_revision_id="rev_123",
|
||
|
|
organization_id="org_123",
|
||
|
|
)
|
||
|
|
|
||
|
|
# Incomplete block should NOT appear in the output
|
||
|
|
assert "incomplete_block" not in result
|
||
|
|
|
||
|
|
# create_or_update_script_block should NOT be called for incomplete block
|
||
|
|
incomplete_calls = [
|
||
|
|
call
|
||
|
|
for call in mock_create_block.call_args_list
|
||
|
|
if call.kwargs.get("block_label") == "incomplete_block"
|
||
|
|
]
|
||
|
|
assert len(incomplete_calls) == 0
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_cached_blocks_without_code_are_not_preserved(self) -> None:
|
||
|
|
"""Cached blocks without code should NOT be preserved."""
|
||
|
|
cached_blocks = {
|
||
|
|
"empty_block": ScriptBlockSource(
|
||
|
|
label="empty_block",
|
||
|
|
code="", # Empty code
|
||
|
|
run_signature="await empty_block(page, context)",
|
||
|
|
workflow_run_id="wr_old",
|
||
|
|
workflow_run_block_id="wfrb_old",
|
||
|
|
input_fields=None,
|
||
|
|
),
|
||
|
|
}
|
||
|
|
|
||
|
|
with (
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
return_value=("", {}),
|
||
|
|
),
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.create_or_update_script_block",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
) as mock_create_block,
|
||
|
|
):
|
||
|
|
await generate_workflow_script_python_code(
|
||
|
|
file_name="test.py",
|
||
|
|
workflow_run_request={"workflow_id": "wpid_test"},
|
||
|
|
workflow={
|
||
|
|
"workflow_id": "wf_test",
|
||
|
|
"title": "Test",
|
||
|
|
"workflow_definition": {"parameters": []},
|
||
|
|
},
|
||
|
|
blocks=[],
|
||
|
|
actions_by_task={},
|
||
|
|
cached_blocks=cached_blocks,
|
||
|
|
updated_block_labels={"__start_block__"},
|
||
|
|
script_id="script_123",
|
||
|
|
script_revision_id="rev_123",
|
||
|
|
organization_id="org_123",
|
||
|
|
)
|
||
|
|
|
||
|
|
# Empty block should NOT appear
|
||
|
|
empty_calls = [
|
||
|
|
call for call in mock_create_block.call_args_list if call.kwargs.get("block_label") == "empty_block"
|
||
|
|
]
|
||
|
|
assert len(empty_calls) == 0
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_already_processed_blocks_are_not_duplicated(self) -> None:
|
||
|
|
"""
|
||
|
|
Blocks that appear in both the transform output AND cached_blocks
|
||
|
|
should NOT be duplicated. The transform output processing handles them.
|
||
|
|
"""
|
||
|
|
block_code = (
|
||
|
|
"async def shared_block(page: SkyvernPage, context: RunContext) -> None:\n"
|
||
|
|
' await skyvern.click(page, "//button")\n'
|
||
|
|
)
|
||
|
|
cached_blocks = {
|
||
|
|
"shared_block": ScriptBlockSource(
|
||
|
|
label="shared_block",
|
||
|
|
code=block_code,
|
||
|
|
run_signature="await shared_block(page, context)",
|
||
|
|
workflow_run_id="wr_run1",
|
||
|
|
workflow_run_block_id="wfrb_shared",
|
||
|
|
input_fields=None,
|
||
|
|
),
|
||
|
|
}
|
||
|
|
|
||
|
|
# Same block also appears in the transform output (it executed this run too)
|
||
|
|
blocks = [
|
||
|
|
{
|
||
|
|
"block_type": "navigation",
|
||
|
|
"label": "shared_block",
|
||
|
|
"task_id": "task_shared",
|
||
|
|
"navigation_goal": "Navigate somewhere",
|
||
|
|
"url": "https://example.com",
|
||
|
|
"workflow_run_id": "wr_run2",
|
||
|
|
"workflow_run_block_id": "wfrb_shared_run2",
|
||
|
|
},
|
||
|
|
]
|
||
|
|
|
||
|
|
actions_by_task = {
|
||
|
|
"task_shared": [
|
||
|
|
{
|
||
|
|
"action_type": "click",
|
||
|
|
"action_id": "action_1",
|
||
|
|
"xpath": "//button",
|
||
|
|
"element_id": "btn",
|
||
|
|
"reasoning": "Click",
|
||
|
|
"intention": "Click",
|
||
|
|
"confidence_float": 0.9,
|
||
|
|
"has_mini_agent": False,
|
||
|
|
},
|
||
|
|
],
|
||
|
|
}
|
||
|
|
|
||
|
|
with (
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
return_value=("", {}),
|
||
|
|
),
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.create_or_update_script_block",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
) as mock_create_block,
|
||
|
|
):
|
||
|
|
await generate_workflow_script_python_code(
|
||
|
|
file_name="test.py",
|
||
|
|
workflow_run_request={"workflow_id": "wpid_test"},
|
||
|
|
workflow={
|
||
|
|
"workflow_id": "wf_test",
|
||
|
|
"title": "Test",
|
||
|
|
"workflow_definition": {"parameters": []},
|
||
|
|
},
|
||
|
|
blocks=blocks,
|
||
|
|
actions_by_task=actions_by_task,
|
||
|
|
cached_blocks=cached_blocks,
|
||
|
|
updated_block_labels={"shared_block", "__start_block__"},
|
||
|
|
script_id="script_123",
|
||
|
|
script_revision_id="rev_123",
|
||
|
|
organization_id="org_123",
|
||
|
|
)
|
||
|
|
|
||
|
|
# The block should appear exactly once (from the transform output processing,
|
||
|
|
# NOT duplicated by the preservation loop)
|
||
|
|
shared_calls = [
|
||
|
|
call for call in mock_create_block.call_args_list if call.kwargs.get("block_label") == "shared_block"
|
||
|
|
]
|
||
|
|
# Should be called once from the normal task_v1 processing, NOT again from preservation
|
||
|
|
assert len(shared_calls) == 1
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_multiple_unexecuted_branches_all_preserved(self) -> None:
|
||
|
|
"""
|
||
|
|
When a workflow has 3 conditional branches and only 1 executes,
|
||
|
|
cached blocks from the other 2 branches should ALL be preserved.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def _make_cached_block(label: str) -> ScriptBlockSource:
|
||
|
|
return ScriptBlockSource(
|
||
|
|
label=label,
|
||
|
|
code=f"async def {label}(page: SkyvernPage, context: RunContext) -> None:\n pass\n",
|
||
|
|
run_signature=f"await {label}(page, context)",
|
||
|
|
workflow_run_id="wr_old",
|
||
|
|
workflow_run_block_id=f"wfrb_{label}",
|
||
|
|
input_fields=None,
|
||
|
|
)
|
||
|
|
|
||
|
|
cached_blocks = {
|
||
|
|
"branch_a_extract": _make_cached_block("branch_a_extract"),
|
||
|
|
"branch_b_navigate": _make_cached_block("branch_b_navigate"),
|
||
|
|
# branch_c executed this run, so it's also in blocks below
|
||
|
|
}
|
||
|
|
|
||
|
|
# Only branch C's block is in the transform output
|
||
|
|
blocks = [
|
||
|
|
{
|
||
|
|
"block_type": "extraction",
|
||
|
|
"label": "branch_c_extract",
|
||
|
|
"task_id": "task_c",
|
||
|
|
"data_extraction_goal": "Extract C data",
|
||
|
|
"workflow_run_id": "wr_run3",
|
||
|
|
"workflow_run_block_id": "wfrb_c",
|
||
|
|
},
|
||
|
|
]
|
||
|
|
|
||
|
|
actions_by_task = {
|
||
|
|
"task_c": [
|
||
|
|
{
|
||
|
|
"action_type": "extract",
|
||
|
|
"action_id": "action_c1",
|
||
|
|
"xpath": "//div[@class='data']",
|
||
|
|
"element_id": "data",
|
||
|
|
"reasoning": "Extract",
|
||
|
|
"intention": "Extract data",
|
||
|
|
"confidence_float": 0.9,
|
||
|
|
"has_mini_agent": False,
|
||
|
|
"data_extraction_goal": "Extract C data",
|
||
|
|
},
|
||
|
|
],
|
||
|
|
}
|
||
|
|
|
||
|
|
with (
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
return_value=("", {}),
|
||
|
|
),
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.create_or_update_script_block",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
) as mock_create_block,
|
||
|
|
):
|
||
|
|
result = await generate_workflow_script_python_code(
|
||
|
|
file_name="test.py",
|
||
|
|
workflow_run_request={"workflow_id": "wpid_test"},
|
||
|
|
workflow={
|
||
|
|
"workflow_id": "wf_test",
|
||
|
|
"title": "Test",
|
||
|
|
"workflow_definition": {"parameters": []},
|
||
|
|
},
|
||
|
|
blocks=blocks,
|
||
|
|
actions_by_task=actions_by_task,
|
||
|
|
cached_blocks=cached_blocks,
|
||
|
|
updated_block_labels={"branch_c_extract", "__start_block__"},
|
||
|
|
script_id="script_123",
|
||
|
|
script_revision_id="rev_123",
|
||
|
|
organization_id="org_123",
|
||
|
|
)
|
||
|
|
|
||
|
|
# Both branch A and B should be preserved
|
||
|
|
assert "branch_a_extract" in result, "Branch A cached block should be preserved"
|
||
|
|
assert "branch_b_navigate" in result, "Branch B cached block should be preserved"
|
||
|
|
assert "branch_c_extract" in result, "Branch C (executed) block should be present"
|
||
|
|
|
||
|
|
# Verify DB entries were created for all 3 blocks + __start_block__
|
||
|
|
all_labels = {call.kwargs.get("block_label") for call in mock_create_block.call_args_list}
|
||
|
|
assert "branch_a_extract" in all_labels
|
||
|
|
assert "branch_b_navigate" in all_labels
|
||
|
|
assert "branch_c_extract" in all_labels
|
||
|
|
assert "__start_block__" in all_labels
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_preservation_without_script_context(self) -> None:
|
||
|
|
"""
|
||
|
|
When script_id/script_revision_id/organization_id are not provided,
|
||
|
|
cached blocks should still be added to the script output (just no DB calls).
|
||
|
|
"""
|
||
|
|
branch_a_code = "async def branch_a(page: SkyvernPage, context: RunContext) -> None:\n pass\n"
|
||
|
|
cached_blocks = {
|
||
|
|
"branch_a": ScriptBlockSource(
|
||
|
|
label="branch_a",
|
||
|
|
code=branch_a_code,
|
||
|
|
run_signature="await branch_a(page, context)",
|
||
|
|
workflow_run_id="wr_old",
|
||
|
|
workflow_run_block_id="wfrb_a",
|
||
|
|
input_fields=None,
|
||
|
|
),
|
||
|
|
}
|
||
|
|
|
||
|
|
with (
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
return_value=("", {}),
|
||
|
|
),
|
||
|
|
patch(
|
||
|
|
"skyvern.core.script_generations.generate_script.create_or_update_script_block",
|
||
|
|
new_callable=AsyncMock,
|
||
|
|
) as mock_create_block,
|
||
|
|
):
|
||
|
|
result = await generate_workflow_script_python_code(
|
||
|
|
file_name="test.py",
|
||
|
|
workflow_run_request={"workflow_id": "wpid_test"},
|
||
|
|
workflow={
|
||
|
|
"workflow_id": "wf_test",
|
||
|
|
"title": "Test",
|
||
|
|
"workflow_definition": {"parameters": []},
|
||
|
|
},
|
||
|
|
blocks=[],
|
||
|
|
actions_by_task={},
|
||
|
|
cached_blocks=cached_blocks,
|
||
|
|
updated_block_labels={"__start_block__"},
|
||
|
|
# No script context
|
||
|
|
script_id=None,
|
||
|
|
script_revision_id=None,
|
||
|
|
organization_id=None,
|
||
|
|
)
|
||
|
|
|
||
|
|
# Code should still be in the output
|
||
|
|
assert "branch_a" in result
|
||
|
|
|
||
|
|
# But no DB calls should be made for preserved blocks
|
||
|
|
preserved_calls = [
|
||
|
|
call for call in mock_create_block.call_args_list if call.kwargs.get("block_label") == "branch_a"
|
||
|
|
]
|
||
|
|
assert len(preserved_calls) == 0
|
||
|
|
|
||
|
|
|
||
|
|
class TestRegenerationLoopPrevention:
|
||
|
|
"""
|
||
|
|
End-to-end tests for the regeneration loop prevention (SKY-7815).
|
||
|
|
|
||
|
|
The regeneration loop happens when:
|
||
|
|
1. Workflow has conditional branches A and B
|
||
|
|
2. Run 1 caches branch A → script has A's blocks
|
||
|
|
3. Run 2 executes branch B → triggers regeneration for B
|
||
|
|
4. During regeneration, transform only returns B's blocks
|
||
|
|
5. A's cached blocks are dropped from the new script
|
||
|
|
6. Run 3 executes branch A → A is "missing" → triggers regeneration
|
||
|
|
7. During regeneration, B's cached blocks are dropped → loop continues
|
||
|
|
|
||
|
|
The fix has two parts:
|
||
|
|
1. generate_script_if_needed: Don't add missing labels for conditional workflows
|
||
|
|
2. generate_workflow_script_python_code: Preserve cached blocks from unexecuted branches
|
||
|
|
"""
|
||
|
|
|
||
|
|
def test_regeneration_loop_scenario_is_prevented(self) -> None:
|
||
|
|
"""
|
||
|
|
Simulate the full regeneration loop scenario and verify it's prevented.
|
||
|
|
|
||
|
|
This test verifies both parts of the fix working together:
|
||
|
|
- Missing labels don't trigger regeneration for conditional workflows
|
||
|
|
- Even if regeneration IS triggered (for other reasons), cached blocks are preserved
|
||
|
|
"""
|
||
|
|
# --- Part 1: generate_script_if_needed logic ---
|
||
|
|
# Workflow definition has blocks: nav, branch_a_extract, branch_b_extract
|
||
|
|
should_cache_block_labels = {"nav_block", "branch_a_extract", "branch_b_extract", "__start_block__"}
|
||
|
|
|
||
|
|
# After Run 1: only nav and branch_a are cached
|
||
|
|
cached_block_labels = {"nav_block", "branch_a_extract", "__start_block__"}
|
||
|
|
|
||
|
|
missing_labels = should_cache_block_labels - cached_block_labels
|
||
|
|
assert missing_labels == {"branch_b_extract"}
|
||
|
|
|
||
|
|
has_conditionals = True
|
||
|
|
blocks_to_update: set[str] = set()
|
||
|
|
|
||
|
|
# With conditionals, missing labels should NOT be added
|
||
|
|
if missing_labels and not has_conditionals:
|
||
|
|
blocks_to_update.update(missing_labels)
|
||
|
|
elif missing_labels and has_conditionals:
|
||
|
|
pass # Skip - expected for conditional workflows
|
||
|
|
|
||
|
|
# No regeneration needed just because of missing labels
|
||
|
|
assert len(blocks_to_update) == 0
|
||
|
|
|
||
|
|
# --- Part 2: Even if regeneration IS triggered ---
|
||
|
|
# e.g., branch B executed this run and needs caching
|
||
|
|
blocks_to_update.add("branch_b_extract")
|
||
|
|
|
||
|
|
# The transform output only has branch B's block
|
||
|
|
transform_output_labels = {"nav_block", "branch_b_extract"}
|
||
|
|
|
||
|
|
# cached_blocks from old script has branch A's data
|
||
|
|
old_cached_block_labels = {"nav_block", "branch_a_extract"}
|
||
|
|
|
||
|
|
# After the fix, the preservation loop handles blocks NOT in transform output
|
||
|
|
processed_by_transform = transform_output_labels
|
||
|
|
preserved_from_cache = old_cached_block_labels - processed_by_transform
|
||
|
|
|
||
|
|
assert preserved_from_cache == {"branch_a_extract"}, (
|
||
|
|
"Branch A's block should be preserved even though it wasn't in the transform output"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Final result should have ALL blocks
|
||
|
|
final_blocks = transform_output_labels | preserved_from_cache
|
||
|
|
assert final_blocks == {"nav_block", "branch_a_extract", "branch_b_extract"}
|
||
|
|
|
||
|
|
def test_no_regeneration_loop_across_three_runs(self) -> None:
|
||
|
|
"""
|
||
|
|
Simulate 3 runs and verify no regeneration loop occurs.
|
||
|
|
|
||
|
|
Run 1: Branch A → cache A
|
||
|
|
Run 2: Branch B → regenerate (B is new) → A is preserved
|
||
|
|
Run 3: Branch A → no regeneration needed (A is still cached)
|
||
|
|
"""
|
||
|
|
# --- Run 1: Branch A executes ---
|
||
|
|
cached_blocks_after_run1 = {"nav_block", "branch_a_extract", "__start_block__"}
|
||
|
|
|
||
|
|
# --- Run 2: Branch B executes ---
|
||
|
|
has_conditionals = True
|
||
|
|
should_cache = {"nav_block", "branch_a_extract", "branch_b_extract", "__start_block__"}
|
||
|
|
|
||
|
|
missing_run2 = should_cache - cached_blocks_after_run1
|
||
|
|
assert missing_run2 == {"branch_b_extract"}
|
||
|
|
|
||
|
|
blocks_to_update_run2: set[str] = set()
|
||
|
|
# Missing labels NOT added for conditional workflows
|
||
|
|
if missing_run2 and not has_conditionals:
|
||
|
|
blocks_to_update_run2.update(missing_run2)
|
||
|
|
|
||
|
|
# branch_b_extract is added because it actually executed
|
||
|
|
blocks_to_update_run2.add("branch_b_extract")
|
||
|
|
|
||
|
|
# Regeneration happens, but branch A is PRESERVED
|
||
|
|
transform_output_run2 = {"nav_block", "branch_b_extract"}
|
||
|
|
preserved_run2 = {"branch_a_extract"} # From cache, not in transform
|
||
|
|
|
||
|
|
cached_blocks_after_run2 = transform_output_run2 | preserved_run2 | {"__start_block__"}
|
||
|
|
assert cached_blocks_after_run2 == {"nav_block", "branch_a_extract", "branch_b_extract", "__start_block__"}
|
||
|
|
|
||
|
|
# --- Run 3: Branch A executes again ---
|
||
|
|
missing_run3 = should_cache - cached_blocks_after_run2
|
||
|
|
assert len(missing_run3) == 0, "No missing blocks after Run 2 because branch A was preserved"
|
||
|
|
|
||
|
|
blocks_to_update_run3: set[str] = set()
|
||
|
|
if missing_run3 and not has_conditionals:
|
||
|
|
blocks_to_update_run3.update(missing_run3)
|
||
|
|
|
||
|
|
# branch_a_extract already has cached code, so it's NOT added to blocks_to_update
|
||
|
|
# (execution tracking only adds blocks that DON'T have cached code)
|
||
|
|
should_regenerate_run3 = bool(blocks_to_update_run3)
|
||
|
|
assert should_regenerate_run3 is False, "No regeneration needed on Run 3 - the loop is broken"
|