From 3c4df39fee21f9007133617083687493143ad5a4 Mon Sep 17 00:00:00 2001 From: Stanislav Novosad Date: Thu, 6 Nov 2025 11:12:55 -0700 Subject: [PATCH] Extract ScriptSkyvernPage from SkyvernPage (#3920) --- .../script_generations/run_initializer.py | 5 +- .../script_generations/script_skyvern_page.py | 500 ++++++++++++++++++ .../core/script_generations/skyvern_page.py | 474 +---------------- .../script_generations/workflow_wrappers.py | 3 +- skyvern/forge/sdk/routes/sdk.py | 4 +- skyvern/services/script_service.py | 2 +- 6 files changed, 530 insertions(+), 458 deletions(-) create mode 100644 skyvern/core/script_generations/script_skyvern_page.py diff --git a/skyvern/core/script_generations/run_initializer.py b/skyvern/core/script_generations/run_initializer.py index 14ec5c07..0d025003 100644 --- a/skyvern/core/script_generations/run_initializer.py +++ b/skyvern/core/script_generations/run_initializer.py @@ -2,7 +2,8 @@ from typing import Any from pydantic import BaseModel -from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage, script_run_context_manager +from skyvern.core.script_generations.script_skyvern_page import ScriptSkyvernPage, script_run_context_manager +from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage from skyvern.forge import app from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.workflow.models.parameter import WorkflowParameterType @@ -27,7 +28,7 @@ async def setup( if parameter.workflow_parameter_type == WorkflowParameterType.CREDENTIAL_ID: parameters[key] = workflow_run_context.values[key] context.script_run_parameters.update(parameters) - skyvern_page = await SkyvernPage.create(browser_session_id=browser_session_id) + skyvern_page = await ScriptSkyvernPage.create(browser_session_id=browser_session_id) run_context = RunContext( parameters=parameters, page=skyvern_page, diff --git a/skyvern/core/script_generations/script_skyvern_page.py b/skyvern/core/script_generations/script_skyvern_page.py new file mode 100644 index 00000000..1657acd9 --- /dev/null +++ b/skyvern/core/script_generations/script_skyvern_page.py @@ -0,0 +1,500 @@ +from __future__ import annotations + +import asyncio +from typing import Any, Callable + +import structlog +from playwright.async_api import Page + +from skyvern.config import settings +from skyvern.core.script_generations.real_skyvern_page_ai import RealSkyvernPageAi, render_template +from skyvern.core.script_generations.skyvern_page import ActionCall, ActionMetadata, RunContext, SkyvernPage +from skyvern.core.script_generations.skyvern_page_ai import SkyvernPageAi +from skyvern.exceptions import ScriptTerminationException, WorkflowRunNotFound +from skyvern.forge import app +from skyvern.forge.prompts import prompt_engine +from skyvern.forge.sdk.artifact.models import ArtifactType +from skyvern.forge.sdk.core import skyvern_context +from skyvern.utils.url_validators import prepend_scheme_and_validate_url +from skyvern.webeye.actions.action_types import ActionType +from skyvern.webeye.actions.actions import ( + Action, + ActionStatus, + CompleteAction, + ExtractAction, + SelectOption, + SolveCaptchaAction, +) +from skyvern.webeye.actions.handler import ActionHandler, handle_complete_action +from skyvern.webeye.browser_factory import BrowserState +from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website + +LOG = structlog.get_logger() + +action_wrap = SkyvernPage.action_wrap + + +class ScriptSkyvernPage(SkyvernPage): + """ + A minimal adapter around the chosen driver that: + 1. Executes real browser commands + 2. Records ActionCallobjects into RunContext.trace + 3. Adds retry / fallback hooks + """ + + def __init__( + self, + scraped_page: ScrapedPage, + page: Page, + ai: SkyvernPageAi, + *, + recorder: Callable[[ActionCall], None] | None = None, + ) -> None: + super().__init__(page=page, ai=ai) + self.scraped_page = scraped_page + self._record = recorder or (lambda ac: None) + + @classmethod + async def _get_or_create_browser_state(cls, browser_session_id: str | None = None) -> BrowserState: + context = skyvern_context.current() + if context and context.workflow_run_id and context.organization_id: + workflow_run = await app.DATABASE.get_workflow_run( + workflow_run_id=context.workflow_run_id, organization_id=context.organization_id + ) + if workflow_run: + browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( + workflow_run=workflow_run, + browser_session_id=browser_session_id, + browser_profile_id=workflow_run.browser_profile_id, + ) + else: + raise WorkflowRunNotFound(workflow_run_id=context.workflow_run_id) + else: + browser_state = await app.BROWSER_MANAGER.get_or_create_for_script(browser_session_id=browser_session_id) + return browser_state + + @classmethod + async def _get_browser_state(cls) -> BrowserState | None: + context = skyvern_context.current() + if context and context.workflow_run_id and context.organization_id: + workflow_run = await app.DATABASE.get_workflow_run( + workflow_run_id=context.workflow_run_id, organization_id=context.organization_id + ) + if workflow_run: + browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id=context.workflow_run_id) + else: + raise WorkflowRunNotFound(workflow_run_id=context.workflow_run_id) + else: + browser_state = app.BROWSER_MANAGER.get_for_script() + return browser_state + + @classmethod + async def create( + cls, + browser_session_id: str | None = None, + ) -> ScriptSkyvernPage: + scraped_page = await cls.create_scraped_page(browser_session_id=browser_session_id) + page = await scraped_page._browser_state.must_get_working_page() + ai = RealSkyvernPageAi(scraped_page, page) + return cls(scraped_page=scraped_page, page=page, ai=ai) + + @classmethod + async def create_scraped_page( + cls, + browser_session_id: str | None = None, + ) -> ScrapedPage: + # initialize browser state + # TODO: add workflow_run_id or eventually script_id/script_run_id + browser_state = await cls._get_or_create_browser_state(browser_session_id=browser_session_id) + return await scrape_website( + browser_state=browser_state, + url="", + cleanup_element_tree=app.AGENT_FUNCTION.cleanup_element_tree_factory(), + scrape_exclude=app.scrape_exclude, + max_screenshot_number=settings.MAX_NUM_SCREENSHOTS, + draw_boxes=True, + scroll=True, + support_empty_page=True, + ) + + async def _decorate_call( + self, + fn: Callable, + action: ActionType, + *args: Any, + prompt: str = "", + data: str | dict[str, Any] = "", + intention: str = "", + **kwargs: Any, + ) -> Any: + """ + Decorator to record the action call. + + Auto-creates action records in DB before action execution + and screenshot artifacts after action execution. + """ + + # Emoji mapping for different action types + ACTION_EMOJIS = { + ActionType.CLICK: "👆", + ActionType.INPUT_TEXT: "⌨️", + ActionType.UPLOAD_FILE: "📤", + ActionType.DOWNLOAD_FILE: "📥", + ActionType.SELECT_OPTION: "🎯", + ActionType.WAIT: "⏳", + ActionType.SOLVE_CAPTCHA: "🔓", + ActionType.VERIFICATION_CODE: "🔐", + ActionType.SCROLL: "📜", + ActionType.COMPLETE: "✅", + ActionType.TERMINATE: "🛑", + } + + # Backward compatibility: use intention if provided and prompt is empty + if intention and not prompt: + prompt = intention + + meta = ActionMetadata(prompt, data) + call = ActionCall(action, args, kwargs, meta) + + action_status = ActionStatus.completed + + # Print action in script mode + context = skyvern_context.current() + if context and context.script_mode: + emoji = ACTION_EMOJIS.get(action, "🔧") + action_name = action.value if hasattr(action, "value") else str(action) + print(f"{emoji} {action_name.replace('_', ' ').title()}", end="") + if prompt: + print(f": {prompt}") + else: + print() + + try: + call.result = await fn( + self, *args, prompt=prompt, data=data, intention=intention, **kwargs + ) # real driver call + + # Note: Action status would be updated to completed here if update method existed + + # Print success in script mode + if context and context.script_mode: + print(" ✓ Completed") + + return call.result + except Exception as e: + call.error = e + action_status = ActionStatus.failed + # Note: Action status would be updated to failed here if update method existed + + # Print failure in script mode + if context and context.script_mode: + print(f" ✗ Failed: {str(e)}") + + # LLM fallback hook could go here ... + raise + finally: + self._record(call) + # Auto-create action after execution + await self._create_action_after_execution( + action_type=action, + intention=prompt, + status=action_status, + data=data, + kwargs=kwargs, + call_result=call.result, + ) + + # Auto-create screenshot artifact after execution + await self._create_screenshot_after_execution() + + async def _update_action_reasoning( + self, + action_id: str, + organization_id: str, + action_type: ActionType, + intention: str = "", + text: str | None = None, + select_option: SelectOption | None = None, + file_url: str | None = None, + data_extraction_goal: str | None = None, + data_extraction_schema: dict[str, Any] | list | str | None = None, + ) -> str: + """Generate user-facing reasoning for an action using the secondary LLM.""" + + reasoning = f"Auto-generated action for {action_type.value}" + try: + context = skyvern_context.current() + if not context or not context.organization_id: + return f"Auto-generated action for {action_type.value}" + + # Build the prompt with available context + reasoning_prompt = prompt_engine.load_prompt( + template="generate-action-reasoning", + action_type=action_type.value, + intention=intention, + text=text, + select_option=select_option.value if select_option else None, + file_url=file_url, + data_extraction_goal=data_extraction_goal, + data_extraction_schema=data_extraction_schema, + ) + + # Call secondary LLM to generate reasoning + json_response = await app.SECONDARY_LLM_API_HANDLER( + prompt=reasoning_prompt, + prompt_name="generate-action-reasoning", + organization_id=context.organization_id, + ) + + reasoning = json_response.get("reasoning", f"Auto-generated action for {action_type.value}") + + except Exception: + LOG.warning("Failed to generate action reasoning, using fallback", action_type=action_type) + await app.DATABASE.update_action_reasoning( + organization_id=organization_id, + action_id=action_id, + reasoning=reasoning, + ) + return reasoning + + async def _create_action_after_execution( + self, + action_type: ActionType, + intention: str = "", + status: ActionStatus = ActionStatus.pending, + data: str | dict[str, Any] = "", + kwargs: dict[str, Any] | None = None, + call_result: Any | None = None, + ) -> Action | None: + """Create an action record in the database before execution if task_id and step_id are available.""" + + try: + context = skyvern_context.current() + if not context or not context.task_id or not context.step_id: + return None + + # Create action record. TODO: store more action fields + kwargs = kwargs or {} + # we're using "value" instead of "text" for input text actions interface + xpath = None + if action_type == ActionType.CLICK: + if isinstance(call_result, str) and "xpath=" in call_result: + xpath_split_list = call_result.split("xpath=") + if len(xpath_split_list) > 1: + xpath = xpath_split_list[1] + text = None + select_option = None + response: str | None = kwargs.get("response") + file_url = kwargs.get("file_url") + if not response: + if action_type == ActionType.INPUT_TEXT: + text = str(call_result) + response = text + elif action_type == ActionType.SELECT_OPTION: + option_value = str(call_result) or "" + select_option = SelectOption(value=option_value) + response = option_value + elif action_type == ActionType.UPLOAD_FILE: + file_url = str(call_result) + + action = Action( + element_id="", + action_type=action_type, + status=status, + organization_id=context.organization_id, + workflow_run_id=context.workflow_run_id, + task_id=context.task_id, + step_id=context.step_id, + step_order=0, # Will be updated by the system if needed + action_order=context.action_order, # Will be updated by the system if needed + intention=intention, + text=text, + option=select_option, + file_url=file_url, + response=response, + xpath=xpath, + created_by="script", + ) + data_extraction_goal = None + data_extraction_schema = None + if action_type == ActionType.EXTRACT: + data_extraction_goal = kwargs.get("prompt") + data_extraction_schema = kwargs.get("schema") + action = ExtractAction( + element_id="", + action_type=action_type, + status=status, + organization_id=context.organization_id, + workflow_run_id=context.workflow_run_id, + task_id=context.task_id, + step_id=context.step_id, + step_order=0, + action_order=context.action_order, + intention=intention, + data_extraction_goal=data_extraction_goal, + data_extraction_schema=data_extraction_schema, + option=select_option, + response=response, + created_by="script", + ) + + created_action = await app.DATABASE.create_action(action) + # Generate user-facing reasoning using secondary LLM + asyncio.create_task( + self._update_action_reasoning( + action_id=str(created_action.action_id), + organization_id=str(context.organization_id), + action_type=action_type, + intention=intention, + text=text, + select_option=select_option, + file_url=file_url, + data_extraction_goal=data_extraction_goal, + data_extraction_schema=data_extraction_schema, + ) + ) + + context.action_order += 1 + + return created_action + + except Exception: + # If action creation fails, don't block the actual action execution + return None + + @classmethod + async def _create_screenshot_after_execution(cls) -> None: + """Create a screenshot artifact after action execution if task_id and step_id are available.""" + try: + context = skyvern_context.ensure_context() + if not context or not context.task_id or not context.step_id: + return + + # Get browser state and take screenshot + browser_state = await cls._get_browser_state() + if not browser_state: + return + + screenshot = await browser_state.take_post_action_screenshot(scrolling_number=0) + + if screenshot: + # Create a minimal Step object for artifact creation + step = await app.DATABASE.get_step( + context.step_id, + organization_id=context.organization_id, + ) + if not step: + return + + await app.ARTIFACT_MANAGER.create_artifact( + step=step, + artifact_type=ArtifactType.SCREENSHOT_ACTION, + data=screenshot, + ) + + except Exception: + # If screenshot creation fails, don't block execution + pass + + async def goto(self, url: str, timeout: float = settings.BROWSER_LOADING_TIMEOUT_MS) -> None: + url = render_template(url) + url = prepend_scheme_and_validate_url(url) + + # Print navigation in script mode + context = skyvern_context.current() + if context and context.script_mode: + print(f"🌐 Navigating to: {url}") + + await self.page.goto( + url, + timeout=timeout, + ) + + if context and context.script_mode: + print(" ✓ Page loaded") + + @action_wrap(ActionType.SOLVE_CAPTCHA) + async def solve_captcha( + self, prompt: str | None = None, data: str | dict[str, Any] | None = None, intention: str | None = None + ) -> None: + context = skyvern_context.current() + if not context or not context.organization_id or not context.task_id or not context.step_id: + await asyncio.sleep(30) + return None + + task = await app.DATABASE.get_task(context.task_id, context.organization_id) + step = await app.DATABASE.get_step(context.step_id, context.organization_id) + if task and step: + solve_captcha_handler = ActionHandler._handled_action_types[ActionType.SOLVE_CAPTCHA] + action = SolveCaptchaAction( + organization_id=context.organization_id, + task_id=context.task_id, + step_id=context.step_id, + ) + await solve_captcha_handler(action, self.page, self.scraped_page, task, step) + else: + await asyncio.sleep(30) + + @action_wrap(ActionType.COMPLETE) + async def complete( + self, prompt: str | None = None, data: str | dict[str, Any] | None = None, intention: str | None = None + ) -> None: + # TODO: add validation here. if it doesn't pass the validation criteria: + # 1. terminate the workflow run if fallback to ai is false + # 2. fallback to ai if fallback to ai is true + context = skyvern_context.current() + if ( + not context + or not context.organization_id + or not context.workflow_run_id + or not context.task_id + or not context.step_id + ): + return + task = await app.DATABASE.get_task(context.task_id, context.organization_id) + step = await app.DATABASE.get_step(context.step_id, context.organization_id) + if task and step: + action = CompleteAction( + organization_id=context.organization_id, + task_id=context.task_id, + step_id=context.step_id, + step_order=step.order, + action_order=context.action_order, + ) + # result = await ActionHandler.handle_action(self.scraped_page, task, step, self.page, action) + result = await handle_complete_action(action, self.page, self.scraped_page, task, step) + if result and result[-1].success is False: + raise ScriptTerminationException(result[-1].exception_message) + + +class ScriptRunContextManager: + """ + Manages the run context for code runs. + """ + + def __init__(self) -> None: + # self.run_contexts: dict[str, RunContext] = {} + self.run_context: RunContext | None = None + self.cached_fns: dict[str, Callable] = {} + + def get_run_context(self) -> RunContext | None: + return self.run_context + + def set_run_context(self, run_context: RunContext) -> None: + self.run_context = run_context + + def ensure_run_context(self) -> RunContext: + if not self.run_context: + raise Exception("Run context not found") + return self.run_context + + def set_cached_fn(self, cache_key: str, fn: Callable) -> None: + self.cached_fns[cache_key] = fn + + def get_cached_fn(self, cache_key: str | None = None) -> Callable | None: + if cache_key: + return self.cached_fns.get(cache_key) + return None + + +script_run_context_manager = ScriptRunContextManager() diff --git a/skyvern/core/script_generations/skyvern_page.py b/skyvern/core/script_generations/skyvern_page.py index ad3725ba..485a84e0 100644 --- a/skyvern/core/script_generations/skyvern_page.py +++ b/skyvern/core/script_generations/skyvern_page.py @@ -10,31 +10,11 @@ import structlog from playwright.async_api import Page from skyvern.config import settings -from skyvern.core.script_generations.real_skyvern_page_ai import RealSkyvernPageAi, render_template from skyvern.core.script_generations.skyvern_page_ai import SkyvernPageAi -from skyvern.exceptions import ScriptTerminationException, WorkflowRunNotFound -from skyvern.forge import app -from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.files import download_file -from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context -from skyvern.utils.url_validators import prepend_scheme_and_validate_url from skyvern.webeye.actions import handler_utils from skyvern.webeye.actions.action_types import ActionType -from skyvern.webeye.actions.actions import ( - Action, - ActionStatus, - CompleteAction, - ExtractAction, - SelectOption, - SolveCaptchaAction, -) -from skyvern.webeye.actions.handler import ( - ActionHandler, - handle_complete_action, -) -from skyvern.webeye.browser_factory import BrowserState -from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website LOG = structlog.get_logger() @@ -63,116 +43,37 @@ class ActionCall: class SkyvernPage: """ - A minimal adapter around the chosen driver that: - 1. Executes real browser commands - 2. Records ActionCallobjects into RunContext.trace - 3. Adds retry / fallback hooks + A lightweight adapter for the selected driver that: + 1. Executes actual browser commands + 2. Enables AI-driven actions + 3. Provides an AI-based fallback for standard actions """ def __init__( self, - scraped_page: ScrapedPage, page: Page, ai: SkyvernPageAi, - *, - recorder: Callable[[ActionCall], None] | None = None, - # generate_response: bool = False, - ): - self.scraped_page = scraped_page + ) -> None: self.page = page - self._record = recorder or (lambda ac: None) self.current_label: str | None = None self._ai = ai - @classmethod - async def _get_or_create_browser_state(cls, browser_session_id: str | None = None) -> BrowserState: - context = skyvern_context.current() - if context and context.workflow_run_id and context.organization_id: - workflow_run = await app.DATABASE.get_workflow_run( - workflow_run_id=context.workflow_run_id, organization_id=context.organization_id - ) - if workflow_run: - browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( - workflow_run=workflow_run, - browser_session_id=browser_session_id, - browser_profile_id=workflow_run.browser_profile_id, - ) - else: - raise WorkflowRunNotFound(workflow_run_id=context.workflow_run_id) - else: - browser_state = await app.BROWSER_MANAGER.get_or_create_for_script(browser_session_id=browser_session_id) - return browser_state - - @classmethod - async def _get_browser_state(cls) -> BrowserState | None: - context = skyvern_context.current() - if context and context.workflow_run_id and context.organization_id: - workflow_run = await app.DATABASE.get_workflow_run( - workflow_run_id=context.workflow_run_id, organization_id=context.organization_id - ) - if workflow_run: - browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id=context.workflow_run_id) - else: - raise WorkflowRunNotFound(workflow_run_id=context.workflow_run_id) - else: - browser_state = app.BROWSER_MANAGER.get_for_script() - return browser_state - - @classmethod - async def create( - cls, - browser_session_id: str | None = None, - ) -> SkyvernPage: - scraped_page = await cls.create_scraped_page(browser_session_id=browser_session_id) - page = await scraped_page._browser_state.must_get_working_page() - ai = RealSkyvernPageAi(scraped_page, page) - return cls(scraped_page=scraped_page, page=page, ai=ai) - - @classmethod - async def create_scraped_page( - cls, - browser_session_id: str | None = None, - ) -> ScrapedPage: - # initialize browser state - # TODO: add workflow_run_id or eventually script_id/script_run_id - browser_state = await cls._get_or_create_browser_state(browser_session_id=browser_session_id) - return await scrape_website( - browser_state=browser_state, - url="", - cleanup_element_tree=app.AGENT_FUNCTION.cleanup_element_tree_factory(), - scrape_exclude=app.scrape_exclude, - max_screenshot_number=settings.MAX_NUM_SCREENSHOTS, - draw_boxes=True, - scroll=True, - support_empty_page=True, - ) + async def _decorate_call( + self, + fn: Callable, + action: ActionType, + *args: Any, + prompt: str = "", + data: str | dict[str, Any] = "", + intention: str = "", # backward compatibility + **kwargs: Any, + ) -> Any: + return await fn(self, *args, prompt=prompt, data=data, intention=intention, **kwargs) @staticmethod def action_wrap( action: ActionType, ) -> Callable: - """ - Decorator to record the action call. - - Auto-creates action records in DB before action execution - and screenshot artifacts after action execution. - """ - - # Emoji mapping for different action types - ACTION_EMOJIS = { - ActionType.CLICK: "👆", - ActionType.INPUT_TEXT: "⌨️", - ActionType.UPLOAD_FILE: "📤", - ActionType.DOWNLOAD_FILE: "📥", - ActionType.SELECT_OPTION: "🎯", - ActionType.WAIT: "⏳", - ActionType.SOLVE_CAPTCHA: "🔓", - ActionType.VERIFICATION_CODE: "🔐", - ActionType.SCROLL: "📜", - ActionType.COMPLETE: "✅", - ActionType.TERMINATE: "🛑", - } - def decorator(fn: Callable) -> Callable: async def wrapper( skyvern_page: SkyvernPage, @@ -182,273 +83,16 @@ class SkyvernPage: intention: str = "", # backward compatibility **kwargs: Any, ) -> Any: - # Backward compatibility: use intention if provided and prompt is empty - if intention and not prompt: - prompt = intention - - meta = ActionMetadata(prompt, data) - call = ActionCall(action, args, kwargs, meta) - - action_status = ActionStatus.completed - - # Print action in script mode - context = skyvern_context.current() - if context and context.script_mode: - emoji = ACTION_EMOJIS.get(action, "🔧") - action_name = action.value if hasattr(action, "value") else str(action) - print(f"{emoji} {action_name.replace('_', ' ').title()}", end="") - if prompt: - print(f": {prompt}") - else: - print() - - try: - call.result = await fn( - skyvern_page, *args, prompt=prompt, data=data, intention=intention, **kwargs - ) # real driver call - - # Note: Action status would be updated to completed here if update method existed - - # Print success in script mode - if context and context.script_mode: - print(" ✓ Completed") - - return call.result - except Exception as e: - call.error = e - action_status = ActionStatus.failed - # Note: Action status would be updated to failed here if update method existed - - # Print failure in script mode - if context and context.script_mode: - print(f" ✗ Failed: {str(e)}") - - # LLM fallback hook could go here ... - raise - finally: - skyvern_page._record(call) - # Auto-create action after execution - await skyvern_page._create_action_after_execution( - action_type=action, - intention=prompt, - status=action_status, - data=data, - kwargs=kwargs, - call_result=call.result, - ) - - # Auto-create screenshot artifact after execution - await skyvern_page._create_screenshot_after_execution() + return await skyvern_page._decorate_call( + fn, action, *args, prompt=prompt, data=data, intention=intention, **kwargs + ) return wrapper return decorator async def goto(self, url: str, timeout: float = settings.BROWSER_LOADING_TIMEOUT_MS) -> None: - url = render_template(url) - url = prepend_scheme_and_validate_url(url) - - # Print navigation in script mode - context = skyvern_context.current() - if context and context.script_mode: - print(f"🌐 Navigating to: {url}") - - await self.page.goto( - url, - timeout=timeout, - ) - - if context and context.script_mode: - print(" ✓ Page loaded") - - async def _update_action_reasoning( - self, - action_id: str, - organization_id: str, - action_type: ActionType, - intention: str = "", - text: str | None = None, - select_option: SelectOption | None = None, - file_url: str | None = None, - data_extraction_goal: str | None = None, - data_extraction_schema: dict[str, Any] | list | str | None = None, - ) -> str: - """Generate user-facing reasoning for an action using the secondary LLM.""" - - reasoning = f"Auto-generated action for {action_type.value}" - try: - context = skyvern_context.current() - if not context or not context.organization_id: - return f"Auto-generated action for {action_type.value}" - - # Build the prompt with available context - reasoning_prompt = prompt_engine.load_prompt( - template="generate-action-reasoning", - action_type=action_type.value, - intention=intention, - text=text, - select_option=select_option.value if select_option else None, - file_url=file_url, - data_extraction_goal=data_extraction_goal, - data_extraction_schema=data_extraction_schema, - ) - - # Call secondary LLM to generate reasoning - json_response = await app.SECONDARY_LLM_API_HANDLER( - prompt=reasoning_prompt, - prompt_name="generate-action-reasoning", - organization_id=context.organization_id, - ) - - reasoning = json_response.get("reasoning", f"Auto-generated action for {action_type.value}") - - except Exception: - LOG.warning("Failed to generate action reasoning, using fallback", action_type=action_type) - await app.DATABASE.update_action_reasoning( - organization_id=organization_id, - action_id=action_id, - reasoning=reasoning, - ) - return reasoning - - async def _create_action_after_execution( - self, - action_type: ActionType, - intention: str = "", - status: ActionStatus = ActionStatus.pending, - data: str | dict[str, Any] = "", - kwargs: dict[str, Any] | None = None, - call_result: Any | None = None, - ) -> Action | None: - """Create an action record in the database before execution if task_id and step_id are available.""" - - try: - context = skyvern_context.current() - if not context or not context.task_id or not context.step_id: - return None - - # Create action record. TODO: store more action fields - kwargs = kwargs or {} - # we're using "value" instead of "text" for input text actions interface - xpath = None - if action_type == ActionType.CLICK: - if isinstance(call_result, str) and "xpath=" in call_result: - xpath_split_list = call_result.split("xpath=") - if len(xpath_split_list) > 1: - xpath = xpath_split_list[1] - text = None - select_option = None - response: str | None = kwargs.get("response") - file_url = kwargs.get("file_url") - if not response: - if action_type == ActionType.INPUT_TEXT: - text = str(call_result) - response = text - elif action_type == ActionType.SELECT_OPTION: - option_value = str(call_result) or "" - select_option = SelectOption(value=option_value) - response = option_value - elif action_type == ActionType.UPLOAD_FILE: - file_url = str(call_result) - - action = Action( - element_id="", - action_type=action_type, - status=status, - organization_id=context.organization_id, - workflow_run_id=context.workflow_run_id, - task_id=context.task_id, - step_id=context.step_id, - step_order=0, # Will be updated by the system if needed - action_order=context.action_order, # Will be updated by the system if needed - intention=intention, - text=text, - option=select_option, - file_url=file_url, - response=response, - xpath=xpath, - created_by="script", - ) - data_extraction_goal = None - data_extraction_schema = None - if action_type == ActionType.EXTRACT: - data_extraction_goal = kwargs.get("prompt") - data_extraction_schema = kwargs.get("schema") - action = ExtractAction( - element_id="", - action_type=action_type, - status=status, - organization_id=context.organization_id, - workflow_run_id=context.workflow_run_id, - task_id=context.task_id, - step_id=context.step_id, - step_order=0, - action_order=context.action_order, - intention=intention, - data_extraction_goal=data_extraction_goal, - data_extraction_schema=data_extraction_schema, - option=select_option, - response=response, - created_by="script", - ) - - created_action = await app.DATABASE.create_action(action) - # Generate user-facing reasoning using secondary LLM - asyncio.create_task( - self._update_action_reasoning( - action_id=str(created_action.action_id), - organization_id=str(context.organization_id), - action_type=action_type, - intention=intention, - text=text, - select_option=select_option, - file_url=file_url, - data_extraction_goal=data_extraction_goal, - data_extraction_schema=data_extraction_schema, - ) - ) - - context.action_order += 1 - - return created_action - - except Exception: - # If action creation fails, don't block the actual action execution - return None - - @classmethod - async def _create_screenshot_after_execution(cls) -> None: - """Create a screenshot artifact after action execution if task_id and step_id are available.""" - try: - context = skyvern_context.ensure_context() - if not context or not context.task_id or not context.step_id: - return - - # Get browser state and take screenshot - browser_state = await cls._get_browser_state() - if not browser_state: - return - - screenshot = await browser_state.take_post_action_screenshot(scrolling_number=0) - - if screenshot: - # Create a minimal Step object for artifact creation - step = await app.DATABASE.get_step( - context.step_id, - organization_id=context.organization_id, - ) - if not step: - return - - await app.ARTIFACT_MANAGER.create_artifact( - step=step, - artifact_type=ArtifactType.SCREENSHOT_ACTION, - data=screenshot, - ) - - except Exception: - # If screenshot creation fails, don't block execution - pass + await self.page.goto(url, timeout=timeout) ######### Public Interfaces ######### @action_wrap(ActionType.CLICK) @@ -766,23 +410,7 @@ class SkyvernPage: async def solve_captcha( self, prompt: str | None = None, data: str | dict[str, Any] | None = None, intention: str | None = None ) -> None: - context = skyvern_context.current() - if not context or not context.organization_id or not context.task_id or not context.step_id: - await asyncio.sleep(30) - return None - - task = await app.DATABASE.get_task(context.task_id, context.organization_id) - step = await app.DATABASE.get_step(context.step_id, context.organization_id) - if task and step: - solve_captcha_handler = ActionHandler._handled_action_types[ActionType.SOLVE_CAPTCHA] - action = SolveCaptchaAction( - organization_id=context.organization_id, - task_id=context.task_id, - step_id=context.step_id, - ) - await solve_captcha_handler(action, self.page, self.scraped_page, task, step) - else: - await asyncio.sleep(30) + raise NotImplementedError("Solve captcha is not supported outside server context") @action_wrap(ActionType.TERMINATE) async def terminate( @@ -799,32 +427,7 @@ class SkyvernPage: async def complete( self, prompt: str | None = None, data: str | dict[str, Any] | None = None, intention: str | None = None ) -> None: - # TODO: add validation here. if it doesn't pass the validation criteria: - # 1. terminate the workflow run if fallback to ai is false - # 2. fallback to ai if fallback to ai is true - context = skyvern_context.current() - if ( - not context - or not context.organization_id - or not context.workflow_run_id - or not context.task_id - or not context.step_id - ): - return - task = await app.DATABASE.get_task(context.task_id, context.organization_id) - step = await app.DATABASE.get_step(context.step_id, context.organization_id) - if task and step: - action = CompleteAction( - organization_id=context.organization_id, - task_id=context.task_id, - step_id=context.step_id, - step_order=step.order, - action_order=context.action_order, - ) - # result = await ActionHandler.handle_action(self.scraped_page, task, step, self.page, action) - result = await handle_complete_action(action, self.page, self.scraped_page, task, step) - if result and result[-1].success is False: - raise ScriptTerminationException(result[-1].exception_message) + """Stub for complete. Override in subclasses for specific behavior.""" @action_wrap(ActionType.RELOAD_PAGE) async def reload_page( @@ -923,36 +526,3 @@ class RunContext: self.parameters[key] = value self.page = page self.trace: list[ActionCall] = [] - - -class ScriptRunContextManager: - """ - Manages the run context for code runs. - """ - - def __init__(self) -> None: - # self.run_contexts: dict[str, RunContext] = {} - self.run_context: RunContext | None = None - self.cached_fns: dict[str, Callable] = {} - - def get_run_context(self) -> RunContext | None: - return self.run_context - - def set_run_context(self, run_context: RunContext) -> None: - self.run_context = run_context - - def ensure_run_context(self) -> RunContext: - if not self.run_context: - raise Exception("Run context not found") - return self.run_context - - def set_cached_fn(self, cache_key: str, fn: Callable) -> None: - self.cached_fns[cache_key] = fn - - def get_cached_fn(self, cache_key: str | None = None) -> Callable | None: - if cache_key: - return self.cached_fns.get(cache_key) - return None - - -script_run_context_manager = ScriptRunContextManager() diff --git a/skyvern/core/script_generations/workflow_wrappers.py b/skyvern/core/script_generations/workflow_wrappers.py index a42dbc0a..81c25888 100644 --- a/skyvern/core/script_generations/workflow_wrappers.py +++ b/skyvern/core/script_generations/workflow_wrappers.py @@ -1,6 +1,7 @@ from typing import Any, Callable -from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage, script_run_context_manager +from skyvern.core.script_generations.script_skyvern_page import script_run_context_manager +from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage # Build a dummy workflow decorator diff --git a/skyvern/forge/sdk/routes/sdk.py b/skyvern/forge/sdk/routes/sdk.py index 22a6b64a..27b0f03a 100644 --- a/skyvern/forge/sdk/routes/sdk.py +++ b/skyvern/forge/sdk/routes/sdk.py @@ -3,8 +3,8 @@ from typing import Any import structlog from fastapi import Depends, HTTPException, status -from skyvern import SkyvernPage from skyvern.core.script_generations.real_skyvern_page_ai import RealSkyvernPageAi +from skyvern.core.script_generations.script_skyvern_page import ScriptSkyvernPage from skyvern.forge import app from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.skyvern_context import SkyvernContext @@ -145,7 +145,7 @@ async def run_sdk_action( ) result: Any | None = None try: - scraped_page = await SkyvernPage.create_scraped_page(browser_session_id=browser_session_id) + scraped_page = await ScriptSkyvernPage.create_scraped_page(browser_session_id=browser_session_id) page = await scraped_page._browser_state.must_get_working_page() page_ai = RealSkyvernPageAi(scraped_page, page) diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index bec6ccda..334a4b35 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -17,7 +17,7 @@ from skyvern.config import settings from skyvern.constants import GET_DOWNLOADED_FILES_TIMEOUT from skyvern.core.script_generations.constants import SCRIPT_TASK_BLOCKS from skyvern.core.script_generations.generate_script import _build_block_fn, create_or_update_script_block -from skyvern.core.script_generations.skyvern_page import script_run_context_manager +from skyvern.core.script_generations.script_skyvern_page import script_run_context_manager from skyvern.exceptions import ScriptNotFound, ScriptTerminationException, WorkflowRunNotFound from skyvern.forge import app from skyvern.forge.sdk.artifact.models import ArtifactType