Nest conditional branch blocks in workflow run timeline (#SKY-7367) (#4727)
This commit is contained in:
@@ -1120,6 +1120,74 @@ class LoopBlockExecutedResult(BaseModel):
|
|||||||
return self.block_outputs[-1].failure_reason if len(self.block_outputs) > 0 else "No block has been executed"
|
return self.block_outputs[-1].failure_reason if len(self.block_outputs) > 0 else "No block has been executed"
|
||||||
|
|
||||||
|
|
||||||
|
def compute_conditional_scopes(
|
||||||
|
label_to_block: dict[str, Any],
|
||||||
|
default_next_map: dict[str, str | None],
|
||||||
|
) -> dict[str, str]:
|
||||||
|
"""Map each block label to the conditional block label whose scope it belongs to.
|
||||||
|
|
||||||
|
For each conditional block, trace each branch's chain of blocks via
|
||||||
|
``default_next_map``. Labels that appear in **all** branch chains are
|
||||||
|
considered merge-point blocks (i.e. they come *after* the conditional
|
||||||
|
reconverges) and are **not** scoped. Labels that appear in fewer chains
|
||||||
|
than the total number of branches **are** inside the conditional.
|
||||||
|
|
||||||
|
Inner conditionals are themselves scoped to an outer conditional, but
|
||||||
|
their *own* branch targets are handled by a recursive application of
|
||||||
|
the same logic (inner wins via the ``if lbl not in scopes`` guard).
|
||||||
|
"""
|
||||||
|
scopes: dict[str, str] = {}
|
||||||
|
|
||||||
|
conditional_labels = [lbl for lbl, blk in label_to_block.items() if blk.block_type == BlockType.CONDITIONAL]
|
||||||
|
|
||||||
|
for cond_label in conditional_labels:
|
||||||
|
cond_block = label_to_block[cond_label]
|
||||||
|
branch_targets: list[str | None] = [branch.next_block_label for branch in cond_block.ordered_branches]
|
||||||
|
# Deduplicate while preserving order – two branches may point to the same target
|
||||||
|
seen_targets: set[str | None] = set()
|
||||||
|
unique_targets: list[str | None] = []
|
||||||
|
for t in branch_targets:
|
||||||
|
if t not in seen_targets:
|
||||||
|
seen_targets.add(t)
|
||||||
|
unique_targets.append(t)
|
||||||
|
|
||||||
|
num_branches = len(unique_targets)
|
||||||
|
if num_branches == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# For each unique branch target, trace the chain via default_next_map.
|
||||||
|
# Stop at other conditional blocks (they handle their own branches).
|
||||||
|
chain_sets: list[list[str]] = []
|
||||||
|
for target in unique_targets:
|
||||||
|
chain: list[str] = []
|
||||||
|
cur = target
|
||||||
|
while cur and cur in label_to_block:
|
||||||
|
chain.append(cur)
|
||||||
|
# Stop tracing when we hit another conditional – it owns its own sub-tree
|
||||||
|
if label_to_block[cur].block_type == BlockType.CONDITIONAL:
|
||||||
|
break
|
||||||
|
cur = default_next_map.get(cur)
|
||||||
|
chain_sets.append(chain)
|
||||||
|
|
||||||
|
# Count how many branch chains each label appears in
|
||||||
|
label_count: dict[str, int] = {}
|
||||||
|
for chain in chain_sets:
|
||||||
|
for lbl in chain:
|
||||||
|
label_count[lbl] = label_count.get(lbl, 0) + 1
|
||||||
|
|
||||||
|
# Labels appearing in ALL branches are merge points (after the conditional).
|
||||||
|
# Labels appearing in fewer branches are inside the conditional.
|
||||||
|
for chain in chain_sets:
|
||||||
|
for lbl in chain:
|
||||||
|
if label_count[lbl] >= num_branches:
|
||||||
|
# This is a merge point – stop scoping further along this chain
|
||||||
|
break
|
||||||
|
if lbl not in scopes:
|
||||||
|
scopes[lbl] = cond_label
|
||||||
|
|
||||||
|
return scopes
|
||||||
|
|
||||||
|
|
||||||
class ForLoopBlock(Block):
|
class ForLoopBlock(Block):
|
||||||
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
|
# There is a mypy bug with Literal. Without the type: ignore, mypy will raise an error:
|
||||||
# Parameter 1 of Literal[...] cannot be of type "Any"
|
# Parameter 1 of Literal[...] cannot be of type "Any"
|
||||||
@@ -1507,6 +1575,7 @@ class ForLoopBlock(Block):
|
|||||||
current_block: BlockTypeVar | None = None
|
current_block: BlockTypeVar | None = None
|
||||||
|
|
||||||
start_label, label_to_block, default_next_map = self._build_loop_graph(self.loop_blocks)
|
start_label, label_to_block, default_next_map = self._build_loop_graph(self.loop_blocks)
|
||||||
|
conditional_scopes = compute_conditional_scopes(label_to_block, default_next_map)
|
||||||
|
|
||||||
for loop_idx, loop_over_value in enumerate(loop_over_values):
|
for loop_idx, loop_over_value in enumerate(loop_over_values):
|
||||||
# Check max_iterations limit
|
# Check max_iterations limit
|
||||||
@@ -1548,6 +1617,7 @@ class ForLoopBlock(Block):
|
|||||||
|
|
||||||
block_idx = 0
|
block_idx = 0
|
||||||
current_label: str | None = start_label
|
current_label: str | None = start_label
|
||||||
|
conditional_wrb_ids: dict[str, str] = {}
|
||||||
while current_label:
|
while current_label:
|
||||||
loop_block = label_to_block.get(current_label)
|
loop_block = label_to_block.get(current_label)
|
||||||
if not loop_block:
|
if not loop_block:
|
||||||
@@ -1584,13 +1654,27 @@ class ForLoopBlock(Block):
|
|||||||
loop_block = loop_block.model_copy(deep=True)
|
loop_block = loop_block.model_copy(deep=True)
|
||||||
current_block = loop_block
|
current_block = loop_block
|
||||||
|
|
||||||
|
# Determine the parent for timeline nesting: if this block is
|
||||||
|
# inside a conditional's scope, parent it to that conditional's
|
||||||
|
# workflow_run_block rather than the loop's.
|
||||||
|
parent_wrb_id = workflow_run_block_id
|
||||||
|
if current_label in conditional_scopes:
|
||||||
|
cond_label = conditional_scopes[current_label]
|
||||||
|
if cond_label in conditional_wrb_ids:
|
||||||
|
parent_wrb_id = conditional_wrb_ids[cond_label]
|
||||||
|
|
||||||
block_output = await loop_block.execute_safe(
|
block_output = await loop_block.execute_safe(
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
parent_workflow_run_block_id=workflow_run_block_id,
|
parent_workflow_run_block_id=parent_wrb_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
browser_session_id=browser_session_id,
|
browser_session_id=browser_session_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Track conditional workflow_run_block_ids so branch targets
|
||||||
|
# can be parented to them.
|
||||||
|
if loop_block.block_type == BlockType.CONDITIONAL and block_output.workflow_run_block_id:
|
||||||
|
conditional_wrb_ids[current_label] = block_output.workflow_run_block_id
|
||||||
|
|
||||||
output_value = (
|
output_value = (
|
||||||
workflow_run_context.get_value(block_output.output_parameter.key)
|
workflow_run_context.get_value(block_output.output_parameter.key)
|
||||||
if workflow_run_context.has_value(block_output.output_parameter.key)
|
if workflow_run_context.has_value(block_output.output_parameter.key)
|
||||||
|
|||||||
@@ -70,6 +70,7 @@ from skyvern.forge.sdk.workflow.models.block import (
|
|||||||
ExtractionBlock,
|
ExtractionBlock,
|
||||||
NavigationBlock,
|
NavigationBlock,
|
||||||
TaskV2Block,
|
TaskV2Block,
|
||||||
|
compute_conditional_scopes,
|
||||||
get_all_blocks,
|
get_all_blocks,
|
||||||
)
|
)
|
||||||
from skyvern.forge.sdk.workflow.models.parameter import (
|
from skyvern.forge.sdk.workflow.models.parameter import (
|
||||||
@@ -1206,6 +1207,9 @@ class WorkflowService:
|
|||||||
)
|
)
|
||||||
return workflow_run, blocks_to_update
|
return workflow_run, blocks_to_update
|
||||||
|
|
||||||
|
conditional_scopes = compute_conditional_scopes(label_to_block, default_next_map)
|
||||||
|
conditional_wrb_ids: dict[str, str] = {}
|
||||||
|
|
||||||
visited_labels: set[str] = set()
|
visited_labels: set[str] = set()
|
||||||
current_label = start_label
|
current_label = start_label
|
||||||
block_idx = 0
|
block_idx = 0
|
||||||
@@ -1225,6 +1229,15 @@ class WorkflowService:
|
|||||||
)
|
)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Determine the parent for timeline nesting: if this block is
|
||||||
|
# inside a conditional's scope, parent it to that conditional's
|
||||||
|
# workflow_run_block rather than the root.
|
||||||
|
parent_wrb_id: str | None = None
|
||||||
|
if current_label in conditional_scopes:
|
||||||
|
cond_label = conditional_scopes[current_label]
|
||||||
|
if cond_label in conditional_wrb_ids:
|
||||||
|
parent_wrb_id = conditional_wrb_ids[cond_label]
|
||||||
|
|
||||||
(
|
(
|
||||||
workflow_run,
|
workflow_run,
|
||||||
blocks_to_update,
|
blocks_to_update,
|
||||||
@@ -1244,8 +1257,14 @@ class WorkflowService:
|
|||||||
loaded_script_module=loaded_script_module,
|
loaded_script_module=loaded_script_module,
|
||||||
is_script_run=is_script_run,
|
is_script_run=is_script_run,
|
||||||
blocks_to_update=blocks_to_update,
|
blocks_to_update=blocks_to_update,
|
||||||
|
parent_workflow_run_block_id=parent_wrb_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Track conditional workflow_run_block_ids so branch targets
|
||||||
|
# can be parented to them.
|
||||||
|
if block.block_type == BlockType.CONDITIONAL and block_result and block_result.workflow_run_block_id:
|
||||||
|
conditional_wrb_ids[block.label] = block_result.workflow_run_block_id
|
||||||
|
|
||||||
visited_labels.add(current_label)
|
visited_labels.add(current_label)
|
||||||
if should_stop:
|
if should_stop:
|
||||||
break
|
break
|
||||||
@@ -1298,6 +1317,7 @@ class WorkflowService:
|
|||||||
loaded_script_module: Any,
|
loaded_script_module: Any,
|
||||||
is_script_run: bool,
|
is_script_run: bool,
|
||||||
blocks_to_update: set[str],
|
blocks_to_update: set[str],
|
||||||
|
parent_workflow_run_block_id: str | None = None,
|
||||||
) -> tuple[WorkflowRun, set[str], BlockResult | None, bool, dict[str, Any] | None]:
|
) -> tuple[WorkflowRun, set[str], BlockResult | None, bool, dict[str, Any] | None]:
|
||||||
organization_id = organization.organization_id
|
organization_id = organization.organization_id
|
||||||
workflow_run_block_result: BlockResult | None = None
|
workflow_run_block_result: BlockResult | None = None
|
||||||
@@ -1426,6 +1446,7 @@ class WorkflowService:
|
|||||||
)
|
)
|
||||||
workflow_run_block_result = await block.execute_safe(
|
workflow_run_block_result = await block.execute_safe(
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
|
parent_workflow_run_block_id=parent_workflow_run_block_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
browser_session_id=browser_session_id,
|
browser_session_id=browser_session_id,
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user