diff --git a/skyvern/config.py b/skyvern/config.py index ecd7668c..d2ca9cf4 100644 --- a/skyvern/config.py +++ b/skyvern/config.py @@ -427,6 +427,14 @@ class Settings(BaseSettings): ENCRYPTOR_AES_SALT: str | None = None ENCRYPTOR_AES_IV: str | None = None + # Cleanup Cron Settings + ENABLE_CLEANUP_CRON: bool = False + """Enable periodic cleanup of temporary data (temp files and stale processes).""" + CLEANUP_CRON_INTERVAL_MINUTES: int = 10 + """Interval in minutes for the cleanup cron job.""" + CLEANUP_STALE_TASK_THRESHOLD_HOURS: int = 24 + """Tasks/workflows not updated for this many hours are considered stale (stuck).""" + # OpenTelemetry Settings OTEL_ENABLED: bool = False OTEL_SERVICE_NAME: str = "skyvern" diff --git a/skyvern/forge/api_app.py b/skyvern/forge/api_app.py index b55a0adf..339bca5e 100644 --- a/skyvern/forge/api_app.py +++ b/skyvern/forge/api_app.py @@ -23,6 +23,7 @@ from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.db.exceptions import NotFoundError from skyvern.forge.sdk.routes import internal_auth from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router, legacy_v2_router +from skyvern.services.cleanup_service import start_cleanup_scheduler, stop_cleanup_scheduler try: from cloud.observability.otel_setup import OTELSetup @@ -68,7 +69,17 @@ async def lifespan(fastapi_app: FastAPI) -> AsyncGenerator[None, Any]: await forge_app.api_app_startup_event(fastapi_app) except Exception: LOG.exception("Failed to execute api app startup event") + + # Start cleanup scheduler if enabled + cleanup_task = start_cleanup_scheduler() + if cleanup_task: + LOG.info("Cleanup scheduler started") + yield + + # Stop cleanup scheduler + await stop_cleanup_scheduler() + if forge_app.api_app_shutdown_event: LOG.info("Calling api app shutdown event") try: diff --git a/skyvern/forge/sdk/db/agent_db.py b/skyvern/forge/sdk/db/agent_db.py index c2fe2f15..30e343de 100644 --- a/skyvern/forge/sdk/db/agent_db.py +++ b/skyvern/forge/sdk/db/agent_db.py @@ -968,6 +968,99 @@ class AgentDB(BaseAlchemyDB): LOG.error("UnexpectedError", exc_info=True) raise + async def get_running_tasks_info_globally( + self, + stale_threshold_hours: int = 24, + ) -> tuple[int, int]: + """ + Get information about running tasks across all organizations. + Used by cleanup service to determine if cleanup should be skipped. + + Args: + stale_threshold_hours: Tasks not updated for this many hours are considered stale. + + Returns: + Tuple of (active_task_count, stale_task_count). + Active tasks are those updated within the threshold. + Stale tasks are those not updated within the threshold but still in running status. + """ + try: + async with self.Session() as session: + running_statuses = [TaskStatus.created, TaskStatus.queued, TaskStatus.running] + stale_cutoff = datetime.utcnow() - timedelta(hours=stale_threshold_hours) + + # Count active tasks (recently updated) + active_query = ( + select(func.count()) + .select_from(TaskModel) + .filter(TaskModel.status.in_(running_statuses)) + .filter(TaskModel.modified_at >= stale_cutoff) + ) + active_count = (await session.execute(active_query)).scalar_one() + + # Count stale tasks (not updated for a long time) + stale_query = ( + select(func.count()) + .select_from(TaskModel) + .filter(TaskModel.status.in_(running_statuses)) + .filter(TaskModel.modified_at < stale_cutoff) + ) + stale_count = (await session.execute(stale_query)).scalar_one() + + return (active_count, stale_count) + except SQLAlchemyError: + LOG.error("SQLAlchemyError in get_running_tasks_info_globally", exc_info=True) + raise + + async def get_running_workflow_runs_info_globally( + self, + stale_threshold_hours: int = 24, + ) -> tuple[int, int]: + """ + Get information about running workflow runs across all organizations. + Used by cleanup service to determine if cleanup should be skipped. + + Args: + stale_threshold_hours: Workflow runs not updated for this many hours are considered stale. + + Returns: + Tuple of (active_workflow_count, stale_workflow_count). + Active workflows are those updated within the threshold. + Stale workflows are those not updated within the threshold but still in running status. + """ + try: + async with self.Session() as session: + running_statuses = [ + WorkflowRunStatus.created, + WorkflowRunStatus.queued, + WorkflowRunStatus.running, + WorkflowRunStatus.paused, + ] + stale_cutoff = datetime.utcnow() - timedelta(hours=stale_threshold_hours) + + # Count active workflow runs (recently updated) + active_query = ( + select(func.count()) + .select_from(WorkflowRunModel) + .filter(WorkflowRunModel.status.in_(running_statuses)) + .filter(WorkflowRunModel.modified_at >= stale_cutoff) + ) + active_count = (await session.execute(active_query)).scalar_one() + + # Count stale workflow runs (not updated for a long time) + stale_query = ( + select(func.count()) + .select_from(WorkflowRunModel) + .filter(WorkflowRunModel.status.in_(running_statuses)) + .filter(WorkflowRunModel.modified_at < stale_cutoff) + ) + stale_count = (await session.execute(stale_query)).scalar_one() + + return (active_count, stale_count) + except SQLAlchemyError: + LOG.error("SQLAlchemyError in get_running_workflow_runs_info_globally", exc_info=True) + raise + async def get_all_organizations(self) -> list[Organization]: try: async with self.Session() as session: diff --git a/skyvern/services/cleanup_service.py b/skyvern/services/cleanup_service.py new file mode 100644 index 00000000..45c5d250 --- /dev/null +++ b/skyvern/services/cleanup_service.py @@ -0,0 +1,289 @@ +""" +Cleanup service for periodic cleanup of temporary data. + +This service is responsible for: +1. Cleaning up temporary files in the temp directory +2. Killing stale playwright/node/browser processes +""" + +import asyncio +import shutil +from datetime import datetime, timedelta +from pathlib import Path + +import psutil +import structlog + +from skyvern.config import settings +from skyvern.forge import app + +LOG = structlog.get_logger() + +# Process names to look for when cleaning up stale processes +STALE_PROCESS_NAMES = frozenset( + { + "playwright", + "node", + "chromium", + "chrome", + "firefox", + "webkit", + "chromium-browser", + "google-chrome", + "google-chrome-stable", + "msedge", + "microsoft-edge", + "microsoft-edge-stable", + } +) + + +async def check_running_tasks_or_workflows() -> tuple[bool, int, int]: + """ + Check if there are any actively running tasks or workflow runs. + + Tasks/workflows that haven't been updated for longer than the configured + stale threshold are considered stuck (not actively running), and a warning + will be logged for them. + + Returns: + Tuple of (has_active_tasks_or_workflows, stale_task_count, stale_workflow_count). + """ + try: + stale_threshold = settings.CLEANUP_STALE_TASK_THRESHOLD_HOURS + + # Check tasks + active_tasks, stale_tasks = await app.DATABASE.get_running_tasks_info_globally( + stale_threshold_hours=stale_threshold + ) + + # Check workflow runs + active_workflows, stale_workflows = await app.DATABASE.get_running_workflow_runs_info_globally( + stale_threshold_hours=stale_threshold + ) + + # Log warnings for stale tasks/workflows + if stale_tasks > 0: + LOG.warning( + "Found stale tasks that haven't been updated", + stale_task_count=stale_tasks, + threshold_hours=stale_threshold, + ) + + if stale_workflows > 0: + LOG.warning( + "Found stale workflow runs that haven't been updated", + stale_workflow_count=stale_workflows, + threshold_hours=stale_threshold, + ) + + has_active = active_tasks > 0 or active_workflows > 0 + return (has_active, stale_tasks, stale_workflows) + except Exception: + LOG.exception("Error checking for running tasks/workflows") + # If we can't check, assume there are running tasks to be safe + return (True, 0, 0) + + +def cleanup_temp_directory() -> int: + """ + Clean up temporary files in the temp directory. + + Returns: + Number of files/directories removed. + """ + temp_path = Path(settings.TEMP_PATH) + if not temp_path.exists(): + LOG.debug("Temp directory does not exist", temp_path=str(temp_path)) + return 0 + + removed_count = 0 + try: + for item in temp_path.iterdir(): + try: + if item.is_file(): + item.unlink() + removed_count += 1 + elif item.is_dir(): + shutil.rmtree(item) + removed_count += 1 + except Exception: + LOG.warning("Failed to remove temp item", item=str(item), exc_info=True) + except Exception: + LOG.exception("Error cleaning temp directory", temp_path=str(temp_path)) + + return removed_count + + +def get_stale_browser_processes(max_age_minutes: int = 60) -> list[psutil.Process]: + """ + Get browser-related processes that have been running for too long. + + Args: + max_age_minutes: Maximum age in minutes before a process is considered stale. + + Returns: + List of stale processes. + """ + stale_processes = [] + cutoff_time = datetime.now() - timedelta(minutes=max_age_minutes) + + for proc in psutil.process_iter(["pid", "name", "create_time"]): + try: + proc_name = proc.info.get("name", "").lower() # type: ignore[union-attr] + create_time = proc.info.get("create_time") # type: ignore[union-attr] + + if create_time is None: + continue + + # Check if process name matches any of the stale process names + is_browser_related = any(name in proc_name for name in STALE_PROCESS_NAMES) + + if is_browser_related: + proc_start_time = datetime.fromtimestamp(create_time) + if proc_start_time < cutoff_time: + stale_processes.append(proc) + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + + return stale_processes + + +def kill_stale_processes() -> int: + """ + Kill stale playwright/node/browser processes. + + Returns: + Number of processes killed. + """ + stale_processes = get_stale_browser_processes() + killed_count = 0 + + for proc in stale_processes: + try: + proc_name = proc.name() + proc_pid = proc.pid + + # Try graceful termination first + proc.terminate() + + # Wait a short time for graceful shutdown + try: + proc.wait(timeout=3) + except psutil.TimeoutExpired: + # Force kill if graceful termination didn't work + proc.kill() + + LOG.info("Killed stale process", name=proc_name, pid=proc_pid) + killed_count += 1 + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + except Exception: + LOG.warning("Failed to kill stale process", pid=proc.pid, exc_info=True) + + return killed_count + + +async def run_cleanup() -> None: + """ + Run the cleanup process. + + This function checks if there are actively running tasks/workflows before cleaning up. + If there are active (recently updated) tasks/workflows, it skips the cleanup. + Tasks/workflows that haven't been updated for longer than the stale threshold + are considered stuck and will not block cleanup (but a warning will be logged). + """ + LOG.debug("Starting cleanup process") + + # Check if there are running tasks or workflows + has_active, stale_tasks, stale_workflows = await check_running_tasks_or_workflows() + + if has_active: + LOG.info("Skipping cleanup: tasks or workflows are currently running") + return + + # Log summary if there are stale tasks/workflows (they won't block cleanup) + if stale_tasks > 0 or stale_workflows > 0: + LOG.info( + "Proceeding with cleanup despite stale tasks/workflows", + stale_tasks=stale_tasks, + stale_workflows=stale_workflows, + ) + + # Clean up temp directory + temp_files_removed = cleanup_temp_directory() + if temp_files_removed > 0: + LOG.info("Cleaned up temp directory", files_removed=temp_files_removed) + + # Kill stale processes + processes_killed = kill_stale_processes() + if processes_killed > 0: + LOG.info("Killed stale browser processes", processes_killed=processes_killed) + + LOG.debug("Cleanup process completed") + + +async def cleanup_scheduler() -> None: + """ + Scheduler that runs the cleanup process periodically. + + This runs in an infinite loop, sleeping for the configured interval between runs. + """ + interval_seconds = settings.CLEANUP_CRON_INTERVAL_MINUTES * 60 + + LOG.info( + "Cleanup scheduler started", + interval_minutes=settings.CLEANUP_CRON_INTERVAL_MINUTES, + ) + + while True: + try: + await asyncio.sleep(interval_seconds) + await run_cleanup() + except asyncio.CancelledError: + LOG.info("Cleanup scheduler cancelled") + break + except Exception: + LOG.exception("Error in cleanup scheduler") + # Continue running despite errors + + +_cleanup_task: asyncio.Task | None = None + + +def start_cleanup_scheduler() -> asyncio.Task | None: + """ + Start the cleanup scheduler as a background task. + + Returns: + The asyncio Task running the scheduler, or None if cleanup is disabled. + """ + global _cleanup_task + + if not settings.ENABLE_CLEANUP_CRON: + LOG.debug("Cleanup cron is disabled") + return None + + if _cleanup_task is not None and not _cleanup_task.done(): + LOG.warning("Cleanup scheduler is already running") + return _cleanup_task + + _cleanup_task = asyncio.create_task(cleanup_scheduler()) + return _cleanup_task + + +async def stop_cleanup_scheduler() -> None: + """ + Stop the cleanup scheduler if it's running. + """ + global _cleanup_task + + if _cleanup_task is not None and not _cleanup_task.done(): + _cleanup_task.cancel() + try: + await _cleanup_task + except asyncio.CancelledError: + pass + LOG.info("Cleanup scheduler stopped") + + _cleanup_task = None