From 7ff809e50b3cfb99191037f8f474c224b01b6218 Mon Sep 17 00:00:00 2001 From: LawyZheng Date: Tue, 4 Nov 2025 11:29:14 +0800 Subject: [PATCH] refactor webhook signature (#3889) --- skyvern/core/totp.py | 17 ++--------- skyvern/forge/agent.py | 16 ++++++---- skyvern/forge/sdk/core/security.py | 44 ++++++++++++++++++++++----- skyvern/forge/sdk/routes/webhooks.py | 10 +++--- skyvern/forge/sdk/workflow/service.py | 20 ++++++------ skyvern/services/otp_service.py | 16 +++------- skyvern/services/task_v2_service.py | 14 +++++---- skyvern/services/webhook_service.py | 38 +++++++++++------------ 8 files changed, 95 insertions(+), 80 deletions(-) diff --git a/skyvern/core/totp.py b/skyvern/core/totp.py index 36366a46..273bc1c3 100644 --- a/skyvern/core/totp.py +++ b/skyvern/core/totp.py @@ -1,5 +1,4 @@ import asyncio -import json from datetime import datetime, timedelta import structlog @@ -8,7 +7,7 @@ from skyvern.config import settings from skyvern.exceptions import FailedToGetTOTPVerificationCode, NoTOTPVerificationCodeFound from skyvern.forge import app from skyvern.forge.sdk.core.aiohttp_helper import aiohttp_post -from skyvern.forge.sdk.core.security import generate_skyvern_signature +from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType from skyvern.forge.sdk.schemas.totp_codes import OTPType @@ -86,19 +85,9 @@ async def _get_verification_code_from_url( request_data["workflow_run_id"] = workflow_run_id if workflow_permanent_id: request_data["workflow_permanent_id"] = workflow_permanent_id - payload = json.dumps(request_data) - signature = generate_skyvern_signature( - payload=payload, - api_key=api_key, - ) - timestamp = str(int(datetime.utcnow().timestamp())) - headers = { - "x-skyvern-timestamp": timestamp, - "x-skyvern-signature": signature, - "Content-Type": "application/json", - } + signed_data = generate_skyvern_webhook_signature(payload=request_data, api_key=api_key) try: - json_resp = await aiohttp_post(url=url, data=request_data, headers=headers, raise_exception=False) + json_resp = await aiohttp_post(url=url, data=request_data, headers=signed_data.headers, raise_exception=False) except Exception as e: LOG.error("Failed to get verification code from url", exc_info=True) raise FailedToGetTOTPVerificationCode( diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index f9371392..77614ae9 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -75,7 +75,7 @@ from skyvern.forge.sdk.api.llm.exceptions import LLM_PROVIDER_ERROR_RETRYABLE_TA from skyvern.forge.sdk.api.llm.ui_tars_llm_caller import UITarsLLMCaller from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context -from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers +from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature from skyvern.forge.sdk.db.enums import TaskType from skyvern.forge.sdk.log_artifacts import save_step_logs, save_task_logs from skyvern.forge.sdk.models import Step, StepStatus @@ -2592,19 +2592,23 @@ class ForgeAgent: payload_dict = json.loads(payload_json) if task_run_response_json: payload_dict.update(json.loads(task_run_response_json)) - payload = json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False) - headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key) + + signed_data = generate_skyvern_webhook_signature(payload=payload_dict, api_key=api_key) + LOG.info( "Sending task response to webhook callback url", task_id=task.task_id, webhook_callback_url=task.webhook_callback_url, - payload=payload, - headers=headers, + payload=signed_data.signed_payload, + headers=signed_data.headers, ) async with httpx.AsyncClient() as client: resp = await client.post( - task.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) + task.webhook_callback_url, + data=signed_data.signed_payload, + headers=signed_data.headers, + timeout=httpx.Timeout(30.0), ) if resp.status_code >= 200 and resp.status_code < 300: LOG.info( diff --git a/skyvern/forge/sdk/core/security.py b/skyvern/forge/sdk/core/security.py index 48a17450..5b63cfb5 100644 --- a/skyvern/forge/sdk/core/security.py +++ b/skyvern/forge/sdk/core/security.py @@ -1,5 +1,7 @@ import hashlib import hmac +import json +from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any, Union @@ -8,6 +10,20 @@ from jose import jwt from skyvern.config import settings +def _normalize_numbers(x: Any) -> Any: + if isinstance(x, float): + return int(x) if x.is_integer() else x + if isinstance(x, dict): + return {k: _normalize_numbers(v) for k, v in x.items()} + if isinstance(x, list): + return [_normalize_numbers(v) for v in x] + return x + + +def _normalize_json_dumps(payload: dict) -> str: + return json.dumps(_normalize_numbers(payload), separators=(",", ":"), ensure_ascii=False) + + def create_access_token( subject: Union[str, Any], expires_delta: timedelta | None = None, @@ -43,11 +59,25 @@ def generate_skyvern_signature( return hash_obj.hexdigest() -def generate_skyvern_webhook_headers(payload: str, api_key: str) -> dict[str, str]: - signature = generate_skyvern_signature(payload=payload, api_key=api_key) +@dataclass +class WebhookSignature: + timestamp: str + signature: str + signed_payload: str + headers: dict[str, str] + + +def generate_skyvern_webhook_signature(payload: dict, api_key: str) -> WebhookSignature: + payload_str = _normalize_json_dumps(payload) + signature = generate_skyvern_signature(payload=payload_str, api_key=api_key) timestamp = str(int(datetime.utcnow().timestamp())) - return { - "x-skyvern-timestamp": timestamp, - "x-skyvern-signature": signature, - "Content-Type": "application/json", - } + return WebhookSignature( + timestamp=timestamp, + signature=signature, + signed_payload=payload_str, + headers={ + "x-skyvern-timestamp": timestamp, + "x-skyvern-signature": signature, + "Content-Type": "application/json", + }, + ) diff --git a/skyvern/forge/sdk/routes/webhooks.py b/skyvern/forge/sdk/routes/webhooks.py index 62dfe0a8..9e5b913a 100644 --- a/skyvern/forge/sdk/routes/webhooks.py +++ b/skyvern/forge/sdk/routes/webhooks.py @@ -14,7 +14,7 @@ from skyvern.exceptions import ( WorkflowRunNotFound, ) from skyvern.forge import app -from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers +from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router from skyvern.forge.sdk.schemas.organizations import Organization @@ -135,7 +135,7 @@ async def test_webhook( ) api_key = api_key_obj.token if api_key_obj else "test_api_key_placeholder" - headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key) + signed_data = generate_skyvern_webhook_signature(payload=payload, api_key=api_key) # Send the webhook request status_code = None @@ -146,8 +146,8 @@ async def test_webhook( async with httpx.AsyncClient() as client: response = await client.post( validated_url, - content=payload, - headers=headers, + content=signed_data.signed_payload, + headers=signed_data.headers, timeout=httpx.Timeout(10.0), ) status_code = response.status_code @@ -190,7 +190,7 @@ async def test_webhook( status_code=status_code, latency_ms=latency_ms, response_body=response_body, - headers_sent=headers, + headers_sent=signed_data.headers, error=error, ) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 68415777..bece40dd 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -32,7 +32,7 @@ from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context -from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers +from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.db.enums import TaskType from skyvern.forge.sdk.models import Step, StepStatus @@ -2076,12 +2076,11 @@ class WorkflowService: ), errors=workflow_run_status_response.errors, ) - payload_dict = json.loads(workflow_run_status_response.model_dump_json()) + payload_dict: dict = json.loads(workflow_run_status_response.model_dump_json()) workflow_run_response_dict = json.loads(workflow_run_response.model_dump_json()) payload_dict.update(workflow_run_response_dict) - payload = json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False) - headers = generate_skyvern_webhook_headers( - payload=payload, + signed_data = generate_skyvern_webhook_signature( + payload=payload_dict, api_key=api_key, ) LOG.info( @@ -2089,13 +2088,16 @@ class WorkflowService: workflow_id=workflow_id, workflow_run_id=workflow_run.workflow_run_id, webhook_callback_url=workflow_run.webhook_callback_url, - payload=payload, - headers=headers, + payload=signed_data.signed_payload, + headers=signed_data.headers, ) try: async with httpx.AsyncClient() as client: resp = await client.post( - url=workflow_run.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) + url=workflow_run.webhook_callback_url, + data=signed_data.signed_payload, + headers=signed_data.headers, + timeout=httpx.Timeout(30.0), ) if resp.status_code >= 200 and resp.status_code < 300: LOG.info( @@ -2114,7 +2116,7 @@ class WorkflowService: "Webhook failed", workflow_id=workflow_id, workflow_run_id=workflow_run.workflow_run_id, - webhook_data=payload, + webhook_data=signed_data.signed_payload, resp=resp, resp_code=resp.status_code, resp_text=resp.text, diff --git a/skyvern/services/otp_service.py b/skyvern/services/otp_service.py index 4f238add..b2bff38f 100644 --- a/skyvern/services/otp_service.py +++ b/skyvern/services/otp_service.py @@ -1,5 +1,4 @@ import asyncio -import json from datetime import datetime, timedelta import structlog @@ -10,7 +9,7 @@ from skyvern.exceptions import FailedToGetTOTPVerificationCode, NoTOTPVerificati from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.core.aiohttp_helper import aiohttp_post -from skyvern.forge.sdk.core.security import generate_skyvern_signature +from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType from skyvern.forge.sdk.schemas.totp_codes import OTPType @@ -122,19 +121,12 @@ async def _get_otp_value_from_url( request_data["workflow_run_id"] = workflow_run_id if workflow_permanent_id: request_data["workflow_permanent_id"] = workflow_permanent_id - payload = json.dumps(request_data) - signature = generate_skyvern_signature( - payload=payload, + signed_data = generate_skyvern_webhook_signature( + payload=request_data, api_key=api_key, ) - timestamp = str(int(datetime.utcnow().timestamp())) - headers = { - "x-skyvern-timestamp": timestamp, - "x-skyvern-signature": signature, - "Content-Type": "application/json", - } try: - json_resp = await aiohttp_post(url=url, data=request_data, headers=headers, raise_exception=False) + json_resp = await aiohttp_post(url=url, data=request_data, headers=signed_data.headers, raise_exception=False) except Exception as e: LOG.error("Failed to get otp value from url", exc_info=True) raise FailedToGetTOTPVerificationCode( diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index 39c2a4e1..2a7084e9 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -20,7 +20,7 @@ from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.hashing import generate_url_hash -from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers +from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType from skyvern.forge.sdk.schemas.organizations import Organization @@ -1817,17 +1817,19 @@ async def send_task_v2_webhook(task_v2: TaskV2) -> None: payload_json = task_v2.model_dump_json(by_alias=True) payload_dict = json.loads(payload_json) payload_dict.update(json.loads(task_run_response_json)) - payload = json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False) - headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) + signed_data = generate_skyvern_webhook_signature(payload=payload_dict, api_key=api_key.token) LOG.info( "Sending task v2 response to webhook callback url", task_v2_id=task_v2.observer_cruise_id, webhook_callback_url=task_v2.webhook_callback_url, - payload=payload, - headers=headers, + payload=signed_data.signed_payload, + headers=signed_data.headers, ) resp = await httpx.AsyncClient().post( - task_v2.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) + task_v2.webhook_callback_url, + data=signed_data.signed_payload, + headers=signed_data.headers, + timeout=httpx.Timeout(30.0), ) if resp.status_code >= 200 and resp.status_code < 300: LOG.info( diff --git a/skyvern/services/webhook_service.py b/skyvern/services/webhook_service.py index d3a4f599..8854ca10 100644 --- a/skyvern/services/webhook_service.py +++ b/skyvern/services/webhook_service.py @@ -20,7 +20,7 @@ from skyvern.exceptions import ( WorkflowRunNotFound, ) from skyvern.forge import app -from skyvern.forge.sdk.core.security import generate_skyvern_webhook_headers +from skyvern.forge.sdk.core.security import generate_skyvern_webhook_signature from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType from skyvern.forge.sdk.schemas.task_v2 import TaskV2 from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus @@ -51,7 +51,7 @@ def _now() -> datetime: return datetime.now(timezone.utc) -def build_sample_task_payload(run_id: str | None = None) -> str: +def build_sample_task_payload(run_id: str | None = None) -> dict: """ Build a sample task webhook payload using the real TaskResponse + TaskRunResponse models so schema changes are reflected automatically. @@ -128,10 +128,10 @@ def build_sample_task_payload(run_id: str | None = None) -> str: ) payload_dict.update(json.loads(task_run_response.model_dump_json(exclude_unset=True))) - return json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False) + return payload_dict -def build_sample_workflow_run_payload(run_id: str | None = None) -> str: +def build_sample_workflow_run_payload(run_id: str | None = None) -> dict: """ Build a sample workflow webhook payload using the real WorkflowRunResponseBase + WorkflowRunResponse models so schema changes are reflected automatically. @@ -195,14 +195,14 @@ def build_sample_workflow_run_payload(run_id: str | None = None) -> str: ) payload_dict.update(json.loads(workflow_run_response.model_dump_json(exclude_unset=True))) - return json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False) + return payload_dict @dataclass class _WebhookPayload: run_id: str run_type: str - payload: str + payload: dict default_webhook_url: str | None @@ -210,13 +210,13 @@ async def build_run_preview(organization_id: str, run_id: str) -> RunWebhookPrev """Return the payload and headers that would be used for a replay.""" payload = await _build_webhook_payload(organization_id=organization_id, run_id=run_id) api_key = await _get_api_key(organization_id=organization_id) - headers = generate_skyvern_webhook_headers(payload=payload.payload, api_key=api_key) + signed_data = generate_skyvern_webhook_signature(payload=payload.payload, api_key=api_key) return RunWebhookPreviewResponse( run_id=payload.run_id, run_type=payload.run_type, default_webhook_url=payload.default_webhook_url, - payload=payload.payload, - headers=headers, + payload=signed_data.signed_payload, + headers=signed_data.headers, ) @@ -226,7 +226,7 @@ async def replay_run_webhook(organization_id: str, run_id: str, target_url: str """ payload = await _build_webhook_payload(organization_id=organization_id, run_id=run_id) api_key = await _get_api_key(organization_id=organization_id) - headers = generate_skyvern_webhook_headers(payload=payload.payload, api_key=api_key) + signed_data = generate_skyvern_webhook_signature(payload=payload.payload, api_key=api_key) url_to_use: str | None = target_url if target_url else payload.default_webhook_url @@ -237,8 +237,8 @@ async def replay_run_webhook(organization_id: str, run_id: str, target_url: str status_code, latency_ms, response_body, error = await _deliver_webhook( url=validated_url, - payload=payload.payload, - headers=headers, + payload=signed_data.signed_payload, + headers=signed_data.headers, ) return RunWebhookReplayResponse( @@ -246,8 +246,8 @@ async def replay_run_webhook(organization_id: str, run_id: str, target_url: str run_type=payload.run_type, default_webhook_url=payload.default_webhook_url, target_webhook_url=validated_url, - payload=payload.payload, - headers=headers, + payload=signed_data.signed_payload, + headers=signed_data.headers, status_code=status_code, latency_ms=latency_ms, response_body=response_body, @@ -330,11 +330,10 @@ async def _build_task_payload(organization_id: str, run_id: str, run_type_str: s run_response_json = run_response.model_dump_json(exclude={"run_request"}) payload_dict.update(json.loads(run_response_json)) - payload = json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False) return _WebhookPayload( run_id=run_id, run_type=run_type_str, - payload=payload, + payload=payload_dict, default_webhook_url=task.webhook_callback_url, ) @@ -360,12 +359,10 @@ async def _build_task_v2_payload(task_v2: TaskV2) -> _WebhookPayload: f"Run {task_v2.observer_cruise_id} has not reached a terminal state (status={task_run_response.status})." ) task_run_response_json = task_run_response.model_dump_json(exclude={"run_request"}) - - payload = json.dumps(json.loads(task_run_response_json), separators=(",", ":"), ensure_ascii=False) return _WebhookPayload( run_id=task_v2.observer_cruise_id, run_type=RunType.task_v2.value, - payload=payload, + payload=json.loads(task_run_response_json), default_webhook_url=task_v2.webhook_callback_url, ) @@ -437,12 +434,11 @@ async def _build_workflow_payload( ) ) payload_dict.update(json.loads(run_response.model_dump_json(exclude={"run_request"}))) - payload = json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False) return _WebhookPayload( run_id=workflow_run.workflow_run_id, run_type=RunType.workflow_run.value, - payload=payload, + payload=payload_dict, default_webhook_url=workflow_run.webhook_callback_url, )