webhook support for task v1 and workflow run; fix task v2 webhook (#2446)

This commit is contained in:
Shuchang Zheng
2025-05-24 13:01:53 -07:00
committed by GitHub
parent b8b3821bc9
commit 42d015cdba
3 changed files with 73 additions and 28 deletions

View File

@@ -72,6 +72,7 @@ from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus
from skyvern.schemas.runs import CUA_ENGINES, CUA_RUN_TYPES, RunEngine from skyvern.schemas.runs import CUA_ENGINES, CUA_RUN_TYPES, RunEngine
from skyvern.services import run_service
from skyvern.utils.image_resizer import Resolution from skyvern.utils.image_resizer import Resolution
from skyvern.utils.prompt_engine import load_prompt_with_elements from skyvern.utils.prompt_engine import load_prompt_with_elements
from skyvern.webeye.actions.actions import ( from skyvern.webeye.actions.actions import (
@@ -2109,18 +2110,31 @@ class ForgeAgent:
return return
task_response = await self.build_task_response(task=task, last_step=last_step) task_response = await self.build_task_response(task=task, last_step=last_step)
# try to build the new TaskRunResponse for backward compatibility
# send task_response to the webhook callback url task_run_response_json: str | None = None
payload = task_response.model_dump_json(exclude={"request"})
headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key)
LOG.info(
"Sending task response to webhook callback url",
task_id=task.task_id,
webhook_callback_url=task.webhook_callback_url,
payload=payload,
headers=headers,
)
try: try:
run_response = await run_service.get_run_response(
run_id=task.task_id,
organization_id=task.organization_id,
)
if run_response is not None:
task_run_response_json = run_response.model_dump_json(exclude={"run_request"})
# send task_response to the webhook callback url
payload_json = task_response.model_dump_json(exclude={"request"})
payload_dict = json.loads(payload_json)
if task_run_response_json:
payload_dict.update(json.loads(task_run_response_json))
payload = json.dumps(payload_dict)
headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key)
LOG.info(
"Sending task response to webhook callback url",
task_id=task.task_id,
webhook_callback_url=task.webhook_callback_url,
payload=payload,
headers=headers,
)
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
resp = await client.post( resp = await client.post(
task.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) task.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0)

View File

@@ -90,7 +90,7 @@ from skyvern.forge.sdk.workflow.models.yaml import (
WorkflowCreateYAMLRequest, WorkflowCreateYAMLRequest,
WorkflowDefinitionYAML, WorkflowDefinitionYAML,
) )
from skyvern.schemas.runs import ProxyLocation from skyvern.schemas.runs import ProxyLocation, RunStatus, RunType, WorkflowRunRequest, WorkflowRunResponse
from skyvern.webeye.browser_factory import BrowserState from skyvern.webeye.browser_factory import BrowserState
LOG = structlog.get_logger() LOG = structlog.get_logger()
@@ -1214,8 +1214,37 @@ class WorkflowService:
) )
return return
# send webhook to the webhook callback url # build new schema for backward compatible webhook payload
payload = workflow_run_status_response.model_dump_json() app_url = (
f"{settings.SKYVERN_APP_URL.rstrip('/')}/workflows/"
f"{workflow_run.workflow_permanent_id}/{workflow_run.workflow_run_id}"
)
workflow_run_response = WorkflowRunResponse(
run_id=workflow_run.workflow_run_id,
run_type=RunType.workflow_run,
status=RunStatus(workflow_run.status),
output=workflow_run_status_response.outputs,
downloaded_files=workflow_run_status_response.downloaded_files,
recording_url=workflow_run_status_response.recording_url,
screenshot_urls=workflow_run_status_response.screenshot_urls,
failure_reason=workflow_run_status_response.failure_reason,
app_url=app_url,
created_at=workflow_run.created_at,
modified_at=workflow_run.modified_at,
run_request=WorkflowRunRequest(
workflow_id=workflow_run.workflow_permanent_id,
title=workflow_run_status_response.workflow_title,
parameters=workflow_run_status_response.parameters,
proxy_location=workflow_run.proxy_location,
webhook_url=workflow_run.webhook_callback_url or None,
totp_url=workflow_run.totp_verification_url or None,
totp_identifier=workflow_run.totp_identifier,
),
)
payload_dict = json.loads(workflow_run_status_response.model_dump_json())
workflow_run_response_dict = json.loads(workflow_run_response.model_dump_json())
payload_dict.update(workflow_run_response_dict)
payload = json.dumps(payload_dict, default=str)
headers = generate_skyvern_webhook_headers( headers = generate_skyvern_webhook_headers(
payload=payload, payload=payload,
api_key=api_key, api_key=api_key,

View File

@@ -356,6 +356,7 @@ async def run_task_v2(
workflow_run=workflow_run, workflow_run=workflow_run,
browser_session_id=browser_session_id, browser_session_id=browser_session_id,
close_browser_on_completion=browser_session_id is None, close_browser_on_completion=browser_session_id is None,
need_call_webhook=False,
) )
else: else:
LOG.warning("Workflow or workflow run not found") LOG.warning("Workflow or workflow run not found")
@@ -1645,21 +1646,22 @@ async def send_task_v2_webhook(task_v2: TaskV2) -> None:
task_v2_id=task_v2.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
) )
return return
# build the task v2 response with backward compatible data
task_run_response = await build_task_v2_run_response(task_v2)
task_run_response_dict = task_run_response.model_dump()
payload_dict = task_v2.model_dump(by_alias=True)
payload_dict.update(task_run_response_dict)
payload = json.dumps(payload_dict)
headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token)
LOG.info(
"Sending task v2 response to webhook callback url",
task_v2_id=task_v2.observer_cruise_id,
webhook_callback_url=task_v2.webhook_callback_url,
payload=payload,
headers=headers,
)
try: try:
# build the task v2 response with backward compatible data
task_run_response = await build_task_v2_run_response(task_v2)
task_run_response_json = task_run_response.model_dump_json(exclude={"run_request"})
payload_json = task_v2.model_dump_json(by_alias=True)
payload_dict = json.loads(payload_json)
payload_dict.update(json.loads(task_run_response_json))
payload = json.dumps(payload_dict)
headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token)
LOG.info(
"Sending task v2 response to webhook callback url",
task_v2_id=task_v2.observer_cruise_id,
webhook_callback_url=task_v2.webhook_callback_url,
payload=payload,
headers=headers,
)
resp = await httpx.AsyncClient().post( resp = await httpx.AsyncClient().post(
task_v2.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) task_v2.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0)
) )