diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 5c3f717f..7cec7699 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -105,17 +105,18 @@ class ForgeAgent: task_url = task_block.url if task_url is None: browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(workflow_run=workflow_run) - if not browser_state.page: + working_page = await browser_state.get_working_page() + if not working_page: LOG.error( "BrowserState has no page", workflow_run_id=workflow_run.workflow_run_id, ) raise MissingBrowserStatePage(workflow_run_id=workflow_run.workflow_run_id) - if browser_state.page.url == "about:blank": + if working_page.url == "about:blank": raise InvalidWorkflowTaskURLState(workflow_run.workflow_run_id) - task_url = browser_state.page.url + task_url = working_page.url task = await app.DATABASE.create_task( url=task_url, @@ -258,8 +259,8 @@ class ForgeAgent: detailed_output, ) = await self._initialize_execution_state(task, step, workflow_run) - if browser_state.page: - self.register_async_operations(organization, task, browser_state.page) + if page := await browser_state.get_working_page(): + self.register_async_operations(organization, task, page) step, detailed_output = await self.agent_step(task, step, browser_state, organization=organization) task = await self.update_task_errors_from_detailed_output(task, detailed_output) @@ -796,7 +797,8 @@ class ForgeAgent: return failed_step, detailed_agent_step_output.get_clean_detailed_output() async def record_artifacts_after_action(self, task: Task, step: Step, browser_state: BrowserState) -> None: - if not browser_state.page: + working_page = await browser_state.get_working_page() + if not working_page: raise BrowserStateMissingPage() try: screenshot = await browser_state.take_screenshot(full_page=True) @@ -814,7 +816,7 @@ class ForgeAgent: ) try: - skyvern_frame = await SkyvernFrame.create_instance(frame=browser_state.page) + skyvern_frame = await SkyvernFrame.create_instance(frame=working_page) html = await skyvern_frame.get_content() await app.ARTIFACT_MANAGER.create_artifact( step=step, @@ -830,12 +832,15 @@ class ForgeAgent: ) try: - video_data = await app.BROWSER_MANAGER.get_video_data(task_id=task.task_id, browser_state=browser_state) - await app.ARTIFACT_MANAGER.update_artifact_data( - artifact_id=browser_state.browser_artifacts.video_artifact_id, - organization_id=task.organization_id, - data=video_data, + video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts( + task_id=task.task_id, browser_state=browser_state ) + for video_artifact in video_artifacts: + await app.ARTIFACT_MANAGER.update_artifact_data( + artifact_id=video_artifact.video_artifact_id, + organization_id=task.organization_id, + data=video_artifact.video_data, + ) except Exception: LOG.error( "Failed to record video after action", @@ -854,14 +859,20 @@ class ForgeAgent: else: browser_state = await app.BROWSER_MANAGER.get_or_create_for_task(task) # Initialize video artifact for the task here, afterwards it'll only get updated - if browser_state and not browser_state.browser_artifacts.video_artifact_id: - video_data = await app.BROWSER_MANAGER.get_video_data(task_id=task.task_id, browser_state=browser_state) - video_artifact_id = await app.ARTIFACT_MANAGER.create_artifact( - step=step, - artifact_type=ArtifactType.RECORDING, - data=video_data, + if browser_state and browser_state.browser_artifacts: + video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts( + task_id=task.task_id, browser_state=browser_state ) - app.BROWSER_MANAGER.set_video_artifact_for_task(task, video_artifact_id) + for idx, video_artifact in enumerate(video_artifacts): + if video_artifact.video_artifact_id: + continue + video_artifact_id = await app.ARTIFACT_MANAGER.create_artifact( + step=step, + artifact_type=ArtifactType.RECORDING, + data=video_artifact.video_data, + ) + video_artifacts[idx].video_artifact_id = video_artifact_id + app.BROWSER_MANAGER.set_video_artifact_for_task(task, video_artifacts) detailed_output = DetailedAgentStepOutput( scraped_page=None, @@ -1005,9 +1016,8 @@ class ForgeAgent: # Generate the extract action prompt navigation_goal = task.navigation_goal starting_url = task.url - current_url = ( - await browser_state.page.evaluate("() => document.location.href") if browser_state.page else starting_url - ) + page = await browser_state.get_working_page() + current_url = await page.evaluate("() => document.location.href") if page else starting_url final_navigation_payload = self._build_navigation_payload( task, expire_verification_code=expire_verification_code ) @@ -1346,12 +1356,14 @@ class ForgeAgent: browser_state = await app.BROWSER_MANAGER.cleanup_for_task(task.task_id, close_browser_on_completion) if browser_state: # Update recording artifact after closing the browser, so we can get an accurate recording - video_data = await app.BROWSER_MANAGER.get_video_data(task_id=task.task_id, browser_state=browser_state) - if video_data: + video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts( + task_id=task.task_id, browser_state=browser_state + ) + for video_artifact in video_artifacts: await app.ARTIFACT_MANAGER.update_artifact_data( - artifact_id=browser_state.browser_artifacts.video_artifact_id, + artifact_id=video_artifact.video_artifact_id, organization_id=task.organization_id, - data=video_data, + data=video_artifact.video_data, ) har_data = await app.BROWSER_MANAGER.get_har_data(task_id=task.task_id, browser_state=browser_state) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 28ffdad7..56dc229b 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -259,7 +259,8 @@ class TaskBlock(Block): browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( workflow_run=workflow_run, url=self.url ) - if not browser_state.page: + working_page = await browser_state.get_working_page() + if not working_page: LOG.error( "BrowserState has no page", workflow_run_id=workflow_run.workflow_run_id, @@ -278,7 +279,7 @@ class TaskBlock(Block): if self.url: try: - await browser_state.page.goto(self.url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS) + await working_page.goto(self.url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS) except Error as playright_error: LOG.warning(f"Error while navigating to url: {str(playright_error)}") # Make sure the task is marked as failed in the database before raising the exception @@ -521,8 +522,9 @@ class CodeBlock(Block): # get all parameters into a dictionary parameter_values = {} maybe_browser_state = await app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id) - if maybe_browser_state and maybe_browser_state.page: - parameter_values["skyvern_page"] = maybe_browser_state.page + if maybe_browser_state: + if page := await maybe_browser_state.get_working_page(): + parameter_values["skyvern_page"] = await page for parameter in self.parameters: value = workflow_run_context.get_value(parameter.key) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 49a65c91..5b84f389 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -744,16 +744,16 @@ class WorkflowService: self, browser_state: BrowserState, workflow: Workflow, workflow_run: WorkflowRun ) -> None: # Create recording artifact after closing the browser, so we can get an accurate recording - video_data = await app.BROWSER_MANAGER.get_video_data( + video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts( workflow_id=workflow.workflow_id, workflow_run_id=workflow_run.workflow_run_id, browser_state=browser_state, ) - if video_data: + for video_artifact in video_artifacts: await app.ARTIFACT_MANAGER.update_artifact_data( - artifact_id=browser_state.browser_artifacts.video_artifact_id, + artifact_id=video_artifact.video_artifact_id, organization_id=workflow.organization_id, - data=video_data, + data=video_artifact.video_data, ) async def persist_har_data( diff --git a/skyvern/webeye/browser_factory.py b/skyvern/webeye/browser_factory.py index 6d4c9872..802c515a 100644 --- a/skyvern/webeye/browser_factory.py +++ b/skyvern/webeye/browser_factory.py @@ -85,15 +85,13 @@ class BrowserContextFactory: @staticmethod def build_browser_artifacts( - video_path: str | None = None, + video_artifacts: list[VideoArtifact] | None = None, har_path: str | None = None, - video_artifact_id: str | None = None, traces_dir: str | None = None, ) -> BrowserArtifacts: return BrowserArtifacts( - video_path=video_path, + video_artifacts=video_artifacts or [], har_path=har_path, - video_artifact_id=video_artifact_id, traces_dir=traces_dir, ) @@ -130,9 +128,14 @@ class BrowserContextFactory: return await cls._validator(page) -class BrowserArtifacts(BaseModel): +class VideoArtifact(BaseModel): video_path: str | None = None video_artifact_id: str | None = None + video_data: bytes = bytes() + + +class BrowserArtifacts(BaseModel): + video_artifacts: list[VideoArtifact] = [] har_path: str | None = None traces_dir: str | None = None @@ -175,24 +178,26 @@ class BrowserState: browser_artifacts: BrowserArtifacts = BrowserArtifacts(), browser_cleanup: BrowserCleanupFunc = None, ): + self.__page = page self.pw = pw self.browser_context = browser_context - self.page = page self.browser_artifacts = browser_artifacts self.browser_cleanup = browser_cleanup - def __assert_page(self) -> Page: - if self.page is not None: - return self.page + async def __assert_page(self) -> Page: + page = await self.get_working_page() + if page is not None: + return page LOG.error("BrowserState has no page") raise MissingBrowserStatePage() async def _close_all_other_pages(self) -> None: - if not self.browser_context or not self.page: + cur_page = await self.get_working_page() + if not self.browser_context or not cur_page: return pages = self.browser_context.pages for page in pages: - if page != self.page: + if page != cur_page: await page.close() async def check_and_fix_state( @@ -226,21 +231,22 @@ class BrowserState: assert self.browser_context is not None - if self.page is None: + if await self.get_working_page() is None: success = False retries = 0 while not success and retries < 3: try: LOG.info("Creating a new page") - self.page = await self.browser_context.new_page() + page = await self.browser_context.new_page() + await self.set_working_page(page, 0) await self._close_all_other_pages() LOG.info("A new page is created") if url: LOG.info(f"Navigating page to {url} and waiting for 5 seconds") try: start_time = time.time() - await self.page.goto(url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS) + await page.goto(url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS) end_time = time.time() LOG.info( "Page loading time", @@ -270,8 +276,33 @@ class BrowserState: raise e LOG.info(f"Retrying to create a new page. Retry count: {retries}") - if self.browser_artifacts.video_path is None: - self.browser_artifacts.video_path = await self.page.video.path() if self.page and self.page.video else None + async def get_working_page(self) -> Page | None: + # HACK: currently, assuming the last page is always the working page. + # Need to refactor this logic when we want to manipulate multi pages together + if self.__page is None or self.browser_context is None or len(self.browser_context.pages) == 0: + return None + + last_page = self.browser_context.pages[-1] + if self.__page == last_page: + return self.__page + await self.set_working_page(last_page, len(self.browser_context.pages) - 1) + return last_page + + async def set_working_page(self, page: Page | None, index: int = 0) -> None: + self.__page = page + if page is None: + return + if len(self.browser_artifacts.video_artifacts) > index: + if self.browser_artifacts.video_artifacts[index].video_path is None: + self.browser_artifacts.video_artifacts[index].video_path = await page.video.path() + return + + target_lenght = index + 1 + self.browser_artifacts.video_artifacts.extend( + [VideoArtifact()] * (target_lenght - len(self.browser_artifacts.video_artifacts)) + ) + self.browser_artifacts.video_artifacts[index].video_path = await page.video.path() + return async def get_or_create_page( self, @@ -280,8 +311,9 @@ class BrowserState: task_id: str | None = None, workflow_run_id: str | None = None, ) -> Page: - if self.page is not None: - return self.page + page = await self.get_working_page() + if page is not None: + return page try: await self.check_and_fix_state( @@ -295,26 +327,26 @@ class BrowserState: await self.check_and_fix_state( url=url, proxy_location=proxy_location, task_id=task_id, workflow_run_id=workflow_run_id ) - assert self.page is not None + await self.__assert_page() - if not await BrowserContextFactory.validate_browser_context(self.page): + if not await BrowserContextFactory.validate_browser_context(await self.get_working_page()): await self.close_current_open_page() await self.check_and_fix_state( url=url, proxy_location=proxy_location, task_id=task_id, workflow_run_id=workflow_run_id ) - assert self.page is not None + await self.__assert_page() - return self.page + return page async def close_current_open_page(self) -> None: await self._close_all_other_pages() if self.browser_context is not None: await self.browser_context.close() self.browser_context = None - self.page = None + await self.set_working_page(None) async def stop_page_loading(self) -> None: - page = self.__assert_page() + page = await self.__assert_page() try: await page.evaluate("window.stop()") except Exception as e: @@ -322,7 +354,7 @@ class BrowserState: raise FailedToStopLoadingPage(url=page.url, error_message=repr(e)) async def reload_page(self) -> None: - page = self.__assert_page() + page = await self.__assert_page() LOG.info(f"Reload page {page.url} and waiting for 5 seconds") try: @@ -353,5 +385,5 @@ class BrowserState: LOG.info("Playwright is stopped") async def take_screenshot(self, full_page: bool = False, file_path: str | None = None) -> bytes: - page = self.__assert_page() + page = await self.__assert_page() return await SkyvernFrame.take_screenshot(page=page, full_page=full_page, file_path=file_path) diff --git a/skyvern/webeye/browser_manager.py b/skyvern/webeye/browser_manager.py index e007fd78..814517da 100644 --- a/skyvern/webeye/browser_manager.py +++ b/skyvern/webeye/browser_manager.py @@ -6,7 +6,7 @@ from playwright.async_api import async_playwright from skyvern.exceptions import MissingBrowserState from skyvern.forge.sdk.schemas.tasks import ProxyLocation, Task from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun -from skyvern.webeye.browser_factory import BrowserContextFactory, BrowserState +from skyvern.webeye.browser_factory import BrowserContextFactory, BrowserState, VideoArtifact LOG = structlog.get_logger() @@ -96,52 +96,42 @@ class BrowserManager: return self.pages[workflow_run_id] return None - def set_video_artifact_for_task(self, task: Task, artifact_id: str) -> None: + def set_video_artifact_for_task(self, task: Task, artifacts: list[VideoArtifact]) -> None: if task.workflow_run_id and task.workflow_run_id in self.pages: - if self.pages[task.workflow_run_id].browser_artifacts.video_artifact_id: - LOG.warning( - "Video artifact is already set for workflow run. Overwriting", - workflow_run_id=task.workflow_run_id, - old_artifact_id=self.pages[task.workflow_run_id].browser_artifacts.video_artifact_id, - new_artifact_id=artifact_id, - ) - self.pages[task.workflow_run_id].browser_artifacts.video_artifact_id = artifact_id + self.pages[task.workflow_run_id].browser_artifacts.video_artifacts = artifacts return if task.task_id in self.pages: - if self.pages[task.task_id].browser_artifacts.video_artifact_id: - LOG.warning( - "Video artifact is already set for task. Overwriting", - task_id=task.task_id, - old_artifact_id=self.pages[task.task_id].browser_artifacts.video_artifact_id, - new_artifact_id=artifact_id, - ) - self.pages[task.task_id].browser_artifacts.video_artifact_id = artifact_id + self.pages[task.task_id].browser_artifacts.video_artifacts = artifacts return raise MissingBrowserState(task_id=task.task_id) - async def get_video_data( + async def get_video_artifacts( self, browser_state: BrowserState, task_id: str = "", workflow_id: str = "", workflow_run_id: str = "", - ) -> bytes: - if browser_state: - path = browser_state.browser_artifacts.video_path + ) -> list[VideoArtifact]: + if len(browser_state.browser_artifacts.video_artifacts) == 0: + LOG.warning( + "Video data not found for task", + task_id=task_id, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + ) + return [] + + for i, video_artifact in enumerate(browser_state.browser_artifacts.video_artifacts): + path = video_artifact.video_path if path: try: with open(path, "rb") as f: - return f.read() + browser_state.browser_artifacts.video_artifacts[i].video_data = f.read() + except FileNotFoundError: pass - LOG.warning( - "Video data not found for task", - task_id=task_id, - workflow_id=workflow_id, - workflow_run_id=workflow_run_id, - ) - return b"" + return browser_state.browser_artifacts.video_artifacts async def get_har_data( self,