continue on failure for lask block (#1299)
This commit is contained in:
@@ -216,7 +216,6 @@ class WorkflowService:
|
|||||||
blocks_cnt = len(blocks)
|
blocks_cnt = len(blocks)
|
||||||
block_result = None
|
block_result = None
|
||||||
for block_idx, block in enumerate(blocks):
|
for block_idx, block in enumerate(blocks):
|
||||||
is_last_block = block_idx + 1 == blocks_cnt
|
|
||||||
try:
|
try:
|
||||||
refreshed_workflow_run = await app.DATABASE.get_workflow_run(
|
refreshed_workflow_run = await app.DATABASE.get_workflow_run(
|
||||||
workflow_run_id=workflow_run.workflow_run_id
|
workflow_run_id=workflow_run.workflow_run_id
|
||||||
@@ -278,18 +277,7 @@ class WorkflowService:
|
|||||||
block_type_var=block.block_type,
|
block_type_var=block.block_type,
|
||||||
block_label=block.label,
|
block_label=block.label,
|
||||||
)
|
)
|
||||||
if block.continue_on_failure and not is_last_block:
|
if not block.continue_on_failure:
|
||||||
LOG.warning(
|
|
||||||
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed but will continue executing the workflow run {workflow_run_id}",
|
|
||||||
block_type=block.block_type,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_result=block_result,
|
|
||||||
continue_on_failure=block.continue_on_failure,
|
|
||||||
block_type_var=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed. failure reason: {block_result.failure_reason}"
|
failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed. failure reason: {block_result.failure_reason}"
|
||||||
await self.mark_workflow_run_as_failed(
|
await self.mark_workflow_run_as_failed(
|
||||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
||||||
@@ -300,6 +288,18 @@ class WorkflowService:
|
|||||||
api_key=api_key,
|
api_key=api_key,
|
||||||
)
|
)
|
||||||
return workflow_run
|
return workflow_run
|
||||||
|
|
||||||
|
LOG.warning(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed but will continue executing the workflow run {workflow_run_id}",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
continue_on_failure=block.continue_on_failure,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
|
||||||
elif block_result.status == BlockStatus.terminated:
|
elif block_result.status == BlockStatus.terminated:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated",
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated",
|
||||||
@@ -310,18 +310,8 @@ class WorkflowService:
|
|||||||
block_type_var=block.block_type,
|
block_type_var=block.block_type,
|
||||||
block_label=block.label,
|
block_label=block.label,
|
||||||
)
|
)
|
||||||
if block.continue_on_failure and not is_last_block:
|
|
||||||
LOG.warning(
|
if not block.continue_on_failure:
|
||||||
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run",
|
|
||||||
block_type=block.block_type,
|
|
||||||
workflow_run_id=workflow_run.workflow_run_id,
|
|
||||||
block_idx=block_idx,
|
|
||||||
block_result=block_result,
|
|
||||||
continue_on_failure=block.continue_on_failure,
|
|
||||||
block_type_var=block.block_type,
|
|
||||||
block_label=block.label,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} terminated. Reason: {block_result.failure_reason}"
|
failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} terminated. Reason: {block_result.failure_reason}"
|
||||||
await self.mark_workflow_run_as_terminated(
|
await self.mark_workflow_run_as_terminated(
|
||||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
||||||
@@ -332,6 +322,18 @@ class WorkflowService:
|
|||||||
api_key=api_key,
|
api_key=api_key,
|
||||||
)
|
)
|
||||||
return workflow_run
|
return workflow_run
|
||||||
|
|
||||||
|
LOG.warning(
|
||||||
|
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run",
|
||||||
|
block_type=block.block_type,
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id,
|
||||||
|
block_idx=block_idx,
|
||||||
|
block_result=block_result,
|
||||||
|
continue_on_failure=block.continue_on_failure,
|
||||||
|
block_type_var=block.block_type,
|
||||||
|
block_label=block.label,
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception(
|
LOG.exception(
|
||||||
f"Error while executing workflow run {workflow_run.workflow_run_id}",
|
f"Error while executing workflow run {workflow_run.workflow_run_id}",
|
||||||
|
|||||||
Reference in New Issue
Block a user