Files
Dorod-Sky/skyvern/services/cleanup_service.py
2026-02-18 15:10:00 +00:00

290 lines
8.7 KiB
Python

"""
Cleanup service for periodic cleanup of temporary data.
This service is responsible for:
1. Cleaning up temporary files in the temp directory
2. Killing stale playwright/node/browser processes
"""
import asyncio
import shutil
from datetime import datetime, timedelta
from pathlib import Path
import psutil
import structlog
from skyvern.config import settings
from skyvern.forge import app
LOG = structlog.get_logger()
# Process names to look for when cleaning up stale processes
STALE_PROCESS_NAMES = frozenset(
{
"playwright",
"node",
"chromium",
"chrome",
"firefox",
"webkit",
"chromium-browser",
"google-chrome",
"google-chrome-stable",
"msedge",
"microsoft-edge",
"microsoft-edge-stable",
}
)
async def check_running_tasks_or_workflows() -> tuple[bool, int, int]:
"""
Check if there are any actively running tasks or workflow runs.
Tasks/workflows that haven't been updated for longer than the configured
stale threshold are considered stuck (not actively running), and a warning
will be logged for them.
Returns:
Tuple of (has_active_tasks_or_workflows, stale_task_count, stale_workflow_count).
"""
try:
stale_threshold = settings.CLEANUP_STALE_TASK_THRESHOLD_HOURS
# Check tasks
active_tasks, stale_tasks = await app.DATABASE.get_running_tasks_info_globally(
stale_threshold_hours=stale_threshold
)
# Check workflow runs
active_workflows, stale_workflows = await app.DATABASE.get_running_workflow_runs_info_globally(
stale_threshold_hours=stale_threshold
)
# Log warnings for stale tasks/workflows
if stale_tasks > 0:
LOG.warning(
"Found stale tasks that haven't been updated",
stale_task_count=stale_tasks,
threshold_hours=stale_threshold,
)
if stale_workflows > 0:
LOG.warning(
"Found stale workflow runs that haven't been updated",
stale_workflow_count=stale_workflows,
threshold_hours=stale_threshold,
)
has_active = active_tasks > 0 or active_workflows > 0
return (has_active, stale_tasks, stale_workflows)
except Exception:
LOG.exception("Error checking for running tasks/workflows")
# If we can't check, assume there are running tasks to be safe
return (True, 0, 0)
def cleanup_temp_directory() -> int:
"""
Clean up temporary files in the temp directory.
Returns:
Number of files/directories removed.
"""
temp_path = Path(settings.TEMP_PATH)
if not temp_path.exists():
LOG.debug("Temp directory does not exist", temp_path=str(temp_path))
return 0
removed_count = 0
try:
for item in temp_path.iterdir():
try:
if item.is_file():
item.unlink()
removed_count += 1
elif item.is_dir():
shutil.rmtree(item)
removed_count += 1
except Exception:
LOG.warning("Failed to remove temp item", item=str(item), exc_info=True)
except Exception:
LOG.exception("Error cleaning temp directory", temp_path=str(temp_path))
return removed_count
def get_stale_browser_processes(max_age_minutes: int = 60) -> list[psutil.Process]:
"""
Get browser-related processes that have been running for too long.
Args:
max_age_minutes: Maximum age in minutes before a process is considered stale.
Returns:
List of stale processes.
"""
stale_processes = []
cutoff_time = datetime.now() - timedelta(minutes=max_age_minutes)
for proc in psutil.process_iter(["pid", "name", "create_time"]):
try:
proc_name = proc.info.get("name", "").lower() # type: ignore[union-attr]
create_time = proc.info.get("create_time") # type: ignore[union-attr]
if create_time is None:
continue
# Check if process name matches any of the stale process names
is_browser_related = any(name in proc_name for name in STALE_PROCESS_NAMES)
if is_browser_related:
proc_start_time = datetime.fromtimestamp(create_time)
if proc_start_time < cutoff_time:
stale_processes.append(proc)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return stale_processes
def kill_stale_processes() -> int:
"""
Kill stale playwright/node/browser processes.
Returns:
Number of processes killed.
"""
stale_processes = get_stale_browser_processes()
killed_count = 0
for proc in stale_processes:
try:
proc_name = proc.name()
proc_pid = proc.pid
# Try graceful termination first
proc.terminate()
# Wait a short time for graceful shutdown
try:
proc.wait(timeout=3)
except psutil.TimeoutExpired:
# Force kill if graceful termination didn't work
proc.kill()
LOG.info("Killed stale process", name=proc_name, pid=proc_pid)
killed_count += 1
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
except Exception:
LOG.warning("Failed to kill stale process", pid=proc.pid, exc_info=True)
return killed_count
async def run_cleanup() -> None:
"""
Run the cleanup process.
This function checks if there are actively running tasks/workflows before cleaning up.
If there are active (recently updated) tasks/workflows, it skips the cleanup.
Tasks/workflows that haven't been updated for longer than the stale threshold
are considered stuck and will not block cleanup (but a warning will be logged).
"""
LOG.debug("Starting cleanup process")
# Check if there are running tasks or workflows
has_active, stale_tasks, stale_workflows = await check_running_tasks_or_workflows()
if has_active:
LOG.info("Skipping cleanup: tasks or workflows are currently running")
return
# Log summary if there are stale tasks/workflows (they won't block cleanup)
if stale_tasks > 0 or stale_workflows > 0:
LOG.info(
"Proceeding with cleanup despite stale tasks/workflows",
stale_tasks=stale_tasks,
stale_workflows=stale_workflows,
)
# Clean up temp directory
temp_files_removed = cleanup_temp_directory()
if temp_files_removed > 0:
LOG.info("Cleaned up temp directory", files_removed=temp_files_removed)
# Kill stale processes
processes_killed = kill_stale_processes()
if processes_killed > 0:
LOG.info("Killed stale browser processes", processes_killed=processes_killed)
LOG.debug("Cleanup process completed")
async def cleanup_scheduler() -> None:
"""
Scheduler that runs the cleanup process periodically.
This runs in an infinite loop, sleeping for the configured interval between runs.
"""
interval_seconds = settings.CLEANUP_CRON_INTERVAL_MINUTES * 60
LOG.info(
"Cleanup scheduler started",
interval_minutes=settings.CLEANUP_CRON_INTERVAL_MINUTES,
)
while True:
try:
await asyncio.sleep(interval_seconds)
await run_cleanup()
except asyncio.CancelledError:
LOG.info("Cleanup scheduler cancelled")
break
except Exception:
LOG.exception("Error in cleanup scheduler")
# Continue running despite errors
_cleanup_task: asyncio.Task | None = None
def start_cleanup_scheduler() -> asyncio.Task | None:
"""
Start the cleanup scheduler as a background task.
Returns:
The asyncio Task running the scheduler, or None if cleanup is disabled.
"""
global _cleanup_task
if not settings.ENABLE_CLEANUP_CRON:
LOG.debug("Cleanup cron is disabled")
return None
if _cleanup_task is not None and not _cleanup_task.done():
LOG.warning("Cleanup scheduler is already running")
return _cleanup_task
_cleanup_task = asyncio.create_task(cleanup_scheduler())
return _cleanup_task
async def stop_cleanup_scheduler() -> None:
"""
Stop the cleanup scheduler if it's running.
"""
global _cleanup_task
if _cleanup_task is not None and not _cleanup_task.done():
_cleanup_task.cancel()
try:
await _cleanup_task
except asyncio.CancelledError:
pass
LOG.info("Cleanup scheduler stopped")
_cleanup_task = None