Return 409 on workflow updates race conditions (#4510)

This commit is contained in:
Stanislav Novosad
2026-01-21 15:45:52 -07:00
committed by GitHub
parent f879f4983c
commit f781a6f0ef
2 changed files with 38 additions and 24 deletions

View File

@@ -54,6 +54,14 @@ class FailedToUpdateWorkflow(BaseWorkflowHTTPException):
)
class WorkflowVersionConflict(BaseWorkflowHTTPException):
def __init__(self, workflow_permanent_id: str) -> None:
super().__init__(
f"Concurrent update detected for workflow {workflow_permanent_id}. Please retry.",
status_code=status.HTTP_409_CONFLICT,
)
class OutputParameterKeyCollisionError(BaseWorkflowHTTPException):
def __init__(self, key: str, retry_count: int | None = None) -> None:
message = f"Output parameter key {key} already exists in the context manager."

View File

@@ -60,6 +60,7 @@ from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRu
from skyvern.forge.sdk.trace import TraceManager
from skyvern.forge.sdk.workflow.exceptions import (
InvalidWorkflowDefinition,
WorkflowVersionConflict,
)
from skyvern.forge.sdk.workflow.models.block import (
BlockTypeVar,
@@ -1586,30 +1587,35 @@ class WorkflowService:
sequential_key: str | None = None,
folder_id: str | None = None,
) -> Workflow:
return await app.DATABASE.create_workflow(
title=title,
workflow_definition=workflow_definition.model_dump(),
organization_id=organization_id,
description=description,
proxy_location=proxy_location,
webhook_callback_url=webhook_callback_url,
max_screenshot_scrolling_times=max_screenshot_scrolling_times,
totp_verification_url=totp_verification_url,
totp_identifier=totp_identifier,
persist_browser_session=persist_browser_session,
model=model,
workflow_permanent_id=workflow_permanent_id,
version=version,
is_saved_task=is_saved_task,
status=status,
extra_http_headers=extra_http_headers,
run_with=run_with,
cache_key=cache_key,
ai_fallback=False if ai_fallback is None else ai_fallback,
run_sequentially=run_sequentially,
sequential_key=sequential_key,
folder_id=folder_id,
)
try:
return await app.DATABASE.create_workflow(
title=title,
workflow_definition=workflow_definition.model_dump(),
organization_id=organization_id,
description=description,
proxy_location=proxy_location,
webhook_callback_url=webhook_callback_url,
max_screenshot_scrolling_times=max_screenshot_scrolling_times,
totp_verification_url=totp_verification_url,
totp_identifier=totp_identifier,
persist_browser_session=persist_browser_session,
model=model,
workflow_permanent_id=workflow_permanent_id,
version=version,
is_saved_task=is_saved_task,
status=status,
extra_http_headers=extra_http_headers,
run_with=run_with,
cache_key=cache_key,
ai_fallback=False if ai_fallback is None else ai_fallback,
run_sequentially=run_sequentially,
sequential_key=sequential_key,
folder_id=folder_id,
)
except IntegrityError as e:
if "uc_org_permanent_id_version" in str(e) and workflow_permanent_id:
raise WorkflowVersionConflict(workflow_permanent_id) from e
raise
async def create_workflow_from_prompt(
self,