From f781a6f0efb1e297963bda8b319c5fb6e0366f58 Mon Sep 17 00:00:00 2001 From: Stanislav Novosad Date: Wed, 21 Jan 2026 15:45:52 -0700 Subject: [PATCH] Return 409 on workflow updates race conditions (#4510) --- skyvern/forge/sdk/workflow/exceptions.py | 8 ++++ skyvern/forge/sdk/workflow/service.py | 54 +++++++++++++----------- 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/skyvern/forge/sdk/workflow/exceptions.py b/skyvern/forge/sdk/workflow/exceptions.py index f3d2728b..42bed8d4 100644 --- a/skyvern/forge/sdk/workflow/exceptions.py +++ b/skyvern/forge/sdk/workflow/exceptions.py @@ -54,6 +54,14 @@ class FailedToUpdateWorkflow(BaseWorkflowHTTPException): ) +class WorkflowVersionConflict(BaseWorkflowHTTPException): + def __init__(self, workflow_permanent_id: str) -> None: + super().__init__( + f"Concurrent update detected for workflow {workflow_permanent_id}. Please retry.", + status_code=status.HTTP_409_CONFLICT, + ) + + class OutputParameterKeyCollisionError(BaseWorkflowHTTPException): def __init__(self, key: str, retry_count: int | None = None) -> None: message = f"Output parameter key {key} already exists in the context manager." diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 7f315f53..2d78201a 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -60,6 +60,7 @@ from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRu from skyvern.forge.sdk.trace import TraceManager from skyvern.forge.sdk.workflow.exceptions import ( InvalidWorkflowDefinition, + WorkflowVersionConflict, ) from skyvern.forge.sdk.workflow.models.block import ( BlockTypeVar, @@ -1586,30 +1587,35 @@ class WorkflowService: sequential_key: str | None = None, folder_id: str | None = None, ) -> Workflow: - return await app.DATABASE.create_workflow( - title=title, - workflow_definition=workflow_definition.model_dump(), - organization_id=organization_id, - description=description, - proxy_location=proxy_location, - webhook_callback_url=webhook_callback_url, - max_screenshot_scrolling_times=max_screenshot_scrolling_times, - totp_verification_url=totp_verification_url, - totp_identifier=totp_identifier, - persist_browser_session=persist_browser_session, - model=model, - workflow_permanent_id=workflow_permanent_id, - version=version, - is_saved_task=is_saved_task, - status=status, - extra_http_headers=extra_http_headers, - run_with=run_with, - cache_key=cache_key, - ai_fallback=False if ai_fallback is None else ai_fallback, - run_sequentially=run_sequentially, - sequential_key=sequential_key, - folder_id=folder_id, - ) + try: + return await app.DATABASE.create_workflow( + title=title, + workflow_definition=workflow_definition.model_dump(), + organization_id=organization_id, + description=description, + proxy_location=proxy_location, + webhook_callback_url=webhook_callback_url, + max_screenshot_scrolling_times=max_screenshot_scrolling_times, + totp_verification_url=totp_verification_url, + totp_identifier=totp_identifier, + persist_browser_session=persist_browser_session, + model=model, + workflow_permanent_id=workflow_permanent_id, + version=version, + is_saved_task=is_saved_task, + status=status, + extra_http_headers=extra_http_headers, + run_with=run_with, + cache_key=cache_key, + ai_fallback=False if ai_fallback is None else ai_fallback, + run_sequentially=run_sequentially, + sequential_key=sequential_key, + folder_id=folder_id, + ) + except IntegrityError as e: + if "uc_org_permanent_id_version" in str(e) and workflow_permanent_id: + raise WorkflowVersionConflict(workflow_permanent_id) from e + raise async def create_workflow_from_prompt( self,