Files
Dorod-Sky/skyvern/experimentation/wait_utils.py
2025-10-23 16:09:42 -07:00

132 lines
4.3 KiB
Python

"""
Wait utility functions for configurable waits.
These functions wrap asyncio.sleep with experiment-aware wait times and metrics tracking.
"""
import asyncio
import structlog
from cachetools import TTLCache
from skyvern.experimentation.wait_config import WaitConfig, get_wait_config_from_experiment
LOG = structlog.get_logger()
# Global cache for wait config per task/workflow
# TTL of 1 hour ensures cache doesn't grow unbounded in long-running processes
# Most tasks/workflows complete within minutes, so 1 hour TTL is safe
_wait_config_cache: TTLCache[str, WaitConfig | None] = TTLCache(maxsize=10000, ttl=3600)
async def get_or_create_wait_config(
task_id: str | None = None,
workflow_run_id: str | None = None,
organization_id: str | None = None,
) -> WaitConfig | None:
"""
Get or create wait config for a task/workflow.
Uses caching to avoid repeated PostHog calls within the same task/workflow.
Args:
task_id: Task ID for experiment assignment and caching
workflow_run_id: Workflow run ID for experiment assignment and caching
organization_id: Organization ID for experiment properties
Returns None if:
- ENABLE_WAIT_TIME_OPTIMIZATION_EXPERIMENT is False (killswitch activated)
- organization_id is None
- No experiment is active for this task/workflow
"""
# KILLSWITCH: Check if experiment is disabled globally (lazy import to avoid circular dependency)
try:
from cloud.config import settings as cloud_settings # noqa: PLC0415
if not cloud_settings.ENABLE_WAIT_TIME_OPTIMIZATION_EXPERIMENT:
return None
except ImportError:
# If cloud.config isn't available (OSS), experiment is disabled by default
return None
if not organization_id:
return None
# Use task_id or workflow_run_id as cache key
cache_key = task_id or workflow_run_id
if not cache_key:
return None
# Check cache first
if cache_key in _wait_config_cache:
return _wait_config_cache[cache_key]
# Get from experiment
wait_config = await get_wait_config_from_experiment(cache_key, organization_id)
# Cache it
_wait_config_cache[cache_key] = wait_config
return wait_config
# Simple helper for getting wait time from config
def get_wait_time(
wait_config: WaitConfig | None,
wait_type: str,
default: float,
retry_count: int = 0,
) -> float:
"""
Get wait time from config or use default.
Args:
wait_config: WaitConfig instance or None
wait_type: Type of wait (e.g., "post_click_delay")
default: Default wait time in seconds (HARDCODED PRODUCTION VALUE)
retry_count: Current retry count (for adaptive mode)
Returns:
Wait time in seconds
"""
if wait_config:
return wait_config.get_wait_time(wait_type, retry_count)
return default
# Convenience functions for common wait patterns
async def scroll_into_view_wait(
task_id: str | None = None,
workflow_run_id: str | None = None,
organization_id: str | None = None,
) -> None:
"""
Wait after scrolling element into view.
Note: This is called from low-level DOM utilities (SkyvernElement.scroll_into_view)
which don't have task context available. Threading context through would require
invasive changes to many call sites. Defaults are reasonable for this utility function.
"""
wait_config = await get_or_create_wait_config(task_id, workflow_run_id, organization_id)
wait_seconds = get_wait_time(wait_config, "scroll_into_view_wait", default=2.0)
await asyncio.sleep(wait_seconds)
async def empty_page_retry_wait(
task_id: str | None = None,
workflow_run_id: str | None = None,
organization_id: str | None = None,
) -> None:
"""
Wait before retrying scrape when no elements found.
Note: This is called from the scraper (scrape_web_unsafe) which doesn't have task context.
Threading context through would require updating scrape_website, scrape_web_unsafe, and all
their callers. Defaults are reasonable for this low-level scraping utility.
"""
wait_config = await get_or_create_wait_config(task_id, workflow_run_id, organization_id)
wait_seconds = get_wait_time(wait_config, "empty_page_retry_wait", default=3.0)
await asyncio.sleep(wait_seconds)