From 8ebe0f2bfbf79bcc6dc5af22d53620c459d1851d Mon Sep 17 00:00:00 2001 From: Marc Kelechava Date: Thu, 18 Dec 2025 10:38:51 -0800 Subject: [PATCH] Add support for custom URLs in the retry webhook API (#4329) --- skyvern/client/client.py | 18 ++++++++++---- skyvern/client/raw_client.py | 28 ++++++++++++++++++++-- skyvern/forge/sdk/routes/agent_protocol.py | 9 ++++++- skyvern/schemas/webhooks.py | 7 ++++++ skyvern/services/run_service.py | 22 ++++++++++++++--- skyvern/services/webhook_service.py | 15 +++++++++--- 6 files changed, 86 insertions(+), 13 deletions(-) diff --git a/skyvern/client/client.py b/skyvern/client/client.py index 36244807..79c895a9 100644 --- a/skyvern/client/client.py +++ b/skyvern/client/client.py @@ -790,7 +790,11 @@ class Skyvern: return _response.data def retry_run_webhook( - self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + run_id: str, + *, + webhook_url: typing.Optional[str] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> typing.Optional[typing.Any]: """ Retry sending the webhook for a run @@ -819,7 +823,7 @@ class Skyvern: run_id="tsk_123", ) """ - _response = self._raw_client.retry_run_webhook(run_id, request_options=request_options) + _response = self._raw_client.retry_run_webhook(run_id, webhook_url=webhook_url, request_options=request_options) return _response.data def get_run_timeline( @@ -2701,7 +2705,11 @@ class AsyncSkyvern: return _response.data async def retry_run_webhook( - self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + run_id: str, + *, + webhook_url: typing.Optional[str] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> typing.Optional[typing.Any]: """ Retry sending the webhook for a run @@ -2738,7 +2746,9 @@ class AsyncSkyvern: asyncio.run(main()) """ - _response = await self._raw_client.retry_run_webhook(run_id, request_options=request_options) + _response = await self._raw_client.retry_run_webhook( + run_id, webhook_url=webhook_url, request_options=request_options + ) return _response.data async def get_run_timeline( diff --git a/skyvern/client/raw_client.py b/skyvern/client/raw_client.py index 5317e9cd..79827801 100644 --- a/skyvern/client/raw_client.py +++ b/skyvern/client/raw_client.py @@ -957,7 +957,11 @@ class RawSkyvern: raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json) def retry_run_webhook( - self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + run_id: str, + *, + webhook_url: typing.Optional[str] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> HttpResponse[typing.Optional[typing.Any]]: """ Retry sending the webhook for a run @@ -975,10 +979,18 @@ class RawSkyvern: HttpResponse[typing.Optional[typing.Any]] Successful Response """ + request_kwargs: dict[str, typing.Any] = {} + if webhook_url is not None: + request_kwargs = { + "json": {"webhook_url": webhook_url}, + "headers": {"content-type": "application/json"}, + "omit": OMIT, + } _response = self._client_wrapper.httpx_client.request( f"v1/runs/{jsonable_encoder(run_id)}/retry_webhook", method="POST", request_options=request_options, + **request_kwargs, ) try: if _response is None or not _response.text.strip(): @@ -3477,7 +3489,11 @@ class AsyncRawSkyvern: raise ApiError(status_code=_response.status_code, headers=dict(_response.headers), body=_response_json) async def retry_run_webhook( - self, run_id: str, *, request_options: typing.Optional[RequestOptions] = None + self, + run_id: str, + *, + webhook_url: typing.Optional[str] = None, + request_options: typing.Optional[RequestOptions] = None, ) -> AsyncHttpResponse[typing.Optional[typing.Any]]: """ Retry sending the webhook for a run @@ -3495,10 +3511,18 @@ class AsyncRawSkyvern: AsyncHttpResponse[typing.Optional[typing.Any]] Successful Response """ + request_kwargs: dict[str, typing.Any] = {} + if webhook_url is not None: + request_kwargs = { + "json": {"webhook_url": webhook_url}, + "headers": {"content-type": "application/json"}, + "omit": OMIT, + } _response = await self._client_wrapper.httpx_client.request( f"v1/runs/{jsonable_encoder(run_id)}/retry_webhook", method="POST", request_options=request_options, + **request_kwargs, ) try: if _response is None or not _response.text.strip(): diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 9c3aa402..67136354 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -116,6 +116,7 @@ from skyvern.schemas.runs import ( WorkflowRunRequest, WorkflowRunResponse, ) +from skyvern.schemas.webhooks import RetryRunWebhookRequest from skyvern.schemas.workflows import BlockType, WorkflowCreateYAMLRequest, WorkflowRequest, WorkflowStatus from skyvern.services import block_service, run_service, task_v1_service, task_v2_service, workflow_service from skyvern.services.pdf_import_service import pdf_import_service @@ -1372,11 +1373,17 @@ async def get_run_artifacts( @base_router.post("/runs/{run_id}/retry_webhook/", include_in_schema=False) async def retry_run_webhook( run_id: str = Path(..., description="The id of the task run or the workflow run.", examples=["tsk_123", "wr_123"]), + request: RetryRunWebhookRequest | None = None, current_org: Organization = Depends(org_auth_service.get_current_org), x_api_key: Annotated[str | None, Header()] = None, ) -> None: analytics.capture("skyvern-oss-agent-run-retry-webhook") - await run_service.retry_run_webhook(run_id, organization_id=current_org.organization_id, api_key=x_api_key) + await run_service.retry_run_webhook( + run_id, + organization_id=current_org.organization_id, + api_key=x_api_key, + webhook_url=request.webhook_url if request else None, + ) @base_router.get( diff --git a/skyvern/schemas/webhooks.py b/skyvern/schemas/webhooks.py index c18a2487..c9f816b9 100644 --- a/skyvern/schemas/webhooks.py +++ b/skyvern/schemas/webhooks.py @@ -32,6 +32,13 @@ class RunWebhookReplayRequest(BaseModel): ) +class RetryRunWebhookRequest(BaseModel): + webhook_url: str | None = Field( + None, + description="Optional webhook URL to send the payload to instead of the stored configuration", + ) + + class RunWebhookReplayResponse(BaseModel): run_id: str = Field(..., description="Identifier of the run that was replayed") run_type: str = Field(..., description="Run type associated with the payload") diff --git a/skyvern/services/run_service.py b/skyvern/services/run_service.py index ef143b43..0e83d8af 100644 --- a/skyvern/services/run_service.py +++ b/skyvern/services/run_service.py @@ -1,12 +1,12 @@ from fastapi import HTTPException, status from skyvern.config import settings -from skyvern.exceptions import TaskNotFound, WorkflowRunNotFound +from skyvern.exceptions import OrganizationNotFound, TaskNotFound, WorkflowRunNotFound from skyvern.forge import app from skyvern.forge.sdk.schemas.tasks import TaskStatus from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus from skyvern.schemas.runs import RunEngine, RunResponse, RunType, TaskRunRequest, TaskRunResponse -from skyvern.services import task_v1_service, task_v2_service, workflow_service +from skyvern.services import task_v1_service, task_v2_service, webhook_service, workflow_service async def get_run_response(run_id: str, organization_id: str | None = None) -> RunResponse | None: @@ -147,7 +147,12 @@ async def cancel_run(run_id: str, organization_id: str | None = None, api_key: s ) -async def retry_run_webhook(run_id: str, organization_id: str | None = None, api_key: str | None = None) -> None: +async def retry_run_webhook( + run_id: str, + organization_id: str | None = None, + api_key: str | None = None, + webhook_url: str | None = None, +) -> None: """Retry sending the webhook for a run.""" run = await app.DATABASE.get_run(run_id, organization_id=organization_id) @@ -157,6 +162,17 @@ async def retry_run_webhook(run_id: str, organization_id: str | None = None, api detail=f"Run not found {run_id}", ) + if webhook_url: + if not organization_id: + raise OrganizationNotFound(organization_id="") + await webhook_service.replay_run_webhook( + organization_id=organization_id, + run_id=run_id, + target_url=webhook_url, + api_key=api_key, + ) + return + if run.task_run_type in [RunType.task_v1, RunType.openai_cua, RunType.anthropic_cua, RunType.ui_tars]: task = await app.DATABASE.get_task(run_id, organization_id=organization_id) if not task: diff --git a/skyvern/services/webhook_service.py b/skyvern/services/webhook_service.py index 775f1c79..0a58b28b 100644 --- a/skyvern/services/webhook_service.py +++ b/skyvern/services/webhook_service.py @@ -222,13 +222,22 @@ async def build_run_preview(organization_id: str, run_id: str) -> RunWebhookPrev ) -async def replay_run_webhook(organization_id: str, run_id: str, target_url: str | None) -> RunWebhookReplayResponse: +async def replay_run_webhook( + organization_id: str, + run_id: str, + target_url: str | None, + api_key: str | None = None, +) -> RunWebhookReplayResponse: """ Send the webhook payload for a run to either the stored URL or a caller-provided override. + + If `api_key` is provided, it will be used to sign the webhook payload instead of looking up the organization's + API key from the database. This is useful for endpoints that authenticate with an API key and want the replay + signature to match the caller-provided key. """ payload = await _build_webhook_payload(organization_id=organization_id, run_id=run_id) - api_key = await _get_api_key(organization_id=organization_id) - signed_data = generate_skyvern_webhook_signature(payload=payload.payload, api_key=api_key) + signing_key = api_key if api_key else await _get_api_key(organization_id=organization_id) + signed_data = generate_skyvern_webhook_signature(payload=payload.payload, api_key=signing_key) url_to_use: str | None = target_url if target_url else payload.default_webhook_url