Fix DB issue 2s

This commit is contained in:
Urtzi Alfaro
2025-09-30 21:58:10 +02:00
parent 147893015e
commit 7cc4b957a5
77 changed files with 4385 additions and 1211 deletions

View File

@@ -1,9 +1,10 @@
"""
Database Initialization Manager
Handles automatic table creation and Alembic integration for both:
1. Production deployments (first-time table creation)
2. Development workflows (reset to clean slate)
Handles Alembic-based migrations with autogenerate support:
1. First-time deployment: Generate initial migration from models
2. Subsequent deployments: Run pending migrations
3. Development reset: Drop tables and regenerate migrations
"""
import os
@@ -25,10 +26,8 @@ logger = structlog.get_logger()
class DatabaseInitManager:
"""
Manages database initialization with support for:
- Automatic table creation from SQLAlchemy models
- Alembic integration and version management
- Development reset capabilities
Manages database initialization using Alembic migrations exclusively.
Uses autogenerate to create initial migrations if none exist.
"""
def __init__(
@@ -48,28 +47,35 @@ class DatabaseInitManager:
async def initialize_database(self) -> Dict[str, Any]:
"""
Main initialization method that handles all scenarios:
1. Check if database exists and has tables
2. Create tables if needed (first-time deployment)
3. Handle Alembic version management
4. Support development reset scenarios
Main initialization method:
1. Check if migrations exist in the codebase
2. Run alembic upgrade head to apply all pending migrations
NOTE: Migration files must be pre-generated and included in Docker images.
Do NOT generate migrations at runtime.
"""
self.logger.info("Starting database initialization")
self.logger.info("Starting database initialization with Alembic")
try:
if not self.alembic_ini_path or not os.path.exists(self.alembic_ini_path):
raise Exception(f"Alembic configuration not found at {self.alembic_ini_path}")
# Check current database state
db_state = await self._check_database_state()
self.logger.info("Database state checked", state=db_state)
# Handle different initialization scenarios
# Handle different scenarios based on migration state
if self.force_recreate:
result = await self._handle_force_recreate()
elif db_state["is_empty"]:
result = await self._handle_first_time_deployment()
elif db_state["has_alembic_version"]:
result = await self._handle_existing_database_with_alembic()
elif not db_state["has_migrations"]:
# No migration files found - use create_all() as fallback
self.logger.warning(
"No migration files found - using create_all() as fallback. "
"Consider generating proper migrations for production use."
)
result = await self._handle_no_migrations()
else:
result = await self._handle_existing_database_without_alembic()
result = await self._handle_run_migrations()
self.logger.info("Database initialization completed", result=result)
return result
@@ -79,162 +85,142 @@ class DatabaseInitManager:
raise
async def _check_database_state(self) -> Dict[str, Any]:
"""Check the current state of the database"""
"""Check the current state of migrations"""
state = {
"has_migrations": False,
"migration_count": 0,
"is_empty": False,
"has_alembic_version": False,
"existing_tables": [],
"alembic_version": None
"has_alembic_version": False,
"current_revision": None
}
try:
# Check if migration files exist
migrations_dir = self._get_migrations_versions_dir()
if migrations_dir.exists():
migration_files = list(migrations_dir.glob("*.py"))
migration_files = [f for f in migration_files if f.name != "__pycache__" and not f.name.startswith("_")]
state["migration_count"] = len(migration_files)
state["has_migrations"] = len(migration_files) > 0
self.logger.info("Found migration files", count=len(migration_files))
# Check database tables
async with self.database_manager.get_session() as session:
# Check if database has any tables
inspector = await self._get_inspector(session)
existing_tables = await self._get_existing_tables(inspector)
existing_tables = await self._get_existing_tables(session)
state["existing_tables"] = existing_tables
state["is_empty"] = len(existing_tables) == 0
# Check if alembic_version table exists and has version
# Check alembic_version table
if "alembic_version" in existing_tables:
state["has_alembic_version"] = True
result = await session.execute(text("SELECT version_num FROM alembic_version"))
version = result.scalar()
state["alembic_version"] = version
state["current_revision"] = version
except Exception as e:
self.logger.warning("Error checking database state", error=str(e))
state["is_empty"] = True
return state
async def _handle_first_time_deployment(self) -> Dict[str, Any]:
"""Handle first-time deployment: create tables and set up Alembic"""
self.logger.info("Handling first-time deployment")
async def _handle_no_migrations(self) -> Dict[str, Any]:
"""Handle case where no migration files exist - use create_all()"""
self.logger.info("No migrations found, using create_all() to initialize tables")
# Import models to ensure they're registered with Base
if self.models_module:
await self._import_models()
# Create all tables from SQLAlchemy models
await self._create_tables_from_models()
# Initialize Alembic and stamp with the latest version
if self.alembic_ini_path and os.path.exists(self.alembic_ini_path):
await self._initialize_alembic()
return {
"action": "first_time_deployment",
"tables_created": True,
"alembic_initialized": True,
"message": "Database initialized for first-time deployment"
}
async def _handle_force_recreate(self) -> Dict[str, Any]:
"""Handle development scenario: drop everything and recreate"""
self.logger.info("Handling force recreate (development mode)")
# Drop all tables
await self._drop_all_tables()
# Create tables from models
if self.models_module:
await self._import_models()
await self._create_tables_from_models()
# Re-initialize Alembic
if self.alembic_ini_path and os.path.exists(self.alembic_ini_path):
await self._initialize_alembic()
return {
"action": "force_recreate",
"tables_dropped": True,
"tables_created": True,
"alembic_reinitialized": True,
"message": "Database recreated from scratch (development mode)"
}
async def _handle_existing_database_with_alembic(self) -> Dict[str, Any]:
"""Handle existing database with Alembic version management"""
self.logger.info("Handling existing database with Alembic")
# Run pending migrations
if self.alembic_ini_path and os.path.exists(self.alembic_ini_path):
await self._run_migrations()
return {
"action": "existing_with_alembic",
"migrations_run": True,
"message": "Existing database updated with pending migrations"
}
async def _handle_existing_database_without_alembic(self) -> Dict[str, Any]:
"""Handle existing database without Alembic (legacy scenario)"""
self.logger.info("Handling existing database without Alembic")
# Initialize Alembic on existing database
if self.alembic_ini_path and os.path.exists(self.alembic_ini_path):
await self._initialize_alembic_on_existing()
return {
"action": "existing_without_alembic",
"alembic_initialized": True,
"message": "Alembic initialized on existing database"
}
async def _import_models(self):
"""Import models module to ensure models are registered with Base"""
try:
import importlib
import sys
from pathlib import Path
# Create tables directly using SQLAlchemy metadata
await self._create_tables_from_models()
# Add project root to Python path if not already there
project_root = Path(__file__).parent.parent.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
# Try to import the models module
try:
importlib.import_module(self.models_module)
self.logger.info("Models imported successfully", module=self.models_module)
except ImportError as import_error:
# Try alternative import path (from app directly)
alt_module = f"app.models"
self.logger.warning("Primary import failed, trying alternative",
primary=self.models_module,
alternative=alt_module,
error=str(import_error))
# Change working directory to service directory
import os
original_cwd = os.getcwd()
service_dir = project_root / "services" / self.service_name
if service_dir.exists():
os.chdir(service_dir)
try:
importlib.import_module(alt_module)
self.logger.info("Models imported with alternative path", module=alt_module)
finally:
os.chdir(original_cwd)
else:
raise import_error
return {
"action": "tables_created_via_create_all",
"tables_created": True,
"message": "Tables created using SQLAlchemy create_all()"
}
except Exception as e:
self.logger.error("Failed to import models", module=self.models_module, error=str(e))
# Don't raise for now, continue without models import
self.logger.warning("Continuing without models import - tables may not be created")
self.logger.error("Failed to create tables", error=str(e))
raise
async def _handle_run_migrations(self) -> Dict[str, Any]:
"""Handle normal migration scenario - run pending migrations"""
self.logger.info("Running pending migrations")
try:
await self._run_migrations()
return {
"action": "migrations_applied",
"message": "Pending migrations applied successfully"
}
except Exception as e:
self.logger.error("Failed to run migrations", error=str(e))
raise
async def _handle_force_recreate(self) -> Dict[str, Any]:
"""Handle development reset scenario - drop and recreate tables using existing migrations"""
self.logger.info("Force recreate: dropping tables and rerunning migrations")
try:
# Drop all tables
await self._drop_all_tables()
# Apply migrations from scratch
await self._run_migrations()
return {
"action": "force_recreate",
"tables_dropped": True,
"migrations_applied": True,
"message": "Database recreated from existing migrations"
}
except Exception as e:
self.logger.error("Failed to force recreate", error=str(e))
raise
async def _run_migrations(self):
"""Run pending Alembic migrations (upgrade head)"""
try:
def run_alembic_upgrade():
import os
from pathlib import Path
# Ensure we're in the correct working directory
alembic_dir = Path(self.alembic_ini_path).parent
original_cwd = os.getcwd()
try:
os.chdir(alembic_dir)
alembic_cfg = Config(self.alembic_ini_path)
# Set the SQLAlchemy URL from the database manager
alembic_cfg.set_main_option("sqlalchemy.url", str(self.database_manager.database_url))
# Run upgrade
command.upgrade(alembic_cfg, "head")
finally:
os.chdir(original_cwd)
# Run in executor to avoid blocking
await asyncio.get_event_loop().run_in_executor(None, run_alembic_upgrade)
self.logger.info("Migrations applied successfully")
except Exception as e:
self.logger.error("Failed to run migrations", error=str(e))
raise
async def _create_tables_from_models(self):
"""Create all tables from registered SQLAlchemy models"""
"""Create tables using SQLAlchemy metadata (create_all)"""
try:
async with self.database_manager.async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
self.logger.info("Tables created from SQLAlchemy models")
self.logger.info("Tables created via create_all()")
except Exception as e:
self.logger.error("Failed to create tables from models", error=str(e))
self.logger.error("Failed to create tables", error=str(e))
raise
async def _drop_all_tables(self):
@@ -247,89 +233,20 @@ class DatabaseInitManager:
self.logger.error("Failed to drop tables", error=str(e))
raise
async def _initialize_alembic(self):
"""Initialize Alembic and stamp with latest version"""
try:
def run_alembic_init():
# Create Alembic config
alembic_cfg = Config(self.alembic_ini_path)
# Get the latest revision
script_dir = ScriptDirectory.from_config(alembic_cfg)
latest_revision = script_dir.get_current_head()
def _get_migrations_versions_dir(self) -> Path:
"""Get the migrations/versions directory path"""
alembic_path = Path(self.alembic_ini_path).parent
return alembic_path / "migrations" / "versions"
if latest_revision:
# Stamp the database with the latest revision
command.stamp(alembic_cfg, latest_revision)
return latest_revision
return None
# Run Alembic operations in async executor
latest_revision = await asyncio.get_event_loop().run_in_executor(None, run_alembic_init)
if latest_revision:
self.logger.info("Alembic initialized and stamped", revision=latest_revision)
else:
self.logger.warning("No Alembic revisions found")
except Exception as e:
self.logger.error("Failed to initialize Alembic", error=str(e))
raise
async def _initialize_alembic_on_existing(self):
"""Initialize Alembic on an existing database"""
try:
def run_alembic_stamp():
alembic_cfg = Config(self.alembic_ini_path)
script_dir = ScriptDirectory.from_config(alembic_cfg)
latest_revision = script_dir.get_current_head()
if latest_revision:
command.stamp(alembic_cfg, latest_revision)
return latest_revision
return None
# Run Alembic operations in async executor
latest_revision = await asyncio.get_event_loop().run_in_executor(None, run_alembic_stamp)
if latest_revision:
self.logger.info("Alembic initialized on existing database", revision=latest_revision)
except Exception as e:
self.logger.error("Failed to initialize Alembic on existing database", error=str(e))
raise
async def _run_migrations(self):
"""Run pending Alembic migrations"""
try:
def run_alembic_upgrade():
alembic_cfg = Config(self.alembic_ini_path)
command.upgrade(alembic_cfg, "head")
# Run Alembic operations in async executor
await asyncio.get_event_loop().run_in_executor(None, run_alembic_upgrade)
self.logger.info("Alembic migrations completed")
except Exception as e:
self.logger.error("Failed to run migrations", error=str(e))
raise
async def _get_inspector(self, session: AsyncSession):
"""Get SQLAlchemy inspector for the current connection"""
def get_inspector_sync(connection):
return inspect(connection)
connection = await session.connection()
return await connection.run_sync(get_inspector_sync)
async def _get_existing_tables(self, inspector) -> List[str]:
async def _get_existing_tables(self, session: AsyncSession) -> List[str]:
"""Get list of existing tables in the database"""
def get_tables_sync(connection):
insp = inspect(connection)
return insp.get_table_names()
async with self.database_manager.get_session() as session:
connection = await session.connection()
return await connection.run_sync(get_tables_sync)
connection = await session.connection()
return await connection.run_sync(get_tables_sync)
def create_init_manager(