Add "Execute on Any Outcome" (Finally) option to blocks - Pair Team request (#4443)
This commit is contained in:
@@ -20,6 +20,24 @@ class WorkflowDefinitionHasDuplicateBlockLabels(BaseWorkflowHTTPException):
|
||||
)
|
||||
|
||||
|
||||
class InvalidFinallyBlockLabel(BaseWorkflowHTTPException):
|
||||
def __init__(self, finally_block_label: str, available_labels: list[str]) -> None:
|
||||
super().__init__(
|
||||
f"finally_block_label '{finally_block_label}' does not reference a valid block in the workflow. "
|
||||
f"Available block labels: {', '.join(available_labels) if available_labels else '(none)'}",
|
||||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
)
|
||||
|
||||
|
||||
class NonTerminalFinallyBlock(BaseWorkflowHTTPException):
|
||||
def __init__(self, finally_block_label: str) -> None:
|
||||
super().__init__(
|
||||
f"finally_block_label '{finally_block_label}' must be a terminal block (next_block_label must be null). "
|
||||
"Only blocks without a next_block_label can be used as finally blocks.",
|
||||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
)
|
||||
|
||||
|
||||
class FailedToCreateWorkflow(BaseWorkflowHTTPException):
|
||||
def __init__(self, error_message: str) -> None:
|
||||
super().__init__(
|
||||
|
||||
@@ -7,7 +7,11 @@ from typing_extensions import deprecated
|
||||
|
||||
from skyvern.forge.sdk.schemas.files import FileInfo
|
||||
from skyvern.forge.sdk.schemas.task_v2 import TaskV2
|
||||
from skyvern.forge.sdk.workflow.exceptions import WorkflowDefinitionHasDuplicateBlockLabels
|
||||
from skyvern.forge.sdk.workflow.exceptions import (
|
||||
InvalidFinallyBlockLabel,
|
||||
NonTerminalFinallyBlock,
|
||||
WorkflowDefinitionHasDuplicateBlockLabels,
|
||||
)
|
||||
from skyvern.forge.sdk.workflow.models.block import BlockTypeVar
|
||||
from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, OutputParameter
|
||||
from skyvern.schemas.runs import ProxyLocationInput, ScriptRunResponse
|
||||
@@ -54,6 +58,7 @@ class WorkflowDefinition(BaseModel):
|
||||
version: int = 1
|
||||
parameters: list[PARAMETER_TYPE]
|
||||
blocks: List[BlockTypeVar]
|
||||
finally_block_label: str | None = None
|
||||
|
||||
def validate(self) -> None:
|
||||
labels: set[str] = set()
|
||||
@@ -67,6 +72,13 @@ class WorkflowDefinition(BaseModel):
|
||||
if duplicate_labels:
|
||||
raise WorkflowDefinitionHasDuplicateBlockLabels(duplicate_labels)
|
||||
|
||||
if self.finally_block_label:
|
||||
if self.finally_block_label not in labels:
|
||||
raise InvalidFinallyBlockLabel(self.finally_block_label, list(labels))
|
||||
for block in self.blocks:
|
||||
if block.label == self.finally_block_label and block.next_block_label is not None:
|
||||
raise NonTerminalFinallyBlock(self.finally_block_label)
|
||||
|
||||
|
||||
class Workflow(BaseModel):
|
||||
workflow_id: str
|
||||
|
||||
@@ -737,33 +737,59 @@ class WorkflowService:
|
||||
script=workflow_script,
|
||||
)
|
||||
|
||||
# Check if there's a finally block configured
|
||||
finally_block_label = workflow.workflow_definition.finally_block_label
|
||||
|
||||
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
|
||||
workflow_run_id=workflow_run_id,
|
||||
organization_id=organization_id,
|
||||
):
|
||||
workflow_run = refreshed_workflow_run
|
||||
if workflow_run.status not in (
|
||||
WorkflowRunStatus.canceled,
|
||||
|
||||
pre_finally_status = workflow_run.status
|
||||
pre_finally_failure_reason = workflow_run.failure_reason
|
||||
|
||||
if pre_finally_status not in (
|
||||
WorkflowRunStatus.canceled,
|
||||
WorkflowRunStatus.failed,
|
||||
WorkflowRunStatus.terminated,
|
||||
WorkflowRunStatus.timed_out,
|
||||
):
|
||||
await self.generate_script_if_needed(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
block_labels=block_labels,
|
||||
blocks_to_update=blocks_to_update,
|
||||
)
|
||||
|
||||
# Execute finally block if configured. Skip for: canceled (user explicitly stopped)
|
||||
should_run_finally = finally_block_label and pre_finally_status != WorkflowRunStatus.canceled
|
||||
if should_run_finally:
|
||||
# Temporarily set to running for terminal workflows (for frontend UX)
|
||||
if pre_finally_status in (
|
||||
WorkflowRunStatus.failed,
|
||||
WorkflowRunStatus.terminated,
|
||||
WorkflowRunStatus.timed_out,
|
||||
):
|
||||
workflow_run = await self.mark_workflow_run_as_completed(
|
||||
workflow_run = await self._update_workflow_run_status(
|
||||
workflow_run_id=workflow_run_id,
|
||||
status=WorkflowRunStatus.running,
|
||||
failure_reason=None,
|
||||
)
|
||||
await self.generate_script_if_needed(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
block_labels=block_labels,
|
||||
blocks_to_update=blocks_to_update,
|
||||
)
|
||||
else:
|
||||
LOG.info(
|
||||
"Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed",
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_run_status=workflow_run.status,
|
||||
run_with=workflow_run.run_with,
|
||||
)
|
||||
await self._execute_finally_block_if_configured(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
organization=organization,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
|
||||
workflow_run = await self._finalize_workflow_run_status(
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_run=workflow_run,
|
||||
pre_finally_status=pre_finally_status,
|
||||
pre_finally_failure_reason=pre_finally_failure_reason,
|
||||
)
|
||||
|
||||
await self.clean_up_workflow(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
@@ -1340,6 +1366,46 @@ class WorkflowService:
|
||||
|
||||
return workflow_run, False
|
||||
|
||||
async def _execute_finally_block_if_configured(
|
||||
self,
|
||||
workflow: Workflow,
|
||||
workflow_run: WorkflowRun,
|
||||
organization: Organization,
|
||||
browser_session_id: str | None,
|
||||
) -> None:
|
||||
finally_block_label = workflow.workflow_definition.finally_block_label
|
||||
if not finally_block_label:
|
||||
return
|
||||
|
||||
label_to_block: dict[str, BlockTypeVar] = {block.label: block for block in workflow.workflow_definition.blocks}
|
||||
|
||||
block = label_to_block.get(finally_block_label)
|
||||
if not block:
|
||||
LOG.warning(
|
||||
"Finally block label not found",
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
finally_block_label=finally_block_label,
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
parameters = block.get_all_parameters(workflow_run.workflow_run_id)
|
||||
await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run(
|
||||
workflow_run.workflow_run_id, parameters, organization
|
||||
)
|
||||
await block.execute_safe(
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
organization_id=organization.organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
"Finally block execution failed",
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
block_label=block.label,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
def _build_workflow_graph(
|
||||
self,
|
||||
blocks: list[BlockTypeVar],
|
||||
@@ -2131,6 +2197,35 @@ class WorkflowService:
|
||||
run_with=run_with,
|
||||
)
|
||||
|
||||
async def _finalize_workflow_run_status(
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
workflow_run: WorkflowRun,
|
||||
pre_finally_status: WorkflowRunStatus,
|
||||
pre_finally_failure_reason: str | None,
|
||||
) -> WorkflowRun:
|
||||
"""
|
||||
Set final workflow run status based on pre-finally state.
|
||||
Called unconditionally to ensure unified flow.
|
||||
"""
|
||||
if pre_finally_status not in (
|
||||
WorkflowRunStatus.canceled,
|
||||
WorkflowRunStatus.failed,
|
||||
WorkflowRunStatus.terminated,
|
||||
WorkflowRunStatus.timed_out,
|
||||
):
|
||||
return await self.mark_workflow_run_as_completed(workflow_run_id)
|
||||
|
||||
if workflow_run.status == WorkflowRunStatus.running:
|
||||
# We temporarily set to running for finally block, restore terminal status
|
||||
return await self._update_workflow_run_status(
|
||||
workflow_run_id=workflow_run_id,
|
||||
status=pre_finally_status,
|
||||
failure_reason=pre_finally_failure_reason,
|
||||
)
|
||||
|
||||
return workflow_run
|
||||
|
||||
async def mark_workflow_run_as_failed(
|
||||
self,
|
||||
workflow_run_id: str,
|
||||
|
||||
@@ -302,7 +302,12 @@ def convert_workflow_definition(
|
||||
if dag_version is None:
|
||||
dag_version = 2 if _has_dag_metadata(workflow_definition_yaml.blocks) else 1
|
||||
|
||||
workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks, version=dag_version)
|
||||
workflow_definition = WorkflowDefinition(
|
||||
parameters=parameters.values(),
|
||||
blocks=blocks,
|
||||
version=dag_version,
|
||||
finally_block_label=workflow_definition_yaml.finally_block_label,
|
||||
)
|
||||
|
||||
LOG.info(
|
||||
f"Created workflow from request, title: {title}",
|
||||
|
||||
Reference in New Issue
Block a user