# ================================================================ # services/production/app/main.py # ================================================================ """ Production Service - FastAPI Application Production planning and batch management service """ import time from fastapi import FastAPI, Request from sqlalchemy import text from app.core.config import settings from app.core.database import database_manager from app.services.production_alert_service import ProductionAlertService from app.services.production_scheduler_service import ProductionSchedulerService from shared.service_base import StandardFastAPIService # Import standardized routers from app.api import ( production_batches, production_schedules, production_operations, production_dashboard, analytics, quality_templates, equipment, internal_demo ) class ProductionService(StandardFastAPIService): """Production Service with standardized setup""" expected_migration_version = "00001" async def on_startup(self, app): """Custom startup logic including migration verification""" await self.verify_migrations() await super().on_startup(app) async def verify_migrations(self): """Verify database schema matches the latest migrations.""" try: async with self.database_manager.get_session() as session: result = await session.execute(text("SELECT version_num FROM alembic_version")) version = result.scalar() if version != self.expected_migration_version: self.logger.error(f"Migration version mismatch: expected {self.expected_migration_version}, got {version}") raise RuntimeError(f"Migration version mismatch: expected {self.expected_migration_version}, got {version}") self.logger.info(f"Migration verification successful: {version}") except Exception as e: self.logger.error(f"Migration verification failed: {e}") raise def __init__(self): # Define expected database tables for health checks production_expected_tables = [ 'production_batches', 'production_schedules', 'production_capacity', 'quality_check_templates', 'quality_checks', 'equipment' ] self.alert_service = None self.scheduler_service = None # Create custom checks for services async def check_alert_service(): """Check production alert service health""" try: return bool(self.alert_service) if self.alert_service else False except Exception as e: self.logger.error("Alert service health check failed", error=str(e)) return False async def check_scheduler_service(): """Check production scheduler service health""" try: return bool(self.scheduler_service) if self.scheduler_service else False except Exception as e: self.logger.error("Scheduler service health check failed", error=str(e)) return False super().__init__( service_name=settings.SERVICE_NAME, app_name=settings.APP_NAME, description=settings.DESCRIPTION, version=settings.VERSION, api_prefix="", # Empty because RouteBuilder already includes /api/v1 database_manager=database_manager, expected_tables=production_expected_tables, custom_health_checks={ "alert_service": check_alert_service, "scheduler_service": check_scheduler_service } ) async def on_startup(self, app: FastAPI): """Custom startup logic for production service""" # Initialize alert service self.alert_service = ProductionAlertService(settings) await self.alert_service.start() self.logger.info("Production alert service started") # Initialize production scheduler service self.scheduler_service = ProductionSchedulerService(settings) await self.scheduler_service.start() self.logger.info("Production scheduler service started") # Store services in app state app.state.alert_service = self.alert_service app.state.scheduler_service = self.scheduler_service async def on_shutdown(self, app: FastAPI): """Custom startup logic for production service""" # Stop scheduler service if self.scheduler_service: await self.scheduler_service.stop() self.logger.info("Scheduler service stopped") # Stop alert service if self.alert_service: await self.alert_service.stop() self.logger.info("Alert service stopped") def get_service_features(self): """Return production-specific features""" return [ "production_planning", "batch_management", "production_scheduling", "automated_daily_scheduling", # NEW: Automated scheduler "quality_control", "equipment_management", "capacity_planning", "alert_notifications" ] def setup_custom_middleware(self): """Setup custom middleware for production service""" @self.app.middleware("http") async def logging_middleware(request: Request, call_next): """Add request logging middleware""" start_time = time.time() response = await call_next(request) process_time = time.time() - start_time self.logger.info("HTTP request processed", method=request.method, url=str(request.url), status_code=response.status_code, process_time=round(process_time, 4)) return response # Create service instance service = ProductionService() # Create FastAPI app with standardized setup app = service.create_app() # Setup standard endpoints service.setup_standard_endpoints() # Setup custom middleware service.setup_custom_middleware() # Include standardized routers # NOTE: Register more specific routes before generic parameterized routes service.add_router(quality_templates.router) # Register first to avoid route conflicts service.add_router(equipment.router) service.add_router(production_batches.router) service.add_router(production_schedules.router) service.add_router(production_operations.router) service.add_router(production_dashboard.router) service.add_router(analytics.router) service.add_router(internal_demo.router) @app.post("/test/production-scheduler") async def test_production_scheduler(): """Test endpoint to manually trigger production scheduler""" try: if hasattr(app.state, 'scheduler_service'): scheduler_service = app.state.scheduler_service await scheduler_service.test_production_schedule_generation() return {"message": "Production scheduler test triggered successfully"} else: return {"error": "Scheduler service not available"} except Exception as e: service.logger.error("Error testing production scheduler", error=str(e)) return {"error": f"Failed to trigger scheduler test: {str(e)}"} if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=settings.DEBUG )