diff --git a/skyvern/experimentation/__init__.py b/skyvern/experimentation/__init__.py new file mode 100644 index 00000000..60ae9981 --- /dev/null +++ b/skyvern/experimentation/__init__.py @@ -0,0 +1,5 @@ +"""Experimentation modules for Skyvern.""" + +from skyvern.experimentation.wait_config import WAIT_VARIANTS, WaitConfig, get_wait_config_from_experiment + +__all__ = ["WaitConfig", "get_wait_config_from_experiment", "WAIT_VARIANTS"] diff --git a/skyvern/experimentation/wait_config.py b/skyvern/experimentation/wait_config.py new file mode 100644 index 00000000..a709ee45 --- /dev/null +++ b/skyvern/experimentation/wait_config.py @@ -0,0 +1,208 @@ +""" +Wait time optimization experiment configuration. + +This module provides configurable wait times that can be controlled via PostHog feature flags. +Allows for A/B testing of different wait time strategies to optimize speed while maintaining success rates. +""" + +import json +from typing import Any + +import structlog + +from skyvern.forge import app + +LOG = structlog.get_logger() + + +# Preset variant configurations (all times in seconds) +WAIT_VARIANTS = { + "baseline": { + "post_click_delay": 0.3, + "post_input_dropdown_delay": 3.0, + "dropdown_close_wait": 2.0, + "checkbox_retry_delay": 2.0, + "scroll_into_view_wait": 2.0, + "empty_page_retry_wait": 3.0, + "mouse_movement_delay_min": 0.2, + "mouse_movement_delay_max": 0.3, + }, + "moderate": { + "post_click_delay": 0.15, + "post_input_dropdown_delay": 1.5, + "dropdown_close_wait": 1.0, + "checkbox_retry_delay": 1.0, + "scroll_into_view_wait": 1.0, + "empty_page_retry_wait": 2.0, + "mouse_movement_delay_min": 0.1, + "mouse_movement_delay_max": 0.15, + }, + "aggressive": { + "post_click_delay": 0.05, + "post_input_dropdown_delay": 0.5, + "dropdown_close_wait": 0.5, + "checkbox_retry_delay": 0.5, + "scroll_into_view_wait": 0.5, + "empty_page_retry_wait": 1.0, + "mouse_movement_delay_min": 0.05, + "mouse_movement_delay_max": 0.1, + }, +} + + +class WaitConfig: + """Manages wait time configuration with PostHog experiment support.""" + + def __init__(self, payload: dict[str, Any] | None = None): + """ + Initialize wait config from PostHog payload. + + Expected payload format: + { + "variant": "moderate", # optional preset variant + "overrides": { # optional specific overrides + "post_click_delay": 0.15, + ... + }, + "global_multiplier": 1.0, # optional multiplier for all waits + "adaptive_mode": false, # optional adaptive retry behavior + "adaptive_backoff": 1.5 # optional backoff multiplier + } + """ + self.variant = "baseline" + self.overrides: dict[str, float] = {} + self.global_multiplier = 1.0 + self.adaptive_mode = False + self.adaptive_backoff = 1.5 + + if payload: + self._load_from_payload(payload) + + def _load_from_payload(self, payload: dict[str, Any]) -> None: + """Load configuration from PostHog payload.""" + self.variant = payload.get("variant", "baseline") + self.overrides = payload.get("overrides", {}) + self.global_multiplier = payload.get("global_multiplier", 1.0) + self.adaptive_mode = payload.get("adaptive_mode", False) + self.adaptive_backoff = payload.get("adaptive_backoff", 1.5) + + # Validate variant + if self.variant not in WAIT_VARIANTS: + LOG.warning( + "Invalid wait variant, falling back to baseline", + variant=self.variant, + valid_variants=list(WAIT_VARIANTS.keys()), + ) + self.variant = "baseline" + + def get_wait_time(self, wait_type: str, retry_count: int = 0) -> float: + """ + Get wait time for a specific wait type. + + Args: + wait_type: Type of wait (e.g., "post_click_delay") + retry_count: Current retry count (for adaptive mode) + + Returns: + Wait time in seconds (clamped to [0.0, 30.0]) + """ + # Check for override first + if wait_type in self.overrides: + base_wait = self.overrides[wait_type] + else: + # Get from variant preset + variant_config = WAIT_VARIANTS.get(self.variant, WAIT_VARIANTS["baseline"]) + base_wait = variant_config.get(wait_type, 0.0) + + # Sanitize inputs to prevent negative/extreme waits from payload misconfig + base_wait = max(0.0, float(base_wait)) + multiplier = max(0.0, float(self.global_multiplier)) + backoff = max(0.0, float(self.adaptive_backoff)) + + # Apply global multiplier + wait_time = base_wait * multiplier + + # Apply adaptive backoff if enabled + if self.adaptive_mode and retry_count > 0: + wait_time *= backoff**retry_count + + # Cap at 30 seconds to prevent extreme waits from exponential backoff + return min(30.0, max(0.0, wait_time)) + + def get_mouse_movement_delay_range(self) -> tuple[float, float]: + """Get min and max for mouse movement delays.""" + min_delay = self.get_wait_time("mouse_movement_delay_min") + max_delay = self.get_wait_time("mouse_movement_delay_max") + return (min_delay, max_delay) + + def to_dict(self) -> dict[str, Any]: + """Export current configuration as dict for logging.""" + return { + "variant": self.variant, + "overrides": self.overrides, + "global_multiplier": self.global_multiplier, + "adaptive_mode": self.adaptive_mode, + "adaptive_backoff": self.adaptive_backoff, + } + + +async def get_wait_config_from_experiment( + distinct_id: str, + organization_id: str, +) -> WaitConfig | None: + """ + Get wait configuration from PostHog experiment. + + Args: + distinct_id: Unique identifier for experiment assignment + organization_id: Organization ID for experiment properties + + Returns: + WaitConfig instance if experiment is active, None otherwise + """ + + if not app.EXPERIMENTATION_PROVIDER: + return None + + # Check if user is in the experiment + wait_optimization_experiment = await app.EXPERIMENTATION_PROVIDER.get_value_cached( + "WAIT_TIME_OPTIMIZATION", distinct_id, properties={"organization_id": organization_id} + ) + + # Skip if user is in control group (False or "False") or experiment is disabled (None/empty) + if wait_optimization_experiment in (False, "False") or not wait_optimization_experiment: + return None + + # If we have an active variant, get the payload + payload = await app.EXPERIMENTATION_PROVIDER.get_payload_cached( + "WAIT_TIME_OPTIMIZATION", distinct_id, properties={"organization_id": organization_id} + ) + + if payload: + try: + config_dict = json.loads(payload) if isinstance(payload, str) else payload + wait_config = WaitConfig(config_dict) + LOG.info( + "Wait time optimization experiment enabled", + distinct_id=distinct_id, + organization_id=organization_id, + variant=wait_optimization_experiment, + config=wait_config.to_dict(), + ) + return wait_config + except (json.JSONDecodeError, KeyError, TypeError) as e: + LOG.warning( + "Failed to parse wait optimization experiment payload", + distinct_id=distinct_id, + variant=wait_optimization_experiment, + payload=payload, + error=str(e), + ) + else: + LOG.warning( + "No payload found for wait optimization experiment", + distinct_id=distinct_id, + variant=wait_optimization_experiment, + ) + + return None diff --git a/skyvern/experimentation/wait_utils.py b/skyvern/experimentation/wait_utils.py new file mode 100644 index 00000000..a9f1d683 --- /dev/null +++ b/skyvern/experimentation/wait_utils.py @@ -0,0 +1,131 @@ +""" +Wait utility functions for configurable waits. + +These functions wrap asyncio.sleep with experiment-aware wait times and metrics tracking. +""" + +import asyncio + +import structlog +from cachetools import TTLCache + +from skyvern.experimentation.wait_config import WaitConfig, get_wait_config_from_experiment + +LOG = structlog.get_logger() + +# Global cache for wait config per task/workflow +# TTL of 1 hour ensures cache doesn't grow unbounded in long-running processes +# Most tasks/workflows complete within minutes, so 1 hour TTL is safe +_wait_config_cache: TTLCache[str, WaitConfig | None] = TTLCache(maxsize=10000, ttl=3600) + + +async def get_or_create_wait_config( + task_id: str | None = None, + workflow_run_id: str | None = None, + organization_id: str | None = None, +) -> WaitConfig | None: + """ + Get or create wait config for a task/workflow. + + Uses caching to avoid repeated PostHog calls within the same task/workflow. + + Args: + task_id: Task ID for experiment assignment and caching + workflow_run_id: Workflow run ID for experiment assignment and caching + organization_id: Organization ID for experiment properties + + Returns None if: + - ENABLE_WAIT_TIME_OPTIMIZATION_EXPERIMENT is False (killswitch activated) + - organization_id is None + - No experiment is active for this task/workflow + """ + # KILLSWITCH: Check if experiment is disabled globally (lazy import to avoid circular dependency) + try: + from cloud.config import settings as cloud_settings # noqa: PLC0415 + + if not cloud_settings.ENABLE_WAIT_TIME_OPTIMIZATION_EXPERIMENT: + return None + except ImportError: + # If cloud.config isn't available (OSS), experiment is disabled by default + return None + + if not organization_id: + return None + + # Use task_id or workflow_run_id as cache key + cache_key = task_id or workflow_run_id + if not cache_key: + return None + + # Check cache first + if cache_key in _wait_config_cache: + return _wait_config_cache[cache_key] + + # Get from experiment + wait_config = await get_wait_config_from_experiment(cache_key, organization_id) + + # Cache it + _wait_config_cache[cache_key] = wait_config + + return wait_config + + +# Simple helper for getting wait time from config +def get_wait_time( + wait_config: WaitConfig | None, + wait_type: str, + default: float, + retry_count: int = 0, +) -> float: + """ + Get wait time from config or use default. + + Args: + wait_config: WaitConfig instance or None + wait_type: Type of wait (e.g., "post_click_delay") + default: Default wait time in seconds (HARDCODED PRODUCTION VALUE) + retry_count: Current retry count (for adaptive mode) + + Returns: + Wait time in seconds + """ + if wait_config: + return wait_config.get_wait_time(wait_type, retry_count) + return default + + +# Convenience functions for common wait patterns + + +async def scroll_into_view_wait( + task_id: str | None = None, + workflow_run_id: str | None = None, + organization_id: str | None = None, +) -> None: + """ + Wait after scrolling element into view. + + Note: This is called from low-level DOM utilities (SkyvernElement.scroll_into_view) + which don't have task context available. Threading context through would require + invasive changes to many call sites. Defaults are reasonable for this utility function. + """ + wait_config = await get_or_create_wait_config(task_id, workflow_run_id, organization_id) + wait_seconds = get_wait_time(wait_config, "scroll_into_view_wait", default=2.0) + await asyncio.sleep(wait_seconds) + + +async def empty_page_retry_wait( + task_id: str | None = None, + workflow_run_id: str | None = None, + organization_id: str | None = None, +) -> None: + """ + Wait before retrying scrape when no elements found. + + Note: This is called from the scraper (scrape_web_unsafe) which doesn't have task context. + Threading context through would require updating scrape_website, scrape_web_unsafe, and all + their callers. Defaults are reasonable for this low-level scraping utility. + """ + wait_config = await get_or_create_wait_config(task_id, workflow_run_id, organization_id) + wait_seconds = get_wait_time(wait_config, "empty_page_retry_wait", default=3.0) + await asyncio.sleep(wait_seconds) diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index f8ae2fd6..d8b8e3d6 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -55,6 +55,7 @@ from skyvern.exceptions import ( OptionIndexOutOfBound, WrongElementToUploadFile, ) +from skyvern.experimentation.wait_utils import get_or_create_wait_config, get_wait_time from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.files import ( @@ -524,6 +525,9 @@ async def handle_click_action( task: Task, step: Step, ) -> list[ActionResult]: + # Get wait config once for this handler + wait_config = await get_or_create_wait_config(task.task_id, task.workflow_run_id, task.organization_id) + dom = DomUtil(scraped_page=scraped_page, page=page) original_url = page.url if action.x is not None and action.y is not None: @@ -569,7 +573,9 @@ async def handle_click_action( return [ActionSuccess()] skyvern_element = await dom.get_skyvern_element_by_id(action.element_id) - await asyncio.sleep(0.3) + + # Wait after getting element to allow any dynamic changes + await asyncio.sleep(get_wait_time(wait_config, "post_click_delay", default=0.3)) # dynamically validate the attr, since it could change into enabled after the previous actions if await skyvern_element.is_disabled(dynamic=True): @@ -1578,6 +1584,9 @@ async def handle_download_file_action( task: Task, step: Step, ) -> list[ActionResult]: + # Get wait config once for this handler + wait_config = await get_or_create_wait_config(task.task_id, task.workflow_run_id, task.organization_id) + dom = DomUtil(scraped_page=scraped_page, page=page) skyvern_element = await dom.get_skyvern_element_by_id(action.element_id) @@ -1586,7 +1595,7 @@ async def handle_download_file_action( try: # Start waiting for the download async with page.expect_download() as download_info: - await asyncio.sleep(0.3) + await asyncio.sleep(get_wait_time(wait_config, "post_click_delay", default=0.3)) locator = skyvern_element.locator await locator.click( diff --git a/skyvern/webeye/scraper/scraper.py b/skyvern/webeye/scraper/scraper.py index c17e89f0..688612c3 100644 --- a/skyvern/webeye/scraper/scraper.py +++ b/skyvern/webeye/scraper/scraper.py @@ -20,6 +20,7 @@ from skyvern.exceptions import ( ScrapingFailedBlankPage, UnknownElementTreeFormat, ) +from skyvern.experimentation.wait_utils import empty_page_retry_wait from skyvern.forge.sdk.api.crypto import calculate_sha256 from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.settings_manager import SettingsManager @@ -564,8 +565,8 @@ async def scrape_web_unsafe( elements, element_tree = await get_interactable_element_tree(page, scrape_exclude) if not elements and not support_empty_page: - LOG.warning("No elements found on the page, wait for 3 seconds and retry") - await asyncio.sleep(3) + LOG.warning("No elements found on the page, wait and retry") + await empty_page_retry_wait() elements, element_tree = await get_interactable_element_tree(page, scrape_exclude) element_tree = await cleanup_element_tree(page, url, copy.deepcopy(element_tree)) diff --git a/skyvern/webeye/utils/dom.py b/skyvern/webeye/utils/dom.py index 7aed264d..c10b6de4 100644 --- a/skyvern/webeye/utils/dom.py +++ b/skyvern/webeye/utils/dom.py @@ -25,6 +25,7 @@ from skyvern.exceptions import ( NoneFrameError, SkyvernException, ) +from skyvern.experimentation.wait_utils import get_or_create_wait_config, get_wait_time, scroll_into_view_wait from skyvern.webeye.actions import handler_utils from skyvern.webeye.scraper.scraper import IncrementalScrapePage, ScrapedPage, json_to_html, trim_element from skyvern.webeye.utils.page import SkyvernFrame @@ -613,35 +614,49 @@ class SkyvernElement: async def input_clear(self, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None: await self.get_locator().clear(timeout=timeout) - async def check(self, delay: int = 2, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None: + async def check( + self, + timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, + task_id: str | None = None, + workflow_run_id: str | None = None, + organization_id: str | None = None, + ) -> None: # HACK: sometimes playwright will raise exception when checking the element. # we need to trigger the hack to check again in several seconds try: await self.get_locator().check(timeout=timeout) except Exception: LOG.info( - f"Failed to check the element at the first time, trigger the hack to check again in {delay} seconds", + "Failed to check the element at the first time, trigger the hack to check again", exc_info=True, element_id=self.get_id(), ) - await asyncio.sleep(delay) + wait_config = await get_or_create_wait_config(task_id, workflow_run_id, organization_id) + await asyncio.sleep(get_wait_time(wait_config, "checkbox_retry_delay", default=2.0)) if await self.get_locator().count() == 0: LOG.info("Element is not on the page, the checking should work", element_id=self.get_id()) return await self.get_locator().check(timeout=timeout) - async def uncheck(self, delay: int = 2, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None: + async def uncheck( + self, + timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, + task_id: str | None = None, + workflow_run_id: str | None = None, + organization_id: str | None = None, + ) -> None: # HACK: sometimes playwright will raise exception when unchecking the element. # we need to trigger the hack to uncheck again in several seconds try: await self.get_locator().uncheck(timeout=timeout) except Exception: LOG.info( - f"Failed to uncheck the element at the first time, trigger the hack to uncheck again in {delay} seconds", + "Failed to uncheck the element at the first time, trigger the hack to uncheck again", exc_info=True, element_id=self.get_id(), ) - await asyncio.sleep(delay) + wait_config = await get_or_create_wait_config(task_id, workflow_run_id, organization_id) + await asyncio.sleep(get_wait_time(wait_config, "checkbox_retry_delay", default=2.0)) if await self.get_locator().count() == 0: LOG.info("Element is not on the page, the unchecking should work", element_id=self.get_id()) return @@ -772,7 +787,9 @@ class SkyvernElement: ) await self.blur() await self.focus(timeout=timeout) - await asyncio.sleep(2) # wait for scrolling into the target + + # Wait for scrolling to complete + await scroll_into_view_wait() async def calculate_min_y_distance_to( self,