From 2fe2b53690b7ae73b4a7ef1f6ed80feeb4601211 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Fri, 23 May 2025 16:24:41 -0700 Subject: [PATCH] Add TaskRunResponse data to task v2 webhook (#2435) --- skyvern/services/run_service.py | 30 +-------------- skyvern/services/task_v2_service.py | 60 +++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 32 deletions(-) diff --git a/skyvern/services/run_service.py b/skyvern/services/run_service.py index c4e26ddc..49c2804b 100644 --- a/skyvern/services/run_service.py +++ b/skyvern/services/run_service.py @@ -67,35 +67,7 @@ async def get_run_response(run_id: str, organization_id: str | None = None) -> R task_v2 = await app.DATABASE.get_task_v2(run.run_id, organization_id=organization_id) if not task_v2: return None - workflow_run = None - if task_v2.workflow_run_id: - workflow_run = await workflow_service.get_workflow_run_response( - task_v2.workflow_run_id, organization_id=organization_id - ) - return TaskRunResponse( - run_id=run.run_id, - run_type=run.task_run_type, - status=task_v2.status, - output=task_v2.output, - failure_reason=workflow_run.failure_reason if workflow_run else None, - created_at=task_v2.created_at, - modified_at=task_v2.modified_at, - recording_url=workflow_run.recording_url if workflow_run else None, - screenshot_urls=workflow_run.screenshot_urls if workflow_run else None, - downloaded_files=workflow_run.downloaded_files if workflow_run else None, - app_url=f"{settings.SKYVERN_APP_URL.rstrip('/')}/workflows/{task_v2.workflow_permanent_id}/{task_v2.workflow_run_id}", - run_request=TaskRunRequest( - engine=RunEngine.skyvern_v2, - prompt=task_v2.prompt, - url=task_v2.url, - webhook_url=task_v2.webhook_callback_url, - totp_identifier=task_v2.totp_identifier, - totp_url=task_v2.totp_verification_url, - proxy_location=task_v2.proxy_location, - data_extraction_schema=task_v2.extracted_information_schema, - error_code_mapping=task_v2.error_code_mapping, - ), - ) + return await task_v2_service.build_task_v2_run_response(task_v2) elif run.task_run_type == RunType.workflow_run: return await workflow_service.get_workflow_run_response(run.run_id, organization_id=organization_id) raise ValueError(f"Invalid task run type: {run.task_run_type}") diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index 953ddd8c..bc77722b 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -1,3 +1,4 @@ +import json import os import random import string @@ -58,7 +59,8 @@ from skyvern.forge.sdk.workflow.models.yaml import ( WorkflowCreateYAMLRequest, WorkflowDefinitionYAML, ) -from skyvern.schemas.runs import ProxyLocation, RunType +from skyvern.schemas.runs import ProxyLocation, RunEngine, RunType, TaskRunRequest, TaskRunResponse +from skyvern.services import workflow_service from skyvern.utils.prompt_engine import load_prompt_with_elements from skyvern.webeye.browser_factory import BrowserState from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website @@ -1578,6 +1580,54 @@ async def _summarize_task_v2( ) +async def build_task_v2_run_response(task_v2: TaskV2) -> TaskRunResponse: + """Build TaskRunResponse object for webhook backward compatibility.""" + workflow_run_resp = None + if task_v2.workflow_run_id: + try: + workflow_run_resp = await workflow_service.get_workflow_run_response( + task_v2.workflow_run_id, organization_id=task_v2.organization_id + ) + except Exception: + LOG.warning( + "Failed to get workflow run response for task v2 webhook", + exc_info=True, + task_v2_id=task_v2.observer_cruise_id, + ) + + app_url = None + if task_v2.workflow_run_id and task_v2.workflow_permanent_id: + app_url = ( + f"{settings.SKYVERN_APP_URL.rstrip('/')}/workflows/" + f"{task_v2.workflow_permanent_id}/{task_v2.workflow_run_id}" + ) + + return TaskRunResponse( + run_id=task_v2.observer_cruise_id, + run_type=RunType.task_v2, + status=task_v2.status, + output=task_v2.output, + failure_reason=workflow_run_resp.failure_reason if workflow_run_resp else None, + created_at=task_v2.created_at, + modified_at=task_v2.modified_at, + recording_url=workflow_run_resp.recording_url if workflow_run_resp else None, + screenshot_urls=workflow_run_resp.screenshot_urls if workflow_run_resp else None, + downloaded_files=workflow_run_resp.downloaded_files if workflow_run_resp else None, + app_url=app_url, + run_request=TaskRunRequest( + engine=RunEngine.skyvern_v2, + prompt=task_v2.prompt or "", + url=task_v2.url, + webhook_url=task_v2.webhook_callback_url, + totp_identifier=task_v2.totp_identifier, + totp_url=task_v2.totp_verification_url, + proxy_location=task_v2.proxy_location, + data_extraction_schema=task_v2.extracted_information_schema, + error_code_mapping=task_v2.error_code_mapping, + ), + ) + + async def send_task_v2_webhook(task_v2: TaskV2) -> None: if not task_v2.webhook_callback_url: return @@ -1594,8 +1644,12 @@ async def send_task_v2_webhook(task_v2: TaskV2) -> None: task_v2_id=task_v2.observer_cruise_id, ) return - # build the task v2 response - payload = task_v2.model_dump_json(by_alias=True) + # build the task v2 response with backward compatible data + task_run_response = await build_task_v2_run_response(task_v2) + task_run_response_dict = task_run_response.model_dump() + payload_dict = task_v2.model_dump(by_alias=True) + payload_dict.update(task_run_response_dict) + payload = json.dumps(payload_dict) headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) LOG.info( "Sending task v2 response to webhook callback url",