From 393b3fba878bf1bb29086ca0a6410cf830becde2 Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Fri, 19 Apr 2024 00:12:02 -0700 Subject: [PATCH] Fail task if FailedToNavigateToUrl (#209) --- skyvern/exceptions.py | 2 + skyvern/forge/agent.py | 141 ++++++++++++++++++++++++++--------------- 2 files changed, 91 insertions(+), 52 deletions(-) diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index 2be03252..d60cb993 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -126,6 +126,8 @@ class WorkflowParameterNotFound(SkyvernException): class FailedToNavigateToUrl(SkyvernException): def __init__(self, url: str, error_message: str) -> None: + self.url = url + self.error_message = error_message super().__init__(f"Failed to navigate to url {url}. Error message: {error_message}") diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index fa7ac745..c571f66f 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -12,6 +12,7 @@ from playwright.async_api import Page from skyvern import analytics from skyvern.exceptions import ( BrowserStateMissingPage, + FailedToNavigateToUrl, FailedToSendWebhook, InvalidWorkflowTaskURLState, MissingBrowserStatePage, @@ -309,6 +310,28 @@ class ForgeAgent(Agent): step=step, ) return step, detailed_output, next_step + except FailedToNavigateToUrl as e: + # Fail the task if we can't navigate to the URL and send the response + LOG.error( + "Failed to navigate to URL, marking task as failed, and sending webhook response", + task_id=task.task_id, + step_id=step.step_id, + url=e.url, + error_message=e.error_message, + ) + task = await self.update_task( + task, + status=TaskStatus.failed, + failure_reason=f"Failed to navigate to URL. URL:{e.url}, Error:{e.error_message}", + ) + await self.send_task_response( + task=task, + last_step=step, + api_key=api_key, + close_browser_on_completion=close_browser_on_completion, + skip_artifacts=True, + ) + return step, detailed_output, next_step async def agent_step( self, @@ -711,6 +734,7 @@ class ForgeAgent(Agent): last_step: Step, api_key: str | None = None, close_browser_on_completion: bool = True, + skip_artifacts: bool = False, ) -> None: """ send the task response to the webhook callback url @@ -727,6 +751,14 @@ class ForgeAgent(Agent): task = refreshed_task # log the task status as an event analytics.capture("skyvern-oss-agent-task-status", {"status": task.status}) + # We skip the artifacts and send the webhook response directly only when there is an issue with the browser + # initialization. In this case, we don't have any artifacts to send and we can't take final screenshots etc. + # since the browser is not initialized properly or the proxy is not working. + if skip_artifacts: + await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_task(task.task_id) + await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key) + return + # Take one last screenshot and create an artifact before closing the browser to see the final state browser_state: BrowserState = await app.BROWSER_MANAGER.get_or_create_for_task(task) await browser_state.get_or_create_page() @@ -760,7 +792,9 @@ class ForgeAgent(Agent): await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key) - async def execute_task_webhook(self, task: Task, last_step: Step, api_key: str | None) -> None: + async def execute_task_webhook( + self, task: Task, last_step: Step, api_key: str | None, skip_artifacts: bool = False + ) -> None: if not api_key: LOG.warning( "Request has no api key. Not sending task response", @@ -775,65 +809,68 @@ class ForgeAgent(Agent): ) return - # get the artifact of the screenshot and get the screenshot_url - screenshot_artifact = await app.DATABASE.get_artifact( - task_id=task.task_id, - step_id=last_step.step_id, - artifact_type=ArtifactType.SCREENSHOT_FINAL, - organization_id=task.organization_id, - ) - screenshot_url = None - if screenshot_artifact: - screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact) + if not skip_artifacts: + # get the artifact of the screenshot and get the screenshot_url + screenshot_artifact = await app.DATABASE.get_artifact( + task_id=task.task_id, + step_id=last_step.step_id, + artifact_type=ArtifactType.SCREENSHOT_FINAL, + organization_id=task.organization_id, + ) + screenshot_url = None + if screenshot_artifact: + screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact) - recording_artifact = await app.DATABASE.get_artifact( - task_id=task.task_id, - step_id=last_step.step_id, - artifact_type=ArtifactType.RECORDING, - organization_id=task.organization_id, - ) - recording_url = None - if recording_artifact: - recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact) + recording_artifact = await app.DATABASE.get_artifact( + task_id=task.task_id, + step_id=last_step.step_id, + artifact_type=ArtifactType.RECORDING, + organization_id=task.organization_id, + ) + recording_url = None + if recording_artifact: + recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact) - # get the artifact of the last TASK_RESPONSE_ACTION_SCREENSHOT_COUNT screenshots and get the screenshot_url - latest_action_screenshot_artifacts = await app.DATABASE.get_latest_n_artifacts( - task_id=task.task_id, - organization_id=task.organization_id, - artifact_types=[ArtifactType.SCREENSHOT_ACTION], - n=SettingsManager.get_settings().TASK_RESPONSE_ACTION_SCREENSHOT_COUNT, - ) - latest_action_screenshot_urls = [] - if latest_action_screenshot_artifacts: - for artifact in latest_action_screenshot_artifacts: - screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(artifact) - if screenshot_url: - latest_action_screenshot_urls.append(screenshot_url) - else: - LOG.error( - "Failed to get share link for action screenshot", - artifact_id=artifact.artifact_id, - ) + # get the artifact of the last TASK_RESPONSE_ACTION_SCREENSHOT_COUNT screenshots and get the screenshot_url + latest_action_screenshot_artifacts = await app.DATABASE.get_latest_n_artifacts( + task_id=task.task_id, + organization_id=task.organization_id, + artifact_types=[ArtifactType.SCREENSHOT_ACTION], + n=SettingsManager.get_settings().TASK_RESPONSE_ACTION_SCREENSHOT_COUNT, + ) + latest_action_screenshot_urls = [] + if latest_action_screenshot_artifacts: + for artifact in latest_action_screenshot_artifacts: + screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(artifact) + if screenshot_url: + latest_action_screenshot_urls.append(screenshot_url) + else: + LOG.error( + "Failed to get share link for action screenshot", + artifact_id=artifact.artifact_id, + ) + else: + LOG.error("Failed to get latest action screenshots") + + # get the latest task from the db to get the latest status, extracted_information, and failure_reason + task_from_db = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id) + if not task_from_db: + LOG.error("Failed to get task from db when sending task response") + raise TaskNotFound(task_id=task.task_id) + + task = task_from_db + task_response = task.to_task_response( + action_screenshot_urls=latest_action_screenshot_urls, + screenshot_url=screenshot_url, + recording_url=recording_url, + ) else: - LOG.error("Failed to get latest action screenshots") + task_response = task.to_task_response() - # get the latest task from the db to get the latest status, extracted_information, and failure_reason - task_from_db = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id) - if not task_from_db: - LOG.error("Failed to get task from db when sending task response") - raise TaskNotFound(task_id=task.task_id) - - task = task_from_db if not task.webhook_callback_url: LOG.info("Task has no webhook callback url. Not sending task response") return - task_response = task.to_task_response( - action_screenshot_urls=latest_action_screenshot_urls, - screenshot_url=screenshot_url, - recording_url=recording_url, - ) - # send task_response to the webhook callback url # TODO: use async requests (httpx) timestamp = str(int(datetime.utcnow().timestamp()))