Files
bakery-ia/services/production/app/api/analytics.py

429 lines
14 KiB
Python
Raw Normal View History

2025-10-06 15:27:01 +02:00
# services/production/app/api/analytics.py
"""
Analytics API endpoints for Production Service
Following standardized URL structure: /api/v1/tenants/{tenant_id}/production/analytics/{operation}
Requires: Professional or Enterprise subscription tier
"""
from datetime import date, datetime, timedelta
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Path, Query
import structlog
from shared.auth.decorators import get_current_user_dep
from shared.auth.access_control import analytics_tier_required
from app.services.production_service import ProductionService
from app.core.config import settings
from shared.routing import RouteBuilder
logger = structlog.get_logger()
# Create route builder for consistent URL structure
route_builder = RouteBuilder('production')
router = APIRouter(tags=["production-analytics"])
def get_production_service() -> ProductionService:
"""Dependency injection for production service"""
from app.core.database import database_manager
return ProductionService(database_manager, settings)
# ===== ANALYTICS ENDPOINTS (Professional/Enterprise Only) =====
@router.get(
route_builder.build_analytics_route("equipment-efficiency"),
response_model=dict
)
@analytics_tier_required
async def get_equipment_efficiency(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None, description="Start date for analysis"),
end_date: Optional[date] = Query(None, description="End date for analysis"),
equipment_id: Optional[UUID] = Query(None, description="Filter by equipment"),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze equipment efficiency (Professional/Enterprise only)
Metrics:
- Overall Equipment Effectiveness (OEE)
- Availability rate
- Performance rate
- Quality rate
- Downtime analysis
"""
try:
# Set default dates
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_equipment_efficiency_analytics
efficiency_data = await production_service.get_equipment_efficiency_analytics(tenant_id)
logger.info("Equipment efficiency analyzed",
tenant_id=str(tenant_id),
equipment_id=str(equipment_id) if equipment_id else "all",
user_id=current_user.get('user_id'))
return efficiency_data
except Exception as e:
logger.error("Error analyzing equipment efficiency",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze equipment efficiency"
)
@router.get(
route_builder.build_analytics_route("production-trends"),
response_model=dict
)
@analytics_tier_required
async def get_production_trends(
tenant_id: UUID = Path(...),
days_back: int = Query(90, ge=7, le=365, description="Days to analyze"),
product_id: Optional[UUID] = Query(None, description="Filter by product"),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production trends (Professional/Enterprise only)
Provides:
- Production volume trends
- Batch completion rates
- Cycle time analysis
- Quality trends
- Seasonal patterns
"""
try:
# Use existing methods: get_performance_analytics + get_yield_trends_analytics
end_date_calc = datetime.now().date()
start_date_calc = end_date_calc - timedelta(days=days_back)
performance = await production_service.get_performance_analytics(
tenant_id, start_date_calc, end_date_calc
)
# Map days_back to period string for yield trends
period = "weekly" if days_back <= 30 else "monthly"
yield_trends = await production_service.get_yield_trends_analytics(tenant_id, period)
trends = {
"performance_metrics": performance,
"yield_trends": yield_trends,
"days_analyzed": days_back,
"product_filter": str(product_id) if product_id else None
}
logger.info("Production trends analyzed",
tenant_id=str(tenant_id),
days_analyzed=days_back,
user_id=current_user.get('user_id'))
return trends
except Exception as e:
logger.error("Error analyzing production trends",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze production trends"
)
@router.get(
route_builder.build_analytics_route("capacity-utilization"),
response_model=dict
)
@analytics_tier_required
async def get_capacity_utilization(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production capacity utilization (Professional/Enterprise only)
Metrics:
- Capacity utilization percentage
- Bottleneck identification
- Resource allocation efficiency
- Optimization recommendations
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_capacity_usage_report
utilization = await production_service.get_capacity_usage_report(
tenant_id, start_date, end_date
)
logger.info("Capacity utilization analyzed",
tenant_id=str(tenant_id),
user_id=current_user.get('user_id'))
return utilization
except Exception as e:
logger.error("Error analyzing capacity utilization",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze capacity utilization"
)
@router.get(
route_builder.build_analytics_route("quality-metrics"),
response_model=dict
)
@analytics_tier_required
async def get_quality_metrics(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
product_id: Optional[UUID] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze quality control metrics (Professional/Enterprise only)
Metrics:
- First pass yield
- Defect rates by type
- Quality trends over time
- Root cause analysis
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing methods: get_quality_trends + get_top_defects_analytics
quality_trends = await production_service.get_quality_trends(
tenant_id, start_date, end_date
)
top_defects = await production_service.get_top_defects_analytics(tenant_id)
quality_data = {
"quality_trends": quality_trends,
"top_defects": top_defects,
"period": {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
},
"product_filter": str(product_id) if product_id else None
}
logger.info("Quality metrics analyzed",
tenant_id=str(tenant_id),
user_id=current_user.get('user_id'))
return quality_data
except Exception as e:
logger.error("Error analyzing quality metrics",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze quality metrics"
)
@router.get(
route_builder.build_analytics_route("waste-analysis"),
response_model=dict
)
@analytics_tier_required
async def get_production_waste_analysis(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production waste (Professional/Enterprise only)
Provides:
- Material waste percentages
- Waste by category/product
- Cost impact analysis
- Reduction recommendations
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_batch_statistics to calculate waste from yield data
batch_stats = await production_service.get_batch_statistics(
tenant_id, start_date, end_date
)
# Calculate waste metrics from batch statistics
waste_analysis = {
"batch_statistics": batch_stats,
"waste_metrics": {
"calculated_from": "yield_variance",
"note": "Waste derived from planned vs actual quantity differences"
},
"period": {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
}
}
logger.info("Production waste analyzed",
tenant_id=str(tenant_id),
user_id=current_user.get('user_id'))
return waste_analysis
except Exception as e:
logger.error("Error analyzing production waste",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze production waste"
)
@router.get(
route_builder.build_analytics_route("cost-analysis"),
response_model=dict
)
@analytics_tier_required
async def get_production_cost_analysis(
tenant_id: UUID = Path(...),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
product_id: Optional[UUID] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Analyze production costs (Professional/Enterprise only)
Metrics:
- Cost per unit
- Direct vs indirect costs
- Cost trends over time
- Cost variance analysis
- Profitability insights
"""
try:
if not end_date:
end_date = datetime.now().date()
if not start_date:
start_date = end_date - timedelta(days=30)
# Use existing method: get_batch_statistics for cost-related data
batch_stats = await production_service.get_batch_statistics(
tenant_id, start_date, end_date
)
cost_analysis = {
"batch_statistics": batch_stats,
"cost_metrics": {
"note": "Cost analysis requires additional cost tracking data",
"available_metrics": ["batch_count", "production_volume", "efficiency"]
},
"period": {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
},
"product_filter": str(product_id) if product_id else None
}
logger.info("Production cost analyzed",
tenant_id=str(tenant_id),
product_id=str(product_id) if product_id else "all",
user_id=current_user.get('user_id'))
return cost_analysis
except Exception as e:
logger.error("Error analyzing production costs",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to analyze production costs"
)
@router.get(
route_builder.build_analytics_route("predictive-maintenance"),
response_model=dict
)
@analytics_tier_required
async def get_predictive_maintenance_insights(
tenant_id: UUID = Path(...),
equipment_id: Optional[UUID] = Query(None),
current_user: dict = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Get predictive maintenance insights (Professional/Enterprise only)
Provides:
- Equipment failure predictions
- Maintenance schedule recommendations
- Parts replacement forecasts
- Downtime risk assessment
"""
try:
# Use existing method: predict_capacity_bottlenecks as proxy for maintenance insights
days_ahead = 7 # Predict one week ahead
bottlenecks = await production_service.predict_capacity_bottlenecks(
tenant_id, days_ahead
)
maintenance_insights = {
"capacity_bottlenecks": bottlenecks,
"maintenance_recommendations": {
"note": "Derived from capacity predictions and bottleneck analysis",
"days_predicted": days_ahead
},
"equipment_filter": str(equipment_id) if equipment_id else None
}
logger.info("Predictive maintenance insights generated",
tenant_id=str(tenant_id),
equipment_id=str(equipment_id) if equipment_id else "all",
user_id=current_user.get('user_id'))
return maintenance_insights
except Exception as e:
logger.error("Error generating predictive maintenance insights",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=500,
detail="Failed to generate predictive maintenance insights"
)