Wait time optimization xp (#3802)

This commit is contained in:
pedrohsdb
2025-10-23 16:09:42 -07:00
committed by GitHub
parent 2930c06661
commit 5b80614aac
6 changed files with 382 additions and 11 deletions

View File

@@ -0,0 +1,5 @@
"""Experimentation modules for Skyvern."""
from skyvern.experimentation.wait_config import WAIT_VARIANTS, WaitConfig, get_wait_config_from_experiment
__all__ = ["WaitConfig", "get_wait_config_from_experiment", "WAIT_VARIANTS"]

View File

@@ -0,0 +1,208 @@
"""
Wait time optimization experiment configuration.
This module provides configurable wait times that can be controlled via PostHog feature flags.
Allows for A/B testing of different wait time strategies to optimize speed while maintaining success rates.
"""
import json
from typing import Any
import structlog
from skyvern.forge import app
LOG = structlog.get_logger()
# Preset variant configurations (all times in seconds)
WAIT_VARIANTS = {
"baseline": {
"post_click_delay": 0.3,
"post_input_dropdown_delay": 3.0,
"dropdown_close_wait": 2.0,
"checkbox_retry_delay": 2.0,
"scroll_into_view_wait": 2.0,
"empty_page_retry_wait": 3.0,
"mouse_movement_delay_min": 0.2,
"mouse_movement_delay_max": 0.3,
},
"moderate": {
"post_click_delay": 0.15,
"post_input_dropdown_delay": 1.5,
"dropdown_close_wait": 1.0,
"checkbox_retry_delay": 1.0,
"scroll_into_view_wait": 1.0,
"empty_page_retry_wait": 2.0,
"mouse_movement_delay_min": 0.1,
"mouse_movement_delay_max": 0.15,
},
"aggressive": {
"post_click_delay": 0.05,
"post_input_dropdown_delay": 0.5,
"dropdown_close_wait": 0.5,
"checkbox_retry_delay": 0.5,
"scroll_into_view_wait": 0.5,
"empty_page_retry_wait": 1.0,
"mouse_movement_delay_min": 0.05,
"mouse_movement_delay_max": 0.1,
},
}
class WaitConfig:
"""Manages wait time configuration with PostHog experiment support."""
def __init__(self, payload: dict[str, Any] | None = None):
"""
Initialize wait config from PostHog payload.
Expected payload format:
{
"variant": "moderate", # optional preset variant
"overrides": { # optional specific overrides
"post_click_delay": 0.15,
...
},
"global_multiplier": 1.0, # optional multiplier for all waits
"adaptive_mode": false, # optional adaptive retry behavior
"adaptive_backoff": 1.5 # optional backoff multiplier
}
"""
self.variant = "baseline"
self.overrides: dict[str, float] = {}
self.global_multiplier = 1.0
self.adaptive_mode = False
self.adaptive_backoff = 1.5
if payload:
self._load_from_payload(payload)
def _load_from_payload(self, payload: dict[str, Any]) -> None:
"""Load configuration from PostHog payload."""
self.variant = payload.get("variant", "baseline")
self.overrides = payload.get("overrides", {})
self.global_multiplier = payload.get("global_multiplier", 1.0)
self.adaptive_mode = payload.get("adaptive_mode", False)
self.adaptive_backoff = payload.get("adaptive_backoff", 1.5)
# Validate variant
if self.variant not in WAIT_VARIANTS:
LOG.warning(
"Invalid wait variant, falling back to baseline",
variant=self.variant,
valid_variants=list(WAIT_VARIANTS.keys()),
)
self.variant = "baseline"
def get_wait_time(self, wait_type: str, retry_count: int = 0) -> float:
"""
Get wait time for a specific wait type.
Args:
wait_type: Type of wait (e.g., "post_click_delay")
retry_count: Current retry count (for adaptive mode)
Returns:
Wait time in seconds (clamped to [0.0, 30.0])
"""
# Check for override first
if wait_type in self.overrides:
base_wait = self.overrides[wait_type]
else:
# Get from variant preset
variant_config = WAIT_VARIANTS.get(self.variant, WAIT_VARIANTS["baseline"])
base_wait = variant_config.get(wait_type, 0.0)
# Sanitize inputs to prevent negative/extreme waits from payload misconfig
base_wait = max(0.0, float(base_wait))
multiplier = max(0.0, float(self.global_multiplier))
backoff = max(0.0, float(self.adaptive_backoff))
# Apply global multiplier
wait_time = base_wait * multiplier
# Apply adaptive backoff if enabled
if self.adaptive_mode and retry_count > 0:
wait_time *= backoff**retry_count
# Cap at 30 seconds to prevent extreme waits from exponential backoff
return min(30.0, max(0.0, wait_time))
def get_mouse_movement_delay_range(self) -> tuple[float, float]:
"""Get min and max for mouse movement delays."""
min_delay = self.get_wait_time("mouse_movement_delay_min")
max_delay = self.get_wait_time("mouse_movement_delay_max")
return (min_delay, max_delay)
def to_dict(self) -> dict[str, Any]:
"""Export current configuration as dict for logging."""
return {
"variant": self.variant,
"overrides": self.overrides,
"global_multiplier": self.global_multiplier,
"adaptive_mode": self.adaptive_mode,
"adaptive_backoff": self.adaptive_backoff,
}
async def get_wait_config_from_experiment(
distinct_id: str,
organization_id: str,
) -> WaitConfig | None:
"""
Get wait configuration from PostHog experiment.
Args:
distinct_id: Unique identifier for experiment assignment
organization_id: Organization ID for experiment properties
Returns:
WaitConfig instance if experiment is active, None otherwise
"""
if not app.EXPERIMENTATION_PROVIDER:
return None
# Check if user is in the experiment
wait_optimization_experiment = await app.EXPERIMENTATION_PROVIDER.get_value_cached(
"WAIT_TIME_OPTIMIZATION", distinct_id, properties={"organization_id": organization_id}
)
# Skip if user is in control group (False or "False") or experiment is disabled (None/empty)
if wait_optimization_experiment in (False, "False") or not wait_optimization_experiment:
return None
# If we have an active variant, get the payload
payload = await app.EXPERIMENTATION_PROVIDER.get_payload_cached(
"WAIT_TIME_OPTIMIZATION", distinct_id, properties={"organization_id": organization_id}
)
if payload:
try:
config_dict = json.loads(payload) if isinstance(payload, str) else payload
wait_config = WaitConfig(config_dict)
LOG.info(
"Wait time optimization experiment enabled",
distinct_id=distinct_id,
organization_id=organization_id,
variant=wait_optimization_experiment,
config=wait_config.to_dict(),
)
return wait_config
except (json.JSONDecodeError, KeyError, TypeError) as e:
LOG.warning(
"Failed to parse wait optimization experiment payload",
distinct_id=distinct_id,
variant=wait_optimization_experiment,
payload=payload,
error=str(e),
)
else:
LOG.warning(
"No payload found for wait optimization experiment",
distinct_id=distinct_id,
variant=wait_optimization_experiment,
)
return None

View File

@@ -0,0 +1,131 @@
"""
Wait utility functions for configurable waits.
These functions wrap asyncio.sleep with experiment-aware wait times and metrics tracking.
"""
import asyncio
import structlog
from cachetools import TTLCache
from skyvern.experimentation.wait_config import WaitConfig, get_wait_config_from_experiment
LOG = structlog.get_logger()
# Global cache for wait config per task/workflow
# TTL of 1 hour ensures cache doesn't grow unbounded in long-running processes
# Most tasks/workflows complete within minutes, so 1 hour TTL is safe
_wait_config_cache: TTLCache[str, WaitConfig | None] = TTLCache(maxsize=10000, ttl=3600)
async def get_or_create_wait_config(
task_id: str | None = None,
workflow_run_id: str | None = None,
organization_id: str | None = None,
) -> WaitConfig | None:
"""
Get or create wait config for a task/workflow.
Uses caching to avoid repeated PostHog calls within the same task/workflow.
Args:
task_id: Task ID for experiment assignment and caching
workflow_run_id: Workflow run ID for experiment assignment and caching
organization_id: Organization ID for experiment properties
Returns None if:
- ENABLE_WAIT_TIME_OPTIMIZATION_EXPERIMENT is False (killswitch activated)
- organization_id is None
- No experiment is active for this task/workflow
"""
# KILLSWITCH: Check if experiment is disabled globally (lazy import to avoid circular dependency)
try:
from cloud.config import settings as cloud_settings # noqa: PLC0415
if not cloud_settings.ENABLE_WAIT_TIME_OPTIMIZATION_EXPERIMENT:
return None
except ImportError:
# If cloud.config isn't available (OSS), experiment is disabled by default
return None
if not organization_id:
return None
# Use task_id or workflow_run_id as cache key
cache_key = task_id or workflow_run_id
if not cache_key:
return None
# Check cache first
if cache_key in _wait_config_cache:
return _wait_config_cache[cache_key]
# Get from experiment
wait_config = await get_wait_config_from_experiment(cache_key, organization_id)
# Cache it
_wait_config_cache[cache_key] = wait_config
return wait_config
# Simple helper for getting wait time from config
def get_wait_time(
wait_config: WaitConfig | None,
wait_type: str,
default: float,
retry_count: int = 0,
) -> float:
"""
Get wait time from config or use default.
Args:
wait_config: WaitConfig instance or None
wait_type: Type of wait (e.g., "post_click_delay")
default: Default wait time in seconds (HARDCODED PRODUCTION VALUE)
retry_count: Current retry count (for adaptive mode)
Returns:
Wait time in seconds
"""
if wait_config:
return wait_config.get_wait_time(wait_type, retry_count)
return default
# Convenience functions for common wait patterns
async def scroll_into_view_wait(
task_id: str | None = None,
workflow_run_id: str | None = None,
organization_id: str | None = None,
) -> None:
"""
Wait after scrolling element into view.
Note: This is called from low-level DOM utilities (SkyvernElement.scroll_into_view)
which don't have task context available. Threading context through would require
invasive changes to many call sites. Defaults are reasonable for this utility function.
"""
wait_config = await get_or_create_wait_config(task_id, workflow_run_id, organization_id)
wait_seconds = get_wait_time(wait_config, "scroll_into_view_wait", default=2.0)
await asyncio.sleep(wait_seconds)
async def empty_page_retry_wait(
task_id: str | None = None,
workflow_run_id: str | None = None,
organization_id: str | None = None,
) -> None:
"""
Wait before retrying scrape when no elements found.
Note: This is called from the scraper (scrape_web_unsafe) which doesn't have task context.
Threading context through would require updating scrape_website, scrape_web_unsafe, and all
their callers. Defaults are reasonable for this low-level scraping utility.
"""
wait_config = await get_or_create_wait_config(task_id, workflow_run_id, organization_id)
wait_seconds = get_wait_time(wait_config, "empty_page_retry_wait", default=3.0)
await asyncio.sleep(wait_seconds)

View File

@@ -55,6 +55,7 @@ from skyvern.exceptions import (
OptionIndexOutOfBound,
WrongElementToUploadFile,
)
from skyvern.experimentation.wait_utils import get_or_create_wait_config, get_wait_time
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.files import (
@@ -524,6 +525,9 @@ async def handle_click_action(
task: Task,
step: Step,
) -> list[ActionResult]:
# Get wait config once for this handler
wait_config = await get_or_create_wait_config(task.task_id, task.workflow_run_id, task.organization_id)
dom = DomUtil(scraped_page=scraped_page, page=page)
original_url = page.url
if action.x is not None and action.y is not None:
@@ -569,7 +573,9 @@ async def handle_click_action(
return [ActionSuccess()]
skyvern_element = await dom.get_skyvern_element_by_id(action.element_id)
await asyncio.sleep(0.3)
# Wait after getting element to allow any dynamic changes
await asyncio.sleep(get_wait_time(wait_config, "post_click_delay", default=0.3))
# dynamically validate the attr, since it could change into enabled after the previous actions
if await skyvern_element.is_disabled(dynamic=True):
@@ -1578,6 +1584,9 @@ async def handle_download_file_action(
task: Task,
step: Step,
) -> list[ActionResult]:
# Get wait config once for this handler
wait_config = await get_or_create_wait_config(task.task_id, task.workflow_run_id, task.organization_id)
dom = DomUtil(scraped_page=scraped_page, page=page)
skyvern_element = await dom.get_skyvern_element_by_id(action.element_id)
@@ -1586,7 +1595,7 @@ async def handle_download_file_action(
try:
# Start waiting for the download
async with page.expect_download() as download_info:
await asyncio.sleep(0.3)
await asyncio.sleep(get_wait_time(wait_config, "post_click_delay", default=0.3))
locator = skyvern_element.locator
await locator.click(

View File

@@ -20,6 +20,7 @@ from skyvern.exceptions import (
ScrapingFailedBlankPage,
UnknownElementTreeFormat,
)
from skyvern.experimentation.wait_utils import empty_page_retry_wait
from skyvern.forge.sdk.api.crypto import calculate_sha256
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.settings_manager import SettingsManager
@@ -564,8 +565,8 @@ async def scrape_web_unsafe(
elements, element_tree = await get_interactable_element_tree(page, scrape_exclude)
if not elements and not support_empty_page:
LOG.warning("No elements found on the page, wait for 3 seconds and retry")
await asyncio.sleep(3)
LOG.warning("No elements found on the page, wait and retry")
await empty_page_retry_wait()
elements, element_tree = await get_interactable_element_tree(page, scrape_exclude)
element_tree = await cleanup_element_tree(page, url, copy.deepcopy(element_tree))

View File

@@ -25,6 +25,7 @@ from skyvern.exceptions import (
NoneFrameError,
SkyvernException,
)
from skyvern.experimentation.wait_utils import get_or_create_wait_config, get_wait_time, scroll_into_view_wait
from skyvern.webeye.actions import handler_utils
from skyvern.webeye.scraper.scraper import IncrementalScrapePage, ScrapedPage, json_to_html, trim_element
from skyvern.webeye.utils.page import SkyvernFrame
@@ -613,35 +614,49 @@ class SkyvernElement:
async def input_clear(self, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
await self.get_locator().clear(timeout=timeout)
async def check(self, delay: int = 2, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
async def check(
self,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
task_id: str | None = None,
workflow_run_id: str | None = None,
organization_id: str | None = None,
) -> None:
# HACK: sometimes playwright will raise exception when checking the element.
# we need to trigger the hack to check again in several seconds
try:
await self.get_locator().check(timeout=timeout)
except Exception:
LOG.info(
f"Failed to check the element at the first time, trigger the hack to check again in {delay} seconds",
"Failed to check the element at the first time, trigger the hack to check again",
exc_info=True,
element_id=self.get_id(),
)
await asyncio.sleep(delay)
wait_config = await get_or_create_wait_config(task_id, workflow_run_id, organization_id)
await asyncio.sleep(get_wait_time(wait_config, "checkbox_retry_delay", default=2.0))
if await self.get_locator().count() == 0:
LOG.info("Element is not on the page, the checking should work", element_id=self.get_id())
return
await self.get_locator().check(timeout=timeout)
async def uncheck(self, delay: int = 2, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
async def uncheck(
self,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
task_id: str | None = None,
workflow_run_id: str | None = None,
organization_id: str | None = None,
) -> None:
# HACK: sometimes playwright will raise exception when unchecking the element.
# we need to trigger the hack to uncheck again in several seconds
try:
await self.get_locator().uncheck(timeout=timeout)
except Exception:
LOG.info(
f"Failed to uncheck the element at the first time, trigger the hack to uncheck again in {delay} seconds",
"Failed to uncheck the element at the first time, trigger the hack to uncheck again",
exc_info=True,
element_id=self.get_id(),
)
await asyncio.sleep(delay)
wait_config = await get_or_create_wait_config(task_id, workflow_run_id, organization_id)
await asyncio.sleep(get_wait_time(wait_config, "checkbox_retry_delay", default=2.0))
if await self.get_locator().count() == 0:
LOG.info("Element is not on the page, the unchecking should work", element_id=self.get_id())
return
@@ -772,7 +787,9 @@ class SkyvernElement:
)
await self.blur()
await self.focus(timeout=timeout)
await asyncio.sleep(2) # wait for scrolling into the target
# Wait for scrolling to complete
await scroll_into_view_wait()
async def calculate_min_y_distance_to(
self,