Webhook Replay Test URL (#3769)

Co-authored-by: Shuchang Zheng <wintonzheng0325@gmail.com>
This commit is contained in:
Marc Kelechava
2025-10-22 14:26:14 -07:00
committed by GitHub
parent e8472df6d1
commit aeefc301ed
8 changed files with 964 additions and 12 deletions

View File

@@ -49,6 +49,26 @@ class ProxyLocationNotSupportedError(SkyvernException):
super().__init__(f"Unknown proxy location: {proxy_location}")
class WebhookReplayError(SkyvernHTTPException):
def __init__(
self,
message: str | None = None,
*,
status_code: int = status.HTTP_400_BAD_REQUEST,
):
super().__init__(message=message or "Webhook replay failed.", status_code=status_code)
class MissingWebhookTarget(WebhookReplayError):
def __init__(self, message: str | None = None):
super().__init__(message or "No webhook URL configured for the run.")
class MissingApiKey(WebhookReplayError):
def __init__(self, message: str | None = None):
super().__init__(message or "Organization does not have a valid API key configured.")
class TaskNotFound(SkyvernHTTPException):
def __init__(self, task_id: str | None = None):
super().__init__(f"Task {task_id} not found", status_code=status.HTTP_404_NOT_FOUND)

View File

@@ -2,17 +2,36 @@ from time import perf_counter
import httpx
import structlog
from fastapi import Depends
from fastapi import Depends, HTTPException, status
from skyvern.exceptions import BlockedHost, SkyvernHTTPException
from skyvern.exceptions import (
BlockedHost,
MissingApiKey,
MissingWebhookTarget,
SkyvernHTTPException,
TaskNotFound,
WebhookReplayError,
WorkflowRunNotFound,
)
from skyvern.forge import app
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers
from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType
from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router
from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.services import org_auth_service
from skyvern.schemas.webhooks import TestWebhookRequest, TestWebhookResponse
from skyvern.services.webhook_service import build_sample_task_payload, build_sample_workflow_run_payload
from skyvern.schemas.webhooks import (
RunWebhookPreviewResponse,
RunWebhookReplayRequest,
RunWebhookReplayResponse,
TestWebhookRequest,
TestWebhookResponse,
)
from skyvern.services.webhook_service import (
build_run_preview,
build_sample_task_payload,
build_sample_workflow_run_payload,
replay_run_webhook,
)
from skyvern.utils.url_validators import validate_url
LOG = structlog.get_logger()
@@ -174,3 +193,114 @@ async def test_webhook(
headers_sent=headers,
error=error,
)
@legacy_base_router.get(
"/internal/runs/{run_id}/test-webhook",
tags=["Internal"],
response_model=RunWebhookPreviewResponse,
summary="Preview webhook replay payload",
include_in_schema=False,
)
@base_router.get(
"/internal/runs/{run_id}/test-webhook",
tags=["Internal"],
response_model=RunWebhookPreviewResponse,
summary="Preview webhook replay payload",
include_in_schema=False,
)
async def preview_webhook_replay(
run_id: str,
current_org: Organization = Depends(org_auth_service.get_current_org), # noqa: B008
) -> RunWebhookPreviewResponse:
"""Return the replay payload preview for a completed run.
Args:
run_id (str): Identifier of the run to preview.
current_org (Organization): Organization context for permission and signing.
Returns:
RunWebhookPreviewResponse: Payload and headers that would be used to replay the webhook.
Raises:
HTTPException: 400 if the organization lacks a valid API key or other replay preconditions fail.
HTTPException: 404 if the specified run cannot be found.
HTTPException: 500 if an unexpected error occurs while building the preview.
"""
try:
return await build_run_preview(
organization_id=current_org.organization_id,
run_id=run_id,
)
except MissingApiKey as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except (TaskNotFound, WorkflowRunNotFound) as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except WebhookReplayError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except SkyvernHTTPException as exc:
raise HTTPException(status_code=exc.status_code, detail=str(exc)) from exc
except Exception as exc: # pragma: no cover - defensive guard
LOG.error("Failed to build webhook replay preview", run_id=run_id, error=str(exc), exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to build webhook replay preview.",
) from exc
@legacy_base_router.post(
"/internal/runs/{run_id}/test-webhook",
tags=["Internal"],
response_model=RunWebhookReplayResponse,
summary="Replay webhook for a completed run",
include_in_schema=False,
)
@base_router.post(
"/internal/runs/{run_id}/test-webhook",
tags=["Internal"],
response_model=RunWebhookReplayResponse,
summary="Replay webhook for a completed run",
include_in_schema=False,
)
async def trigger_webhook_replay(
run_id: str,
request: RunWebhookReplayRequest,
current_org: Organization = Depends(org_auth_service.get_current_org), # noqa: B008
) -> RunWebhookReplayResponse:
"""Replay a completed run's webhook to the stored or override URL.
Args:
run_id (str): Identifier of the run whose webhook should be replayed.
request (RunWebhookReplayRequest): Optional override URL details for the replay.
current_org (Organization): Organization context for permission and signing.
Returns:
RunWebhookReplayResponse: Delivery status information for the replay attempt.
Raises:
HTTPException: 400 if no target URL is available, the organization lacks an API key, or replay validation fails.
HTTPException: 404 if the specified run cannot be found.
HTTPException: 500 if an unexpected error occurs while replaying the webhook.
"""
try:
return await replay_run_webhook(
organization_id=current_org.organization_id,
run_id=run_id,
target_url=request.override_webhook_url,
)
except MissingWebhookTarget as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except MissingApiKey as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except (TaskNotFound, WorkflowRunNotFound) as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except WebhookReplayError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except SkyvernHTTPException as exc:
raise HTTPException(status_code=exc.status_code, detail=str(exc)) from exc
except Exception as exc: # pragma: no cover - defensive guard
LOG.error("Failed to replay webhook", run_id=run_id, error=str(exc), exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to replay webhook.",
) from exc

View File

@@ -15,3 +15,31 @@ class TestWebhookResponse(BaseModel):
response_body: str = Field(..., description="First 2KB of the response body")
headers_sent: dict[str, str] = Field(..., description="Headers sent with the webhook request")
error: str | None = Field(None, description="Error message if the request failed")
class RunWebhookPreviewResponse(BaseModel):
run_id: str = Field(..., description="Identifier of the run whose payload is being replayed")
run_type: str = Field(..., description="Run type associated with the payload")
default_webhook_url: str | None = Field(None, description="Webhook URL stored on the original run configuration")
payload: str = Field(..., description="JSON payload that was delivered when the run completed")
headers: dict[str, str] = Field(..., description="Signed headers that would accompany the replayed webhook")
class RunWebhookReplayRequest(BaseModel):
override_webhook_url: str | None = Field(
None,
description="Optional webhook URL to send the payload to instead of the stored configuration",
)
class RunWebhookReplayResponse(BaseModel):
run_id: str = Field(..., description="Identifier of the run that was replayed")
run_type: str = Field(..., description="Run type associated with the payload")
default_webhook_url: str | None = Field(None, description="Webhook URL stored on the original run configuration")
target_webhook_url: str | None = Field(None, description="Webhook URL that the replay attempted to reach")
payload: str = Field(..., description="JSON payload that was delivered during the replay attempt")
headers: dict[str, str] = Field(..., description="Signed headers that were generated for the replay attempt")
status_code: int | None = Field(None, description="HTTP status code returned by the webhook receiver, if available")
latency_ms: int | None = Field(None, description="Round-trip latency in milliseconds for the replay attempt")
response_body: str | None = Field(None, description="Body returned by the webhook receiver (truncated to 2KB)")
error: str | None = Field(None, description="Error message if the replay attempt failed")

View File

@@ -1,8 +1,34 @@
import json
from datetime import datetime, timezone
from __future__ import annotations
from skyvern.forge.sdk.schemas.tasks import TaskRequest, TaskResponse, TaskStatus
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunResponseBase, WorkflowRunStatus
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from time import perf_counter
import httpx
import structlog
from fastapi import status
from skyvern.config import settings
from skyvern.exceptions import (
BlockedHost,
MissingApiKey,
MissingWebhookTarget,
SkyvernHTTPException,
TaskNotFound,
WebhookReplayError,
WorkflowRunNotFound,
)
from skyvern.forge import app
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers
from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType
from skyvern.forge.sdk.schemas.task_v2 import TaskV2
from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus
from skyvern.forge.sdk.workflow.models.workflow import (
WorkflowRun,
WorkflowRunResponseBase,
WorkflowRunStatus,
)
from skyvern.schemas.runs import (
ProxyLocation,
RunStatus,
@@ -12,6 +38,13 @@ from skyvern.schemas.runs import (
WorkflowRunRequest,
WorkflowRunResponse,
)
from skyvern.schemas.webhooks import RunWebhookPreviewResponse, RunWebhookReplayResponse
from skyvern.services import run_service, task_v2_service
from skyvern.utils.url_validators import validate_url
LOG = structlog.get_logger()
RESPONSE_BODY_TRUNCATION_LIMIT = 2048
def _now() -> datetime:
@@ -163,3 +196,326 @@ def build_sample_workflow_run_payload(run_id: str | None = None) -> str:
payload_dict.update(json.loads(workflow_run_response.model_dump_json(exclude_unset=True)))
return json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False)
@dataclass
class _WebhookPayload:
run_id: str
run_type: str
payload: str
default_webhook_url: str | None
async def build_run_preview(organization_id: str, run_id: str) -> RunWebhookPreviewResponse:
"""Return the payload and headers that would be used for a replay."""
payload = await _build_webhook_payload(organization_id=organization_id, run_id=run_id)
api_key = await _get_api_key(organization_id=organization_id)
headers = generate_skyvern_webhook_headers(payload=payload.payload, api_key=api_key)
return RunWebhookPreviewResponse(
run_id=payload.run_id,
run_type=payload.run_type,
default_webhook_url=payload.default_webhook_url,
payload=payload.payload,
headers=headers,
)
async def replay_run_webhook(organization_id: str, run_id: str, target_url: str | None) -> RunWebhookReplayResponse:
"""
Send the webhook payload for a run to either the stored URL or a caller-provided override.
"""
payload = await _build_webhook_payload(organization_id=organization_id, run_id=run_id)
api_key = await _get_api_key(organization_id=organization_id)
headers = generate_skyvern_webhook_headers(payload=payload.payload, api_key=api_key)
url_to_use: str | None = target_url if target_url else payload.default_webhook_url
if not url_to_use:
raise MissingWebhookTarget()
validated_url = _validate_target_url(url_to_use)
status_code, latency_ms, response_body, error = await _deliver_webhook(
url=validated_url,
payload=payload.payload,
headers=headers,
)
return RunWebhookReplayResponse(
run_id=payload.run_id,
run_type=payload.run_type,
default_webhook_url=payload.default_webhook_url,
target_webhook_url=validated_url,
payload=payload.payload,
headers=headers,
status_code=status_code,
latency_ms=latency_ms,
response_body=response_body,
error=error,
)
async def _build_webhook_payload(organization_id: str, run_id: str) -> _WebhookPayload:
run = await app.DATABASE.get_run(run_id, organization_id=organization_id)
if not run:
# Attempt to resolve task v2 runs that may not yet be in the runs table.
task_v2 = await app.DATABASE.get_task_v2(run_id, organization_id=organization_id)
if task_v2:
return await _build_task_v2_payload(task_v2)
workflow_run = await app.DATABASE.get_workflow_run(
workflow_run_id=run_id,
organization_id=organization_id,
)
if workflow_run:
return await _build_workflow_payload(
organization_id=organization_id,
workflow_run_id=run_id,
)
raise SkyvernHTTPException(
f"Run {run_id} not found",
status_code=status.HTTP_404_NOT_FOUND,
)
run_type = _as_run_type_str(run.task_run_type)
if run.task_run_type in {
RunType.task_v1,
RunType.openai_cua,
RunType.anthropic_cua,
RunType.ui_tars,
}:
return await _build_task_payload(
organization_id=organization_id,
run_id=run.run_id,
run_type_str=run_type,
)
if run.task_run_type == RunType.task_v2:
task_v2 = await app.DATABASE.get_task_v2(run.run_id, organization_id=organization_id)
if not task_v2:
raise SkyvernHTTPException(
f"Task v2 run {run_id} missing task record",
status_code=status.HTTP_404_NOT_FOUND,
)
return await _build_task_v2_payload(task_v2)
if run.task_run_type == RunType.workflow_run:
return await _build_workflow_payload(organization_id=organization_id, workflow_run_id=run.run_id)
raise WebhookReplayError(f"Run type {run_type} is not supported for webhook replay.")
async def _build_task_payload(organization_id: str, run_id: str, run_type_str: str) -> _WebhookPayload:
task: Task | None = await app.DATABASE.get_task(run_id, organization_id=organization_id)
if not task:
raise TaskNotFound(task_id=run_id)
if not task.status.is_final():
LOG.warning(
"Webhook replay requested for non-terminal task run",
run_id=run_id,
status=task.status,
)
raise WebhookReplayError(f"Run {run_id} has not reached a terminal state (status={task.status}).")
latest_step = await app.DATABASE.get_latest_step(run_id, organization_id=organization_id)
task_response = await app.agent.build_task_response(task=task, last_step=latest_step)
payload_dict = json.loads(task_response.model_dump_json(exclude={"request"}))
run_response = await run_service.get_run_response(run_id=run_id, organization_id=organization_id)
if isinstance(run_response, TaskRunResponse):
if not run_response.status.is_final():
LOG.warning(
"Webhook replay requested for non-terminal task run response",
run_id=run_id,
status=run_response.status,
)
raise WebhookReplayError(f"Run {run_id} has not reached a terminal state (status={run_response.status}).")
run_response_json = run_response.model_dump_json(exclude={"run_request"})
payload_dict.update(json.loads(run_response_json))
payload = json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False)
return _WebhookPayload(
run_id=run_id,
run_type=run_type_str,
payload=payload,
default_webhook_url=task.webhook_callback_url,
)
async def _build_task_v2_payload(task_v2: TaskV2) -> _WebhookPayload:
if not task_v2.status.is_final():
LOG.warning(
"Webhook replay requested for non-terminal task v2 run",
run_id=task_v2.observer_cruise_id,
status=task_v2.status,
)
raise WebhookReplayError(
f"Run {task_v2.observer_cruise_id} has not reached a terminal state (status={task_v2.status})."
)
task_run_response = await task_v2_service.build_task_v2_run_response(task_v2)
if not task_run_response.status.is_final():
LOG.warning(
"Webhook replay requested for non-terminal task v2 run response",
run_id=task_v2.observer_cruise_id,
status=task_run_response.status,
)
raise WebhookReplayError(
f"Run {task_v2.observer_cruise_id} has not reached a terminal state (status={task_run_response.status})."
)
task_run_response_json = task_run_response.model_dump_json(exclude={"run_request"})
payload = json.dumps(json.loads(task_run_response_json), separators=(",", ":"), ensure_ascii=False)
return _WebhookPayload(
run_id=task_v2.observer_cruise_id,
run_type=RunType.task_v2.value,
payload=payload,
default_webhook_url=task_v2.webhook_callback_url,
)
async def _build_workflow_payload(
organization_id: str,
workflow_run_id: str,
) -> _WebhookPayload:
workflow_run: WorkflowRun | None = await app.DATABASE.get_workflow_run(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
if not workflow_run:
raise WorkflowRunNotFound(workflow_run_id=workflow_run_id)
if not workflow_run.status.is_final():
LOG.warning(
"Webhook replay requested for non-terminal workflow run",
workflow_run_id=workflow_run_id,
status=workflow_run.status,
)
raise WebhookReplayError(
f"Run {workflow_run_id} has not reached a terminal state (status={workflow_run.status})."
)
status_response = await app.WORKFLOW_SERVICE.build_workflow_run_status_response(
workflow_permanent_id=workflow_run.workflow_permanent_id,
workflow_run_id=workflow_run.workflow_run_id,
organization_id=workflow_run.organization_id,
)
if not status_response.status.is_final():
LOG.warning(
"Webhook replay requested for non-terminal workflow run response",
workflow_run_id=workflow_run_id,
status=status_response.status,
)
raise WebhookReplayError(
f"Run {workflow_run_id} has not reached a terminal state (status={status_response.status})."
)
app_url = (
f"{settings.SKYVERN_APP_URL.rstrip('/')}/workflows/"
f"{workflow_run.workflow_permanent_id}/{workflow_run.workflow_run_id}"
)
run_response = WorkflowRunResponse(
run_id=workflow_run.workflow_run_id,
run_type=RunType.workflow_run,
status=RunStatus(status_response.status),
output=status_response.outputs,
downloaded_files=status_response.downloaded_files,
recording_url=status_response.recording_url,
screenshot_urls=status_response.screenshot_urls,
failure_reason=status_response.failure_reason,
app_url=app_url,
script_run=status_response.script_run,
created_at=status_response.created_at,
modified_at=status_response.modified_at,
errors=status_response.errors,
)
payload_dict = json.loads(
status_response.model_dump_json(
exclude={
"webhook_callback_url",
"totp_verification_url",
"totp_identifier",
"extra_http_headers",
}
)
)
payload_dict.update(json.loads(run_response.model_dump_json(exclude={"run_request"})))
payload = json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False)
return _WebhookPayload(
run_id=workflow_run.workflow_run_id,
run_type=RunType.workflow_run.value,
payload=payload,
default_webhook_url=workflow_run.webhook_callback_url,
)
async def _get_api_key(organization_id: str) -> str:
api_key_obj = await app.DATABASE.get_valid_org_auth_token(
organization_id,
OrganizationAuthTokenType.api.value,
)
if not api_key_obj or not api_key_obj.token:
raise MissingApiKey()
return api_key_obj.token
async def _deliver_webhook(
url: str, payload: str, headers: dict[str, str]
) -> tuple[int | None, int, str | None, str | None]:
start = perf_counter()
status_code: int | None = None
response_body: str | None = None
error: str | None = None
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, content=payload, headers=headers, timeout=httpx.Timeout(10.0))
status_code = response.status_code
body_text = response.text or ""
if len(body_text) > RESPONSE_BODY_TRUNCATION_LIMIT:
response_body = f"{body_text[:RESPONSE_BODY_TRUNCATION_LIMIT]}\n... (truncated)"
else:
response_body = body_text or None
except httpx.TimeoutException:
error = "Request timed out after 10 seconds."
LOG.warning("Webhook replay timed out", url=url)
except httpx.NetworkError as exc:
error = f"Could not reach URL: {exc}"
LOG.warning("Webhook replay network error", url=url, error=str(exc))
except Exception as exc: # pragma: no cover - defensive guard
error = f"Unexpected error: {exc}"
LOG.error("Webhook replay unexpected error", url=url, error=str(exc), exc_info=True)
latency_ms = int((perf_counter() - start) * 1000)
return status_code, latency_ms, response_body, error
def _as_run_type_str(run_type: RunType | str | None) -> str:
if isinstance(run_type, RunType):
return run_type.value
if isinstance(run_type, str):
return run_type
return "unknown"
def _validate_target_url(url: str) -> str:
try:
validated_url = validate_url(url)
if not validated_url:
raise SkyvernHTTPException("Invalid webhook URL.", status_code=status.HTTP_400_BAD_REQUEST)
return validated_url
except BlockedHost as exc:
raise SkyvernHTTPException(
message=(
f"This URL is blocked by SSRF protection. {str(exc)} "
"Add the host to ALLOWED_HOSTS to test internal endpoints or use an external receiver "
"such as webhook.site or requestbin.com."
),
status_code=status.HTTP_400_BAD_REQUEST,
) from exc
except SkyvernHTTPException:
raise
except Exception as exc: # pragma: no cover - defensive guard
LOG.error("Unexpected error validating webhook URL", url=url, error=str(exc))
raise SkyvernHTTPException(
"Unexpected error while validating the webhook URL.",
status_code=status.HTTP_400_BAD_REQUEST,
) from exc