Allow testing webhook response in setup flow (#3768)

This commit is contained in:
Marc Kelechava
2025-10-20 17:35:52 -07:00
committed by GitHub
parent a11189361b
commit 94aa66c241
10 changed files with 820 additions and 32 deletions

View File

@@ -8,3 +8,4 @@ from skyvern.forge.sdk.routes import scripts # noqa: F401
from skyvern.forge.sdk.routes import streaming # noqa: F401
from skyvern.forge.sdk.routes import streaming_commands # noqa: F401
from skyvern.forge.sdk.routes import streaming_vnc # noqa: F401
from skyvern.forge.sdk.routes import webhooks # noqa: F401

View File

@@ -0,0 +1,176 @@
from time import perf_counter
import httpx
import structlog
from fastapi import Depends
from skyvern.exceptions import BlockedHost, SkyvernHTTPException
from skyvern.forge import app
from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers
from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType
from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router
from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.services import org_auth_service
from skyvern.schemas.webhooks import TestWebhookRequest, TestWebhookResponse
from skyvern.services.webhook_service import build_sample_task_payload, build_sample_workflow_run_payload
from skyvern.utils.url_validators import validate_url
LOG = structlog.get_logger()
@legacy_base_router.post(
"/internal/test-webhook",
tags=["Internal"],
description="Test a webhook endpoint by sending a sample payload",
summary="Test webhook endpoint",
include_in_schema=False,
)
@base_router.post(
"/internal/test-webhook",
tags=["Internal"],
description="Test a webhook endpoint by sending a sample payload",
summary="Test webhook endpoint",
include_in_schema=False,
)
async def test_webhook(
request: TestWebhookRequest,
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> TestWebhookResponse:
"""
Test a webhook endpoint by sending a sample signed payload.
This endpoint allows users to:
- Validate their webhook receiver can be reached
- Test HMAC signature verification
- See the exact headers and payload format Skyvern sends
The endpoint respects SSRF protection (BLOCKED_HOSTS, private IPs) and will return
a helpful error message if the URL is blocked.
"""
start_time = perf_counter()
# Validate the URL (raises BlockedHost or SkyvernHTTPException for invalid URLs)
try:
validated_url = validate_url(request.webhook_url)
if not validated_url:
return TestWebhookResponse(
status_code=None,
latency_ms=0,
response_body="",
headers_sent={},
error="Invalid webhook URL",
)
except BlockedHost as exc:
blocked_host: str | None = getattr(exc, "host", None)
return TestWebhookResponse(
status_code=None,
latency_ms=0,
response_body="",
headers_sent={},
error=(
f"This URL is blocked by SSRF protection (host: {blocked_host or 'unknown'}). "
"Add the host to ALLOWED_HOSTS to test internal endpoints or use an external receiver "
"such as webhook.site or requestbin.com."
),
)
except SkyvernHTTPException as exc:
error_message = getattr(exc, "message", None) or "Invalid webhook URL. Use http(s) and a valid host."
return TestWebhookResponse(
status_code=None,
latency_ms=0,
response_body="",
headers_sent={},
error=error_message,
)
except Exception as exc: # pragma: no cover - defensive guard
LOG.exception("Error validating webhook URL", error=str(exc), webhook_url=request.webhook_url)
return TestWebhookResponse(
status_code=None,
latency_ms=0,
response_body="",
headers_sent={},
error="Unexpected error while validating the webhook URL.",
)
# Build the sample payload based on run type
try:
if request.run_type == "task":
payload = build_sample_task_payload(run_id=request.run_id)
else: # workflow_run
payload = build_sample_workflow_run_payload(run_id=request.run_id)
except Exception as e:
LOG.exception("Error building sample payload", error=str(e), run_type=request.run_type)
return TestWebhookResponse(
status_code=None,
latency_ms=0,
response_body="",
headers_sent={},
error=f"Failed to build sample payload: {str(e)}",
)
# Get the organization's API key to sign the webhook
# For testing, we use a placeholder if no API key is available
api_key_obj = await app.DATABASE.get_valid_org_auth_token(
current_org.organization_id,
OrganizationAuthTokenType.api.value,
)
api_key = api_key_obj.token if api_key_obj else "test_api_key_placeholder"
headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key)
# Send the webhook request
status_code = None
response_body = ""
error = None
try:
async with httpx.AsyncClient() as client:
response = await client.post(
validated_url,
content=payload,
headers=headers,
timeout=httpx.Timeout(10.0),
)
status_code = response.status_code
# Capture first 2KB of response body
response_text = response.text
if len(response_text) > 2048:
response_body = response_text[:2048] + "\n... (truncated)"
else:
response_body = response_text
except httpx.TimeoutException:
error = "Request timed out after 10 seconds."
LOG.warning(
"Test webhook timeout",
organization_id=current_org.organization_id,
webhook_url=validated_url,
)
except httpx.NetworkError as exc:
error = f"Could not reach URL: {exc}"
LOG.warning(
"Test webhook network error",
organization_id=current_org.organization_id,
webhook_url=validated_url,
error=str(exc),
)
except Exception as exc: # pragma: no cover - defensive guard
error = f"Unexpected error: {exc}"
LOG.error(
"Test webhook unexpected error",
organization_id=current_org.organization_id,
webhook_url=validated_url,
error=str(exc),
exc_info=True,
)
latency_ms = int((perf_counter() - start_time) * 1000)
return TestWebhookResponse(
status_code=status_code,
latency_ms=latency_ms,
response_body=response_body,
headers_sent=headers,
error=error,
)

View File

@@ -0,0 +1,17 @@
from typing import Literal
from pydantic import BaseModel, Field
class TestWebhookRequest(BaseModel):
webhook_url: str = Field(..., description="The webhook URL to test")
run_type: Literal["task", "workflow_run"] = Field(..., description="Type of run to simulate")
run_id: str | None = Field(None, description="Optional run ID to include in the sample payload")
class TestWebhookResponse(BaseModel):
status_code: int | None = Field(None, description="HTTP status code from the webhook receiver")
latency_ms: int = Field(..., description="Round-trip time in milliseconds")
response_body: str = Field(..., description="First 2KB of the response body")
headers_sent: dict[str, str] = Field(..., description="Headers sent with the webhook request")
error: str | None = Field(None, description="Error message if the request failed")

View File

@@ -0,0 +1,165 @@
import json
from datetime import datetime, timezone
from skyvern.forge.sdk.schemas.tasks import TaskRequest, TaskResponse, TaskStatus
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunResponseBase, WorkflowRunStatus
from skyvern.schemas.runs import (
ProxyLocation,
RunStatus,
RunType,
TaskRunRequest,
TaskRunResponse,
WorkflowRunRequest,
WorkflowRunResponse,
)
def _now() -> datetime:
return datetime.now(timezone.utc)
def build_sample_task_payload(run_id: str | None = None) -> str:
"""
Build a sample task webhook payload using the real TaskResponse + TaskRunResponse models
so schema changes are reflected automatically.
"""
task_id = run_id or "tsk_sample_123456789"
now = _now()
task_request = TaskRequest(
url="https://example.com/start",
webhook_callback_url="https://webhook.example.com/receive",
navigation_goal="Visit the sample site and capture details",
data_extraction_goal="Collect sample output data",
navigation_payload={"sample_field": "sample_value"},
proxy_location=ProxyLocation.RESIDENTIAL,
extra_http_headers={"x-sample-header": "value"},
)
task_response = TaskResponse(
request=task_request,
task_id=task_id,
status=TaskStatus.completed,
created_at=now,
modified_at=now,
queued_at=now,
started_at=now,
finished_at=now,
extracted_information={
"sample_field": "sample_value",
"example_data": "This is sample extracted data from the task",
},
action_screenshot_urls=[
"https://example.com/screenshots/task-action-1.png",
"https://example.com/screenshots/task-action-2.png",
],
screenshot_url="https://example.com/screenshots/task-final.png",
recording_url="https://example.com/recordings/task.mp4",
downloaded_files=[],
downloaded_file_urls=[],
errors=[],
max_steps_per_run=10,
)
payload_dict = json.loads(task_response.model_dump_json(exclude={"request"}))
task_run_response = TaskRunResponse(
run_id=task_id,
run_type=RunType.task_v1,
status=RunStatus.completed,
output=payload_dict.get("extracted_information"),
downloaded_files=None,
recording_url=payload_dict.get("recording_url"),
screenshot_urls=payload_dict.get("action_screenshot_urls"),
failure_reason=payload_dict.get("failure_reason"),
created_at=now,
modified_at=now,
queued_at=now,
started_at=now,
finished_at=now,
app_url=f"https://app.skyvern.com/tasks/{task_id}",
browser_session_id="pbs_sample_123456",
max_screenshot_scrolls=payload_dict.get("max_screenshot_scrolls"),
script_run=None,
errors=payload_dict.get("errors"),
run_request=TaskRunRequest(
prompt="Visit the sample site and collect information",
url=task_request.url,
webhook_url=task_request.webhook_callback_url,
data_extraction_schema=task_request.extracted_information_schema,
error_code_mapping=task_request.error_code_mapping,
proxy_location=task_request.proxy_location,
extra_http_headers=task_request.extra_http_headers,
browser_session_id=None,
),
)
payload_dict.update(json.loads(task_run_response.model_dump_json(exclude_unset=True)))
return json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False)
def build_sample_workflow_run_payload(run_id: str | None = None) -> str:
"""
Build a sample workflow webhook payload using the real WorkflowRunResponseBase + WorkflowRunResponse models
so schema changes are reflected automatically.
"""
workflow_run_id = run_id or "wr_sample_123456789"
workflow_id = "wpid_sample_123"
now = _now()
workflow_base = WorkflowRunResponseBase(
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.completed,
proxy_location=ProxyLocation.RESIDENTIAL,
webhook_callback_url="https://webhook.example.com/receive",
queued_at=now,
started_at=now,
finished_at=now,
created_at=now,
modified_at=now,
parameters={"sample_param": "sample_value"},
screenshot_urls=["https://example.com/screenshots/workflow-step.png"],
recording_url="https://example.com/recordings/workflow.mp4",
downloaded_files=[],
downloaded_file_urls=[],
outputs={"result": "success", "data": "Sample workflow output"},
total_steps=5,
total_cost=0.05,
workflow_title="Sample Workflow",
browser_session_id="pbs_sample_123456",
errors=[],
)
payload_dict = json.loads(workflow_base.model_dump_json())
workflow_run_response = WorkflowRunResponse(
run_id=workflow_run_id,
run_type=RunType.workflow_run,
status=RunStatus.completed,
output=payload_dict.get("outputs"),
downloaded_files=None,
recording_url=payload_dict.get("recording_url"),
screenshot_urls=payload_dict.get("screenshot_urls"),
failure_reason=payload_dict.get("failure_reason"),
created_at=now,
modified_at=now,
queued_at=payload_dict.get("queued_at"),
started_at=payload_dict.get("started_at"),
finished_at=payload_dict.get("finished_at"),
app_url=f"https://app.skyvern.com/workflows/{workflow_id}/{workflow_run_id}",
browser_session_id=payload_dict.get("browser_session_id"),
max_screenshot_scrolls=payload_dict.get("max_screenshot_scrolls"),
script_run=None,
errors=payload_dict.get("errors"),
run_request=WorkflowRunRequest(
workflow_id=workflow_id,
title=payload_dict.get("workflow_title"),
parameters=payload_dict.get("parameters"),
proxy_location=ProxyLocation.RESIDENTIAL,
webhook_url=payload_dict.get("webhook_callback_url"),
),
)
payload_dict.update(json.loads(workflow_run_response.model_dump_json(exclude_unset=True)))
return json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False)