From de18a60ac2b0e93bdb244826a321cce5cc3ff4f0 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Tue, 27 May 2025 03:00:14 -0700 Subject: [PATCH] Revert "revert webhook schema updates" (#2482) --- skyvern/forge/agent.py | 53 +++++++++------------- skyvern/forge/sdk/workflow/service.py | 64 +++++++++++++-------------- skyvern/services/task_v2_service.py | 15 +++---- 3 files changed, 60 insertions(+), 72 deletions(-) diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 046c248e..0472d4e4 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -72,6 +72,7 @@ from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus from skyvern.schemas.runs import CUA_ENGINES, CUA_RUN_TYPES, RunEngine +from skyvern.services import run_service from skyvern.utils.image_resizer import Resolution from skyvern.utils.prompt_engine import load_prompt_with_elements from skyvern.webeye.actions.actions import ( @@ -2109,40 +2110,30 @@ class ForgeAgent: return task_response = await self.build_task_response(task=task, last_step=last_step) - # send task_response to the webhook callback url - payload = task_response.model_dump_json(exclude={"request"}) - headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key) - LOG.info( - "Sending task response to webhook callback url", - task_id=task.task_id, - webhook_callback_url=task.webhook_callback_url, - payload=payload, - headers=headers, - ) # try to build the new TaskRunResponse for backward compatibility - # task_run_response_json: str | None = None + task_run_response_json: str | None = None try: - # run_response = await run_service.get_run_response( - # run_id=task.task_id, - # organization_id=task.organization_id, - # ) - # if run_response is not None: - # task_run_response_json = run_response.model_dump_json(exclude={"run_request"}) + run_response = await run_service.get_run_response( + run_id=task.task_id, + organization_id=task.organization_id, + ) + if run_response is not None: + task_run_response_json = run_response.model_dump_json(exclude={"run_request"}) - # # send task_response to the webhook callback url - # payload_json = task_response.model_dump_json(exclude={"request"}) - # payload_dict = json.loads(payload_json) - # if task_run_response_json: - # payload_dict.update(json.loads(task_run_response_json)) - # payload = json.dumps(payload_dict) - # headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key) - # LOG.info( - # "Sending task response to webhook callback url", - # task_id=task.task_id, - # webhook_callback_url=task.webhook_callback_url, - # payload=payload, - # headers=headers, - # ) + # send task_response to the webhook callback url + payload_json = task_response.model_dump_json(exclude={"request"}) + payload_dict = json.loads(payload_json) + if task_run_response_json: + payload_dict.update(json.loads(task_run_response_json)) + payload = json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False) + headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key) + LOG.info( + "Sending task response to webhook callback url", + task_id=task.task_id, + webhook_callback_url=task.webhook_callback_url, + payload=payload, + headers=headers, + ) async with httpx.AsyncClient() as client: resp = await client.post( diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index e19be5a4..a08ce706 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -90,7 +90,7 @@ from skyvern.forge.sdk.workflow.models.yaml import ( WorkflowCreateYAMLRequest, WorkflowDefinitionYAML, ) -from skyvern.schemas.runs import ProxyLocation +from skyvern.schemas.runs import ProxyLocation, RunStatus, RunType, WorkflowRunRequest, WorkflowRunResponse from skyvern.webeye.browser_factory import BrowserState LOG = structlog.get_logger() @@ -1220,39 +1220,37 @@ class WorkflowService: ) return - # send webhook to the webhook callback url - payload = workflow_run_status_response.model_dump_json() # build new schema for backward compatible webhook payload - # app_url = ( - # f"{settings.SKYVERN_APP_URL.rstrip('/')}/workflows/" - # f"{workflow_run.workflow_permanent_id}/{workflow_run.workflow_run_id}" - # ) - # workflow_run_response = WorkflowRunResponse( - # run_id=workflow_run.workflow_run_id, - # run_type=RunType.workflow_run, - # status=RunStatus(workflow_run_status_response.status), - # output=workflow_run_status_response.outputs, - # downloaded_files=workflow_run_status_response.downloaded_files, - # recording_url=workflow_run_status_response.recording_url, - # screenshot_urls=workflow_run_status_response.screenshot_urls, - # failure_reason=workflow_run_status_response.failure_reason, - # app_url=app_url, - # created_at=workflow_run_status_response.created_at, - # modified_at=workflow_run_status_response.modified_at, - # run_request=WorkflowRunRequest( - # workflow_id=workflow_run.workflow_permanent_id, - # title=workflow_run_status_response.workflow_title, - # parameters=workflow_run_status_response.parameters, - # proxy_location=workflow_run.proxy_location, - # webhook_url=workflow_run.webhook_callback_url or None, - # totp_url=workflow_run.totp_verification_url or None, - # totp_identifier=workflow_run.totp_identifier, - # ), - # ) - # payload_dict = json.loads(workflow_run_status_response.model_dump_json()) - # workflow_run_response_dict = json.loads(workflow_run_response.model_dump_json()) - # payload_dict.update(workflow_run_response_dict) - # payload = json.dumps(payload_dict, default=str) + app_url = ( + f"{settings.SKYVERN_APP_URL.rstrip('/')}/workflows/" + f"{workflow_run.workflow_permanent_id}/{workflow_run.workflow_run_id}" + ) + workflow_run_response = WorkflowRunResponse( + run_id=workflow_run.workflow_run_id, + run_type=RunType.workflow_run, + status=RunStatus(workflow_run_status_response.status), + output=workflow_run_status_response.outputs, + downloaded_files=workflow_run_status_response.downloaded_files, + recording_url=workflow_run_status_response.recording_url, + screenshot_urls=workflow_run_status_response.screenshot_urls, + failure_reason=workflow_run_status_response.failure_reason, + app_url=app_url, + created_at=workflow_run_status_response.created_at, + modified_at=workflow_run_status_response.modified_at, + run_request=WorkflowRunRequest( + workflow_id=workflow_run.workflow_permanent_id, + title=workflow_run_status_response.workflow_title, + parameters=workflow_run_status_response.parameters, + proxy_location=workflow_run.proxy_location, + webhook_url=workflow_run.webhook_callback_url or None, + totp_url=workflow_run.totp_verification_url or None, + totp_identifier=workflow_run.totp_identifier, + ), + ) + payload_dict = json.loads(workflow_run_status_response.model_dump_json()) + workflow_run_response_dict = json.loads(workflow_run_response.model_dump_json()) + payload_dict.update(workflow_run_response_dict) + payload = json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False) headers = generate_skyvern_webhook_headers( payload=payload, api_key=api_key, diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index d1d5acda..85960b8c 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -1,3 +1,4 @@ +import json import os import random import string @@ -1659,15 +1660,13 @@ async def send_task_v2_webhook(task_v2: TaskV2) -> None: ) return try: - # build the task v2 response - payload = task_v2.model_dump_json(by_alias=True) # build the task v2 response with backward compatible data - # task_run_response = await build_task_v2_run_response(task_v2) - # task_run_response_json = task_run_response.model_dump_json(exclude={"run_request"}) - # payload_json = task_v2.model_dump_json(by_alias=True) - # payload_dict = json.loads(payload_json) - # payload_dict.update(json.loads(task_run_response_json)) - # payload = json.dumps(payload_dict) + task_run_response = await build_task_v2_run_response(task_v2) + task_run_response_json = task_run_response.model_dump_json(exclude={"run_request"}) + payload_json = task_v2.model_dump_json(by_alias=True) + payload_dict = json.loads(payload_json) + payload_dict.update(json.loads(task_run_response_json)) + payload = json.dumps(payload_dict, separators=(",", ":"), ensure_ascii=False) headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) LOG.info( "Sending task v2 response to webhook callback url",