From 71db86cbf0be54d0d3a7c3609bf171d59693f518 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Fri, 19 Dec 2025 22:49:40 +0800 Subject: [PATCH] shu/revert transaction pooler (#4339) --- skyvern/config.py | 3 -- skyvern/forge/sdk/db/agent_db.py | 79 ++++++-------------------------- 2 files changed, 15 insertions(+), 67 deletions(-) diff --git a/skyvern/config.py b/skyvern/config.py index 9c665556..b9ec0760 100644 --- a/skyvern/config.py +++ b/skyvern/config.py @@ -54,7 +54,6 @@ class Settings(BaseSettings): LONG_RUNNING_TASK_WARNING_RATIO: float = 0.95 MAX_RETRIES_PER_STEP: int = 5 DEBUG_MODE: bool = False - # Database settings DATABASE_STRING: str = ( "postgresql+asyncpg://skyvern@localhost/skyvern" if platform.system() == "Windows" @@ -63,8 +62,6 @@ class Settings(BaseSettings): DATABASE_REPLICA_STRING: str | None = None DATABASE_STATEMENT_TIMEOUT_MS: int = 60000 DISABLE_CONNECTION_POOL: bool = False - DB_DISABLE_PREPARED_STATEMENTS: bool = False - PROMPT_ACTION_HISTORY_WINDOW: int = 1 TASK_RESPONSE_ACTION_SCREENSHOT_COUNT: int = 3 diff --git a/skyvern/forge/sdk/db/agent_db.py b/skyvern/forge/sdk/db/agent_db.py index 0f1df535..1405a1a8 100644 --- a/skyvern/forge/sdk/db/agent_db.py +++ b/skyvern/forge/sdk/db/agent_db.py @@ -3,23 +3,7 @@ from datetime import datetime, timedelta from typing import Any, List, Literal, Sequence, overload import structlog -from sqlalchemy import ( - Connection, - and_, - asc, - case, - delete, - distinct, - event, - exists, - func, - or_, - pool, - select, - tuple_, - update, -) -from sqlalchemy.engine.url import make_url +from sqlalchemy import and_, asc, case, delete, distinct, exists, func, or_, pool, select, tuple_, update from sqlalchemy.exc import ( SQLAlchemyError, ) @@ -172,58 +156,25 @@ def _serialize_proxy_location(proxy_location: ProxyLocationInput) -> str | None: return result -def _connect_args_for_driver(database_string: str) -> dict[str, Any]: - driver = make_url(database_string).drivername # "postgresql+psycopg" or "postgresql+asyncpg" - args: dict[str, Any] = {} +DB_CONNECT_ARGS: dict[str, Any] = {} - if settings.DB_DISABLE_PREPARED_STATEMENTS: - if driver == "postgresql+psycopg": - # psycopg3: completely disable prepared statements (None vs 0) - # 0 disables caching but still uses prepared statements - # None completely disables prepared statement usage - args["prepare_threshold"] = None - elif driver == "postgresql+asyncpg": - # asyncpg: disable statement cache (prepared statements) - args["statement_cache_size"] = 0 - else: - LOG.warning( - "The database driver might not be well optimized or supported by skyvern: {driver}", driver=driver - ) - - return args - - -def _install_statement_timeout(engine: AsyncEngine, timeout_ms: int) -> None: - if not timeout_ms or timeout_ms <= 0: - return - - timeout_value = int(timeout_ms) - - # Works for direct AND poolers because it's not a startup parameter. - # Applies per-transaction, which is the most reliable behavior with transaction pooling. - @event.listens_for(engine.sync_engine, "begin") - def _set_timeout(conn: Connection) -> None: - # Use a unique comment with object id to prevent psycopg3 from reusing prepared statement names, - # which can cause "prepared statement already exists" errors with connection poolers. - sql = f"SET LOCAL statement_timeout = {timeout_value} /* {id(conn)} */" - conn.exec_driver_sql(sql) - - -def make_async_engine(database_string: str) -> AsyncEngine: - engine = create_async_engine( - database_string, - json_serializer=_custom_json_serializer, - connect_args=_connect_args_for_driver(database_string), - poolclass=pool.NullPool if settings.DISABLE_CONNECTION_POOL else None, - ) - - _install_statement_timeout(engine, settings.DATABASE_STATEMENT_TIMEOUT_MS) - return engine +if "postgresql+psycopg" in settings.DATABASE_STRING: + DB_CONNECT_ARGS = {"options": f"-c statement_timeout={settings.DATABASE_STATEMENT_TIMEOUT_MS}"} +elif "postgresql+asyncpg" in settings.DATABASE_STRING: + DB_CONNECT_ARGS = {"server_settings": {"statement_timeout": str(settings.DATABASE_STATEMENT_TIMEOUT_MS)}} class AgentDB(BaseAlchemyDB): def __init__(self, database_string: str, debug_enabled: bool = False, db_engine: AsyncEngine | None = None) -> None: - super().__init__(db_engine or make_async_engine(database_string)) + super().__init__( + db_engine + or create_async_engine( + database_string, + json_serializer=_custom_json_serializer, + connect_args=DB_CONNECT_ARGS, + poolclass=pool.NullPool if settings.DISABLE_CONNECTION_POOL else None, + ) + ) self.debug_enabled = debug_enabled def is_retryable_error(self, error: SQLAlchemyError) -> bool: