From 09ed1c1dff14c3966a4cdcc6478ad3bf7617cb66 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Fri, 16 May 2025 00:03:10 -0700 Subject: [PATCH] add recording_url and downloaded_files to the RunRepsonse for task v1 and task v2 (#2360) --- skyvern/forge/sdk/routes/agent_protocol.py | 32 +-------------- skyvern/services/run_service.py | 48 +++++++++++++--------- skyvern/services/task_v1_service.py | 34 ++++++++++++++- 3 files changed, 63 insertions(+), 51 deletions(-) diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 5255c654..7268b87c 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -212,37 +212,7 @@ async def get_task_v1( current_org: Organization = Depends(org_auth_service.get_current_org), ) -> TaskResponse: analytics.capture("skyvern-oss-agent-task-get") - task_obj = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id) - if not task_obj: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Task not found {task_id}", - ) - - # get latest step - latest_step = await app.DATABASE.get_latest_step(task_id, organization_id=current_org.organization_id) - if not latest_step: - return await app.agent.build_task_response(task=task_obj) - - failure_reason: str | None = None - if task_obj.status == TaskStatus.failed and (latest_step.output or task_obj.failure_reason): - failure_reason = "" - if task_obj.failure_reason: - failure_reason += task_obj.failure_reason or "" - if latest_step.output is not None and latest_step.output.actions_and_results is not None: - action_results_string: list[str] = [] - for action, results in latest_step.output.actions_and_results: - if len(results) == 0: - continue - if results[-1].success: - continue - action_results_string.append(f"{action.action_type} action failed.") - - if len(action_results_string) > 0: - failure_reason += "(Exceptions: " + str(action_results_string) + ")" - return await app.agent.build_task_response( - task=task_obj, last_step=latest_step, failure_reason=failure_reason, need_browser_log=True - ) + return await task_v1_service.get_task_v1_response(task_id=task_id, organization_id=current_org.organization_id) @legacy_base_router.post( diff --git a/skyvern/services/run_service.py b/skyvern/services/run_service.py index e88a7768..86449119 100644 --- a/skyvern/services/run_service.py +++ b/skyvern/services/run_service.py @@ -6,7 +6,7 @@ from skyvern.forge import app from skyvern.forge.sdk.schemas.tasks import TaskStatus from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus from skyvern.schemas.runs import RunEngine, RunResponse, RunType, TaskRunRequest, TaskRunResponse -from skyvern.services import task_v2_service, workflow_service +from skyvern.services import task_v1_service, task_v2_service, workflow_service async def get_run_response(run_id: str, organization_id: str | None = None) -> RunResponse | None: @@ -20,34 +20,40 @@ async def get_run_response(run_id: str, organization_id: str | None = None) -> R or run.task_run_type == RunType.anthropic_cua ): # fetch task v1 from db and transform to task run response - task_v1 = await app.DATABASE.get_task(run.run_id, organization_id=organization_id) - if not task_v1: + try: + task_v1_response = await task_v1_service.get_task_v1_response( + task_id=run.run_id, organization_id=organization_id + ) + except TaskNotFound: return None run_engine = RunEngine.skyvern_v1 if run.task_run_type == RunType.openai_cua: run_engine = RunEngine.openai_cua elif run.task_run_type == RunType.anthropic_cua: run_engine = RunEngine.anthropic_cua + return TaskRunResponse( run_id=run.run_id, run_type=run.task_run_type, - status=str(task_v1.status), - output=task_v1.extracted_information, - failure_reason=task_v1.failure_reason, - created_at=task_v1.created_at, - modified_at=task_v1.modified_at, - app_url=f"{settings.SKYVERN_APP_URL.rstrip('/')}/tasks/{task_v1.task_id}", + status=str(task_v1_response.status), + output=task_v1_response.extracted_information, + failure_reason=task_v1_response.failure_reason, + created_at=task_v1_response.created_at, + modified_at=task_v1_response.modified_at, + app_url=f"{settings.SKYVERN_APP_URL.rstrip('/')}/tasks/{task_v1_response.task_id}", + recording_url=task_v1_response.recording_url, + downloaded_files=task_v1_response.downloaded_files, run_request=TaskRunRequest( engine=run_engine, - prompt=task_v1.navigation_goal, - url=task_v1.url, - webhook_url=task_v1.webhook_callback_url, - totp_identifier=task_v1.totp_identifier, - totp_url=task_v1.totp_verification_url, - proxy_location=task_v1.proxy_location, - max_steps=task_v1.max_steps_per_run, - data_extraction_schema=task_v1.extracted_information_schema, - error_code_mapping=task_v1.error_code_mapping, + prompt=task_v1_response.request.navigation_goal, + url=task_v1_response.request.url, + webhook_url=task_v1_response.request.webhook_callback_url, + totp_identifier=task_v1_response.request.totp_identifier, + totp_url=task_v1_response.request.totp_verification_url, + proxy_location=task_v1_response.request.proxy_location, + max_steps=task_v1_response.max_steps_per_run, + data_extraction_schema=task_v1_response.request.extracted_information_schema, + error_code_mapping=task_v1_response.request.error_code_mapping, ), ) elif run.task_run_type == RunType.task_v2: @@ -56,7 +62,9 @@ async def get_run_response(run_id: str, organization_id: str | None = None) -> R return None workflow_run = None if task_v2.workflow_run_id: - workflow_run = await app.DATABASE.get_workflow_run(task_v2.workflow_run_id, organization_id=organization_id) + workflow_run = await workflow_service.get_workflow_run_response( + task_v2.workflow_run_id, organization_id=organization_id + ) return TaskRunResponse( run_id=run.run_id, run_type=run.task_run_type, @@ -65,6 +73,8 @@ async def get_run_response(run_id: str, organization_id: str | None = None) -> R failure_reason=workflow_run.failure_reason if workflow_run else None, created_at=task_v2.created_at, modified_at=task_v2.modified_at, + recording_url=workflow_run.recording_url if workflow_run else None, + downloaded_files=workflow_run.downloaded_files if workflow_run else None, app_url=f"{settings.SKYVERN_APP_URL.rstrip('/')}/{task_v2.workflow_permanent_id}/{task_v2.workflow_run_id}", run_request=TaskRunRequest( engine=RunEngine.skyvern_v2, diff --git a/skyvern/services/task_v1_service.py b/skyvern/services/task_v1_service.py index 199377a4..2ab59c41 100644 --- a/skyvern/services/task_v1_service.py +++ b/skyvern/services/task_v1_service.py @@ -5,6 +5,7 @@ from fastapi import BackgroundTasks, HTTPException, Request from sqlalchemy.exc import OperationalError from skyvern.config import settings +from skyvern.exceptions import TaskNotFound from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError @@ -12,7 +13,7 @@ from skyvern.forge.sdk.core.hashing import generate_url_hash from skyvern.forge.sdk.executor.factory import AsyncExecutorFactory from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.task_generations import TaskGeneration, TaskGenerationBase -from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest +from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus from skyvern.schemas.runs import RunEngine, RunType LOG = structlog.get_logger() @@ -114,3 +115,34 @@ async def run_task( api_key=x_api_key, ) return created_task + + +async def get_task_v1_response(task_id: str, organization_id: str | None = None) -> TaskResponse: + task_obj = await app.DATABASE.get_task(task_id, organization_id=organization_id) + if not task_obj: + raise TaskNotFound(task_id=task_id) + + # get latest step + latest_step = await app.DATABASE.get_latest_step(task_id, organization_id=organization_id) + if not latest_step: + return await app.agent.build_task_response(task=task_obj) + + failure_reason: str | None = None + if task_obj.status == TaskStatus.failed and (latest_step.output or task_obj.failure_reason): + failure_reason = "" + if task_obj.failure_reason: + failure_reason += task_obj.failure_reason or "" + if latest_step.output is not None and latest_step.output.actions_and_results is not None: + action_results_string: list[str] = [] + for action, results in latest_step.output.actions_and_results: + if len(results) == 0: + continue + if results[-1].success: + continue + action_results_string.append(f"{action.action_type} action failed.") + + if len(action_results_string) > 0: + failure_reason += "(Exceptions: " + str(action_results_string) + ")" + return await app.agent.build_task_response( + task=task_obj, last_step=latest_step, failure_reason=failure_reason, need_browser_log=True + )