""" Database Initialization Manager Handles Alembic-based migrations with autogenerate support: 1. First-time deployment: Generate initial migration from models 2. Subsequent deployments: Run pending migrations 3. Development reset: Drop tables and regenerate migrations """ import os import asyncio import structlog from typing import Optional, List, Dict, Any from pathlib import Path from sqlalchemy import text, inspect from sqlalchemy.ext.asyncio import AsyncSession from alembic.config import Config from alembic import command from alembic.runtime.migration import MigrationContext from alembic.script import ScriptDirectory from .base import DatabaseManager, Base logger = structlog.get_logger() class DatabaseInitManager: """ Manages database initialization using Alembic migrations exclusively. Uses autogenerate to create initial migrations if none exist. """ def __init__( self, database_manager: DatabaseManager, service_name: str, alembic_ini_path: Optional[str] = None, models_module: Optional[str] = None, force_recreate: bool = False, allow_create_all_fallback: bool = True, environment: Optional[str] = None ): self.database_manager = database_manager self.service_name = service_name self.alembic_ini_path = alembic_ini_path self.models_module = models_module self.force_recreate = force_recreate self.allow_create_all_fallback = allow_create_all_fallback self.environment = environment or os.getenv('ENVIRONMENT', 'development') self.logger = logger.bind(service=service_name) async def initialize_database(self) -> Dict[str, Any]: """ Main initialization method: 1. Check if migrations exist in the codebase 2. Run alembic upgrade head to apply all pending migrations NOTE: Migration files must be pre-generated and included in Docker images. Do NOT generate migrations at runtime. """ self.logger.info("Starting database initialization with Alembic") try: if not self.alembic_ini_path or not os.path.exists(self.alembic_ini_path): raise Exception(f"Alembic configuration not found at {self.alembic_ini_path}") # Check current database state db_state = await self._check_database_state() self.logger.info("Database state checked", state=db_state) # Handle different scenarios based on migration state if self.force_recreate: result = await self._handle_force_recreate() elif not db_state["has_migrations"]: # No migration files found - check if fallback is allowed if self.allow_create_all_fallback: self.logger.warning( "No migration files found - using create_all() as fallback. " "Consider generating proper migrations for production use.", environment=self.environment ) result = await self._handle_no_migrations() else: # In production or when fallback is disabled, fail instead of using create_all error_msg = ( f"No migration files found for {self.service_name} and " f"create_all() fallback is disabled (environment: {self.environment}). " f"Migration files must be generated before deployment. " f"Run migration generation script to create initial migrations." ) self.logger.error(error_msg) raise Exception(error_msg) else: result = await self._handle_run_migrations() self.logger.info("Database initialization completed", result=result) return result except Exception as e: self.logger.error("Database initialization failed", error=str(e)) raise async def _check_database_state(self) -> Dict[str, Any]: """Check the current state of migrations""" state = { "has_migrations": False, "migration_count": 0, "is_empty": False, "existing_tables": [], "has_alembic_version": False, "current_revision": None } try: # Check if migration files exist migrations_dir = self._get_migrations_versions_dir() if migrations_dir.exists(): migration_files = list(migrations_dir.glob("*.py")) migration_files = [f for f in migration_files if f.name != "__pycache__" and not f.name.startswith("_")] state["migration_count"] = len(migration_files) state["has_migrations"] = len(migration_files) > 0 self.logger.info("Found migration files", count=len(migration_files)) # Check database tables async with self.database_manager.get_session() as session: existing_tables = await self._get_existing_tables(session) state["existing_tables"] = existing_tables state["is_empty"] = len(existing_tables) == 0 # Check alembic_version table if "alembic_version" in existing_tables: state["has_alembic_version"] = True result = await session.execute(text("SELECT version_num FROM alembic_version")) version = result.scalar() state["current_revision"] = version except Exception as e: self.logger.warning("Error checking database state", error=str(e)) return state async def _handle_no_migrations(self) -> Dict[str, Any]: """Handle case where no migration files exist - use create_all()""" self.logger.info("No migrations found, using create_all() to initialize tables") try: # Create tables directly using SQLAlchemy metadata await self._create_tables_from_models() return { "action": "tables_created_via_create_all", "tables_created": True, "message": "Tables created using SQLAlchemy create_all()" } except Exception as e: self.logger.error("Failed to create tables", error=str(e)) raise async def _handle_run_migrations(self) -> Dict[str, Any]: """Handle normal migration scenario - run pending migrations""" self.logger.info("Running pending migrations") try: await self._run_migrations() return { "action": "migrations_applied", "message": "Pending migrations applied successfully" } except Exception as e: self.logger.error("Failed to run migrations", error=str(e)) raise async def _handle_force_recreate(self) -> Dict[str, Any]: """Handle development reset scenario - drop and recreate tables using existing migrations""" self.logger.info("Force recreate: dropping tables and rerunning migrations") try: # Drop all tables await self._drop_all_tables() # Apply migrations from scratch await self._run_migrations() return { "action": "force_recreate", "tables_dropped": True, "migrations_applied": True, "message": "Database recreated from existing migrations" } except Exception as e: self.logger.error("Failed to force recreate", error=str(e)) raise async def _run_migrations(self): """Run pending Alembic migrations (upgrade head)""" try: def run_alembic_upgrade(): import os from pathlib import Path # Ensure we're in the correct working directory alembic_dir = Path(self.alembic_ini_path).parent original_cwd = os.getcwd() try: os.chdir(alembic_dir) alembic_cfg = Config(self.alembic_ini_path) # Set the SQLAlchemy URL from the database manager alembic_cfg.set_main_option("sqlalchemy.url", str(self.database_manager.database_url)) # Run upgrade command.upgrade(alembic_cfg, "head") finally: os.chdir(original_cwd) # Run in executor to avoid blocking await asyncio.get_event_loop().run_in_executor(None, run_alembic_upgrade) self.logger.info("Migrations applied successfully") except Exception as e: self.logger.error("Failed to run migrations", error=str(e)) raise async def _create_tables_from_models(self): """Create tables using SQLAlchemy metadata (create_all)""" try: async with self.database_manager.async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) self.logger.info("Tables created via create_all()") except Exception as e: self.logger.error("Failed to create tables", error=str(e)) raise async def _drop_all_tables(self): """Drop all tables (for development reset)""" try: async with self.database_manager.async_engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) self.logger.info("All tables dropped") except Exception as e: self.logger.error("Failed to drop tables", error=str(e)) raise def _get_migrations_versions_dir(self) -> Path: """Get the migrations/versions directory path""" alembic_path = Path(self.alembic_ini_path).parent return alembic_path / "migrations" / "versions" async def _get_existing_tables(self, session: AsyncSession) -> List[str]: """Get list of existing tables in the database""" def get_tables_sync(connection): insp = inspect(connection) return insp.get_table_names() connection = await session.connection() return await connection.run_sync(get_tables_sync) def create_init_manager( database_manager: DatabaseManager, service_name: str, service_path: Optional[str] = None, force_recreate: bool = False, allow_create_all_fallback: Optional[bool] = None, environment: Optional[str] = None ) -> DatabaseInitManager: """ Factory function to create a DatabaseInitManager with auto-detected paths Args: database_manager: DatabaseManager instance service_name: Name of the service service_path: Path to service directory (auto-detected if None) force_recreate: Whether to force recreate tables (development mode) allow_create_all_fallback: Allow create_all() if no migrations (auto-detect from env if None) environment: Environment name (auto-detect from ENVIRONMENT env var if None) """ # Auto-detect environment if environment is None: environment = os.getenv('ENVIRONMENT', 'development') # Auto-detect fallback setting based on environment if allow_create_all_fallback is None: # Only allow fallback in development/local environments allow_create_all_fallback = environment.lower() in ['development', 'dev', 'local', 'test'] allow_create_all_fallback = False # Auto-detect paths if not provided if service_path is None: # Try Docker container path first (service files at root level) if os.path.exists("alembic.ini"): service_path = "." else: # Fallback to development path service_path = f"services/{service_name}" # Set up paths based on environment if service_path == ".": # Docker container environment alembic_ini_path = "alembic.ini" models_module = "app.models" else: # Development environment alembic_ini_path = f"{service_path}/alembic.ini" models_module = f"services.{service_name}.app.models" # Check if paths exist if not os.path.exists(alembic_ini_path): logger.warning("Alembic config not found", path=alembic_ini_path) alembic_ini_path = None return DatabaseInitManager( database_manager=database_manager, service_name=service_name, alembic_ini_path=alembic_ini_path, models_module=models_module, force_recreate=force_recreate, allow_create_all_fallback=allow_create_all_fallback, environment=environment ) async def initialize_service_database( database_manager: DatabaseManager, service_name: str, force_recreate: bool = False, allow_create_all_fallback: Optional[bool] = None, environment: Optional[str] = None ) -> Dict[str, Any]: """ Convenience function for service database initialization Args: database_manager: DatabaseManager instance service_name: Name of the service force_recreate: Whether to force recreate (development mode) allow_create_all_fallback: Allow create_all() if no migrations (auto-detect from env if None) environment: Environment name (auto-detect from ENVIRONMENT env var if None) Returns: Dict with initialization results """ init_manager = create_init_manager( database_manager=database_manager, service_name=service_name, force_recreate=force_recreate, allow_create_all_fallback=allow_create_all_fallback, environment=environment ) return await init_manager.initialize_database()