diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index e1a28264..dad74f9b 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -51,7 +51,7 @@ from skyvern.forge.sdk.core.security import generate_skyvern_signature from skyvern.forge.sdk.core.validators import validate_url from skyvern.forge.sdk.db.enums import TaskType from skyvern.forge.sdk.models import Organization, Step, StepStatus -from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskStatus +from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus @@ -1547,81 +1547,7 @@ class ForgeAgent: ) return - screenshot_url = None - recording_url = None - latest_action_screenshot_urls: list[str] | None = None - downloaded_file_urls: list[str] | None = None - - # get the artifact of the screenshot and get the screenshot_url - screenshot_artifact = await app.DATABASE.get_artifact( - task_id=task.task_id, - step_id=last_step.step_id, - artifact_type=ArtifactType.SCREENSHOT_FINAL, - organization_id=task.organization_id, - ) - if screenshot_artifact: - screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact) - - recording_artifact = await app.DATABASE.get_artifact( - task_id=task.task_id, - step_id=last_step.step_id, - artifact_type=ArtifactType.RECORDING, - organization_id=task.organization_id, - ) - if recording_artifact: - recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact) - - # get the artifact of the last TASK_RESPONSE_ACTION_SCREENSHOT_COUNT screenshots and get the screenshot_url - latest_action_screenshot_artifacts = await app.DATABASE.get_latest_n_artifacts( - task_id=task.task_id, - organization_id=task.organization_id, - artifact_types=[ArtifactType.SCREENSHOT_ACTION], - n=settings.TASK_RESPONSE_ACTION_SCREENSHOT_COUNT, - ) - if latest_action_screenshot_artifacts: - latest_action_screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links( - latest_action_screenshot_artifacts - ) - else: - LOG.error("Failed to get latest action screenshots") - - if task.organization_id: - try: - async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): - downloaded_file_urls = await app.STORAGE.get_downloaded_files( - organization_id=task.organization_id, task_id=task.task_id, workflow_run_id=task.workflow_run_id - ) - except asyncio.TimeoutError: - LOG.warning( - "Timeout to get downloaded files", - task_id=task.task_id, - workflow_run_id=task.workflow_run_id, - ) - except Exception: - LOG.warning( - "Failed to get downloaded files", - exc_info=True, - task_id=task.task_id, - workflow_run_id=task.workflow_run_id, - ) - - # get the latest task from the db to get the latest status, extracted_information, and failure_reason - task_from_db = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id) - if not task_from_db: - LOG.error("Failed to get task from db when sending task response") - raise TaskNotFound(task_id=task.task_id) - - task = task_from_db - task_response = task.to_task_response( - action_screenshot_urls=latest_action_screenshot_urls, - screenshot_url=screenshot_url, - recording_url=recording_url, - downloaded_file_urls=downloaded_file_urls, - ) - - if not task.webhook_callback_url: - LOG.info("Task has no webhook callback url. Not sending task response") - return + task_response = await self.build_task_response(task=task, last_step=last_step) # send task_response to the webhook callback url timestamp = str(int(datetime.utcnow().timestamp())) @@ -1664,6 +1590,107 @@ class ForgeAgent: except Exception as e: raise FailedToSendWebhook(task_id=task.task_id) from e + async def build_task_response( + self, + task: Task, + last_step: Step | None = None, + failure_reason: str | None = None, + need_browser_log: bool = False, + ) -> TaskResponse: + # no last step means the task didn't start, so we don't have any other artifacts + if last_step is None: + return task.to_task_response( + failure_reason=failure_reason, + ) + + screenshot_url = None + recording_url = None + browser_console_log_url: str | None = None + latest_action_screenshot_urls: list[str] | None = None + downloaded_file_urls: list[str] | None = None + + # get the artifact of the screenshot and get the screenshot_url + screenshot_artifact = await app.DATABASE.get_artifact( + task_id=task.task_id, + step_id=last_step.step_id, + artifact_type=ArtifactType.SCREENSHOT_FINAL, + organization_id=task.organization_id, + ) + if screenshot_artifact: + screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact) + + recording_artifact = await app.DATABASE.get_artifact( + task_id=task.task_id, + step_id=last_step.step_id, + artifact_type=ArtifactType.RECORDING, + organization_id=task.organization_id, + ) + if recording_artifact: + recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact) + + # get the artifact of the last TASK_RESPONSE_ACTION_SCREENSHOT_COUNT screenshots and get the screenshot_url + latest_action_screenshot_artifacts = await app.DATABASE.get_latest_n_artifacts( + task_id=task.task_id, + organization_id=task.organization_id, + artifact_types=[ArtifactType.SCREENSHOT_ACTION], + n=settings.TASK_RESPONSE_ACTION_SCREENSHOT_COUNT, + ) + if latest_action_screenshot_artifacts: + latest_action_screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links( + latest_action_screenshot_artifacts + ) + else: + LOG.error( + "Failed to get latest action screenshots", + task_id=task.task_id, + task_status=task.status, + ) + + if task.organization_id: + try: + async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): + downloaded_file_urls = await app.STORAGE.get_downloaded_files( + organization_id=task.organization_id, task_id=task.task_id, workflow_run_id=task.workflow_run_id + ) + except asyncio.TimeoutError: + LOG.warning( + "Timeout to get downloaded files", + task_id=task.task_id, + workflow_run_id=task.workflow_run_id, + ) + except Exception: + LOG.warning( + "Failed to get downloaded files", + exc_info=True, + task_id=task.task_id, + workflow_run_id=task.workflow_run_id, + ) + + if need_browser_log: + browser_console_log = await app.DATABASE.get_latest_artifact( + task_id=task.task_id, + artifact_types=[ArtifactType.BROWSER_CONSOLE_LOG], + organization_id=task.organization_id, + ) + if browser_console_log: + browser_console_log_url = await app.ARTIFACT_MANAGER.get_share_link(browser_console_log) + + # get the latest task from the db to get the latest status, extracted_information, and failure_reason + task_from_db = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id) + if not task_from_db: + LOG.error("Failed to get task from db when sending task response") + raise TaskNotFound(task_id=task.task_id) + + task = task_from_db + return task.to_task_response( + action_screenshot_urls=latest_action_screenshot_urls, + screenshot_url=screenshot_url, + recording_url=recording_url, + downloaded_file_urls=downloaded_file_urls, + browser_console_log_url=browser_console_log_url, + failure_reason=failure_reason, + ) + async def cleanup_browser_and_create_artifacts( self, close_browser_on_completion: bool, last_step: Step, task: Task ) -> None: diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 2f87ed88..db37a2d3 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -28,7 +28,7 @@ from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.aws import aws_client from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError -from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType +from skyvern.forge.sdk.artifact.models import Artifact from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.permissions.permission_checker_factory import PermissionCheckerFactory from skyvern.forge.sdk.core.security import generate_skyvern_signature @@ -255,53 +255,7 @@ async def get_task( # get latest step latest_step = await app.DATABASE.get_latest_step(task_id, organization_id=current_org.organization_id) if not latest_step: - return task_obj.to_task_response() - - screenshot_url = None - # todo (kerem): only access artifacts through the artifact manager instead of db - screenshot_artifact = await app.DATABASE.get_latest_artifact( - task_id=task_obj.task_id, - step_id=latest_step.step_id, - artifact_types=[ArtifactType.SCREENSHOT_ACTION, ArtifactType.SCREENSHOT_FINAL], - organization_id=current_org.organization_id, - ) - if screenshot_artifact: - screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact) - - recording_artifact = await app.DATABASE.get_latest_artifact( - task_id=task_obj.task_id, - artifact_types=[ArtifactType.RECORDING], - organization_id=current_org.organization_id, - ) - recording_url = None - if recording_artifact: - recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact) - - browser_console_log = await app.DATABASE.get_latest_artifact( - task_id=task_obj.task_id, - artifact_types=[ArtifactType.BROWSER_CONSOLE_LOG], - organization_id=current_org.organization_id, - ) - browser_console_log_url = None - if browser_console_log: - browser_console_log_url = await app.ARTIFACT_MANAGER.get_share_link(browser_console_log) - - # get the artifact of the last screenshot and get the screenshot_url - latest_action_screenshot_artifacts = await app.DATABASE.get_latest_n_artifacts( - task_id=task_obj.task_id, - organization_id=task_obj.organization_id, - artifact_types=[ArtifactType.SCREENSHOT_ACTION], - n=settings.TASK_RESPONSE_ACTION_SCREENSHOT_COUNT, - ) - latest_action_screenshot_urls: list[str] | None = None - if latest_action_screenshot_artifacts: - latest_action_screenshot_urls = await app.ARTIFACT_MANAGER.get_share_links(latest_action_screenshot_artifacts) - elif task_obj.status in [TaskStatus.terminated, TaskStatus.completed]: - LOG.error( - "Failed to get latest action screenshots in task response", - task_id=task_id, - task_status=task_obj.status, - ) + return await app.agent.build_task_response(task=task_obj) failure_reason: str | None = None if task_obj.status == TaskStatus.failed and (latest_step.output or task_obj.failure_reason): @@ -319,13 +273,8 @@ async def get_task( if len(action_results_string) > 0: failure_reason += "(Exceptions: " + str(action_results_string) + ")" - - return task_obj.to_task_response( - action_screenshot_urls=latest_action_screenshot_urls, - screenshot_url=screenshot_url, - recording_url=recording_url, - browser_console_log_url=browser_console_log_url, - failure_reason=failure_reason, + return await app.agent.build_task_response( + task=task_obj, last_step=latest_step, failure_reason=failure_reason, need_browser_log=True ) @@ -381,12 +330,12 @@ async def retry_webhook( # get latest step latest_step = await app.DATABASE.get_latest_step(task_id, organization_id=current_org.organization_id) if not latest_step: - return task_obj.to_task_response() + return await app.agent.build_task_response(task=task_obj) # retry the webhook await app.agent.execute_task_webhook(task=task_obj, last_step=latest_step, api_key=x_api_key) - return task_obj.to_task_response() + return await app.agent.build_task_response(task=task_obj, last_step=latest_step) @base_router.get("/internal/tasks/{task_id}", response_model=list[Task]) @@ -454,7 +403,7 @@ async def get_agent_tasks( order_by_column=sort, application=application, ) - return ORJSONResponse([task.to_task_response().model_dump() for task in tasks]) + return ORJSONResponse([(await app.agent.build_task_response(task=task)).model_dump() for task in tasks]) @base_router.get("/internal/tasks", tags=["agent"], response_model=list[Task])