# services/production/app/api/analytics.py """ Analytics API endpoints for Production Service Following standardized URL structure: /api/v1/tenants/{tenant_id}/production/analytics/{operation} Requires: Professional or Enterprise subscription tier """ from datetime import date, datetime, timedelta from typing import Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Path, Query import structlog from shared.auth.decorators import get_current_user_dep from shared.auth.access_control import analytics_tier_required from app.services.production_service import ProductionService from app.core.config import settings from shared.routing import RouteBuilder logger = structlog.get_logger() # Create route builder for consistent URL structure route_builder = RouteBuilder('production') router = APIRouter(tags=["production-analytics"]) def get_production_service() -> ProductionService: """Dependency injection for production service""" from app.core.database import database_manager return ProductionService(database_manager, settings) # ===== ANALYTICS ENDPOINTS (Professional/Enterprise Only) ===== @router.get( route_builder.build_analytics_route("equipment-efficiency"), response_model=dict ) @analytics_tier_required async def get_equipment_efficiency( tenant_id: UUID = Path(...), start_date: Optional[date] = Query(None, description="Start date for analysis"), end_date: Optional[date] = Query(None, description="End date for analysis"), equipment_id: Optional[UUID] = Query(None, description="Filter by equipment"), current_user: dict = Depends(get_current_user_dep), production_service: ProductionService = Depends(get_production_service) ): """ Analyze equipment efficiency (Professional/Enterprise only) Metrics: - Overall Equipment Effectiveness (OEE) - Availability rate - Performance rate - Quality rate - Downtime analysis """ try: # Set default dates if not end_date: end_date = datetime.now().date() if not start_date: start_date = end_date - timedelta(days=30) # Use existing method: get_equipment_efficiency_analytics efficiency_data = await production_service.get_equipment_efficiency_analytics(tenant_id) logger.info("Equipment efficiency analyzed", tenant_id=str(tenant_id), equipment_id=str(equipment_id) if equipment_id else "all", user_id=current_user.get('user_id')) return efficiency_data except Exception as e: logger.error("Error analyzing equipment efficiency", tenant_id=str(tenant_id), error=str(e)) raise HTTPException( status_code=500, detail="Failed to analyze equipment efficiency" ) @router.get( route_builder.build_analytics_route("production-trends"), response_model=dict ) @analytics_tier_required async def get_production_trends( tenant_id: UUID = Path(...), days_back: int = Query(90, ge=7, le=365, description="Days to analyze"), product_id: Optional[UUID] = Query(None, description="Filter by product"), current_user: dict = Depends(get_current_user_dep), production_service: ProductionService = Depends(get_production_service) ): """ Analyze production trends (Professional/Enterprise only) Provides: - Production volume trends - Batch completion rates - Cycle time analysis - Quality trends - Seasonal patterns """ try: # Use existing methods: get_performance_analytics + get_yield_trends_analytics end_date_calc = datetime.now().date() start_date_calc = end_date_calc - timedelta(days=days_back) performance = await production_service.get_performance_analytics( tenant_id, start_date_calc, end_date_calc ) # Map days_back to period string for yield trends period = "weekly" if days_back <= 30 else "monthly" yield_trends = await production_service.get_yield_trends_analytics(tenant_id, period) trends = { "performance_metrics": performance, "yield_trends": yield_trends, "days_analyzed": days_back, "product_filter": str(product_id) if product_id else None } logger.info("Production trends analyzed", tenant_id=str(tenant_id), days_analyzed=days_back, user_id=current_user.get('user_id')) return trends except Exception as e: logger.error("Error analyzing production trends", tenant_id=str(tenant_id), error=str(e)) raise HTTPException( status_code=500, detail="Failed to analyze production trends" ) @router.get( route_builder.build_analytics_route("capacity-utilization"), response_model=dict ) @analytics_tier_required async def get_capacity_utilization( tenant_id: UUID = Path(...), start_date: Optional[date] = Query(None), end_date: Optional[date] = Query(None), current_user: dict = Depends(get_current_user_dep), production_service: ProductionService = Depends(get_production_service) ): """ Analyze production capacity utilization (Professional/Enterprise only) Metrics: - Capacity utilization percentage - Bottleneck identification - Resource allocation efficiency - Optimization recommendations """ try: if not end_date: end_date = datetime.now().date() if not start_date: start_date = end_date - timedelta(days=30) # Use existing method: get_capacity_usage_report utilization = await production_service.get_capacity_usage_report( tenant_id, start_date, end_date ) logger.info("Capacity utilization analyzed", tenant_id=str(tenant_id), user_id=current_user.get('user_id')) return utilization except Exception as e: logger.error("Error analyzing capacity utilization", tenant_id=str(tenant_id), error=str(e)) raise HTTPException( status_code=500, detail="Failed to analyze capacity utilization" ) @router.get( route_builder.build_analytics_route("quality-metrics"), response_model=dict ) @analytics_tier_required async def get_quality_metrics( tenant_id: UUID = Path(...), start_date: Optional[date] = Query(None), end_date: Optional[date] = Query(None), product_id: Optional[UUID] = Query(None), current_user: dict = Depends(get_current_user_dep), production_service: ProductionService = Depends(get_production_service) ): """ Analyze quality control metrics (Professional/Enterprise only) Metrics: - First pass yield - Defect rates by type - Quality trends over time - Root cause analysis """ try: if not end_date: end_date = datetime.now().date() if not start_date: start_date = end_date - timedelta(days=30) # Use existing methods: get_quality_trends + get_top_defects_analytics quality_trends = await production_service.get_quality_trends( tenant_id, start_date, end_date ) top_defects = await production_service.get_top_defects_analytics(tenant_id) quality_data = { "quality_trends": quality_trends, "top_defects": top_defects, "period": { "start_date": start_date.isoformat(), "end_date": end_date.isoformat() }, "product_filter": str(product_id) if product_id else None } logger.info("Quality metrics analyzed", tenant_id=str(tenant_id), user_id=current_user.get('user_id')) return quality_data except Exception as e: logger.error("Error analyzing quality metrics", tenant_id=str(tenant_id), error=str(e)) raise HTTPException( status_code=500, detail="Failed to analyze quality metrics" ) @router.get( route_builder.build_analytics_route("waste-analysis"), response_model=dict ) @analytics_tier_required async def get_production_waste_analysis( tenant_id: UUID = Path(...), start_date: Optional[date] = Query(None), end_date: Optional[date] = Query(None), current_user: dict = Depends(get_current_user_dep), production_service: ProductionService = Depends(get_production_service) ): """ Analyze production waste (Professional/Enterprise only) Provides: - Material waste percentages - Waste by category/product - Cost impact analysis - Reduction recommendations """ try: if not end_date: end_date = datetime.now().date() if not start_date: start_date = end_date - timedelta(days=30) # Use existing method: get_batch_statistics to calculate waste from yield data batch_stats = await production_service.get_batch_statistics( tenant_id, start_date, end_date ) # Calculate waste metrics from batch statistics waste_analysis = { "batch_statistics": batch_stats, "waste_metrics": { "calculated_from": "yield_variance", "note": "Waste derived from planned vs actual quantity differences" }, "period": { "start_date": start_date.isoformat(), "end_date": end_date.isoformat() } } logger.info("Production waste analyzed", tenant_id=str(tenant_id), user_id=current_user.get('user_id')) return waste_analysis except Exception as e: logger.error("Error analyzing production waste", tenant_id=str(tenant_id), error=str(e)) raise HTTPException( status_code=500, detail="Failed to analyze production waste" ) @router.get( route_builder.build_analytics_route("cost-analysis"), response_model=dict ) @analytics_tier_required async def get_production_cost_analysis( tenant_id: UUID = Path(...), start_date: Optional[date] = Query(None), end_date: Optional[date] = Query(None), product_id: Optional[UUID] = Query(None), current_user: dict = Depends(get_current_user_dep), production_service: ProductionService = Depends(get_production_service) ): """ Analyze production costs (Professional/Enterprise only) Metrics: - Cost per unit - Direct vs indirect costs - Cost trends over time - Cost variance analysis - Profitability insights """ try: if not end_date: end_date = datetime.now().date() if not start_date: start_date = end_date - timedelta(days=30) # Use existing method: get_batch_statistics for cost-related data batch_stats = await production_service.get_batch_statistics( tenant_id, start_date, end_date ) cost_analysis = { "batch_statistics": batch_stats, "cost_metrics": { "note": "Cost analysis requires additional cost tracking data", "available_metrics": ["batch_count", "production_volume", "efficiency"] }, "period": { "start_date": start_date.isoformat(), "end_date": end_date.isoformat() }, "product_filter": str(product_id) if product_id else None } logger.info("Production cost analyzed", tenant_id=str(tenant_id), product_id=str(product_id) if product_id else "all", user_id=current_user.get('user_id')) return cost_analysis except Exception as e: logger.error("Error analyzing production costs", tenant_id=str(tenant_id), error=str(e)) raise HTTPException( status_code=500, detail="Failed to analyze production costs" ) @router.get( route_builder.build_analytics_route("predictive-maintenance"), response_model=dict ) @analytics_tier_required async def get_predictive_maintenance_insights( tenant_id: UUID = Path(...), equipment_id: Optional[UUID] = Query(None), current_user: dict = Depends(get_current_user_dep), production_service: ProductionService = Depends(get_production_service) ): """ Get predictive maintenance insights (Professional/Enterprise only) Provides: - Equipment failure predictions - Maintenance schedule recommendations - Parts replacement forecasts - Downtime risk assessment """ try: # Use existing method: predict_capacity_bottlenecks as proxy for maintenance insights days_ahead = 7 # Predict one week ahead bottlenecks = await production_service.predict_capacity_bottlenecks( tenant_id, days_ahead ) maintenance_insights = { "capacity_bottlenecks": bottlenecks, "maintenance_recommendations": { "note": "Derived from capacity predictions and bottleneck analysis", "days_predicted": days_ahead }, "equipment_filter": str(equipment_id) if equipment_id else None } logger.info("Predictive maintenance insights generated", tenant_id=str(tenant_id), equipment_id=str(equipment_id) if equipment_id else "all", user_id=current_user.get('user_id')) return maintenance_insights except Exception as e: logger.error("Error generating predictive maintenance insights", tenant_id=str(tenant_id), error=str(e)) raise HTTPException( status_code=500, detail="Failed to generate predictive maintenance insights" ) # ===== SUSTAINABILITY / WASTE ANALYTICS ENDPOINT ===== # Called by Inventory Service for sustainability metrics @router.get( "/api/v1/tenants/{tenant_id}/production/waste-analytics", response_model=dict ) async def get_waste_analytics_for_sustainability( tenant_id: UUID = Path(...), start_date: datetime = Query(..., description="Start date for waste analysis"), end_date: datetime = Query(..., description="End date for waste analysis"), production_service: ProductionService = Depends(get_production_service) ): """ Get production waste analytics for sustainability tracking This endpoint is called by the Inventory Service's sustainability module to calculate environmental impact and SDG 12.3 compliance. Does NOT require analytics tier - this is core sustainability data. Returns: - total_production_waste: Sum of waste_quantity from all batches - total_defects: Sum of defect_quantity from all batches - total_planned: Sum of planned_quantity - total_actual: Sum of actual_quantity """ try: waste_data = await production_service.get_waste_analytics( tenant_id, start_date, end_date ) logger.info( "Production waste analytics retrieved for sustainability", tenant_id=str(tenant_id), total_waste=waste_data.get('total_production_waste', 0), start_date=start_date.isoformat(), end_date=end_date.isoformat() ) return waste_data except Exception as e: logger.error( "Error getting waste analytics for sustainability", tenant_id=str(tenant_id), error=str(e) ) raise HTTPException( status_code=500, detail=f"Failed to retrieve waste analytics: {str(e)}" ) @router.get( "/api/v1/tenants/{tenant_id}/production/baseline", response_model=dict ) async def get_baseline_metrics( tenant_id: UUID = Path(...), production_service: ProductionService = Depends(get_production_service) ): """ Get baseline production metrics from first 90 days Used by sustainability service to establish waste baseline for SDG 12.3 compliance tracking. Returns: - waste_percentage: Baseline waste percentage from first 90 days - total_production_kg: Total production in first 90 days - total_waste_kg: Total waste in first 90 days - period: Date range of baseline period """ try: baseline_data = await production_service.get_baseline_metrics(tenant_id) logger.info( "Baseline metrics retrieved", tenant_id=str(tenant_id), baseline_percentage=baseline_data.get('waste_percentage', 0) ) return baseline_data except Exception as e: logger.error( "Error getting baseline metrics", tenant_id=str(tenant_id), error=str(e) ) raise HTTPException( status_code=500, detail=f"Failed to retrieve baseline metrics: {str(e)}" )