diff --git a/alembic/versions/2025_07_28_1606-1d0a10ae2a13_add_webhook_failure_reason.py b/alembic/versions/2025_07_28_1606-1d0a10ae2a13_add_webhook_failure_reason.py new file mode 100644 index 00000000..c6e38d5a --- /dev/null +++ b/alembic/versions/2025_07_28_1606-1d0a10ae2a13_add_webhook_failure_reason.py @@ -0,0 +1,35 @@ +"""add_webhook_failure_reason + +Revision ID: 1d0a10ae2a13 +Revises: 044b4a3c3dbc +Create Date: 2025-07-28 16:06:57.319749+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "1d0a10ae2a13" +down_revision: Union[str, None] = "044b4a3c3dbc" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("observer_cruises", sa.Column("webhook_failure_reason", sa.String(), nullable=True)) + op.add_column("tasks", sa.Column("webhook_failure_reason", sa.String(), nullable=True)) + op.add_column("workflow_runs", sa.Column("webhook_failure_reason", sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("workflow_runs", "webhook_failure_reason") + op.drop_column("tasks", "webhook_failure_reason") + op.drop_column("observer_cruises", "webhook_failure_reason") + # ### end Alembic commands ### diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index f57c66f9..282e2a75 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -2315,6 +2315,11 @@ class ForgeAgent: resp_code=resp.status_code, resp_text=resp.text, ) + await app.DATABASE.update_task( + task_id=task.task_id, + organization_id=task.organization_id, + webhook_failure_reason="", + ) else: LOG.info( "Webhook failed", @@ -2323,6 +2328,11 @@ class ForgeAgent: resp_code=resp.status_code, resp_text=resp.text, ) + await app.DATABASE.update_task( + task_id=task.task_id, + organization_id=task.organization_id, + webhook_failure_reason=f"Webhook failed with status code {resp.status_code}, error message: {resp.text}", + ) except Exception as e: raise FailedToSendWebhook(task_id=task.task_id) from e @@ -2542,6 +2552,7 @@ class ForgeAgent: status: TaskStatus, extracted_information: dict[str, Any] | list | str | None = None, failure_reason: str | None = None, + webhook_failure_reason: str | None = None, ) -> Task: # refresh task from db to get the latest status task_from_db = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 924bf47f..a40865ad 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -614,6 +614,7 @@ class AgentDB: task_id: str, status: TaskStatus | None = None, extracted_information: dict[str, Any] | list | str | None = None, + webhook_failure_reason: str | None = None, failure_reason: str | None = None, errors: list[dict[str, Any]] | None = None, max_steps_per_run: int | None = None, @@ -625,6 +626,7 @@ class AgentDB: and failure_reason is None and errors is None and max_steps_per_run is None + and webhook_failure_reason is None ): raise ValueError( "At least one of status, extracted_information, or failure_reason must be provided to update the task" @@ -652,6 +654,8 @@ class AgentDB: task.errors = errors if max_steps_per_run is not None: task.max_steps_per_run = max_steps_per_run + if webhook_failure_reason is not None: + task.webhook_failure_reason = webhook_failure_reason await session.commit() updated_task = await self.get_task(task_id, organization_id=organization_id) if not updated_task: @@ -1590,21 +1594,29 @@ class AgentDB: raise async def update_workflow_run( - self, workflow_run_id: str, status: WorkflowRunStatus, failure_reason: str | None = None + self, + workflow_run_id: str, + status: WorkflowRunStatus | None = None, + failure_reason: str | None = None, + webhook_failure_reason: str | None = None, ) -> WorkflowRun: async with self.Session() as session: workflow_run = ( await session.scalars(select(WorkflowRunModel).filter_by(workflow_run_id=workflow_run_id)) ).first() if workflow_run: - workflow_run.status = status - workflow_run.failure_reason = failure_reason - if status == WorkflowRunStatus.queued and workflow_run.queued_at is None: + if status: + workflow_run.status = status + if status and status == WorkflowRunStatus.queued and workflow_run.queued_at is None: workflow_run.queued_at = datetime.utcnow() - if status == WorkflowRunStatus.running and workflow_run.started_at is None: + if status and status == WorkflowRunStatus.running and workflow_run.started_at is None: workflow_run.started_at = datetime.utcnow() - if status.is_final() and workflow_run.finished_at is None: + if status and status.is_final() and workflow_run.finished_at is None: workflow_run.finished_at = datetime.utcnow() + if failure_reason: + workflow_run.failure_reason = failure_reason + if webhook_failure_reason is not None: + workflow_run.webhook_failure_reason = webhook_failure_reason await session.commit() await session.refresh(workflow_run) await save_workflow_run_logs(workflow_run_id) @@ -2667,6 +2679,7 @@ class AgentDB: summary: str | None = None, output: dict[str, Any] | None = None, organization_id: str | None = None, + webhook_failure_reason: str | None = None, ) -> TaskV2: async with self.Session() as session: task_v2 = ( @@ -2699,6 +2712,8 @@ class AgentDB: task_v2.summary = summary if output: task_v2.output = output + if webhook_failure_reason is not None: + task_v2.webhook_failure_reason = webhook_failure_reason await session.commit() await session.refresh(task_v2) return TaskV2.model_validate(task_v2) diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 46f75451..5f8577c6 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -65,6 +65,7 @@ class TaskModel(Base): browser_session_id = Column(String, nullable=True, index=True) status = Column(String, index=True) webhook_callback_url = Column(String) + webhook_failure_reason = Column(String, nullable=True) totp_verification_url = Column(String) totp_identifier = Column(String) title = Column(String) @@ -263,6 +264,7 @@ class WorkflowRunModel(Base): failure_reason = Column(String) proxy_location = Column(String) webhook_callback_url = Column(String) + webhook_failure_reason = Column(String, nullable=True) totp_verification_url = Column(String) totp_identifier = Column(String) max_screenshot_scrolling_times = Column(Integer, nullable=True) @@ -639,6 +641,7 @@ class TaskV2Model(Base): summary = Column(String, nullable=True) output = Column(JSON, nullable=True) webhook_callback_url = Column(String, nullable=True) + webhook_failure_reason = Column(String, nullable=True) totp_verification_url = Column(String, nullable=True) totp_identifier = Column(String, nullable=True) proxy_location = Column(String, nullable=True) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index 9ee309b4..1829de2a 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -120,6 +120,7 @@ def convert_to_task(task_obj: TaskModel, debug_enabled: bool = False, workflow_p terminate_criterion=task_obj.terminate_criterion, include_action_history_in_verification=task_obj.include_action_history_in_verification, webhook_callback_url=task_obj.webhook_callback_url, + webhook_failure_reason=task_obj.webhook_failure_reason, totp_verification_url=task_obj.totp_verification_url, totp_identifier=task_obj.totp_identifier, navigation_goal=task_obj.navigation_goal, @@ -276,6 +277,7 @@ def convert_to_workflow_run( ProxyLocation(workflow_run_model.proxy_location) if workflow_run_model.proxy_location else None ), webhook_callback_url=workflow_run_model.webhook_callback_url, + webhook_failure_reason=workflow_run_model.webhook_failure_reason, totp_verification_url=workflow_run_model.totp_verification_url, totp_identifier=workflow_run_model.totp_identifier, queued_at=workflow_run_model.queued_at, diff --git a/skyvern/forge/sdk/schemas/task_v2.py b/skyvern/forge/sdk/schemas/task_v2.py index 5169504f..c83a7351 100644 --- a/skyvern/forge/sdk/schemas/task_v2.py +++ b/skyvern/forge/sdk/schemas/task_v2.py @@ -42,6 +42,7 @@ class TaskV2(BaseModel): totp_identifier: str | None = None proxy_location: ProxyLocation | None = None webhook_callback_url: str | None = None + webhook_failure_reason: str | None = None extracted_information_schema: dict | list | str | None = None error_code_mapping: dict | None = None model: dict[str, Any] | None = None diff --git a/skyvern/forge/sdk/schemas/tasks.py b/skyvern/forge/sdk/schemas/tasks.py index c69ed153..5b0f6557 100644 --- a/skyvern/forge/sdk/schemas/tasks.py +++ b/skyvern/forge/sdk/schemas/tasks.py @@ -38,6 +38,10 @@ class TaskBase(BaseModel): description="The URL to call when the task is completed.", examples=["https://my-webhook.com"], ) + webhook_failure_reason: str | None = Field( + default=None, + description="The reason for the webhook failure.", + ) totp_verification_url: str | None = None totp_identifier: str | None = None navigation_goal: str | None = Field( @@ -314,6 +318,7 @@ class Task(TaskBase): finished_at=self.finished_at, extracted_information=self.extracted_information, failure_reason=failure_reason or self.failure_reason, + webhook_failure_reason=self.webhook_failure_reason, action_screenshot_urls=action_screenshot_urls, screenshot_url=screenshot_url, recording_url=recording_url, @@ -341,6 +346,7 @@ class TaskResponse(BaseModel): downloaded_files: list[FileInfo] | None = None downloaded_file_urls: list[str] | None = None failure_reason: str | None = None + webhook_failure_reason: str | None = None errors: list[dict[str, Any]] = [] max_steps_per_run: int | None = None workflow_run_id: str | None = None diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index e959247b..2beefe8b 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -116,6 +116,7 @@ class WorkflowRun(BaseModel): extra_http_headers: dict[str, str] | None = None proxy_location: ProxyLocation | None = None webhook_callback_url: str | None = None + webhook_failure_reason: str | None = None totp_verification_url: str | None = None totp_identifier: str | None = None failure_reason: str | None = None @@ -151,6 +152,7 @@ class WorkflowRunResponseBase(BaseModel): failure_reason: str | None = None proxy_location: ProxyLocation | None = None webhook_callback_url: str | None = None + webhook_failure_reason: str | None = None totp_verification_url: str | None = None totp_identifier: str | None = None extra_http_headers: dict[str, str] | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 5090c73f..63613dd6 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -1226,6 +1226,7 @@ class WorkflowService: failure_reason=workflow_run.failure_reason, proxy_location=workflow_run.proxy_location, webhook_callback_url=workflow_run.webhook_callback_url, + webhook_failure_reason=workflow_run.webhook_failure_reason, totp_verification_url=workflow_run.totp_verification_url, totp_identifier=workflow_run.totp_identifier, extra_http_headers=workflow_run.extra_http_headers, @@ -1390,6 +1391,10 @@ class WorkflowService: resp_code=resp.status_code, resp_text=resp.text, ) + await app.DATABASE.update_workflow_run( + workflow_run_id=workflow_run.workflow_run_id, + webhook_failure_reason="", + ) else: LOG.info( "Webhook failed", @@ -1400,6 +1405,10 @@ class WorkflowService: resp_code=resp.status_code, resp_text=resp.text, ) + await app.DATABASE.update_workflow_run( + workflow_run_id=workflow_run.workflow_run_id, + webhook_failure_reason=f"Webhook failed with status code {resp.status_code}, error message: {resp.text}", + ) except Exception as e: raise FailedToSendWebhook( workflow_id=workflow_id, diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index 27328c13..b725403a 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -1726,6 +1726,11 @@ async def send_task_v2_webhook(task_v2: TaskV2) -> None: resp_code=resp.status_code, resp_text=resp.text, ) + await app.DATABASE.update_task_v2( + task_v2_id=task_v2.observer_cruise_id, + organization_id=task_v2.organization_id, + webhook_failure_reason="", + ) else: LOG.info( "Task v2 webhook failed", @@ -1734,5 +1739,10 @@ async def send_task_v2_webhook(task_v2: TaskV2) -> None: resp_code=resp.status_code, resp_text=resp.text, ) + await app.DATABASE.update_task_v2( + task_v2_id=task_v2.observer_cruise_id, + organization_id=task_v2.organization_id, + webhook_failure_reason=f"Webhook failed with status code {resp.status_code}, error message: {resp.text}", + ) except Exception as e: raise FailedToSendWebhook(task_v2_id=task_v2.observer_cruise_id) from e