# ================================================================ # services/forecasting/app/core/database.py # ================================================================ """ Database configuration for forecasting service """ import structlog from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy import text from typing import AsyncGenerator from app.core.config import settings from shared.database.base import Base, DatabaseManager logger = structlog.get_logger() # Create async engine async_engine = create_async_engine( settings.DATABASE_URL, echo=settings.DEBUG, pool_size=10, max_overflow=20, pool_pre_ping=True, pool_recycle=3600 ) # Create async session factory AsyncSessionLocal = async_sessionmaker( bind=async_engine, class_=AsyncSession, expire_on_commit=False ) async def get_db() -> AsyncGenerator[AsyncSession, None]: """Get database session""" async with AsyncSessionLocal() as session: try: yield session except Exception as e: await session.rollback() logger.error("Database session error", error=str(e)) raise finally: await session.close() async def init_database(): """Initialize database tables""" try: async with async_engine.begin() as conn: # Import all models to ensure they are registered from app.models.forecast import ForecastBatch, Forecast from app.models.prediction import PredictionBatch, Prediction # Create all tables await conn.run_sync(Base.metadata.create_all) logger.info("Forecasting database initialized successfully") except Exception as e: logger.error("Failed to initialize forecasting database", error=str(e)) raise async def get_db_health() -> bool: """Check database health""" try: async with async_engine.begin() as conn: await conn.execute(text("SELECT 1")) return True except Exception as e: logger.error("Database health check failed", error=str(e)) return False async def get_connection_pool_stats() -> dict: """ Get current connection pool statistics for monitoring. Returns: Dictionary with pool statistics including usage and capacity """ try: pool = async_engine.pool # Get pool stats stats = { "pool_size": pool.size(), "checked_in_connections": pool.checkedin(), "checked_out_connections": pool.checkedout(), "overflow_connections": pool.overflow(), "total_connections": pool.size() + pool.overflow(), "max_capacity": 10 + 20, # pool_size + max_overflow "usage_percentage": round(((pool.size() + pool.overflow()) / 30) * 100, 2) } # Add health status if stats["usage_percentage"] > 90: stats["status"] = "critical" stats["message"] = "Connection pool near capacity" elif stats["usage_percentage"] > 80: stats["status"] = "warning" stats["message"] = "Connection pool usage high" else: stats["status"] = "healthy" stats["message"] = "Connection pool healthy" return stats except Exception as e: logger.error("Failed to get connection pool stats", error=str(e)) return { "status": "error", "message": f"Failed to get pool stats: {str(e)}" } # Database manager instance for service_base compatibility database_manager = DatabaseManager( database_url=settings.DATABASE_URL, service_name="forecasting-service", pool_size=10, max_overflow=20, echo=settings.DEBUG )