""" Enhanced Base Database Configuration for All Microservices Provides DatabaseManager with connection pooling, health checks, and multi-database support """ import os from typing import Optional, Dict, Any, List from sqlalchemy import create_engine, text from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy.pool import StaticPool, QueuePool from contextlib import asynccontextmanager import structlog import time from .exceptions import DatabaseError, ConnectionError, HealthCheckError from .utils import DatabaseUtils logger = structlog.get_logger() Base = declarative_base() class DatabaseManager: """Enhanced Database Manager for Microservices Provides: - Connection pooling with configurable settings - Health checks and monitoring - Multi-database support - Session lifecycle management - Background task session support """ def __init__( self, database_url: str, service_name: str = "unknown", pool_size: int = 20, max_overflow: int = 30, pool_recycle: int = 3600, pool_pre_ping: bool = True, echo: bool = False, connect_timeout: int = 30, **engine_kwargs ): self.database_url = database_url self.service_name = service_name self.pool_size = pool_size self.max_overflow = max_overflow # Configure pool class based on database type poolclass = QueuePool if "sqlite" in database_url.lower(): poolclass = StaticPool pool_size = 1 max_overflow = 0 # Create async engine with enhanced configuration engine_config = { "echo": echo, "pool_pre_ping": pool_pre_ping, "pool_recycle": pool_recycle, "pool_size": pool_size, "max_overflow": max_overflow, "poolclass": poolclass, "connect_args": {"command_timeout": connect_timeout}, **engine_kwargs } self.async_engine = create_async_engine(database_url, **engine_config) # Create session factory self.async_session_local = async_sessionmaker( self.async_engine, class_=AsyncSession, expire_on_commit=False, autoflush=False, autocommit=False ) logger.info(f"DatabaseManager initialized for {service_name}", pool_size=pool_size, max_overflow=max_overflow, database_type=self._get_database_type()) async def get_db(self): """Get database session for request handlers (FastAPI dependency)""" async with self.async_session_local() as session: try: logger.debug("Database session created for request") yield session except Exception as e: # Don't wrap HTTPExceptions - let them pass through if hasattr(e, 'status_code') and hasattr(e, 'detail'): # This is likely an HTTPException - don't wrap it await session.rollback() raise error_msg = str(e) if str(e) else f"{type(e).__name__}: {repr(e)}" logger.error(f"Database session error: {error_msg}", service=self.service_name) await session.rollback() # Handle specific ASGI stream issues more gracefully if "EndOfStream" in str(type(e)) or "WouldBlock" in str(type(e)): raise DatabaseError(f"Session error: Request stream disconnected ({type(e).__name__})") else: raise DatabaseError(f"Session error: {error_msg}") finally: await session.close() logger.debug("Database session closed") @asynccontextmanager async def get_background_session(self): """ Get database session for background tasks with auto-commit Usage: async with database_manager.get_background_session() as session: # Your background task code here # Auto-commits on success, rolls back on exception """ async with self.async_session_local() as session: try: logger.debug("Background session created", service=self.service_name) yield session await session.commit() logger.debug("Background session committed") except Exception as e: await session.rollback() logger.error(f"Background task database error: {e}", service=self.service_name) raise DatabaseError(f"Background task failed: {str(e)}") finally: await session.close() logger.debug("Background session closed") @asynccontextmanager async def get_session(self): """Get a plain database session (no auto-commit)""" async with self.async_session_local() as session: try: yield session except Exception as e: await session.rollback() logger.error(f"Session error: {e}", service=self.service_name) raise DatabaseError(f"Session error: {str(e)}") finally: await session.close() # ===== TABLE MANAGEMENT ===== async def create_tables(self, metadata=None): """Create database tables""" try: target_metadata = metadata or Base.metadata async with self.async_engine.begin() as conn: await conn.run_sync(target_metadata.create_all, checkfirst=True) logger.info("Database tables created successfully", service=self.service_name) except Exception as e: # Check if it's a "relation already exists" error which can be safely ignored error_str = str(e).lower() if "already exists" in error_str or "duplicate" in error_str: logger.warning(f"Some database objects already exist - continuing: {e}", service=self.service_name) logger.info("Database tables creation completed (some already existed)", service=self.service_name) else: logger.error(f"Failed to create tables: {e}", service=self.service_name) raise DatabaseError(f"Table creation failed: {str(e)}") async def drop_tables(self, metadata=None): """Drop database tables""" try: target_metadata = metadata or Base.metadata async with self.async_engine.begin() as conn: await conn.run_sync(target_metadata.drop_all) logger.info("Database tables dropped successfully", service=self.service_name) except Exception as e: logger.error(f"Failed to drop tables: {e}", service=self.service_name) raise DatabaseError(f"Table drop failed: {str(e)}") # ===== HEALTH CHECKS AND MONITORING ===== async def health_check(self) -> Dict[str, Any]: """Comprehensive health check for the database""" try: async with self.get_session() as session: return await DatabaseUtils.execute_health_check(session) except Exception as e: logger.error(f"Health check failed: {e}", service=self.service_name) raise HealthCheckError(f"Health check failed: {str(e)}") async def get_connection_info(self) -> Dict[str, Any]: """Get database connection information""" try: pool = self.async_engine.pool return { "service_name": self.service_name, "database_type": self._get_database_type(), "pool_size": self.pool_size, "max_overflow": self.max_overflow, "current_checked_in": pool.checkedin() if pool else 0, "current_checked_out": pool.checkedout() if pool else 0, "current_overflow": pool.overflow() if pool else 0, "invalid_connections": pool.invalid() if pool else 0 } except Exception as e: logger.error(f"Failed to get connection info: {e}", service=self.service_name) return {"error": str(e)} def _get_database_type(self) -> str: """Get database type from URL""" return self.database_url.split("://")[0].lower() if "://" in self.database_url else "unknown" # ===== CLEANUP AND MAINTENANCE ===== async def close_connections(self): """Close all database connections""" try: await self.async_engine.dispose() logger.info("Database connections closed", service=self.service_name) except Exception as e: logger.error(f"Failed to close connections: {e}", service=self.service_name) raise DatabaseError(f"Connection cleanup failed: {str(e)}") async def execute_maintenance(self) -> Dict[str, Any]: """Execute database maintenance tasks""" try: async with self.get_session() as session: return await DatabaseUtils.execute_maintenance(session) except Exception as e: logger.error(f"Maintenance failed: {e}", service=self.service_name) raise DatabaseError(f"Maintenance failed: {str(e)}") # ===== UTILITY METHODS ===== async def test_connection(self) -> bool: """Test database connectivity""" try: async with self.async_engine.begin() as conn: await conn.execute(text("SELECT 1")) logger.debug("Connection test successful", service=self.service_name) return True except Exception as e: logger.error(f"Connection test failed: {e}", service=self.service_name) return False def __repr__(self) -> str: return f"DatabaseManager(service='{self.service_name}', type='{self._get_database_type()}')" # ===== CONVENIENCE FUNCTIONS ===== # ===== CONVENIENCE FUNCTIONS ===== def create_database_manager( database_url: str, service_name: str, **kwargs ) -> DatabaseManager: """Factory function to create DatabaseManager instances""" return DatabaseManager(database_url, service_name, **kwargs) # ===== LEGACY COMPATIBILITY ===== # Keep backward compatibility for existing code engine = None AsyncSessionLocal = None def init_legacy_compatibility(database_url: str): """Initialize legacy global variables for backward compatibility""" global engine, AsyncSessionLocal engine = create_async_engine( database_url, echo=False, pool_pre_ping=True, pool_recycle=300, pool_size=20, max_overflow=30 ) AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) logger.warning("Using legacy database configuration - consider migrating to DatabaseManager") async def get_legacy_db(): """Legacy database session getter for backward compatibility""" if not AsyncSessionLocal: raise RuntimeError("Legacy database not initialized - call init_legacy_compatibility first") async with AsyncSessionLocal() as session: try: yield session except Exception as e: logger.error(f"Legacy database session error: {e}") await session.rollback() raise finally: await session.close()