shu/revert transaction pooler (#4339)
This commit is contained in:
@@ -54,7 +54,6 @@ class Settings(BaseSettings):
|
|||||||
LONG_RUNNING_TASK_WARNING_RATIO: float = 0.95
|
LONG_RUNNING_TASK_WARNING_RATIO: float = 0.95
|
||||||
MAX_RETRIES_PER_STEP: int = 5
|
MAX_RETRIES_PER_STEP: int = 5
|
||||||
DEBUG_MODE: bool = False
|
DEBUG_MODE: bool = False
|
||||||
# Database settings
|
|
||||||
DATABASE_STRING: str = (
|
DATABASE_STRING: str = (
|
||||||
"postgresql+asyncpg://skyvern@localhost/skyvern"
|
"postgresql+asyncpg://skyvern@localhost/skyvern"
|
||||||
if platform.system() == "Windows"
|
if platform.system() == "Windows"
|
||||||
@@ -63,8 +62,6 @@ class Settings(BaseSettings):
|
|||||||
DATABASE_REPLICA_STRING: str | None = None
|
DATABASE_REPLICA_STRING: str | None = None
|
||||||
DATABASE_STATEMENT_TIMEOUT_MS: int = 60000
|
DATABASE_STATEMENT_TIMEOUT_MS: int = 60000
|
||||||
DISABLE_CONNECTION_POOL: bool = False
|
DISABLE_CONNECTION_POOL: bool = False
|
||||||
DB_DISABLE_PREPARED_STATEMENTS: bool = False
|
|
||||||
|
|
||||||
PROMPT_ACTION_HISTORY_WINDOW: int = 1
|
PROMPT_ACTION_HISTORY_WINDOW: int = 1
|
||||||
TASK_RESPONSE_ACTION_SCREENSHOT_COUNT: int = 3
|
TASK_RESPONSE_ACTION_SCREENSHOT_COUNT: int = 3
|
||||||
|
|
||||||
|
|||||||
@@ -3,23 +3,7 @@ from datetime import datetime, timedelta
|
|||||||
from typing import Any, List, Literal, Sequence, overload
|
from typing import Any, List, Literal, Sequence, overload
|
||||||
|
|
||||||
import structlog
|
import structlog
|
||||||
from sqlalchemy import (
|
from sqlalchemy import and_, asc, case, delete, distinct, exists, func, or_, pool, select, tuple_, update
|
||||||
Connection,
|
|
||||||
and_,
|
|
||||||
asc,
|
|
||||||
case,
|
|
||||||
delete,
|
|
||||||
distinct,
|
|
||||||
event,
|
|
||||||
exists,
|
|
||||||
func,
|
|
||||||
or_,
|
|
||||||
pool,
|
|
||||||
select,
|
|
||||||
tuple_,
|
|
||||||
update,
|
|
||||||
)
|
|
||||||
from sqlalchemy.engine.url import make_url
|
|
||||||
from sqlalchemy.exc import (
|
from sqlalchemy.exc import (
|
||||||
SQLAlchemyError,
|
SQLAlchemyError,
|
||||||
)
|
)
|
||||||
@@ -172,58 +156,25 @@ def _serialize_proxy_location(proxy_location: ProxyLocationInput) -> str | None:
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def _connect_args_for_driver(database_string: str) -> dict[str, Any]:
|
DB_CONNECT_ARGS: dict[str, Any] = {}
|
||||||
driver = make_url(database_string).drivername # "postgresql+psycopg" or "postgresql+asyncpg"
|
|
||||||
args: dict[str, Any] = {}
|
|
||||||
|
|
||||||
if settings.DB_DISABLE_PREPARED_STATEMENTS:
|
if "postgresql+psycopg" in settings.DATABASE_STRING:
|
||||||
if driver == "postgresql+psycopg":
|
DB_CONNECT_ARGS = {"options": f"-c statement_timeout={settings.DATABASE_STATEMENT_TIMEOUT_MS}"}
|
||||||
# psycopg3: completely disable prepared statements (None vs 0)
|
elif "postgresql+asyncpg" in settings.DATABASE_STRING:
|
||||||
# 0 disables caching but still uses prepared statements
|
DB_CONNECT_ARGS = {"server_settings": {"statement_timeout": str(settings.DATABASE_STATEMENT_TIMEOUT_MS)}}
|
||||||
# None completely disables prepared statement usage
|
|
||||||
args["prepare_threshold"] = None
|
|
||||||
elif driver == "postgresql+asyncpg":
|
|
||||||
# asyncpg: disable statement cache (prepared statements)
|
|
||||||
args["statement_cache_size"] = 0
|
|
||||||
else:
|
|
||||||
LOG.warning(
|
|
||||||
"The database driver might not be well optimized or supported by skyvern: {driver}", driver=driver
|
|
||||||
)
|
|
||||||
|
|
||||||
return args
|
|
||||||
|
|
||||||
|
|
||||||
def _install_statement_timeout(engine: AsyncEngine, timeout_ms: int) -> None:
|
|
||||||
if not timeout_ms or timeout_ms <= 0:
|
|
||||||
return
|
|
||||||
|
|
||||||
timeout_value = int(timeout_ms)
|
|
||||||
|
|
||||||
# Works for direct AND poolers because it's not a startup parameter.
|
|
||||||
# Applies per-transaction, which is the most reliable behavior with transaction pooling.
|
|
||||||
@event.listens_for(engine.sync_engine, "begin")
|
|
||||||
def _set_timeout(conn: Connection) -> None:
|
|
||||||
# Use a unique comment with object id to prevent psycopg3 from reusing prepared statement names,
|
|
||||||
# which can cause "prepared statement already exists" errors with connection poolers.
|
|
||||||
sql = f"SET LOCAL statement_timeout = {timeout_value} /* {id(conn)} */"
|
|
||||||
conn.exec_driver_sql(sql)
|
|
||||||
|
|
||||||
|
|
||||||
def make_async_engine(database_string: str) -> AsyncEngine:
|
|
||||||
engine = create_async_engine(
|
|
||||||
database_string,
|
|
||||||
json_serializer=_custom_json_serializer,
|
|
||||||
connect_args=_connect_args_for_driver(database_string),
|
|
||||||
poolclass=pool.NullPool if settings.DISABLE_CONNECTION_POOL else None,
|
|
||||||
)
|
|
||||||
|
|
||||||
_install_statement_timeout(engine, settings.DATABASE_STATEMENT_TIMEOUT_MS)
|
|
||||||
return engine
|
|
||||||
|
|
||||||
|
|
||||||
class AgentDB(BaseAlchemyDB):
|
class AgentDB(BaseAlchemyDB):
|
||||||
def __init__(self, database_string: str, debug_enabled: bool = False, db_engine: AsyncEngine | None = None) -> None:
|
def __init__(self, database_string: str, debug_enabled: bool = False, db_engine: AsyncEngine | None = None) -> None:
|
||||||
super().__init__(db_engine or make_async_engine(database_string))
|
super().__init__(
|
||||||
|
db_engine
|
||||||
|
or create_async_engine(
|
||||||
|
database_string,
|
||||||
|
json_serializer=_custom_json_serializer,
|
||||||
|
connect_args=DB_CONNECT_ARGS,
|
||||||
|
poolclass=pool.NullPool if settings.DISABLE_CONNECTION_POOL else None,
|
||||||
|
)
|
||||||
|
)
|
||||||
self.debug_enabled = debug_enabled
|
self.debug_enabled = debug_enabled
|
||||||
|
|
||||||
def is_retryable_error(self, error: SQLAlchemyError) -> bool:
|
def is_retryable_error(self, error: SQLAlchemyError) -> bool:
|
||||||
|
|||||||
Reference in New Issue
Block a user