Issue-2756: updated alembic setup to account for asyncpg mode (#3431)
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
import asyncio
|
||||
from logging.config import fileConfig
|
||||
|
||||
from sqlalchemy import engine_from_config, pool
|
||||
from sqlalchemy import pool
|
||||
from sqlalchemy.engine import Connection
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
|
||||
from alembic import context
|
||||
|
||||
@@ -54,28 +57,49 @@ def run_migrations_offline() -> None:
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
def run_migrations_online() -> None:
|
||||
def do_run_migrations(connection: Connection):
|
||||
"""Run migrations in 'online' mode.
|
||||
|
||||
In this scenario we need to create an Engine
|
||||
and associate a connection with the context.
|
||||
|
||||
"""
|
||||
connectable = engine_from_config(
|
||||
config.get_section(config.config_ini_section, {}),
|
||||
prefix="sqlalchemy.",
|
||||
context.configure(connection=connection, target_metadata=target_metadata)
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
async def run_migrations_online():
|
||||
connectable = create_async_engine(
|
||||
config.get_main_option("sqlalchemy.url"),
|
||||
poolclass=pool.NullPool,
|
||||
)
|
||||
|
||||
with connectable.connect() as connection:
|
||||
context.configure(connection=connection, target_metadata=target_metadata)
|
||||
async with connectable.connect() as connection:
|
||||
await connection.run_sync(do_run_migrations)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
await connectable.dispose()
|
||||
|
||||
|
||||
print("Alembic mode: ", "offline" if context.is_offline_mode() else "online")
|
||||
if context.is_offline_mode():
|
||||
run_migrations_offline()
|
||||
else:
|
||||
run_migrations_online()
|
||||
async def async_main():
|
||||
await run_migrations_online()
|
||||
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
# No running loop -> safe to start one
|
||||
print("Alembic: no running loop")
|
||||
asyncio.run(async_main())
|
||||
else:
|
||||
# Already running loop -> schedule task and await it
|
||||
print("Alembic: schedule task")
|
||||
import concurrent.futures
|
||||
|
||||
# Use a ThreadPoolExecutor to run the async function in a new thread
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
future = executor.submit(asyncio.run, async_main())
|
||||
future.result() # This blocks until completion
|
||||
|
||||
Reference in New Issue
Block a user