From 42d015cdba53c3eba0056b72f31cef3b25e2fc2a Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sat, 24 May 2025 13:01:53 -0700 Subject: [PATCH] webhook support for task v1 and workflow run; fix task v2 webhook (#2446) --- skyvern/forge/agent.py | 36 +++++++++++++++++++-------- skyvern/forge/sdk/workflow/service.py | 35 +++++++++++++++++++++++--- skyvern/services/task_v2_service.py | 30 +++++++++++----------- 3 files changed, 73 insertions(+), 28 deletions(-) diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 16377c24..9660807a 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -72,6 +72,7 @@ from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus from skyvern.schemas.runs import CUA_ENGINES, CUA_RUN_TYPES, RunEngine +from skyvern.services import run_service from skyvern.utils.image_resizer import Resolution from skyvern.utils.prompt_engine import load_prompt_with_elements from skyvern.webeye.actions.actions import ( @@ -2109,18 +2110,31 @@ class ForgeAgent: return task_response = await self.build_task_response(task=task, last_step=last_step) - - # send task_response to the webhook callback url - payload = task_response.model_dump_json(exclude={"request"}) - headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key) - LOG.info( - "Sending task response to webhook callback url", - task_id=task.task_id, - webhook_callback_url=task.webhook_callback_url, - payload=payload, - headers=headers, - ) + # try to build the new TaskRunResponse for backward compatibility + task_run_response_json: str | None = None try: + run_response = await run_service.get_run_response( + run_id=task.task_id, + organization_id=task.organization_id, + ) + if run_response is not None: + task_run_response_json = run_response.model_dump_json(exclude={"run_request"}) + + # send task_response to the webhook callback url + payload_json = task_response.model_dump_json(exclude={"request"}) + payload_dict = json.loads(payload_json) + if task_run_response_json: + payload_dict.update(json.loads(task_run_response_json)) + payload = json.dumps(payload_dict) + headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key) + LOG.info( + "Sending task response to webhook callback url", + task_id=task.task_id, + webhook_callback_url=task.webhook_callback_url, + payload=payload, + headers=headers, + ) + async with httpx.AsyncClient() as client: resp = await client.post( task.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index ad699a26..7aaefcdd 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -90,7 +90,7 @@ from skyvern.forge.sdk.workflow.models.yaml import ( WorkflowCreateYAMLRequest, WorkflowDefinitionYAML, ) -from skyvern.schemas.runs import ProxyLocation +from skyvern.schemas.runs import ProxyLocation, RunStatus, RunType, WorkflowRunRequest, WorkflowRunResponse from skyvern.webeye.browser_factory import BrowserState LOG = structlog.get_logger() @@ -1214,8 +1214,37 @@ class WorkflowService: ) return - # send webhook to the webhook callback url - payload = workflow_run_status_response.model_dump_json() + # build new schema for backward compatible webhook payload + app_url = ( + f"{settings.SKYVERN_APP_URL.rstrip('/')}/workflows/" + f"{workflow_run.workflow_permanent_id}/{workflow_run.workflow_run_id}" + ) + workflow_run_response = WorkflowRunResponse( + run_id=workflow_run.workflow_run_id, + run_type=RunType.workflow_run, + status=RunStatus(workflow_run.status), + output=workflow_run_status_response.outputs, + downloaded_files=workflow_run_status_response.downloaded_files, + recording_url=workflow_run_status_response.recording_url, + screenshot_urls=workflow_run_status_response.screenshot_urls, + failure_reason=workflow_run_status_response.failure_reason, + app_url=app_url, + created_at=workflow_run.created_at, + modified_at=workflow_run.modified_at, + run_request=WorkflowRunRequest( + workflow_id=workflow_run.workflow_permanent_id, + title=workflow_run_status_response.workflow_title, + parameters=workflow_run_status_response.parameters, + proxy_location=workflow_run.proxy_location, + webhook_url=workflow_run.webhook_callback_url or None, + totp_url=workflow_run.totp_verification_url or None, + totp_identifier=workflow_run.totp_identifier, + ), + ) + payload_dict = json.loads(workflow_run_status_response.model_dump_json()) + workflow_run_response_dict = json.loads(workflow_run_response.model_dump_json()) + payload_dict.update(workflow_run_response_dict) + payload = json.dumps(payload_dict, default=str) headers = generate_skyvern_webhook_headers( payload=payload, api_key=api_key, diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index a76a7022..f7835e32 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -356,6 +356,7 @@ async def run_task_v2( workflow_run=workflow_run, browser_session_id=browser_session_id, close_browser_on_completion=browser_session_id is None, + need_call_webhook=False, ) else: LOG.warning("Workflow or workflow run not found") @@ -1645,21 +1646,22 @@ async def send_task_v2_webhook(task_v2: TaskV2) -> None: task_v2_id=task_v2.observer_cruise_id, ) return - # build the task v2 response with backward compatible data - task_run_response = await build_task_v2_run_response(task_v2) - task_run_response_dict = task_run_response.model_dump() - payload_dict = task_v2.model_dump(by_alias=True) - payload_dict.update(task_run_response_dict) - payload = json.dumps(payload_dict) - headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) - LOG.info( - "Sending task v2 response to webhook callback url", - task_v2_id=task_v2.observer_cruise_id, - webhook_callback_url=task_v2.webhook_callback_url, - payload=payload, - headers=headers, - ) try: + # build the task v2 response with backward compatible data + task_run_response = await build_task_v2_run_response(task_v2) + task_run_response_json = task_run_response.model_dump_json(exclude={"run_request"}) + payload_json = task_v2.model_dump_json(by_alias=True) + payload_dict = json.loads(payload_json) + payload_dict.update(json.loads(task_run_response_json)) + payload = json.dumps(payload_dict) + headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) + LOG.info( + "Sending task v2 response to webhook callback url", + task_v2_id=task_v2.observer_cruise_id, + webhook_callback_url=task_v2.webhook_callback_url, + payload=payload, + headers=headers, + ) resp = await httpx.AsyncClient().post( task_v2.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) )