clean up workflow run when it's done by cached script (#3333)
This commit is contained in:
@@ -283,7 +283,7 @@ class WorkflowService:
|
|||||||
skyvern_context.current()
|
skyvern_context.current()
|
||||||
|
|
||||||
# Set workflow run status to running, create workflow run parameters
|
# Set workflow run status to running, create workflow run parameters
|
||||||
workflow_run = await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id)
|
workflow_run = await self.mark_workflow_run_as_running(workflow_run_id=workflow_run_id)
|
||||||
|
|
||||||
# Get all context parameters from the workflow definition
|
# Get all context parameters from the workflow definition
|
||||||
context_parameters = [
|
context_parameters = [
|
||||||
@@ -309,7 +309,7 @@ class WorkflowService:
|
|||||||
]
|
]
|
||||||
|
|
||||||
# Get all <workflow parameter, workflow run parameter> tuples
|
# Get all <workflow parameter, workflow run parameter> tuples
|
||||||
wp_wps_tuples = await self.get_workflow_run_parameter_tuples(workflow_run_id=workflow_run.workflow_run_id)
|
wp_wps_tuples = await self.get_workflow_run_parameter_tuples(workflow_run_id=workflow_run_id)
|
||||||
workflow_output_parameters = await self.get_workflow_output_parameters(workflow_id=workflow.workflow_id)
|
workflow_output_parameters = await self.get_workflow_output_parameters(workflow_id=workflow.workflow_id)
|
||||||
try:
|
try:
|
||||||
await app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context(
|
await app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context(
|
||||||
@@ -323,8 +323,8 @@ class WorkflowService:
|
|||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception(
|
LOG.exception(
|
||||||
f"Error while initializing workflow run context for workflow run {workflow_run.workflow_run_id}",
|
f"Error while initializing workflow run context for workflow run {workflow_run_id}",
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
exception_message = f"Unexpected error: {str(e)}"
|
exception_message = f"Unexpected error: {str(e)}"
|
||||||
@@ -333,7 +333,7 @@ class WorkflowService:
|
|||||||
|
|
||||||
failure_reason = f"Failed to initialize workflow run context. failure reason: {exception_message}"
|
failure_reason = f"Failed to initialize workflow run context. failure reason: {exception_message}"
|
||||||
workflow_run = await self.mark_workflow_run_as_failed(
|
workflow_run = await self.mark_workflow_run_as_failed(
|
||||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
workflow_run_id=workflow_run_id, failure_reason=failure_reason
|
||||||
)
|
)
|
||||||
await self.clean_up_workflow(
|
await self.clean_up_workflow(
|
||||||
workflow=workflow,
|
workflow=workflow,
|
||||||
@@ -354,264 +354,27 @@ class WorkflowService:
|
|||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
workflow_script_id=workflow_script.script_id,
|
workflow_script_id=workflow_script.script_id,
|
||||||
)
|
)
|
||||||
return await self._execute_workflow_script(
|
workflow_run = await self._execute_workflow_script(
|
||||||
script_id=workflow_script.script_id,
|
script_id=workflow_script.script_id,
|
||||||
workflow=workflow,
|
workflow=workflow,
|
||||||
workflow_run=workflow_run,
|
workflow_run=workflow_run,
|
||||||
api_key=api_key,
|
api_key=api_key,
|
||||||
organization=organization,
|
organization=organization,
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
top_level_blocks = workflow.workflow_definition.blocks
|
workflow_run = await self._execute_workflow_blocks(
|
||||||
all_blocks = get_all_blocks(top_level_blocks)
|
workflow=workflow,
|
||||||
|
workflow_run=workflow_run,
|
||||||
if block_labels and len(block_labels):
|
api_key=api_key,
|
||||||
blocks: list[BlockTypeVar] = []
|
organization=organization,
|
||||||
all_labels = {block.label: block for block in all_blocks}
|
close_browser_on_completion=close_browser_on_completion,
|
||||||
for label in block_labels:
|
browser_session_id=browser_session_id,
|
||||||
if label not in all_labels:
|
|
||||||
raise BlockNotFound(block_label=label)
|
|
||||||
|
|
||||||
blocks.append(all_labels[label])
|
|
||||||
|
|
||||||
LOG.info(
|
|
||||||
"Executing workflow blocks via whitelist",
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_cnt=len(blocks),
|
|
||||||
block_labels=block_labels,
|
block_labels=block_labels,
|
||||||
block_outputs=block_outputs,
|
block_outputs=block_outputs,
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
|
||||||
blocks = top_level_blocks
|
|
||||||
|
|
||||||
if not blocks:
|
|
||||||
raise SkyvernException(f"No blocks found for the given block labels: {block_labels}")
|
|
||||||
|
|
||||||
# Execute workflow blocks
|
|
||||||
blocks_cnt = len(blocks)
|
|
||||||
block_result = None
|
|
||||||
for block_idx, block in enumerate(blocks):
|
|
||||||
try:
|
|
||||||
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
organization_id=organization_id,
|
|
||||||
):
|
|
||||||
workflow_run = refreshed_workflow_run
|
|
||||||
if workflow_run.status == WorkflowRunStatus.canceled:
|
|
||||||
LOG.info(
|
|
||||||
"Workflow run is canceled, stopping execution inside workflow execution loop",
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_type=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
await self.clean_up_workflow(
|
|
||||||
workflow=workflow,
|
|
||||||
workflow_run=workflow_run,
|
|
||||||
api_key=api_key,
|
|
||||||
need_call_webhook=True,
|
|
||||||
close_browser_on_completion=close_browser_on_completion,
|
|
||||||
browser_session_id=browser_session_id,
|
|
||||||
)
|
|
||||||
return workflow_run
|
|
||||||
|
|
||||||
if workflow_run.status == WorkflowRunStatus.timed_out:
|
|
||||||
LOG.info(
|
|
||||||
"Workflow run is timed out, stopping execution inside workflow execution loop",
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_type=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
await self.clean_up_workflow(
|
|
||||||
workflow=workflow,
|
|
||||||
workflow_run=workflow_run,
|
|
||||||
api_key=api_key,
|
|
||||||
need_call_webhook=True,
|
|
||||||
close_browser_on_completion=close_browser_on_completion,
|
|
||||||
browser_session_id=browser_session_id,
|
|
||||||
)
|
|
||||||
return workflow_run
|
|
||||||
|
|
||||||
parameters = block.get_all_parameters(workflow_run_id)
|
|
||||||
await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run(
|
|
||||||
workflow_run_id, parameters, organization
|
|
||||||
)
|
|
||||||
LOG.info(
|
|
||||||
f"Executing root block {block.block_type} at index {block_idx}/{blocks_cnt - 1} for workflow run {workflow_run_id}",
|
|
||||||
block_type=block.block_type,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_type_var=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
model=block.model,
|
|
||||||
)
|
|
||||||
block_result = await block.execute_safe(
|
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
organization_id=organization_id,
|
|
||||||
browser_session_id=browser_session_id,
|
|
||||||
)
|
|
||||||
if block_result.status == BlockStatus.canceled:
|
|
||||||
LOG.info(
|
|
||||||
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was canceled for workflow run {workflow_run_id}, cancelling workflow run",
|
|
||||||
block_type=block.block_type,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_result=block_result,
|
|
||||||
block_type_var=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
workflow_run = await self.mark_workflow_run_as_canceled(
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id
|
|
||||||
)
|
|
||||||
# We're not sending a webhook here because the workflow run is manually marked as canceled.
|
|
||||||
await self.clean_up_workflow(
|
|
||||||
workflow=workflow,
|
|
||||||
workflow_run=workflow_run,
|
|
||||||
api_key=api_key,
|
|
||||||
need_call_webhook=False,
|
|
||||||
close_browser_on_completion=close_browser_on_completion,
|
|
||||||
browser_session_id=browser_session_id,
|
|
||||||
)
|
|
||||||
return workflow_run
|
|
||||||
elif block_result.status == BlockStatus.failed:
|
|
||||||
LOG.error(
|
|
||||||
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed for workflow run {workflow_run_id}",
|
|
||||||
block_type=block.block_type,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_result=block_result,
|
|
||||||
block_type_var=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
if not block.continue_on_failure:
|
|
||||||
failure_reason = (
|
|
||||||
f"{block.block_type} block failed. failure reason: {block_result.failure_reason}"
|
|
||||||
)
|
|
||||||
workflow_run = await self.mark_workflow_run_as_failed(
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
|
||||||
)
|
|
||||||
await self.clean_up_workflow(
|
|
||||||
workflow=workflow,
|
|
||||||
workflow_run=workflow_run,
|
|
||||||
api_key=api_key,
|
|
||||||
close_browser_on_completion=close_browser_on_completion,
|
|
||||||
browser_session_id=browser_session_id,
|
|
||||||
)
|
|
||||||
return workflow_run
|
|
||||||
|
|
||||||
LOG.warning(
|
|
||||||
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed but will continue executing the workflow run {workflow_run_id}",
|
|
||||||
block_type=block.block_type,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_result=block_result,
|
|
||||||
continue_on_failure=block.continue_on_failure,
|
|
||||||
block_type_var=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
|
|
||||||
elif block_result.status == BlockStatus.terminated:
|
|
||||||
LOG.info(
|
|
||||||
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated",
|
|
||||||
block_type=block.block_type,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_result=block_result,
|
|
||||||
block_type_var=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not block.continue_on_failure:
|
|
||||||
failure_reason = f"{block.block_type} block terminated. Reason: {block_result.failure_reason}"
|
|
||||||
workflow_run = await self.mark_workflow_run_as_terminated(
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
|
||||||
)
|
|
||||||
await self.clean_up_workflow(
|
|
||||||
workflow=workflow,
|
|
||||||
workflow_run=workflow_run,
|
|
||||||
api_key=api_key,
|
|
||||||
close_browser_on_completion=close_browser_on_completion,
|
|
||||||
browser_session_id=browser_session_id,
|
|
||||||
)
|
|
||||||
return workflow_run
|
|
||||||
|
|
||||||
LOG.warning(
|
|
||||||
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run",
|
|
||||||
block_type=block.block_type,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_result=block_result,
|
|
||||||
continue_on_failure=block.continue_on_failure,
|
|
||||||
block_type_var=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
|
|
||||||
elif block_result.status == BlockStatus.timed_out:
|
|
||||||
LOG.info(
|
|
||||||
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, marking workflow run as failed",
|
|
||||||
block_type=block.block_type,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_result=block_result,
|
|
||||||
block_type_var=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not block.continue_on_failure:
|
|
||||||
failure_reason = f"{block.block_type} block timed out. Reason: {block_result.failure_reason}"
|
|
||||||
workflow_run = await self.mark_workflow_run_as_failed(
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
|
||||||
)
|
|
||||||
await self.clean_up_workflow(
|
|
||||||
workflow=workflow,
|
|
||||||
workflow_run=workflow_run,
|
|
||||||
api_key=api_key,
|
|
||||||
close_browser_on_completion=close_browser_on_completion,
|
|
||||||
browser_session_id=browser_session_id,
|
|
||||||
)
|
|
||||||
return workflow_run
|
|
||||||
|
|
||||||
LOG.warning(
|
|
||||||
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, but will continue executing the workflow run",
|
|
||||||
block_type=block.block_type,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_result=block_result,
|
|
||||||
continue_on_failure=block.continue_on_failure,
|
|
||||||
block_type_var=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
LOG.exception(
|
|
||||||
f"Error while executing workflow run {workflow_run.workflow_run_id}",
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_type=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
|
|
||||||
exception_message = f"Unexpected error: {str(e)}"
|
|
||||||
if isinstance(e, SkyvernException):
|
|
||||||
exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}"
|
|
||||||
|
|
||||||
failure_reason = f"{block.block_type} block failed. failure reason: {exception_message}"
|
|
||||||
workflow_run = await self.mark_workflow_run_as_failed(
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
|
||||||
)
|
|
||||||
await self.clean_up_workflow(
|
|
||||||
workflow=workflow,
|
|
||||||
workflow_run=workflow_run,
|
|
||||||
api_key=api_key,
|
|
||||||
browser_session_id=browser_session_id,
|
|
||||||
close_browser_on_completion=close_browser_on_completion,
|
|
||||||
)
|
|
||||||
return workflow_run
|
|
||||||
|
|
||||||
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
|
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
):
|
):
|
||||||
workflow_run = refreshed_workflow_run
|
workflow_run = refreshed_workflow_run
|
||||||
@@ -621,7 +384,7 @@ class WorkflowService:
|
|||||||
WorkflowRunStatus.terminated,
|
WorkflowRunStatus.terminated,
|
||||||
WorkflowRunStatus.timed_out,
|
WorkflowRunStatus.timed_out,
|
||||||
):
|
):
|
||||||
workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
|
workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id)
|
||||||
await self.generate_script_if_needed(
|
await self.generate_script_if_needed(
|
||||||
workflow=workflow,
|
workflow=workflow,
|
||||||
workflow_run=workflow_run,
|
workflow_run=workflow_run,
|
||||||
@@ -630,7 +393,7 @@ class WorkflowService:
|
|||||||
else:
|
else:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed",
|
"Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed",
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
workflow_run_status=workflow_run.status if workflow_run else None,
|
workflow_run_status=workflow_run.status if workflow_run else None,
|
||||||
)
|
)
|
||||||
await self.clean_up_workflow(
|
await self.clean_up_workflow(
|
||||||
@@ -648,12 +411,220 @@ class WorkflowService:
|
|||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
workflow_id=workflow_run.workflow_id,
|
workflow_id=workflow_run.workflow_id,
|
||||||
duration_seconds=duration_seconds,
|
duration_seconds=duration_seconds,
|
||||||
workflow_run_status=WorkflowRunStatus.completed,
|
workflow_run_status=workflow_run.status,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
|
is_script_run=workflow_script is not None,
|
||||||
)
|
)
|
||||||
|
|
||||||
return workflow_run
|
return workflow_run
|
||||||
|
|
||||||
|
async def _execute_workflow_blocks(
|
||||||
|
self,
|
||||||
|
workflow: Workflow,
|
||||||
|
workflow_run: WorkflowRun,
|
||||||
|
api_key: str,
|
||||||
|
organization: Organization,
|
||||||
|
close_browser_on_completion: bool,
|
||||||
|
browser_session_id: str | None = None,
|
||||||
|
block_labels: list[str] | None = None,
|
||||||
|
block_outputs: dict[str, Any] | None = None,
|
||||||
|
) -> WorkflowRun:
|
||||||
|
organization_id = organization.organization_id
|
||||||
|
workflow_run_id = workflow_run.workflow_run_id
|
||||||
|
top_level_blocks = workflow.workflow_definition.blocks
|
||||||
|
all_blocks = get_all_blocks(top_level_blocks)
|
||||||
|
|
||||||
|
if block_labels and len(block_labels):
|
||||||
|
blocks: list[BlockTypeVar] = []
|
||||||
|
all_labels = {block.label: block for block in all_blocks}
|
||||||
|
for label in block_labels:
|
||||||
|
if label not in all_labels:
|
||||||
|
raise BlockNotFound(block_label=label)
|
||||||
|
|
||||||
|
blocks.append(all_labels[label])
|
||||||
|
|
||||||
|
LOG.info(
|
||||||
|
"Executing workflow blocks via whitelist",
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_cnt=len(blocks),
|
||||||
|
block_labels=block_labels,
|
||||||
|
block_outputs=block_outputs,
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
blocks = top_level_blocks
|
||||||
|
|
||||||
|
if not blocks:
|
||||||
|
raise SkyvernException(f"No blocks found for the given block labels: {block_labels}")
|
||||||
|
|
||||||
|
# Execute workflow blocks
|
||||||
|
blocks_cnt = len(blocks)
|
||||||
|
block_result = None
|
||||||
|
for block_idx, block in enumerate(blocks):
|
||||||
|
try:
|
||||||
|
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
organization_id=organization_id,
|
||||||
|
):
|
||||||
|
workflow_run = refreshed_workflow_run
|
||||||
|
if workflow_run.status == WorkflowRunStatus.canceled:
|
||||||
|
LOG.info(
|
||||||
|
"Workflow run is canceled, stopping execution inside workflow execution loop",
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_type=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
if workflow_run.status == WorkflowRunStatus.timed_out:
|
||||||
|
LOG.info(
|
||||||
|
"Workflow run is timed out, stopping execution inside workflow execution loop",
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_type=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
parameters = block.get_all_parameters(workflow_run_id)
|
||||||
|
await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run(
|
||||||
|
workflow_run_id, parameters, organization
|
||||||
|
)
|
||||||
|
LOG.info(
|
||||||
|
f"Executing root block {block.block_type} at index {block_idx}/{blocks_cnt - 1} for workflow run {workflow_run_id}",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
model=block.model,
|
||||||
|
)
|
||||||
|
block_result = await block.execute_safe(
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
organization_id=organization_id,
|
||||||
|
browser_session_id=browser_session_id,
|
||||||
|
)
|
||||||
|
if block_result.status == BlockStatus.canceled:
|
||||||
|
LOG.info(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was canceled for workflow run {workflow_run_id}, cancelling workflow run",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
workflow_run = await self.mark_workflow_run_as_canceled(workflow_run_id=workflow_run_id)
|
||||||
|
break
|
||||||
|
elif block_result.status == BlockStatus.failed:
|
||||||
|
LOG.error(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed for workflow run {workflow_run_id}",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
if not block.continue_on_failure:
|
||||||
|
failure_reason = (
|
||||||
|
f"{block.block_type} block failed. failure reason: {block_result.failure_reason}"
|
||||||
|
)
|
||||||
|
workflow_run = await self.mark_workflow_run_as_failed(
|
||||||
|
workflow_run_id=workflow_run_id, failure_reason=failure_reason
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
LOG.warning(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} failed but will continue executing the workflow run {workflow_run_id}",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
continue_on_failure=block.continue_on_failure,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
|
||||||
|
elif block_result.status == BlockStatus.terminated:
|
||||||
|
LOG.info(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not block.continue_on_failure:
|
||||||
|
failure_reason = f"{block.block_type} block terminated. Reason: {block_result.failure_reason}"
|
||||||
|
workflow_run = await self.mark_workflow_run_as_terminated(
|
||||||
|
workflow_run_id=workflow_run_id, failure_reason=failure_reason
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
LOG.warning(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
continue_on_failure=block.continue_on_failure,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
|
||||||
|
elif block_result.status == BlockStatus.timed_out:
|
||||||
|
LOG.info(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, marking workflow run as failed",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not block.continue_on_failure:
|
||||||
|
failure_reason = f"{block.block_type} block timed out. Reason: {block_result.failure_reason}"
|
||||||
|
workflow_run = await self.mark_workflow_run_as_failed(
|
||||||
|
workflow_run_id=workflow_run_id, failure_reason=failure_reason
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
LOG.warning(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} timed out for workflow run {workflow_run_id}, but will continue executing the workflow run",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
continue_on_failure=block.continue_on_failure,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception(
|
||||||
|
f"Error while executing workflow run {workflow_run_id}",
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_type=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
|
||||||
|
exception_message = f"Unexpected error: {str(e)}"
|
||||||
|
if isinstance(e, SkyvernException):
|
||||||
|
exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}"
|
||||||
|
|
||||||
|
failure_reason = f"{block.block_type} block failed. failure reason: {exception_message}"
|
||||||
|
workflow_run = await self.mark_workflow_run_as_failed(
|
||||||
|
workflow_run_id=workflow_run_id, failure_reason=failure_reason
|
||||||
|
)
|
||||||
|
break
|
||||||
|
return workflow_run
|
||||||
|
|
||||||
async def create_workflow(
|
async def create_workflow(
|
||||||
self,
|
self,
|
||||||
organization_id: str,
|
organization_id: str,
|
||||||
|
|||||||
Reference in New Issue
Block a user