From ad0bd0b4f52f71f566c1ffdf8647b782b95d9932 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Fri, 1 Nov 2024 15:13:41 -0700 Subject: [PATCH] fix task workflow cancel issue (#1111) --- skyvern/exceptions.py | 6 +- skyvern/forge/agent.py | 161 +++++++++++++-------- skyvern/forge/sdk/workflow/models/block.py | 17 ++- skyvern/forge/sdk/workflow/service.py | 47 +++--- skyvern/webeye/browser_factory.py | 39 +++-- skyvern/webeye/browser_manager.py | 45 ++++-- 6 files changed, 204 insertions(+), 111 deletions(-) diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index 917ae630..61b52720 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -77,8 +77,10 @@ class ImaginaryFileUrl(SkyvernException): class MissingBrowserState(SkyvernException): - def __init__(self, task_id: str) -> None: - super().__init__(f"Browser state for task {task_id} is missing.") + def __init__(self, task_id: str | None = None, workflow_run_id: str | None = None) -> None: + task_str = f"task_id={task_id}" if task_id else "" + workflow_run_str = f"workflow_run_id={workflow_run_id}" if workflow_run_id else "" + super().__init__(f"Browser state for {task_str} {workflow_run_str} is missing.") class MissingBrowserStatePage(SkyvernException): diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index f19dcb0e..1eb9e8b2 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -7,7 +7,7 @@ from asyncio.exceptions import CancelledError from datetime import datetime from typing import Any, Tuple -import requests +import httpx import structlog from playwright._impl._errors import TargetClosedError from playwright.async_api import Page @@ -22,6 +22,7 @@ from skyvern.exceptions import ( FailedToTakeScreenshot, InvalidTaskStatusTransition, InvalidWorkflowTaskURLState, + MissingBrowserState, MissingBrowserStatePage, SkyvernException, StepTerminationError, @@ -113,7 +114,10 @@ class ForgeAgent: task_url = task_block.url if task_url is None: - browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(workflow_run=workflow_run) + browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id=workflow_run.workflow_run_id) + if browser_state is None: + raise MissingBrowserState(workflow_run_id=workflow_run.workflow_run_id) + working_page = await browser_state.get_working_page() if not working_page: LOG.error( @@ -239,6 +243,12 @@ class ForgeAgent: ) # We don't send task response for now as the task is canceled # TODO: shall we send task response here? + await self.clean_up_task( + task=task, + last_step=step, + api_key=api_key, + need_call_webhook=False, + ) return step, None, None context = skyvern_context.current() @@ -305,7 +315,7 @@ class ForgeAgent: task, status=TaskStatus.completed, ) - await self.send_task_response( + await self.clean_up_task( task=completed_task, last_step=last_step, api_key=api_key, @@ -321,7 +331,7 @@ class ForgeAgent: next_step = maybe_next_step retry = True else: - await self.send_task_response( + await self.clean_up_task( task=task, last_step=step, api_key=api_key, @@ -330,7 +340,7 @@ class ForgeAgent: await self.async_operation_pool.remove_task(task.task_id) return step, detailed_output, None elif step.status == StepStatus.completed: - # TODO (kerem): keep the task object uptodate at all times so that send_task_response can just use it + # TODO (kerem): keep the task object uptodate at all times so that clean_up_task can just use it ( is_task_completed, maybe_last_step, @@ -338,7 +348,7 @@ class ForgeAgent: ) = await self.handle_completed_step(organization, task, step, await browser_state.get_working_page()) if is_task_completed is not None and maybe_last_step: last_step = maybe_last_step - await self.send_task_response( + await self.clean_up_task( task=task, last_step=last_step, api_key=api_key, @@ -411,12 +421,18 @@ class ForgeAgent: ) is_task_marked_as_failed = await self.fail_task(task, step, e.message) if is_task_marked_as_failed: - await self.send_task_response( + await self.clean_up_task( task=task, last_step=step, api_key=api_key, close_browser_on_completion=close_browser_on_completion, ) + else: + LOG.warning( + "Task isn't marked as failed, after step termination. NOT clean up the task", + task_id=task.task_id, + step_id=step.step_id, + ) return step, detailed_output, None except FailedToSendWebhook: LOG.exception( @@ -439,12 +455,18 @@ class ForgeAgent: failure_reason = f"Failed to navigate to URL. URL:{e.url}, Error:{e.error_message}" is_task_marked_as_failed = await self.fail_task(task, step, failure_reason) if is_task_marked_as_failed: - await self.send_task_response( + await self.clean_up_task( task=task, last_step=step, api_key=api_key, close_browser_on_completion=close_browser_on_completion, - skip_artifacts=True, + need_final_screenshot=False, + ) + else: + LOG.warning( + "Task isn't marked as failed, after navigation failure. NOT clean up the task", + task_id=task.task_id, + step_id=step.step_id, ) return step, detailed_output, next_step except TaskAlreadyCanceled: @@ -452,7 +474,12 @@ class ForgeAgent: "Task is already canceled, stopping execution", task_id=task.task_id, ) - # We don't send task response for now as the task is canceled + await self.clean_up_task( + task=task, + last_step=step, + api_key=api_key, + need_call_webhook=False, + ) return step, detailed_output, None except InvalidTaskStatusTransition: LOG.warning( @@ -461,6 +488,12 @@ class ForgeAgent: step_id=step.step_id, ) # TODO: shall we send task response here? + await self.clean_up_task( + task=task, + last_step=step, + api_key=api_key, + need_call_webhook=False, + ) return step, detailed_output, None except Exception as e: LOG.exception( @@ -475,12 +508,18 @@ class ForgeAgent: is_task_marked_as_failed = await self.fail_task(task, step, failure_reason) if is_task_marked_as_failed: - await self.send_task_response( + await self.clean_up_task( task=task, last_step=step, api_key=api_key, close_browser_on_completion=close_browser_on_completion, ) + else: + LOG.warning( + "Task isn't marked as failed, after unexpected exception. NOT clean up the task", + task_id=task.task_id, + step_id=step.step_id, + ) return step, detailed_output, None async def fail_task(self, task: Task, step: Step | None, reason: str | None) -> bool: @@ -1297,14 +1336,14 @@ class ForgeAgent: ) return None - async def send_task_response( + async def clean_up_task( self, task: Task, last_step: Step, api_key: str | None = None, + need_call_webhook: bool = True, close_browser_on_completion: bool = True, - skip_artifacts: bool = False, - skip_cleanup: bool = False, + need_final_screenshot: bool = True, ) -> None: """ send the task response to the webhook callback url @@ -1313,53 +1352,48 @@ class ForgeAgent: try: refreshed_task = await app.DATABASE.get_task(task_id=task.task_id, organization_id=task.organization_id) if not refreshed_task: - LOG.error("Failed to get task from db when sending task response") + LOG.error("Failed to get task from db when clean up task", task_id=task.task_id) raise TaskNotFound(task_id=task.task_id) except Exception as e: - LOG.error( - "Failed to get task from db when sending task response", + LOG.exception( + "Failed to get task from db when clean up task", task_id=task.task_id, - error=e, ) raise TaskNotFound(task_id=task.task_id) from e task = refreshed_task - if skip_cleanup: - await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key) - return # log the task status as an event analytics.capture("skyvern-oss-agent-task-status", {"status": task.status}) - # We skip the artifacts and send the webhook response directly only when there is an issue with the browser - # initialization. In this case, we don't have any artifacts to send and we can't take final screenshots etc. - # since the browser is not initialized properly or the proxy is not working. - if skip_artifacts: - await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_task(task.task_id) - await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key) - return + if need_final_screenshot: + # Take one last screenshot and create an artifact before closing the browser to see the final state + # We don't need the artifacts and send the webhook response directly only when there is an issue with the browser + # initialization. In this case, we don't have any artifacts to send and we can't take final screenshots etc. + # since the browser is not initialized properly or the proxy is not working. - # Take one last screenshot and create an artifact before closing the browser to see the final state - browser_state: BrowserState = await app.BROWSER_MANAGER.get_or_create_for_task(task) - await browser_state.get_or_create_page() - try: - screenshot = await browser_state.take_screenshot(full_page=True) - await app.ARTIFACT_MANAGER.create_artifact( - step=last_step, - artifact_type=ArtifactType.SCREENSHOT_FINAL, - data=screenshot, - ) - except TargetClosedError: - LOG.warning( - "Failed to take screenshot before sending task response, page is closed", - task_id=task.task_id, - step_id=last_step.step_id, - ) - except Exception: - LOG.exception( - "Failed to take screenshot before sending task response", - task_id=task.task_id, - step_id=last_step.step_id, - ) + browser_state = app.BROWSER_MANAGER.get_for_task(task.task_id) + if browser_state is not None and await browser_state.get_working_page() is not None: + try: + screenshot = await browser_state.take_screenshot(full_page=True) + await app.ARTIFACT_MANAGER.create_artifact( + step=last_step, + artifact_type=ArtifactType.SCREENSHOT_FINAL, + data=screenshot, + ) + except TargetClosedError: + LOG.warning( + "Failed to take screenshot before sending task response, page is closed", + task_id=task.task_id, + step_id=last_step.step_id, + ) + except Exception: + LOG.exception( + "Failed to take screenshot before sending task response", + task_id=task.task_id, + step_id=last_step.step_id, + ) + # if it's a task block from workflow run, + # we don't need to close the browser, save browser artifacts, or call webhook if task.workflow_run_id: LOG.info( "Task is part of a workflow run, not sending a webhook response", @@ -1373,14 +1407,14 @@ class ForgeAgent: # Wait for all tasks to complete before generating the links for the artifacts await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_task(task.task_id) - await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key) + if need_call_webhook: + await self.execute_task_webhook(task=task, last_step=last_step, api_key=api_key) async def execute_task_webhook( self, task: Task, last_step: Step, api_key: str | None, - skip_artifacts: bool = False, ) -> None: if not api_key: LOG.warning( @@ -1396,14 +1430,14 @@ class ForgeAgent: ) return - if not skip_artifacts: - # get the artifact of the screenshot and get the screenshot_url - screenshot_artifact = await app.DATABASE.get_artifact( - task_id=task.task_id, - step_id=last_step.step_id, - artifact_type=ArtifactType.SCREENSHOT_FINAL, - organization_id=task.organization_id, - ) + # get the artifact of the screenshot and get the screenshot_url + screenshot_artifact = await app.DATABASE.get_artifact( + task_id=task.task_id, + step_id=last_step.step_id, + artifact_type=ArtifactType.SCREENSHOT_FINAL, + organization_id=task.organization_id, + ) + if screenshot_artifact is None: screenshot_url = None if screenshot_artifact: screenshot_url = await app.ARTIFACT_MANAGER.get_share_link(screenshot_artifact) @@ -1453,7 +1487,6 @@ class ForgeAgent: return # send task_response to the webhook callback url - # TODO: use async requests (httpx) timestamp = str(int(datetime.utcnow().timestamp())) payload = task_response.model_dump_json(exclude={"request"}) signature = generate_skyvern_signature( @@ -1473,8 +1506,10 @@ class ForgeAgent: headers=headers, ) try: - resp = requests.post(task.webhook_callback_url, data=payload, headers=headers) - if resp.ok: + resp = await httpx.AsyncClient().post( + task.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) + ) + if resp.status_code == 200: LOG.info( "Webhook sent successfully", task_id=task.task_id, diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 5a09243d..e73f8dbd 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -24,6 +24,7 @@ from skyvern.exceptions import ( ContextParameterValueNotFound, DisabledBlockExecutionError, FailedToNavigateToUrl, + MissingBrowserState, MissingBrowserStatePage, TaskNotFound, UnexpectedTaskStatus, @@ -53,6 +54,7 @@ from skyvern.forge.sdk.workflow.models.parameter import ( OutputParameter, WorkflowParameter, ) +from skyvern.webeye.browser_factory import BrowserState LOG = structlog.get_logger() @@ -272,6 +274,7 @@ class TaskBlock(Block): # non-retryable terminations while will_retry: task_order, task_retry = await self.get_task_order(workflow_run_id, current_retry) + is_first_task = task_order == 0 task, step = await app.agent.create_task_and_step_from_block( task_block=self, workflow=workflow, @@ -283,9 +286,17 @@ class TaskBlock(Block): organization = await app.DATABASE.get_organization(organization_id=workflow.organization_id) if not organization: raise Exception(f"Organization is missing organization_id={workflow.organization_id}") - browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( - workflow_run=workflow_run, url=self.url - ) + + browser_state: BrowserState | None = None + if is_first_task: + browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( + workflow_run=workflow_run, url=self.url + ) + else: + browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id=workflow_run_id) + if browser_state is None: + raise MissingBrowserState(task_id=task.task_id, workflow_run_id=workflow_run_id) + working_page = await browser_state.get_working_page() if not working_page: LOG.error( diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 27f4cb4f..07ef3f79 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -2,7 +2,7 @@ import json from datetime import datetime from typing import Any -import requests +import httpx import structlog from skyvern import analytics @@ -183,15 +183,17 @@ class WorkflowService: ) # Execute workflow blocks blocks = workflow.workflow_definition.blocks + blocks_cnt = len(blocks) block_result = None for block_idx, block in enumerate(blocks): + is_last_block = block_idx + 1 == blocks_cnt try: parameters = block.get_all_parameters(workflow_run_id) await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run( workflow_run_id, parameters, organization ) LOG.info( - f"Executing root block {block.block_type} at index {block_idx} for workflow run {workflow_run_id}", + f"Executing root block {block.block_type} at index {block_idx}/{blocks_cnt -1} for workflow run {workflow_run_id}", block_type=block.block_type, workflow_run_id=workflow_run.workflow_run_id, block_idx=block_idx, @@ -199,7 +201,7 @@ class WorkflowService: block_result = await block.execute_safe(workflow_run_id=workflow_run_id) if block_result.status == BlockStatus.canceled: LOG.info( - f"Block with type {block.block_type} at index {block_idx} was canceled for workflow run {workflow_run_id}, cancelling workflow run", + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} was canceled for workflow run {workflow_run_id}, cancelling workflow run", block_type=block.block_type, workflow_run_id=workflow_run.workflow_run_id, block_idx=block_idx, @@ -207,18 +209,24 @@ class WorkflowService: ) await self.mark_workflow_run_as_canceled(workflow_run_id=workflow_run.workflow_run_id) # We're not sending a webhook here because the workflow run is manually marked as canceled. + await self.clean_up_workflow( + workflow=workflow, + workflow_run=workflow_run, + api_key=api_key, + need_call_webhook=False, + ) return workflow_run elif block_result.status == BlockStatus.failed: LOG.error( - f"Block with type {block.block_type} at index {block_idx} failed for workflow run {workflow_run_id}", + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed for workflow run {workflow_run_id}", block_type=block.block_type, workflow_run_id=workflow_run.workflow_run_id, block_idx=block_idx, block_result=block_result, ) - if block.continue_on_failure: + if block.continue_on_failure and not is_last_block: LOG.warning( - f"Block with type {block.block_type} at index {block_idx} failed but will continue executing the workflow run {workflow_run_id}", + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed but will continue executing the workflow run {workflow_run_id}", block_type=block.block_type, workflow_run_id=workflow_run.workflow_run_id, block_idx=block_idx, @@ -227,7 +235,7 @@ class WorkflowService: ) else: await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) - await self.send_workflow_response( + await self.clean_up_workflow( workflow=workflow, workflow_run=workflow_run, api_key=api_key, @@ -235,15 +243,15 @@ class WorkflowService: return workflow_run elif block_result.status == BlockStatus.terminated: LOG.info( - f"Block with type {block.block_type} at index {block_idx} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated", + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated", block_type=block.block_type, workflow_run_id=workflow_run.workflow_run_id, block_idx=block_idx, block_result=block_result, ) - if block.continue_on_failure: + if block.continue_on_failure and not is_last_block: LOG.warning( - f"Block with type {block.block_type} at index {block_idx} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run", + f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run", block_type=block.block_type, workflow_run_id=workflow_run.workflow_run_id, block_idx=block_idx, @@ -252,7 +260,7 @@ class WorkflowService: ) else: await self.mark_workflow_run_as_terminated(workflow_run_id=workflow_run.workflow_run_id) - await self.send_workflow_response( + await self.clean_up_workflow( workflow=workflow, workflow_run=workflow_run, api_key=api_key, @@ -264,11 +272,11 @@ class WorkflowService: workflow_run_id=workflow_run.workflow_run_id, ) await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) - await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key) + await self.clean_up_workflow(workflow=workflow, workflow_run=workflow_run, api_key=api_key) return workflow_run await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id) - await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key) + await self.clean_up_workflow(workflow=workflow, workflow_run=workflow_run, api_key=api_key) return workflow_run async def create_workflow( @@ -707,12 +715,13 @@ class WorkflowService: outputs=outputs, ) - async def send_workflow_response( + async def clean_up_workflow( self, workflow: Workflow, workflow_run: WorkflowRun, api_key: str | None = None, close_browser_on_completion: bool = True, + need_call_webhook: bool = True, ) -> None: analytics.capture("skyvern-oss-agent-workflow-status", {"status": workflow_run.status}) tasks = await self.get_tasks_by_workflow_run_id(workflow_run.workflow_run_id) @@ -735,6 +744,9 @@ class WorkflowService: await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_tasks(all_workflow_task_ids) + if not need_call_webhook: + return + workflow_run_status_response = await self.build_workflow_run_status_response( workflow_permanent_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run.workflow_run_id, @@ -762,7 +774,6 @@ class WorkflowService: return # send webhook to the webhook callback url - # TODO: use async requests (httpx) timestamp = str(int(datetime.utcnow().timestamp())) payload = workflow_run_status_response.model_dump_json() signature = generate_skyvern_signature( @@ -783,8 +794,10 @@ class WorkflowService: headers=headers, ) try: - resp = requests.post(workflow_run.webhook_callback_url, data=payload, headers=headers) - if resp.ok: + resp = await httpx.AsyncClient().post( + url=workflow_run.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) + ) + if resp.status_code == 200: LOG.info( "Webhook sent successfully", workflow_id=workflow.workflow_id, diff --git a/skyvern/webeye/browser_factory.py b/skyvern/webeye/browser_factory.py index 3eca0499..04024168 100644 --- a/skyvern/webeye/browser_factory.py +++ b/skyvern/webeye/browser_factory.py @@ -10,7 +10,7 @@ from typing import Any, Awaitable, Callable, Protocol import aiofiles import structlog -from playwright.async_api import BrowserContext, ConsoleMessage, Error, Page, Playwright, async_playwright +from playwright.async_api import BrowserContext, ConsoleMessage, Error, Page, Playwright from pydantic import BaseModel, PrivateAttr from skyvern.config import settings @@ -41,10 +41,21 @@ def get_download_dir(workflow_run_id: str | None, task_id: str | None) -> str: return download_dir -def set_browser_console_log(browser_context: BrowserContext, browser_artifacts: BrowserArtifacts) -> str: +def set_browser_console_log(browser_context: BrowserContext, browser_artifacts: BrowserArtifacts) -> None: if browser_artifacts.browser_console_log_path is None: - log_path = f"{settings.LOG_PATH}/{datetime.utcnow().strftime('%Y-%m-%d')}/{uuid.uuid4()}.log" - os.makedirs(os.path.dirname(log_path), exist_ok=True) + try: + log_path = f"{settings.LOG_PATH}/{datetime.utcnow().strftime('%Y-%m-%d')}/{uuid.uuid4()}.log" + os.makedirs(os.path.dirname(log_path), exist_ok=True) + # create the empty log file + with open(log_path, "w") as _: + pass + except Exception: + LOG.warning( + "Failed to create browser log file", + log_path=log_path, + exc_info=True, + ) + return browser_artifacts.browser_console_log_path = log_path async def browser_console_log(msg: ConsoleMessage) -> None: @@ -55,7 +66,6 @@ def set_browser_console_log(browser_context: BrowserContext, browser_artifacts: LOG.info("browser console log is saved", log_path=browser_artifacts.browser_console_log_path) browser_context.on("console", browser_console_log) - return browser_artifacts.browser_console_log_path class BrowserContextCreator(Protocol): @@ -128,6 +138,7 @@ class BrowserContextFactory: cls, playwright: Playwright, **kwargs: Any ) -> tuple[BrowserContext, BrowserArtifacts, BrowserCleanupFunc]: browser_type = SettingsManager.get_settings().BROWSER_TYPE + browser_context: BrowserContext | None = None try: creator = cls._creators.get(browser_type) if not creator: @@ -135,9 +146,15 @@ class BrowserContextFactory: browser_context, browser_artifacts, cleanup_func = await creator(playwright, **kwargs) set_browser_console_log(browser_context=browser_context, browser_artifacts=browser_artifacts) return browser_context, browser_artifacts, cleanup_func - except UnknownBrowserType as e: - raise e except Exception as e: + if browser_context is not None: + # FIXME: sometimes it can't close the browser context? + LOG.error("unexpected error happens after created browser context, going to close the context") + await browser_context.close() + + if isinstance(e, UnknownBrowserType): + raise e + raise UnknownErrorWhileCreatingBrowserContext(browser_type, e) from e @classmethod @@ -217,7 +234,7 @@ class BrowserState: def __init__( self, - pw: Playwright | None = None, + pw: Playwright, browser_context: BrowserContext | None = None, page: Page | None = None, browser_artifacts: BrowserArtifacts = BrowserArtifacts(), @@ -253,10 +270,6 @@ class BrowserState: workflow_run_id: str | None = None, organization_id: str | None = None, ) -> None: - if self.pw is None: - LOG.info("Starting playwright") - self.pw = await async_playwright().start() - LOG.info("playwright is started") if self.browser_context is None: LOG.info("creating browser context") ( @@ -276,8 +289,6 @@ class BrowserState: self.browser_cleanup = browser_cleanup LOG.info("browser context is created") - assert self.browser_context is not None - if await self.get_working_page() is None: success = False retries = 0 diff --git a/skyvern/webeye/browser_manager.py b/skyvern/webeye/browser_manager.py index 5f54a43d..c6c6b1c0 100644 --- a/skyvern/webeye/browser_manager.py +++ b/skyvern/webeye/browser_manager.py @@ -53,17 +53,25 @@ class BrowserManager: browser_cleanup=browser_cleanup, ) - async def get_or_create_for_task(self, task: Task) -> BrowserState: - if task.task_id in self.pages: - return self.pages[task.task_id] - elif task.workflow_run_id in self.pages: + def get_for_task(self, task_id: str, workflow_run_id: str | None = None) -> BrowserState | None: + if task_id in self.pages: + return self.pages[task_id] + + if workflow_run_id and workflow_run_id in self.pages: LOG.info( "Browser state for task not found. Using browser state for workflow run", - task_id=task.task_id, - workflow_run_id=task.workflow_run_id, + task_id=task_id, + workflow_run_id=workflow_run_id, ) - self.pages[task.task_id] = self.pages[task.workflow_run_id] - return self.pages[task.task_id] + self.pages[task_id] = self.pages[workflow_run_id] + return self.pages[task_id] + + return None + + async def get_or_create_for_task(self, task: Task) -> BrowserState: + browser_state = self.get_for_task(task_id=task.task_id, workflow_run_id=task.workflow_run_id) + if browser_state is not None: + return browser_state LOG.info("Creating browser state for task", task_id=task.task_id) browser_state = await self._create_browser_state( @@ -85,8 +93,10 @@ class BrowserManager: return browser_state async def get_or_create_for_workflow_run(self, workflow_run: WorkflowRun, url: str | None = None) -> BrowserState: - if workflow_run.workflow_run_id in self.pages: - return self.pages[workflow_run.workflow_run_id] + browser_state = self.get_for_workflow_run(workflow_run_id=workflow_run.workflow_run_id) + if browser_state is not None: + return browser_state + LOG.info( "Creating browser state for workflow run", workflow_run_id=workflow_run.workflow_run_id, @@ -110,7 +120,7 @@ class BrowserManager: self.pages[workflow_run.workflow_run_id] = browser_state return browser_state - async def get_for_workflow_run(self, workflow_run_id: str) -> BrowserState | None: + def get_for_workflow_run(self, workflow_run_id: str) -> BrowserState | None: if workflow_run_id in self.pages: return self.pages[workflow_run_id] return None @@ -230,7 +240,18 @@ class BrowserManager: await browser_state_to_close.close(close_browser_on_completion=close_browser_on_completion) for task_id in task_ids: - self.pages.pop(task_id, None) + task_browser_state = self.pages.pop(task_id, None) + if task_browser_state is None: + continue + try: + await task_browser_state.close() + except Exception: + LOG.info( + "Failed to close the browser state from the task block, might because it's already closed.", + exc_info=True, + task_id=task_id, + workflow_run_id=workflow_run_id, + ) LOG.info("Workflow run is cleaned up") return browser_state_to_close