Files
bakery-ia/shared/database/base.py
2025-09-25 21:00:40 +02:00

331 lines
13 KiB
Python

"""
Enhanced Base Database Configuration for All Microservices
Provides DatabaseManager with connection pooling, health checks, and multi-database support
"""
import os
from typing import Optional, Dict, Any, List
from sqlalchemy import create_engine, text
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.pool import StaticPool, QueuePool
from contextlib import asynccontextmanager
import structlog
import time
from .exceptions import DatabaseError, ConnectionError, HealthCheckError
from .utils import DatabaseUtils
logger = structlog.get_logger()
Base = declarative_base()
class DatabaseManager:
"""Enhanced Database Manager for Microservices
Provides:
- Connection pooling with configurable settings
- Health checks and monitoring
- Multi-database support
- Session lifecycle management
- Background task session support
"""
def __init__(
self,
database_url: str,
service_name: str = "unknown",
pool_size: int = 20,
max_overflow: int = 30,
pool_recycle: int = 3600,
pool_pre_ping: bool = True,
echo: bool = False,
connect_timeout: int = 30,
**engine_kwargs
):
self.database_url = database_url
self.service_name = service_name
self.pool_size = pool_size
self.max_overflow = max_overflow
# Configure pool class based on database type
poolclass = QueuePool
if "sqlite" in database_url.lower():
poolclass = StaticPool
pool_size = 1
max_overflow = 0
# Create async engine with enhanced configuration
engine_config = {
"echo": echo,
"pool_pre_ping": pool_pre_ping,
"pool_recycle": pool_recycle,
"pool_size": pool_size,
"max_overflow": max_overflow,
"poolclass": poolclass,
"connect_args": {"command_timeout": connect_timeout},
**engine_kwargs
}
self.async_engine = create_async_engine(database_url, **engine_config)
# Create session factory
self.async_session_local = async_sessionmaker(
self.async_engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False
)
logger.info(f"DatabaseManager initialized for {service_name}",
pool_size=pool_size,
max_overflow=max_overflow,
database_type=self._get_database_type())
async def get_db(self):
"""Get database session for request handlers (FastAPI dependency)"""
async with self.async_session_local() as session:
try:
logger.debug("Database session created for request")
yield session
except Exception as e:
# Don't wrap HTTPExceptions - let them pass through
if hasattr(e, 'status_code') and hasattr(e, 'detail'):
# This is likely an HTTPException - don't wrap it
await session.rollback()
raise
error_msg = str(e) if str(e) else f"{type(e).__name__}: {repr(e)}"
logger.error(f"Database session error: {error_msg}", service=self.service_name)
await session.rollback()
# Handle specific ASGI stream issues more gracefully
if "EndOfStream" in str(type(e)) or "WouldBlock" in str(type(e)):
raise DatabaseError(f"Session error: Request stream disconnected ({type(e).__name__})")
else:
raise DatabaseError(f"Session error: {error_msg}")
finally:
await session.close()
logger.debug("Database session closed")
@asynccontextmanager
async def get_background_session(self):
"""
Get database session for background tasks with auto-commit
Usage:
async with database_manager.get_background_session() as session:
# Your background task code here
# Auto-commits on success, rolls back on exception
"""
async with self.async_session_local() as session:
try:
logger.debug("Background session created", service=self.service_name)
yield session
await session.commit()
logger.debug("Background session committed")
except Exception as e:
await session.rollback()
logger.error(f"Background task database error: {e}",
service=self.service_name)
raise DatabaseError(f"Background task failed: {str(e)}")
finally:
await session.close()
logger.debug("Background session closed")
@asynccontextmanager
async def get_session(self):
"""Get a plain database session (no auto-commit)"""
async with self.async_session_local() as session:
try:
yield session
except Exception as e:
await session.rollback()
logger.error(f"Session error: {e}", service=self.service_name)
raise DatabaseError(f"Session error: {str(e)}")
finally:
await session.close()
# ===== TABLE MANAGEMENT =====
async def create_tables(self, metadata=None):
"""Create database tables"""
try:
target_metadata = metadata or Base.metadata
async with self.async_engine.begin() as conn:
await conn.run_sync(target_metadata.create_all, checkfirst=True)
logger.info("Database tables created successfully", service=self.service_name)
except Exception as e:
# Check if it's a "relation already exists" error which can be safely ignored
error_str = str(e).lower()
if "already exists" in error_str or "duplicate" in error_str:
logger.warning(f"Some database objects already exist - continuing: {e}", service=self.service_name)
logger.info("Database tables creation completed (some already existed)", service=self.service_name)
else:
logger.error(f"Failed to create tables: {e}", service=self.service_name)
raise DatabaseError(f"Table creation failed: {str(e)}")
async def drop_tables(self, metadata=None):
"""Drop database tables"""
try:
target_metadata = metadata or Base.metadata
async with self.async_engine.begin() as conn:
await conn.run_sync(target_metadata.drop_all)
logger.info("Database tables dropped successfully", service=self.service_name)
except Exception as e:
logger.error(f"Failed to drop tables: {e}", service=self.service_name)
raise DatabaseError(f"Table drop failed: {str(e)}")
# ===== HEALTH CHECKS AND MONITORING =====
async def health_check(self) -> Dict[str, Any]:
"""Comprehensive health check for the database"""
try:
async with self.get_session() as session:
return await DatabaseUtils.execute_health_check(session)
except Exception as e:
logger.error(f"Health check failed: {e}", service=self.service_name)
raise HealthCheckError(f"Health check failed: {str(e)}")
async def get_connection_info(self) -> Dict[str, Any]:
"""Get database connection information"""
try:
pool = self.async_engine.pool
return {
"service_name": self.service_name,
"database_type": self._get_database_type(),
"pool_size": self.pool_size,
"max_overflow": self.max_overflow,
"current_checked_in": pool.checkedin() if pool else 0,
"current_checked_out": pool.checkedout() if pool else 0,
"current_overflow": pool.overflow() if pool else 0,
"invalid_connections": pool.invalid() if pool else 0
}
except Exception as e:
logger.error(f"Failed to get connection info: {e}", service=self.service_name)
return {"error": str(e)}
def _get_database_type(self) -> str:
"""Get database type from URL"""
return self.database_url.split("://")[0].lower() if "://" in self.database_url else "unknown"
# ===== CLEANUP AND MAINTENANCE =====
async def close_connections(self):
"""Close all database connections"""
try:
await self.async_engine.dispose()
logger.info("Database connections closed", service=self.service_name)
except Exception as e:
logger.error(f"Failed to close connections: {e}", service=self.service_name)
raise DatabaseError(f"Connection cleanup failed: {str(e)}")
async def execute_maintenance(self) -> Dict[str, Any]:
"""Execute database maintenance tasks"""
try:
async with self.get_session() as session:
return await DatabaseUtils.execute_maintenance(session)
except Exception as e:
logger.error(f"Maintenance failed: {e}", service=self.service_name)
raise DatabaseError(f"Maintenance failed: {str(e)}")
# ===== UTILITY METHODS =====
async def test_connection(self) -> bool:
"""Test database connectivity"""
try:
async with self.async_engine.begin() as conn:
await conn.execute(text("SELECT 1"))
logger.debug("Connection test successful", service=self.service_name)
return True
except Exception as e:
logger.error(f"Connection test failed: {e}", service=self.service_name)
return False
def __repr__(self) -> str:
return f"DatabaseManager(service='{self.service_name}', type='{self._get_database_type()}')"
async def execute(self, query: str, *args, **kwargs):
"""
Execute a raw SQL query with proper session management
Note: Use this method carefully to avoid transaction conflicts
"""
from sqlalchemy import text
# Use a new session context to avoid conflicts with existing sessions
async with self.get_session() as session:
try:
# Convert query to SQLAlchemy text object if it's a string
if isinstance(query, str):
query = text(query)
result = await session.execute(query, *args, **kwargs)
# For UPDATE/DELETE operations that need to be committed
if query.text.strip().upper().startswith(('UPDATE', 'DELETE', 'INSERT')):
await session.commit()
return result
except Exception as e:
# Only rollback if it was a modifying operation
if isinstance(query, str) and query.strip().upper().startswith(('UPDATE', 'DELETE', 'INSERT')):
await session.rollback()
logger.error("Database execute failed", error=str(e))
raise
# ===== CONVENIENCE FUNCTIONS =====
# ===== CONVENIENCE FUNCTIONS =====
def create_database_manager(
database_url: str,
service_name: str,
**kwargs
) -> DatabaseManager:
"""Factory function to create DatabaseManager instances"""
return DatabaseManager(database_url, service_name, **kwargs)
# ===== LEGACY COMPATIBILITY =====
# Keep backward compatibility for existing code
engine = None
AsyncSessionLocal = None
def init_legacy_compatibility(database_url: str):
"""Initialize legacy global variables for backward compatibility"""
global engine, AsyncSessionLocal
engine = create_async_engine(
database_url,
echo=False,
pool_pre_ping=True,
pool_recycle=300,
pool_size=20,
max_overflow=30
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
logger.warning("Using legacy database configuration - consider migrating to DatabaseManager")
async def get_legacy_db():
"""Legacy database session getter for backward compatibility"""
if not AsyncSessionLocal:
raise RuntimeError("Legacy database not initialized - call init_legacy_compatibility first")
async with AsyncSessionLocal() as session:
try:
yield session
except Exception as e:
logger.error(f"Legacy database session error: {e}")
await session.rollback()
raise
finally:
await session.close()