290 lines
8.7 KiB
Python
290 lines
8.7 KiB
Python
"""
|
|
Cleanup service for periodic cleanup of temporary data.
|
|
|
|
This service is responsible for:
|
|
1. Cleaning up temporary files in the temp directory
|
|
2. Killing stale playwright/node/browser processes
|
|
"""
|
|
|
|
import asyncio
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import psutil
|
|
import structlog
|
|
|
|
from skyvern.config import settings
|
|
from skyvern.forge import app
|
|
|
|
LOG = structlog.get_logger()
|
|
|
|
# Process names to look for when cleaning up stale processes
|
|
STALE_PROCESS_NAMES = frozenset(
|
|
{
|
|
"playwright",
|
|
"node",
|
|
"chromium",
|
|
"chrome",
|
|
"firefox",
|
|
"webkit",
|
|
"chromium-browser",
|
|
"google-chrome",
|
|
"google-chrome-stable",
|
|
"msedge",
|
|
"microsoft-edge",
|
|
"microsoft-edge-stable",
|
|
}
|
|
)
|
|
|
|
|
|
async def check_running_tasks_or_workflows() -> tuple[bool, int, int]:
|
|
"""
|
|
Check if there are any actively running tasks or workflow runs.
|
|
|
|
Tasks/workflows that haven't been updated for longer than the configured
|
|
stale threshold are considered stuck (not actively running), and a warning
|
|
will be logged for them.
|
|
|
|
Returns:
|
|
Tuple of (has_active_tasks_or_workflows, stale_task_count, stale_workflow_count).
|
|
"""
|
|
try:
|
|
stale_threshold = settings.CLEANUP_STALE_TASK_THRESHOLD_HOURS
|
|
|
|
# Check tasks
|
|
active_tasks, stale_tasks = await app.DATABASE.get_running_tasks_info_globally(
|
|
stale_threshold_hours=stale_threshold
|
|
)
|
|
|
|
# Check workflow runs
|
|
active_workflows, stale_workflows = await app.DATABASE.get_running_workflow_runs_info_globally(
|
|
stale_threshold_hours=stale_threshold
|
|
)
|
|
|
|
# Log warnings for stale tasks/workflows
|
|
if stale_tasks > 0:
|
|
LOG.warning(
|
|
"Found stale tasks that haven't been updated",
|
|
stale_task_count=stale_tasks,
|
|
threshold_hours=stale_threshold,
|
|
)
|
|
|
|
if stale_workflows > 0:
|
|
LOG.warning(
|
|
"Found stale workflow runs that haven't been updated",
|
|
stale_workflow_count=stale_workflows,
|
|
threshold_hours=stale_threshold,
|
|
)
|
|
|
|
has_active = active_tasks > 0 or active_workflows > 0
|
|
return (has_active, stale_tasks, stale_workflows)
|
|
except Exception:
|
|
LOG.exception("Error checking for running tasks/workflows")
|
|
# If we can't check, assume there are running tasks to be safe
|
|
return (True, 0, 0)
|
|
|
|
|
|
def cleanup_temp_directory() -> int:
|
|
"""
|
|
Clean up temporary files in the temp directory.
|
|
|
|
Returns:
|
|
Number of files/directories removed.
|
|
"""
|
|
temp_path = Path(settings.TEMP_PATH)
|
|
if not temp_path.exists():
|
|
LOG.debug("Temp directory does not exist", temp_path=str(temp_path))
|
|
return 0
|
|
|
|
removed_count = 0
|
|
try:
|
|
for item in temp_path.iterdir():
|
|
try:
|
|
if item.is_file():
|
|
item.unlink()
|
|
removed_count += 1
|
|
elif item.is_dir():
|
|
shutil.rmtree(item)
|
|
removed_count += 1
|
|
except Exception:
|
|
LOG.warning("Failed to remove temp item", item=str(item), exc_info=True)
|
|
except Exception:
|
|
LOG.exception("Error cleaning temp directory", temp_path=str(temp_path))
|
|
|
|
return removed_count
|
|
|
|
|
|
def get_stale_browser_processes(max_age_minutes: int = 60) -> list[psutil.Process]:
|
|
"""
|
|
Get browser-related processes that have been running for too long.
|
|
|
|
Args:
|
|
max_age_minutes: Maximum age in minutes before a process is considered stale.
|
|
|
|
Returns:
|
|
List of stale processes.
|
|
"""
|
|
stale_processes = []
|
|
cutoff_time = datetime.now() - timedelta(minutes=max_age_minutes)
|
|
|
|
for proc in psutil.process_iter(["pid", "name", "create_time"]):
|
|
try:
|
|
proc_name = proc.info.get("name", "").lower() # type: ignore[union-attr]
|
|
create_time = proc.info.get("create_time") # type: ignore[union-attr]
|
|
|
|
if create_time is None:
|
|
continue
|
|
|
|
# Check if process name matches any of the stale process names
|
|
is_browser_related = any(name in proc_name for name in STALE_PROCESS_NAMES)
|
|
|
|
if is_browser_related:
|
|
proc_start_time = datetime.fromtimestamp(create_time)
|
|
if proc_start_time < cutoff_time:
|
|
stale_processes.append(proc)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
continue
|
|
|
|
return stale_processes
|
|
|
|
|
|
def kill_stale_processes() -> int:
|
|
"""
|
|
Kill stale playwright/node/browser processes.
|
|
|
|
Returns:
|
|
Number of processes killed.
|
|
"""
|
|
stale_processes = get_stale_browser_processes()
|
|
killed_count = 0
|
|
|
|
for proc in stale_processes:
|
|
try:
|
|
proc_name = proc.name()
|
|
proc_pid = proc.pid
|
|
|
|
# Try graceful termination first
|
|
proc.terminate()
|
|
|
|
# Wait a short time for graceful shutdown
|
|
try:
|
|
proc.wait(timeout=3)
|
|
except psutil.TimeoutExpired:
|
|
# Force kill if graceful termination didn't work
|
|
proc.kill()
|
|
|
|
LOG.info("Killed stale process", name=proc_name, pid=proc_pid)
|
|
killed_count += 1
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
continue
|
|
except Exception:
|
|
LOG.warning("Failed to kill stale process", pid=proc.pid, exc_info=True)
|
|
|
|
return killed_count
|
|
|
|
|
|
async def run_cleanup() -> None:
|
|
"""
|
|
Run the cleanup process.
|
|
|
|
This function checks if there are actively running tasks/workflows before cleaning up.
|
|
If there are active (recently updated) tasks/workflows, it skips the cleanup.
|
|
Tasks/workflows that haven't been updated for longer than the stale threshold
|
|
are considered stuck and will not block cleanup (but a warning will be logged).
|
|
"""
|
|
LOG.debug("Starting cleanup process")
|
|
|
|
# Check if there are running tasks or workflows
|
|
has_active, stale_tasks, stale_workflows = await check_running_tasks_or_workflows()
|
|
|
|
if has_active:
|
|
LOG.info("Skipping cleanup: tasks or workflows are currently running")
|
|
return
|
|
|
|
# Log summary if there are stale tasks/workflows (they won't block cleanup)
|
|
if stale_tasks > 0 or stale_workflows > 0:
|
|
LOG.info(
|
|
"Proceeding with cleanup despite stale tasks/workflows",
|
|
stale_tasks=stale_tasks,
|
|
stale_workflows=stale_workflows,
|
|
)
|
|
|
|
# Clean up temp directory
|
|
temp_files_removed = cleanup_temp_directory()
|
|
if temp_files_removed > 0:
|
|
LOG.info("Cleaned up temp directory", files_removed=temp_files_removed)
|
|
|
|
# Kill stale processes
|
|
processes_killed = kill_stale_processes()
|
|
if processes_killed > 0:
|
|
LOG.info("Killed stale browser processes", processes_killed=processes_killed)
|
|
|
|
LOG.debug("Cleanup process completed")
|
|
|
|
|
|
async def cleanup_scheduler() -> None:
|
|
"""
|
|
Scheduler that runs the cleanup process periodically.
|
|
|
|
This runs in an infinite loop, sleeping for the configured interval between runs.
|
|
"""
|
|
interval_seconds = settings.CLEANUP_CRON_INTERVAL_MINUTES * 60
|
|
|
|
LOG.info(
|
|
"Cleanup scheduler started",
|
|
interval_minutes=settings.CLEANUP_CRON_INTERVAL_MINUTES,
|
|
)
|
|
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(interval_seconds)
|
|
await run_cleanup()
|
|
except asyncio.CancelledError:
|
|
LOG.info("Cleanup scheduler cancelled")
|
|
break
|
|
except Exception:
|
|
LOG.exception("Error in cleanup scheduler")
|
|
# Continue running despite errors
|
|
|
|
|
|
_cleanup_task: asyncio.Task | None = None
|
|
|
|
|
|
def start_cleanup_scheduler() -> asyncio.Task | None:
|
|
"""
|
|
Start the cleanup scheduler as a background task.
|
|
|
|
Returns:
|
|
The asyncio Task running the scheduler, or None if cleanup is disabled.
|
|
"""
|
|
global _cleanup_task
|
|
|
|
if not settings.ENABLE_CLEANUP_CRON:
|
|
LOG.debug("Cleanup cron is disabled")
|
|
return None
|
|
|
|
if _cleanup_task is not None and not _cleanup_task.done():
|
|
LOG.warning("Cleanup scheduler is already running")
|
|
return _cleanup_task
|
|
|
|
_cleanup_task = asyncio.create_task(cleanup_scheduler())
|
|
return _cleanup_task
|
|
|
|
|
|
async def stop_cleanup_scheduler() -> None:
|
|
"""
|
|
Stop the cleanup scheduler if it's running.
|
|
"""
|
|
global _cleanup_task
|
|
|
|
if _cleanup_task is not None and not _cleanup_task.done():
|
|
_cleanup_task.cancel()
|
|
try:
|
|
await _cleanup_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
LOG.info("Cleanup scheduler stopped")
|
|
|
|
_cleanup_task = None
|