# ================================================================ # services/training/app/main.py - FIXED VERSION # ================================================================ """ Training Service Main Application Enhanced with proper error handling, monitoring, and lifecycle management """ import structlog import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.responses import JSONResponse import uvicorn from app.core.config import settings from app.core.database import initialize_training_database, cleanup_training_database from app.api import training, models from app.services.messaging import setup_messaging, cleanup_messaging from shared.monitoring.logging import setup_logging from shared.monitoring.metrics import MetricsCollector # REMOVED: from shared.auth.decorators import require_auth # Setup structured logging setup_logging("training-service", settings.LOG_LEVEL) logger = structlog.get_logger() # Initialize metrics collector metrics_collector = MetricsCollector("training-service") @asynccontextmanager async def lifespan(app: FastAPI): """ Application lifespan manager for startup and shutdown events """ # Startup logger.info("Starting Training Service", version="1.0.0") try: # Initialize database logger.info("Initializing database connection") await initialize_training_database() logger.info("Database initialized successfully") # Initialize messaging logger.info("Setting up messaging") await setup_messaging() logger.info("Messaging setup completed") # Start metrics server logger.info("Starting metrics server") metrics_collector.start_metrics_server(8080) logger.info("Metrics server started on port 8080") # Store metrics collector in app state app.state.metrics_collector = metrics_collector # Mark service as ready app.state.ready = True logger.info("Training Service startup completed successfully") yield except Exception as e: logger.error("Failed to start Training Service", error=str(e)) app.state.ready = False raise # Shutdown logger.info("Shutting down Training Service") try: # Stop metrics server if hasattr(app.state, 'metrics_collector'): await app.state.metrics_collector.shutdown() # Cleanup messaging await cleanup_messaging() logger.info("Messaging cleanup completed") # Close database connections await cleanup_training_database() logger.info("Database connections closed") except Exception as e: logger.error("Error during shutdown", error=str(e)) logger.info("Training Service shutdown completed") # Create FastAPI application with lifespan app = FastAPI( title="Bakery Training Service", description="ML training service for bakery demand forecasting", version="1.0.0", docs_url="/docs", redoc_url="/redoc", lifespan=lifespan ) # Add middleware app.add_middleware( CORSMiddleware, allow_origins=settings.CORS_ORIGINS_LIST, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Request middleware for logging and metrics @app.middleware("http") async def process_request(request: Request, call_next): """Process requests with logging and metrics""" start_time = asyncio.get_event_loop().time() try: response = await call_next(request) duration = asyncio.get_event_loop().time() - start_time logger.info( "Request completed", method=request.method, path=request.url.path, status_code=response.status_code, duration_ms=round(duration * 1000, 2) ) # Update metrics metrics_collector.record_request( method=request.method, endpoint=request.url.path, status_code=response.status_code, duration=duration ) return response except Exception as e: duration = asyncio.get_event_loop().time() - start_time logger.error( "Request failed", method=request.method, path=request.url.path, error=str(e), duration_ms=round(duration * 1000, 2) ) metrics_collector.increment_counter("http_requests_failed_total") raise # Exception handlers @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): """Global exception handler for unhandled errors""" logger.error( "Unhandled exception", path=request.url.path, method=request.method, error=str(exc), exc_info=True ) metrics_collector.increment_counter("unhandled_exceptions_total") return JSONResponse( status_code=500, content={ "detail": "Internal server error", "error_id": structlog.get_logger().new().info("Error logged", error=str(exc)) } ) # Include API routers app.include_router(training.router, prefix="/api/v1", tags=["training"]) app.include_router(models.router, prefix="/api/v1", tags=["models"]) # Health check endpoints @app.get("/health") async def health_check(): """Basic health check endpoint""" return { "status": "healthy" if app.state.ready else "starting", "service": "training-service", "version": "1.0.0", "timestamp": structlog.get_logger().new().info("Health check") } @app.get("/health/ready") async def readiness_check(): """Kubernetes readiness probe endpoint""" checks = { "database": await get_db_health(), "application": getattr(app.state, 'ready', False) } if all(checks.values()): return {"status": "ready", "checks": checks} else: return JSONResponse( status_code=503, content={"status": "not ready", "checks": checks} ) @app.get("/metrics") async def get_metrics(): """Prometheus metrics endpoint""" if hasattr(app.state, 'metrics_collector'): return app.state.metrics_collector.get_metrics() return {"status": "metrics not available"} @app.get("/health/live") async def liveness_check(): return {"status": "alive"} @app.get("/health/ready") async def readiness_check(): ready = getattr(app.state, 'ready', True) return {"status": "ready" if ready else "not ready"} @app.get("/") async def root(): return {"service": "training-service", "version": "1.0.0"} if __name__ == "__main__": uvicorn.run( "app.main:app", host="0.0.0.0", port=settings.PORT, reload=settings.DEBUG, log_level=settings.LOG_LEVEL.lower() )