diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index 780e2783..8abbe566 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -197,6 +197,14 @@ class MissingValueForParameter(SkyvernHTTPException): ) +class WorkflowRunParameterPersistenceError(SkyvernException): + def __init__(self, parameter_key: str, workflow_id: str, workflow_run_id: str, reason: str) -> None: + super().__init__( + f"Failed to persist workflow parameter '{parameter_key}' for workflow run {workflow_run_id} " + f"of workflow {workflow_id}. Reason: {reason}" + ) + + class InvalidCredentialId(SkyvernHTTPException): def __init__(self, credential_id: str) -> None: super().__init__( diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index f3e85823..77ad44a5 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -12,6 +12,8 @@ from typing import Any, Literal, cast import httpx import structlog +from asyncpg.exceptions import NotNullViolationError +from sqlalchemy.exc import IntegrityError, SQLAlchemyError import skyvern from skyvern import analytics @@ -32,6 +34,7 @@ from skyvern.exceptions import ( WorkflowNotFound, WorkflowNotFoundForWorkflowRun, WorkflowRunNotFound, + WorkflowRunParameterPersistenceError, ) from skyvern.forge import app from skyvern.forge.prompts import prompt_engine @@ -484,19 +487,35 @@ class WorkflowService: request_body_value = workflow_request.data[workflow_parameter.key] if workflow_parameter.workflow_parameter_type == WorkflowParameterType.CREDENTIAL_ID: await self._validate_credential_id(str(request_body_value), organization) - await self.create_workflow_run_parameter( - workflow_run_id=workflow_run.workflow_run_id, - workflow_parameter=workflow_parameter, - value=request_body_value, - ) + try: + await self.create_workflow_run_parameter( + workflow_run_id=workflow_run.workflow_run_id, + workflow_parameter=workflow_parameter, + value=request_body_value, + ) + except SQLAlchemyError as parameter_error: + raise WorkflowRunParameterPersistenceError( + parameter_key=workflow_parameter.key, + workflow_id=workflow.workflow_permanent_id, + workflow_run_id=workflow_run.workflow_run_id, + reason=self._format_parameter_persistence_error(parameter_error), + ) from parameter_error elif workflow_parameter.default_value is not None: if workflow_parameter.workflow_parameter_type == WorkflowParameterType.CREDENTIAL_ID: await self._validate_credential_id(str(workflow_parameter.default_value), organization) - await self.create_workflow_run_parameter( - workflow_run_id=workflow_run.workflow_run_id, - workflow_parameter=workflow_parameter, - value=workflow_parameter.default_value, - ) + try: + await self.create_workflow_run_parameter( + workflow_run_id=workflow_run.workflow_run_id, + workflow_parameter=workflow_parameter, + value=workflow_parameter.default_value, + ) + except SQLAlchemyError as parameter_error: + raise WorkflowRunParameterPersistenceError( + parameter_key=workflow_parameter.key, + workflow_id=workflow.workflow_permanent_id, + workflow_run_id=workflow_run.workflow_run_id, + reason=self._format_parameter_persistence_error(parameter_error), + ) from parameter_error else: raise MissingValueForParameter( parameter_key=workflow_parameter.key, @@ -528,6 +547,14 @@ class WorkflowService: return workflow_run + @staticmethod + def _format_parameter_persistence_error(error: SQLAlchemyError) -> str: + if isinstance(error, IntegrityError): + orig_error = getattr(error, "orig", None) + if isinstance(orig_error, NotNullViolationError): + return "value cannot be null" + return "database error while saving parameter value" + async def auto_create_browser_session_if_needed( self, organization_id: str,