Fix finally block executing twice on successful workflow runs (#4659)
This commit is contained in:
@@ -1078,6 +1078,10 @@ class WorkflowService:
|
|||||||
|
|
||||||
else:
|
else:
|
||||||
blocks = top_level_blocks
|
blocks = top_level_blocks
|
||||||
|
# Exclude the finally block from normal traversal — it runs separately via _execute_finally_block_if_configured
|
||||||
|
finally_block_label = workflow.workflow_definition.finally_block_label
|
||||||
|
if finally_block_label:
|
||||||
|
blocks = self._strip_finally_block_references(blocks, finally_block_label)
|
||||||
|
|
||||||
if not blocks:
|
if not blocks:
|
||||||
raise SkyvernException(f"No blocks found for the given block labels: {block_labels}")
|
raise SkyvernException(f"No blocks found for the given block labels: {block_labels}")
|
||||||
@@ -1187,10 +1191,13 @@ class WorkflowService:
|
|||||||
is_script_run: bool,
|
is_script_run: bool,
|
||||||
blocks_to_update: set[str],
|
blocks_to_update: set[str],
|
||||||
) -> tuple[WorkflowRun, set[str]]:
|
) -> tuple[WorkflowRun, set[str]]:
|
||||||
|
finally_block_label = workflow.workflow_definition.finally_block_label
|
||||||
|
dag_blocks = workflow.workflow_definition.blocks
|
||||||
|
if finally_block_label:
|
||||||
|
dag_blocks = self._strip_finally_block_references(dag_blocks, finally_block_label)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
start_label, label_to_block, default_next_map = self._build_workflow_graph(
|
start_label, label_to_block, default_next_map = self._build_workflow_graph(dag_blocks)
|
||||||
workflow.workflow_definition.blocks
|
|
||||||
)
|
|
||||||
except InvalidWorkflowDefinition as exc:
|
except InvalidWorkflowDefinition as exc:
|
||||||
LOG.error("Workflow graph validation failed", error=str(exc), workflow_id=workflow.workflow_id)
|
LOG.error("Workflow graph validation failed", error=str(exc), workflow_id=workflow.workflow_id)
|
||||||
workflow_run = await self.mark_workflow_run_as_failed(
|
workflow_run = await self.mark_workflow_run_as_failed(
|
||||||
@@ -1628,6 +1635,34 @@ class WorkflowService:
|
|||||||
error=str(e),
|
error=str(e),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _strip_finally_block_references(
|
||||||
|
blocks: list[BlockTypeVar],
|
||||||
|
finally_block_label: str,
|
||||||
|
) -> list[BlockTypeVar]:
|
||||||
|
"""Remove the finally block and nullify any edges that point to it.
|
||||||
|
|
||||||
|
This prevents _build_workflow_graph from raising InvalidWorkflowDefinition
|
||||||
|
when a block's next_block_label references the (now-excluded) finally block.
|
||||||
|
"""
|
||||||
|
result: list[BlockTypeVar] = []
|
||||||
|
for block in blocks:
|
||||||
|
if block.label == finally_block_label:
|
||||||
|
continue
|
||||||
|
if isinstance(block, ConditionalBlock):
|
||||||
|
patched_branches = [
|
||||||
|
branch.model_copy(update={"next_block_label": None})
|
||||||
|
if branch.next_block_label == finally_block_label
|
||||||
|
else branch
|
||||||
|
for branch in block.branch_conditions
|
||||||
|
]
|
||||||
|
if patched_branches != block.branch_conditions:
|
||||||
|
block = block.model_copy(update={"branch_conditions": patched_branches})
|
||||||
|
elif block.next_block_label == finally_block_label:
|
||||||
|
block = block.model_copy(update={"next_block_label": None})
|
||||||
|
result.append(block)
|
||||||
|
return result
|
||||||
|
|
||||||
def _build_workflow_graph(
|
def _build_workflow_graph(
|
||||||
self,
|
self,
|
||||||
blocks: list[BlockTypeVar],
|
blocks: list[BlockTypeVar],
|
||||||
|
|||||||
Reference in New Issue
Block a user