Add periodic cleanup cron job for temp data and stale processes (#4781)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -427,6 +427,14 @@ class Settings(BaseSettings):
|
||||
ENCRYPTOR_AES_SALT: str | None = None
|
||||
ENCRYPTOR_AES_IV: str | None = None
|
||||
|
||||
# Cleanup Cron Settings
|
||||
ENABLE_CLEANUP_CRON: bool = False
|
||||
"""Enable periodic cleanup of temporary data (temp files and stale processes)."""
|
||||
CLEANUP_CRON_INTERVAL_MINUTES: int = 10
|
||||
"""Interval in minutes for the cleanup cron job."""
|
||||
CLEANUP_STALE_TASK_THRESHOLD_HOURS: int = 24
|
||||
"""Tasks/workflows not updated for this many hours are considered stale (stuck)."""
|
||||
|
||||
# OpenTelemetry Settings
|
||||
OTEL_ENABLED: bool = False
|
||||
OTEL_SERVICE_NAME: str = "skyvern"
|
||||
|
||||
@@ -23,6 +23,7 @@ from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
|
||||
from skyvern.forge.sdk.db.exceptions import NotFoundError
|
||||
from skyvern.forge.sdk.routes import internal_auth
|
||||
from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router, legacy_v2_router
|
||||
from skyvern.services.cleanup_service import start_cleanup_scheduler, stop_cleanup_scheduler
|
||||
|
||||
try:
|
||||
from cloud.observability.otel_setup import OTELSetup
|
||||
@@ -68,7 +69,17 @@ async def lifespan(fastapi_app: FastAPI) -> AsyncGenerator[None, Any]:
|
||||
await forge_app.api_app_startup_event(fastapi_app)
|
||||
except Exception:
|
||||
LOG.exception("Failed to execute api app startup event")
|
||||
|
||||
# Start cleanup scheduler if enabled
|
||||
cleanup_task = start_cleanup_scheduler()
|
||||
if cleanup_task:
|
||||
LOG.info("Cleanup scheduler started")
|
||||
|
||||
yield
|
||||
|
||||
# Stop cleanup scheduler
|
||||
await stop_cleanup_scheduler()
|
||||
|
||||
if forge_app.api_app_shutdown_event:
|
||||
LOG.info("Calling api app shutdown event")
|
||||
try:
|
||||
|
||||
@@ -968,6 +968,99 @@ class AgentDB(BaseAlchemyDB):
|
||||
LOG.error("UnexpectedError", exc_info=True)
|
||||
raise
|
||||
|
||||
async def get_running_tasks_info_globally(
|
||||
self,
|
||||
stale_threshold_hours: int = 24,
|
||||
) -> tuple[int, int]:
|
||||
"""
|
||||
Get information about running tasks across all organizations.
|
||||
Used by cleanup service to determine if cleanup should be skipped.
|
||||
|
||||
Args:
|
||||
stale_threshold_hours: Tasks not updated for this many hours are considered stale.
|
||||
|
||||
Returns:
|
||||
Tuple of (active_task_count, stale_task_count).
|
||||
Active tasks are those updated within the threshold.
|
||||
Stale tasks are those not updated within the threshold but still in running status.
|
||||
"""
|
||||
try:
|
||||
async with self.Session() as session:
|
||||
running_statuses = [TaskStatus.created, TaskStatus.queued, TaskStatus.running]
|
||||
stale_cutoff = datetime.utcnow() - timedelta(hours=stale_threshold_hours)
|
||||
|
||||
# Count active tasks (recently updated)
|
||||
active_query = (
|
||||
select(func.count())
|
||||
.select_from(TaskModel)
|
||||
.filter(TaskModel.status.in_(running_statuses))
|
||||
.filter(TaskModel.modified_at >= stale_cutoff)
|
||||
)
|
||||
active_count = (await session.execute(active_query)).scalar_one()
|
||||
|
||||
# Count stale tasks (not updated for a long time)
|
||||
stale_query = (
|
||||
select(func.count())
|
||||
.select_from(TaskModel)
|
||||
.filter(TaskModel.status.in_(running_statuses))
|
||||
.filter(TaskModel.modified_at < stale_cutoff)
|
||||
)
|
||||
stale_count = (await session.execute(stale_query)).scalar_one()
|
||||
|
||||
return (active_count, stale_count)
|
||||
except SQLAlchemyError:
|
||||
LOG.error("SQLAlchemyError in get_running_tasks_info_globally", exc_info=True)
|
||||
raise
|
||||
|
||||
async def get_running_workflow_runs_info_globally(
|
||||
self,
|
||||
stale_threshold_hours: int = 24,
|
||||
) -> tuple[int, int]:
|
||||
"""
|
||||
Get information about running workflow runs across all organizations.
|
||||
Used by cleanup service to determine if cleanup should be skipped.
|
||||
|
||||
Args:
|
||||
stale_threshold_hours: Workflow runs not updated for this many hours are considered stale.
|
||||
|
||||
Returns:
|
||||
Tuple of (active_workflow_count, stale_workflow_count).
|
||||
Active workflows are those updated within the threshold.
|
||||
Stale workflows are those not updated within the threshold but still in running status.
|
||||
"""
|
||||
try:
|
||||
async with self.Session() as session:
|
||||
running_statuses = [
|
||||
WorkflowRunStatus.created,
|
||||
WorkflowRunStatus.queued,
|
||||
WorkflowRunStatus.running,
|
||||
WorkflowRunStatus.paused,
|
||||
]
|
||||
stale_cutoff = datetime.utcnow() - timedelta(hours=stale_threshold_hours)
|
||||
|
||||
# Count active workflow runs (recently updated)
|
||||
active_query = (
|
||||
select(func.count())
|
||||
.select_from(WorkflowRunModel)
|
||||
.filter(WorkflowRunModel.status.in_(running_statuses))
|
||||
.filter(WorkflowRunModel.modified_at >= stale_cutoff)
|
||||
)
|
||||
active_count = (await session.execute(active_query)).scalar_one()
|
||||
|
||||
# Count stale workflow runs (not updated for a long time)
|
||||
stale_query = (
|
||||
select(func.count())
|
||||
.select_from(WorkflowRunModel)
|
||||
.filter(WorkflowRunModel.status.in_(running_statuses))
|
||||
.filter(WorkflowRunModel.modified_at < stale_cutoff)
|
||||
)
|
||||
stale_count = (await session.execute(stale_query)).scalar_one()
|
||||
|
||||
return (active_count, stale_count)
|
||||
except SQLAlchemyError:
|
||||
LOG.error("SQLAlchemyError in get_running_workflow_runs_info_globally", exc_info=True)
|
||||
raise
|
||||
|
||||
async def get_all_organizations(self) -> list[Organization]:
|
||||
try:
|
||||
async with self.Session() as session:
|
||||
|
||||
289
skyvern/services/cleanup_service.py
Normal file
289
skyvern/services/cleanup_service.py
Normal file
@@ -0,0 +1,289 @@
|
||||
"""
|
||||
Cleanup service for periodic cleanup of temporary data.
|
||||
|
||||
This service is responsible for:
|
||||
1. Cleaning up temporary files in the temp directory
|
||||
2. Killing stale playwright/node/browser processes
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import shutil
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import psutil
|
||||
import structlog
|
||||
|
||||
from skyvern.config import settings
|
||||
from skyvern.forge import app
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
|
||||
# Process names to look for when cleaning up stale processes
|
||||
STALE_PROCESS_NAMES = frozenset(
|
||||
{
|
||||
"playwright",
|
||||
"node",
|
||||
"chromium",
|
||||
"chrome",
|
||||
"firefox",
|
||||
"webkit",
|
||||
"chromium-browser",
|
||||
"google-chrome",
|
||||
"google-chrome-stable",
|
||||
"msedge",
|
||||
"microsoft-edge",
|
||||
"microsoft-edge-stable",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def check_running_tasks_or_workflows() -> tuple[bool, int, int]:
|
||||
"""
|
||||
Check if there are any actively running tasks or workflow runs.
|
||||
|
||||
Tasks/workflows that haven't been updated for longer than the configured
|
||||
stale threshold are considered stuck (not actively running), and a warning
|
||||
will be logged for them.
|
||||
|
||||
Returns:
|
||||
Tuple of (has_active_tasks_or_workflows, stale_task_count, stale_workflow_count).
|
||||
"""
|
||||
try:
|
||||
stale_threshold = settings.CLEANUP_STALE_TASK_THRESHOLD_HOURS
|
||||
|
||||
# Check tasks
|
||||
active_tasks, stale_tasks = await app.DATABASE.get_running_tasks_info_globally(
|
||||
stale_threshold_hours=stale_threshold
|
||||
)
|
||||
|
||||
# Check workflow runs
|
||||
active_workflows, stale_workflows = await app.DATABASE.get_running_workflow_runs_info_globally(
|
||||
stale_threshold_hours=stale_threshold
|
||||
)
|
||||
|
||||
# Log warnings for stale tasks/workflows
|
||||
if stale_tasks > 0:
|
||||
LOG.warning(
|
||||
"Found stale tasks that haven't been updated",
|
||||
stale_task_count=stale_tasks,
|
||||
threshold_hours=stale_threshold,
|
||||
)
|
||||
|
||||
if stale_workflows > 0:
|
||||
LOG.warning(
|
||||
"Found stale workflow runs that haven't been updated",
|
||||
stale_workflow_count=stale_workflows,
|
||||
threshold_hours=stale_threshold,
|
||||
)
|
||||
|
||||
has_active = active_tasks > 0 or active_workflows > 0
|
||||
return (has_active, stale_tasks, stale_workflows)
|
||||
except Exception:
|
||||
LOG.exception("Error checking for running tasks/workflows")
|
||||
# If we can't check, assume there are running tasks to be safe
|
||||
return (True, 0, 0)
|
||||
|
||||
|
||||
def cleanup_temp_directory() -> int:
|
||||
"""
|
||||
Clean up temporary files in the temp directory.
|
||||
|
||||
Returns:
|
||||
Number of files/directories removed.
|
||||
"""
|
||||
temp_path = Path(settings.TEMP_PATH)
|
||||
if not temp_path.exists():
|
||||
LOG.debug("Temp directory does not exist", temp_path=str(temp_path))
|
||||
return 0
|
||||
|
||||
removed_count = 0
|
||||
try:
|
||||
for item in temp_path.iterdir():
|
||||
try:
|
||||
if item.is_file():
|
||||
item.unlink()
|
||||
removed_count += 1
|
||||
elif item.is_dir():
|
||||
shutil.rmtree(item)
|
||||
removed_count += 1
|
||||
except Exception:
|
||||
LOG.warning("Failed to remove temp item", item=str(item), exc_info=True)
|
||||
except Exception:
|
||||
LOG.exception("Error cleaning temp directory", temp_path=str(temp_path))
|
||||
|
||||
return removed_count
|
||||
|
||||
|
||||
def get_stale_browser_processes(max_age_minutes: int = 60) -> list[psutil.Process]:
|
||||
"""
|
||||
Get browser-related processes that have been running for too long.
|
||||
|
||||
Args:
|
||||
max_age_minutes: Maximum age in minutes before a process is considered stale.
|
||||
|
||||
Returns:
|
||||
List of stale processes.
|
||||
"""
|
||||
stale_processes = []
|
||||
cutoff_time = datetime.now() - timedelta(minutes=max_age_minutes)
|
||||
|
||||
for proc in psutil.process_iter(["pid", "name", "create_time"]):
|
||||
try:
|
||||
proc_name = proc.info.get("name", "").lower() # type: ignore[union-attr]
|
||||
create_time = proc.info.get("create_time") # type: ignore[union-attr]
|
||||
|
||||
if create_time is None:
|
||||
continue
|
||||
|
||||
# Check if process name matches any of the stale process names
|
||||
is_browser_related = any(name in proc_name for name in STALE_PROCESS_NAMES)
|
||||
|
||||
if is_browser_related:
|
||||
proc_start_time = datetime.fromtimestamp(create_time)
|
||||
if proc_start_time < cutoff_time:
|
||||
stale_processes.append(proc)
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
continue
|
||||
|
||||
return stale_processes
|
||||
|
||||
|
||||
def kill_stale_processes() -> int:
|
||||
"""
|
||||
Kill stale playwright/node/browser processes.
|
||||
|
||||
Returns:
|
||||
Number of processes killed.
|
||||
"""
|
||||
stale_processes = get_stale_browser_processes()
|
||||
killed_count = 0
|
||||
|
||||
for proc in stale_processes:
|
||||
try:
|
||||
proc_name = proc.name()
|
||||
proc_pid = proc.pid
|
||||
|
||||
# Try graceful termination first
|
||||
proc.terminate()
|
||||
|
||||
# Wait a short time for graceful shutdown
|
||||
try:
|
||||
proc.wait(timeout=3)
|
||||
except psutil.TimeoutExpired:
|
||||
# Force kill if graceful termination didn't work
|
||||
proc.kill()
|
||||
|
||||
LOG.info("Killed stale process", name=proc_name, pid=proc_pid)
|
||||
killed_count += 1
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
continue
|
||||
except Exception:
|
||||
LOG.warning("Failed to kill stale process", pid=proc.pid, exc_info=True)
|
||||
|
||||
return killed_count
|
||||
|
||||
|
||||
async def run_cleanup() -> None:
|
||||
"""
|
||||
Run the cleanup process.
|
||||
|
||||
This function checks if there are actively running tasks/workflows before cleaning up.
|
||||
If there are active (recently updated) tasks/workflows, it skips the cleanup.
|
||||
Tasks/workflows that haven't been updated for longer than the stale threshold
|
||||
are considered stuck and will not block cleanup (but a warning will be logged).
|
||||
"""
|
||||
LOG.debug("Starting cleanup process")
|
||||
|
||||
# Check if there are running tasks or workflows
|
||||
has_active, stale_tasks, stale_workflows = await check_running_tasks_or_workflows()
|
||||
|
||||
if has_active:
|
||||
LOG.info("Skipping cleanup: tasks or workflows are currently running")
|
||||
return
|
||||
|
||||
# Log summary if there are stale tasks/workflows (they won't block cleanup)
|
||||
if stale_tasks > 0 or stale_workflows > 0:
|
||||
LOG.info(
|
||||
"Proceeding with cleanup despite stale tasks/workflows",
|
||||
stale_tasks=stale_tasks,
|
||||
stale_workflows=stale_workflows,
|
||||
)
|
||||
|
||||
# Clean up temp directory
|
||||
temp_files_removed = cleanup_temp_directory()
|
||||
if temp_files_removed > 0:
|
||||
LOG.info("Cleaned up temp directory", files_removed=temp_files_removed)
|
||||
|
||||
# Kill stale processes
|
||||
processes_killed = kill_stale_processes()
|
||||
if processes_killed > 0:
|
||||
LOG.info("Killed stale browser processes", processes_killed=processes_killed)
|
||||
|
||||
LOG.debug("Cleanup process completed")
|
||||
|
||||
|
||||
async def cleanup_scheduler() -> None:
|
||||
"""
|
||||
Scheduler that runs the cleanup process periodically.
|
||||
|
||||
This runs in an infinite loop, sleeping for the configured interval between runs.
|
||||
"""
|
||||
interval_seconds = settings.CLEANUP_CRON_INTERVAL_MINUTES * 60
|
||||
|
||||
LOG.info(
|
||||
"Cleanup scheduler started",
|
||||
interval_minutes=settings.CLEANUP_CRON_INTERVAL_MINUTES,
|
||||
)
|
||||
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(interval_seconds)
|
||||
await run_cleanup()
|
||||
except asyncio.CancelledError:
|
||||
LOG.info("Cleanup scheduler cancelled")
|
||||
break
|
||||
except Exception:
|
||||
LOG.exception("Error in cleanup scheduler")
|
||||
# Continue running despite errors
|
||||
|
||||
|
||||
_cleanup_task: asyncio.Task | None = None
|
||||
|
||||
|
||||
def start_cleanup_scheduler() -> asyncio.Task | None:
|
||||
"""
|
||||
Start the cleanup scheduler as a background task.
|
||||
|
||||
Returns:
|
||||
The asyncio Task running the scheduler, or None if cleanup is disabled.
|
||||
"""
|
||||
global _cleanup_task
|
||||
|
||||
if not settings.ENABLE_CLEANUP_CRON:
|
||||
LOG.debug("Cleanup cron is disabled")
|
||||
return None
|
||||
|
||||
if _cleanup_task is not None and not _cleanup_task.done():
|
||||
LOG.warning("Cleanup scheduler is already running")
|
||||
return _cleanup_task
|
||||
|
||||
_cleanup_task = asyncio.create_task(cleanup_scheduler())
|
||||
return _cleanup_task
|
||||
|
||||
|
||||
async def stop_cleanup_scheduler() -> None:
|
||||
"""
|
||||
Stop the cleanup scheduler if it's running.
|
||||
"""
|
||||
global _cleanup_task
|
||||
|
||||
if _cleanup_task is not None and not _cleanup_task.done():
|
||||
_cleanup_task.cancel()
|
||||
try:
|
||||
await _cleanup_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
LOG.info("Cleanup scheduler stopped")
|
||||
|
||||
_cleanup_task = None
|
||||
Reference in New Issue
Block a user