Files
bakery-ia/services/production/app/api/analytics.py
2025-10-23 07:44:54 +02:00

528 lines
17 KiB
Python

# services/production/app/api/analytics.py
"""
Analytics API endpoints for Production Service
Following standardized URL structure: /api/v1/tenants/{tenant_id}/production/analytics/{operation}
Requires: Professional or Enterprise subscription tier
"""
from datetime import date, datetime, timedelta
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Path, Query
import structlog
from shared.auth.decorators import get_current_user_dep
from shared.auth.access_control import analytics_tier_required
from app.services.production_service import ProductionService
from app.core.config import settings
from shared.routing import RouteBuilder
logger = structlog.get_logger()
# Create route builder for consistent URL structure
route_builder = RouteBuilder('production')
router = APIRouter(tags=["production-analytics"])
def get_production_service() -> ProductionService:
"""Dependency injection for production service"""
from app.core.database import database_manager
return ProductionService(database_manager, settings)
# ===== ANALYTICS ENDPOINTS (Professional/Enterprise Only) =====
@router.get(
route_builder.build_analytics_route("equipment-efficiency"),
response_model=dict
)
@analytics_tier_required
async def get_equipment_efficiency(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None, description="Start date for analysis"),
end_date: Optional[date] = Query(None, description="End date for analysis"),
equipment_id: Optional[UUID] = Query(None, description="Filter by equipment"),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze equipment efficiency (Professional/Enterprise only)
Metrics:
- Overall Equipment Effectiveness (OEE)
- Availability rate
- Performance rate
- Quality rate
- Downtime analysis
"""
try:
# Set default dates
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_equipment_efficiency_analytics
efficiency_data = await production_service.get_equipment_efficiency_analytics(tenant_id)
logger.info("Equipment efficiency analyzed",
tenant_id=str(tenant_id),
equipment_id=str(equipment_id) if equipment_id else "all",
user_id=current_user.get('user_id'))
return efficiency_data
except Exception as e:
logger.error("Error analyzing equipment efficiency",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze equipment efficiency"
)
@router.get(
route_builder.build_analytics_route("production-trends"),
response_model=dict
)
@analytics_tier_required
async def get_production_trends(
tenant_id: UUID = Path(...),
days_back: int = Query(90, ge=7, le=365, description="Days to analyze"),
product_id: Optional[UUID] = Query(None, description="Filter by product"),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production trends (Professional/Enterprise only)
Provides:
- Production volume trends
- Batch completion rates
- Cycle time analysis
- Quality trends
- Seasonal patterns
"""
try:
# Use existing methods: get_performance_analytics + get_yield_trends_analytics
end_date_calc = datetime.now().date()
start_date_calc = end_date_calc - timedelta(days=days_back)
performance = await production_service.get_performance_analytics(
tenant_id, start_date_calc, end_date_calc
)
# Map days_back to period string for yield trends
period = "weekly" if days_back <= 30 else "monthly"
yield_trends = await production_service.get_yield_trends_analytics(tenant_id, period)
trends = {
"performance_metrics": performance,
"yield_trends": yield_trends,
"days_analyzed": days_back,
"product_filter": str(product_id) if product_id else None
}
logger.info("Production trends analyzed",
tenant_id=str(tenant_id),
days_analyzed=days_back,
user_id=current_user.get('user_id'))
return trends
except Exception as e:
logger.error("Error analyzing production trends",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze production trends"
)
@router.get(
route_builder.build_analytics_route("capacity-utilization"),
response_model=dict
)
@analytics_tier_required
async def get_capacity_utilization(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production capacity utilization (Professional/Enterprise only)
Metrics:
- Capacity utilization percentage
- Bottleneck identification
- Resource allocation efficiency
- Optimization recommendations
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_capacity_usage_report
utilization = await production_service.get_capacity_usage_report(
tenant_id, start_date, end_date
)
logger.info("Capacity utilization analyzed",
tenant_id=str(tenant_id),
user_id=current_user.get('user_id'))
return utilization
except Exception as e:
logger.error("Error analyzing capacity utilization",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze capacity utilization"
)
@router.get(
route_builder.build_analytics_route("quality-metrics"),
response_model=dict
)
@analytics_tier_required
async def get_quality_metrics(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
product_id: Optional[UUID] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze quality control metrics (Professional/Enterprise only)
Metrics:
- First pass yield
- Defect rates by type
- Quality trends over time
- Root cause analysis
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing methods: get_quality_trends + get_top_defects_analytics
quality_trends = await production_service.get_quality_trends(
tenant_id, start_date, end_date
)
top_defects = await production_service.get_top_defects_analytics(tenant_id)
quality_data = {
"quality_trends": quality_trends,
"top_defects": top_defects,
"period": {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
},
"product_filter": str(product_id) if product_id else None
}
logger.info("Quality metrics analyzed",
tenant_id=str(tenant_id),
user_id=current_user.get('user_id'))
return quality_data
except Exception as e:
logger.error("Error analyzing quality metrics",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze quality metrics"
)
@router.get(
route_builder.build_analytics_route("waste-analysis"),
response_model=dict
)
@analytics_tier_required
async def get_production_waste_analysis(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production waste (Professional/Enterprise only)
Provides:
- Material waste percentages
- Waste by category/product
- Cost impact analysis
- Reduction recommendations
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_batch_statistics to calculate waste from yield data
batch_stats = await production_service.get_batch_statistics(
tenant_id, start_date, end_date
)
# Calculate waste metrics from batch statistics
waste_analysis = {
"batch_statistics": batch_stats,
"waste_metrics": {
"calculated_from": "yield_variance",
"note": "Waste derived from planned vs actual quantity differences"
},
"period": {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
}
}
logger.info("Production waste analyzed",
tenant_id=str(tenant_id),
user_id=current_user.get('user_id'))
return waste_analysis
except Exception as e:
logger.error("Error analyzing production waste",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze production waste"
)
@router.get(
route_builder.build_analytics_route("cost-analysis"),
response_model=dict
)
@analytics_tier_required
async def get_production_cost_analysis(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
product_id: Optional[UUID] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production costs (Professional/Enterprise only)
Metrics:
- Cost per unit
- Direct vs indirect costs
- Cost trends over time
- Cost variance analysis
- Profitability insights
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_batch_statistics for cost-related data
batch_stats = await production_service.get_batch_statistics(
tenant_id, start_date, end_date
)
cost_analysis = {
"batch_statistics": batch_stats,
"cost_metrics": {
"note": "Cost analysis requires additional cost tracking data",
"available_metrics": ["batch_count", "production_volume", "efficiency"]
},
"period": {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
},
"product_filter": str(product_id) if product_id else None
}
logger.info("Production cost analyzed",
tenant_id=str(tenant_id),
product_id=str(product_id) if product_id else "all",
user_id=current_user.get('user_id'))
return cost_analysis
except Exception as e:
logger.error("Error analyzing production costs",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze production costs"
)
@router.get(
route_builder.build_analytics_route("predictive-maintenance"),
response_model=dict
)
@analytics_tier_required
async def get_predictive_maintenance_insights(
tenant_id: UUID = Path(...),
equipment_id: Optional[UUID] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Get predictive maintenance insights (Professional/Enterprise only)
Provides:
- Equipment failure predictions
- Maintenance schedule recommendations
- Parts replacement forecasts
- Downtime risk assessment
"""
try:
# Use existing method: predict_capacity_bottlenecks as proxy for maintenance insights
days_ahead = 7 # Predict one week ahead
bottlenecks = await production_service.predict_capacity_bottlenecks(
tenant_id, days_ahead
)
maintenance_insights = {
"capacity_bottlenecks": bottlenecks,
"maintenance_recommendations": {
"note": "Derived from capacity predictions and bottleneck analysis",
"days_predicted": days_ahead
},
"equipment_filter": str(equipment_id) if equipment_id else None
}
logger.info("Predictive maintenance insights generated",
tenant_id=str(tenant_id),
equipment_id=str(equipment_id) if equipment_id else "all",
user_id=current_user.get('user_id'))
return maintenance_insights
except Exception as e:
logger.error("Error generating predictive maintenance insights",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to generate predictive maintenance insights"
)
# ===== SUSTAINABILITY / WASTE ANALYTICS ENDPOINT =====
# Called by Inventory Service for sustainability metrics
@router.get(
"/api/v1/tenants/{tenant_id}/production/waste-analytics",
response_model=dict
)
async def get_waste_analytics_for_sustainability(
tenant_id: UUID = Path(...),
start_date: datetime = Query(..., description="Start date for waste analysis"),
end_date: datetime = Query(..., description="End date for waste analysis"),
production_service: ProductionService = Depends(get_production_service)
):
"""
Get production waste analytics for sustainability tracking
This endpoint is called by the Inventory Service's sustainability module
to calculate environmental impact and SDG 12.3 compliance.
Does NOT require analytics tier - this is core sustainability data.
Returns:
- total_production_waste: Sum of waste_quantity from all batches
- total_defects: Sum of defect_quantity from all batches
- total_planned: Sum of planned_quantity
- total_actual: Sum of actual_quantity
"""
try:
waste_data = await production_service.get_waste_analytics(
tenant_id,
start_date,
end_date
)
logger.info(
"Production waste analytics retrieved for sustainability",
tenant_id=str(tenant_id),
total_waste=waste_data.get('total_production_waste', 0),
start_date=start_date.isoformat(),
end_date=end_date.isoformat()
)
return waste_data
except Exception as e:
logger.error(
"Error getting waste analytics for sustainability",
tenant_id=str(tenant_id),
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve waste analytics: {str(e)}"
)
@router.get(
"/api/v1/tenants/{tenant_id}/production/baseline",
response_model=dict
)
async def get_baseline_metrics(
tenant_id: UUID = Path(...),
production_service: ProductionService = Depends(get_production_service)
):
"""
Get baseline production metrics from first 90 days
Used by sustainability service to establish waste baseline
for SDG 12.3 compliance tracking.
Returns:
- waste_percentage: Baseline waste percentage from first 90 days
- total_production_kg: Total production in first 90 days
- total_waste_kg: Total waste in first 90 days
- period: Date range of baseline period
"""
try:
baseline_data = await production_service.get_baseline_metrics(tenant_id)
logger.info(
"Baseline metrics retrieved",
tenant_id=str(tenant_id),
baseline_percentage=baseline_data.get('waste_percentage', 0)
)
return baseline_data
except Exception as e:
logger.error(
"Error getting baseline metrics",
tenant_id=str(tenant_id),
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve baseline metrics: {str(e)}"
)