# ================================================================ # services/auth/app/core/database.py (ENHANCED VERSION) # ================================================================ """ Database configuration for authentication service """ import structlog from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.pool import NullPool from app.core.config import settings from shared.database.base import Base logger = structlog.get_logger() # Create async engine engine = create_async_engine( settings.DATABASE_URL, poolclass=NullPool, echo=settings.DEBUG, future=True ) # Create session factory AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, autoflush=False, autocommit=False ) async def get_db() -> AsyncSession: """Database dependency""" async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception as e: await session.rollback() logger.error(f"Database session error: {e}") raise finally: await session.close() async def create_tables(): """Create database tables""" async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info("Database tables created successfully") # ================================================================ # services/auth/app/core/database.py - UPDATED TO USE SHARED INFRASTRUCTURE # ================================================================ """ Database configuration for authentication service Uses shared database infrastructure for consistency """ import structlog from typing import AsyncGenerator from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import text from shared.database.base import DatabaseManager, Base from app.core.config import settings logger = structlog.get_logger() # ✅ Initialize database manager using shared infrastructure database_manager = DatabaseManager(settings.DATABASE_URL) # ✅ Alias for convenience - matches the existing interface get_db = database_manager.get_db # ✅ Use the shared background session method get_background_db_session = database_manager.get_background_session async def get_db_health() -> bool: """ Health check function for database connectivity """ try: async with database_manager.async_engine.begin() as conn: await conn.execute(text("SELECT 1")) logger.debug("Database health check passed") return True except Exception as e: logger.error("Database health check failed", error=str(e)) return False async def create_tables(): """Create database tables using shared infrastructure""" await database_manager.create_tables() logger.info("Auth database tables created successfully") # ✅ Auth service specific database utilities class AuthDatabaseUtils: """Auth service specific database utilities""" @staticmethod async def cleanup_old_refresh_tokens(days_old: int = 30): """Clean up old refresh tokens""" try: async with database_manager.get_background_session() as session: if settings.DATABASE_URL.startswith("sqlite"): query = text( "DELETE FROM refresh_tokens " "WHERE created_at < datetime('now', :days_param)" ) params = {"days_param": f"-{days_old} days"} else: # PostgreSQL query = text( "DELETE FROM refresh_tokens " "WHERE created_at < NOW() - INTERVAL :days_param" ) params = {"days_param": f"{days_old} days"} result = await session.execute(query, params) # No need to commit - get_background_session() handles it logger.info("Cleaned up old refresh tokens", deleted_count=result.rowcount, days_old=days_old) return result.rowcount except Exception as e: logger.error("Failed to cleanup old refresh tokens", error=str(e)) return 0 @staticmethod async def get_auth_statistics(tenant_id: str = None) -> dict: """Get authentication statistics""" try: async with database_manager.get_background_session() as session: # Base query for users users_query = text("SELECT COUNT(*) as count FROM users WHERE is_active = :is_active") params = {} if tenant_id: # If tenant filtering is needed (though auth service might not have tenant_id in users table) # This is just an example - adjust based on your actual schema pass # Get active users count active_users_result = await session.execute( users_query, {**params, "is_active": True} ) active_users = active_users_result.scalar() or 0 # Get inactive users count inactive_users_result = await session.execute( users_query, {**params, "is_active": False} ) inactive_users = inactive_users_result.scalar() or 0 # Get refresh tokens count tokens_query = text("SELECT COUNT(*) as count FROM refresh_tokens") tokens_result = await session.execute(tokens_query) active_tokens = tokens_result.scalar() or 0 return { "active_users": active_users, "inactive_users": inactive_users, "total_users": active_users + inactive_users, "active_tokens": active_tokens } except Exception as e: logger.error("Failed to get auth statistics", error=str(e)) return { "active_users": 0, "inactive_users": 0, "total_users": 0, "active_tokens": 0 } @staticmethod async def check_user_exists(user_id: str) -> bool: """Check if user exists""" try: async with database_manager.get_background_session() as session: query = text( "SELECT COUNT(*) as count " "FROM users " "WHERE id = :user_id " "LIMIT 1" ) result = await session.execute(query, {"user_id": user_id}) count = result.scalar() or 0 return count > 0 except Exception as e: logger.error("Failed to check user existence", user_id=user_id, error=str(e)) return False @staticmethod async def get_user_token_count(user_id: str) -> int: """Get count of active refresh tokens for a user""" try: async with database_manager.get_background_session() as session: query = text( "SELECT COUNT(*) as count " "FROM refresh_tokens " "WHERE user_id = :user_id" ) result = await session.execute(query, {"user_id": user_id}) count = result.scalar() or 0 return count except Exception as e: logger.error("Failed to get user token count", user_id=user_id, error=str(e)) return 0 # Enhanced database session dependency with better error handling async def get_db_session() -> AsyncGenerator[AsyncSession, None]: """ Enhanced database session dependency with better logging and error handling """ async with database_manager.async_session_local() as session: try: logger.debug("Database session created") yield session except Exception as e: logger.error("Database session error", error=str(e), exc_info=True) await session.rollback() raise finally: await session.close() logger.debug("Database session closed") # Database initialization for auth service async def initialize_auth_database(): """Initialize database tables for auth service""" try: logger.info("Initializing auth service database") # Import models to ensure they're registered from app.models.users import User from app.models.refresh_tokens import RefreshToken # Create tables using shared infrastructure await database_manager.create_tables() logger.info("Auth service database initialized successfully") except Exception as e: logger.error("Failed to initialize auth service database", error=str(e)) raise # Database cleanup for auth service async def cleanup_auth_database(): """Cleanup database connections for auth service""" try: logger.info("Cleaning up auth service database connections") # Close engine connections if hasattr(database_manager, 'async_engine') and database_manager.async_engine: await database_manager.async_engine.dispose() logger.info("Auth service database cleanup completed") except Exception as e: logger.error("Failed to cleanup auth service database", error=str(e)) # Export the commonly used items to maintain compatibility __all__ = [ 'Base', 'database_manager', 'get_db', 'get_background_db_session', 'get_db_session', 'get_db_health', 'AuthDatabaseUtils', 'initialize_auth_database', 'cleanup_auth_database', 'create_tables' ]