Files
bakery-ia/services/production/app/api/analytics.py

529 lines
17 KiB
Python
Raw Permalink Normal View History

2025-10-06 15:27:01 +02:00
# services/production/app/api/analytics.py
"""
Analytics API endpoints for Production Service
Following standardized URL structure: /api/v1/tenants/{tenant_id}/production/analytics/{operation}
Requires: Professional or Enterprise subscription tier
"""
from datetime import date, datetime, timedelta
from typing import Optional
from uuid import UUID
2025-12-13 23:57:54 +01:00
from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request
2025-10-06 15:27:01 +02:00
import structlog
from shared.auth.decorators import get_current_user_dep
from shared.auth.access_control import analytics_tier_required
from app.services.production_service import ProductionService
from app.core.config import settings
from shared.routing import RouteBuilder
logger = structlog.get_logger()
# Create route builder for consistent URL structure
route_builder = RouteBuilder('production')
router = APIRouter(tags=["production-analytics"])
2025-12-13 23:57:54 +01:00
def get_production_service(request: Request) -> ProductionService:
2025-10-06 15:27:01 +02:00
"""Dependency injection for production service"""
from app.core.database import database_manager
2025-12-13 23:57:54 +01:00
notification_service = getattr(request.app.state, 'notification_service', None)
return ProductionService(database_manager, settings, notification_service)
2025-10-06 15:27:01 +02:00
# ===== ANALYTICS ENDPOINTS (Professional/Enterprise Only) =====
@router.get(
route_builder.build_analytics_route("equipment-efficiency"),
response_model=dict
)
@analytics_tier_required
async def get_equipment_efficiency(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None, description="Start date for analysis"),
end_date: Optional[date] = Query(None, description="End date for analysis"),
equipment_id: Optional[UUID] = Query(None, description="Filter by equipment"),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze equipment efficiency (Professional/Enterprise only)
Metrics:
- Overall Equipment Effectiveness (OEE)
- Availability rate
- Performance rate
- Quality rate
- Downtime analysis
"""
try:
# Set default dates
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_equipment_efficiency_analytics
efficiency_data = await production_service.get_equipment_efficiency_analytics(tenant_id)
logger.info("Equipment efficiency analyzed",
tenant_id=str(tenant_id),
equipment_id=str(equipment_id) if equipment_id else "all",
user_id=current_user.get('user_id'))
return efficiency_data
except Exception as e:
logger.error("Error analyzing equipment efficiency",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze equipment efficiency"
)
@router.get(
route_builder.build_analytics_route("production-trends"),
response_model=dict
)
@analytics_tier_required
async def get_production_trends(
tenant_id: UUID = Path(...),
days_back: int = Query(90, ge=7, le=365, description="Days to analyze"),
product_id: Optional[UUID] = Query(None, description="Filter by product"),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production trends (Professional/Enterprise only)
Provides:
- Production volume trends
- Batch completion rates
- Cycle time analysis
- Quality trends
- Seasonal patterns
"""
try:
# Use existing methods: get_performance_analytics + get_yield_trends_analytics
end_date_calc = datetime.now().date()
start_date_calc = end_date_calc - timedelta(days=days_back)
performance = await production_service.get_performance_analytics(
tenant_id, start_date_calc, end_date_calc
)
# Map days_back to period string for yield trends
period = "weekly" if days_back <= 30 else "monthly"
yield_trends = await production_service.get_yield_trends_analytics(tenant_id, period)
trends = {
"performance_metrics": performance,
"yield_trends": yield_trends,
"days_analyzed": days_back,
"product_filter": str(product_id) if product_id else None
}
logger.info("Production trends analyzed",
tenant_id=str(tenant_id),
days_analyzed=days_back,
user_id=current_user.get('user_id'))
return trends
except Exception as e:
logger.error("Error analyzing production trends",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze production trends"
)
@router.get(
route_builder.build_analytics_route("capacity-utilization"),
response_model=dict
)
@analytics_tier_required
async def get_capacity_utilization(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production capacity utilization (Professional/Enterprise only)
Metrics:
- Capacity utilization percentage
- Bottleneck identification
- Resource allocation efficiency
- Optimization recommendations
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_capacity_usage_report
utilization = await production_service.get_capacity_usage_report(
tenant_id, start_date, end_date
)
logger.info("Capacity utilization analyzed",
tenant_id=str(tenant_id),
user_id=current_user.get('user_id'))
return utilization
except Exception as e:
logger.error("Error analyzing capacity utilization",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze capacity utilization"
)
@router.get(
route_builder.build_analytics_route("quality-metrics"),
response_model=dict
)
@analytics_tier_required
async def get_quality_metrics(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
product_id: Optional[UUID] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze quality control metrics (Professional/Enterprise only)
Metrics:
- First pass yield
- Defect rates by type
- Quality trends over time
- Root cause analysis
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing methods: get_quality_trends + get_top_defects_analytics
quality_trends = await production_service.get_quality_trends(
tenant_id, start_date, end_date
)
top_defects = await production_service.get_top_defects_analytics(tenant_id)
quality_data = {
"quality_trends": quality_trends,
"top_defects": top_defects,
"period": {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
},
"product_filter": str(product_id) if product_id else None
}
logger.info("Quality metrics analyzed",
tenant_id=str(tenant_id),
user_id=current_user.get('user_id'))
return quality_data
except Exception as e:
logger.error("Error analyzing quality metrics",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze quality metrics"
)
@router.get(
route_builder.build_analytics_route("waste-analysis"),
response_model=dict
)
@analytics_tier_required
async def get_production_waste_analysis(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production waste (Professional/Enterprise only)
Provides:
- Material waste percentages
- Waste by category/product
- Cost impact analysis
- Reduction recommendations
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_batch_statistics to calculate waste from yield data
batch_stats = await production_service.get_batch_statistics(
tenant_id, start_date, end_date
)
# Calculate waste metrics from batch statistics
waste_analysis = {
"batch_statistics": batch_stats,
"waste_metrics": {
"calculated_from": "yield_variance",
"note": "Waste derived from planned vs actual quantity differences"
},
"period": {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
}
}
logger.info("Production waste analyzed",
tenant_id=str(tenant_id),
user_id=current_user.get('user_id'))
return waste_analysis
except Exception as e:
logger.error("Error analyzing production waste",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze production waste"
)
@router.get(
route_builder.build_analytics_route("cost-analysis"),
response_model=dict
)
@analytics_tier_required
async def get_production_cost_analysis(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
product_id: Optional[UUID] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production costs (Professional/Enterprise only)
Metrics:
- Cost per unit
- Direct vs indirect costs
- Cost trends over time
- Cost variance analysis
- Profitability insights
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_batch_statistics for cost-related data
batch_stats = await production_service.get_batch_statistics(
tenant_id, start_date, end_date
)
cost_analysis = {
"batch_statistics": batch_stats,
"cost_metrics": {
"note": "Cost analysis requires additional cost tracking data",
"available_metrics": ["batch_count", "production_volume", "efficiency"]
},
"period": {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
},
"product_filter": str(product_id) if product_id else None
}
logger.info("Production cost analyzed",
tenant_id=str(tenant_id),
product_id=str(product_id) if product_id else "all",
user_id=current_user.get('user_id'))
return cost_analysis
except Exception as e:
logger.error("Error analyzing production costs",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze production costs"
)
@router.get(
route_builder.build_analytics_route("predictive-maintenance"),
response_model=dict
)
@analytics_tier_required
async def get_predictive_maintenance_insights(
tenant_id: UUID = Path(...),
equipment_id: Optional[UUID] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Get predictive maintenance insights (Professional/Enterprise only)
Provides:
- Equipment failure predictions
- Maintenance schedule recommendations
- Parts replacement forecasts
- Downtime risk assessment
"""
try:
# Use existing method: predict_capacity_bottlenecks as proxy for maintenance insights
days_ahead = 7 # Predict one week ahead
bottlenecks = await production_service.predict_capacity_bottlenecks(
tenant_id, days_ahead
)
maintenance_insights = {
"capacity_bottlenecks": bottlenecks,
"maintenance_recommendations": {
"note": "Derived from capacity predictions and bottleneck analysis",
"days_predicted": days_ahead
},
"equipment_filter": str(equipment_id) if equipment_id else None
}
logger.info("Predictive maintenance insights generated",
tenant_id=str(tenant_id),
equipment_id=str(equipment_id) if equipment_id else "all",
user_id=current_user.get('user_id'))
return maintenance_insights
except Exception as e:
logger.error("Error generating predictive maintenance insights",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to generate predictive maintenance insights"
)
# ===== SUSTAINABILITY / WASTE ANALYTICS ENDPOINT =====
# Called by Inventory Service for sustainability metrics
@router.get(
"/api/v1/tenants/{tenant_id}/production/waste-analytics",
response_model=dict
)
async def get_waste_analytics_for_sustainability(
tenant_id: UUID = Path(...),
start_date: datetime = Query(..., description="Start date for waste analysis"),
end_date: datetime = Query(..., description="End date for waste analysis"),
production_service: ProductionService = Depends(get_production_service)
):
"""
Get production waste analytics for sustainability tracking
This endpoint is called by the Inventory Service's sustainability module
to calculate environmental impact and SDG 12.3 compliance.
Does NOT require analytics tier - this is core sustainability data.
Returns:
- total_production_waste: Sum of waste_quantity from all batches
- total_defects: Sum of defect_quantity from all batches
- total_planned: Sum of planned_quantity
- total_actual: Sum of actual_quantity
"""
try:
waste_data = await production_service.get_waste_analytics(
tenant_id,
start_date,
end_date
)
logger.info(
"Production waste analytics retrieved for sustainability",
tenant_id=str(tenant_id),
total_waste=waste_data.get('total_production_waste', 0),
start_date=start_date.isoformat(),
end_date=end_date.isoformat()
)
return waste_data
except Exception as e:
logger.error(
"Error getting waste analytics for sustainability",
tenant_id=str(tenant_id),
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve waste analytics: {str(e)}"
)
@router.get(
"/api/v1/tenants/{tenant_id}/production/baseline",
response_model=dict
)
async def get_baseline_metrics(
tenant_id: UUID = Path(...),
production_service: ProductionService = Depends(get_production_service)
):
"""
Get baseline production metrics from first 90 days
Used by sustainability service to establish waste baseline
for SDG 12.3 compliance tracking.
Returns:
- waste_percentage: Baseline waste percentage from first 90 days
- total_production_kg: Total production in first 90 days
- total_waste_kg: Total waste in first 90 days
- period: Date range of baseline period
"""
try:
baseline_data = await production_service.get_baseline_metrics(tenant_id)
logger.info(
"Baseline metrics retrieved",
tenant_id=str(tenant_id),
baseline_percentage=baseline_data.get('waste_percentage', 0)
)
return baseline_data
except Exception as e:
logger.error(
"Error getting baseline metrics",
tenant_id=str(tenant_id),
error=str(e)
)
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve baseline metrics: {str(e)}"
)