Files
bakery-ia/services/inventory/app/api/analytics.py

315 lines
9.9 KiB
Python
Raw Permalink Normal View History

2025-10-06 15:27:01 +02:00
# services/inventory/app/api/analytics.py
"""
Analytics API endpoints for Inventory Service
Following standardized URL structure: /api/v1/tenants/{tenant_id}/inventory/analytics/{operation}
Requires: Professional or Enterprise subscription tier
"""
from datetime import datetime, timedelta
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Path, status
from sqlalchemy.ext.asyncio import AsyncSession
import structlog
from shared.auth.decorators import get_current_user_dep
from shared.auth.access_control import analytics_tier_required
from app.core.database import get_db
from app.services.inventory_service import InventoryService
from app.services.dashboard_service import DashboardService
from app.services.food_safety_service import FoodSafetyService
from app.schemas.dashboard import (
InventoryAnalytics,
BusinessModelInsights,
)
from shared.routing import RouteBuilder
logger = structlog.get_logger()
# Create route builder for consistent URL structure
route_builder = RouteBuilder('inventory')
router = APIRouter(tags=["inventory-analytics"])
# ===== Dependency Injection =====
async def get_dashboard_service(db: AsyncSession = Depends(get_db)) -> DashboardService:
"""Get dashboard service with dependencies"""
return DashboardService(
inventory_service=InventoryService(),
food_safety_service=FoodSafetyService()
)
# ===== ANALYTICS ENDPOINTS (Professional/Enterprise Only) =====
@router.get(
route_builder.build_analytics_route("inventory-insights"),
response_model=InventoryAnalytics
)
@analytics_tier_required
async def get_inventory_analytics(
tenant_id: UUID = Path(...),
days_back: int = Query(30, ge=1, le=365, description="Number of days to analyze"),
current_user: dict = Depends(get_current_user_dep),
dashboard_service: DashboardService = Depends(get_dashboard_service),
db: AsyncSession = Depends(get_db)
):
"""
Get advanced inventory analytics (Professional/Enterprise only)
Provides:
- Stock turnover rates
- Inventory valuation trends
- ABC analysis
- Stockout risk predictions
- Seasonal patterns
"""
try:
analytics = await dashboard_service.get_inventory_analytics(db, tenant_id, days_back)
logger.info("Inventory analytics retrieved",
tenant_id=str(tenant_id),
days_analyzed=days_back,
user_id=current_user.get('user_id'))
return analytics
except Exception as e:
logger.error("Error getting inventory analytics",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve inventory analytics"
)
@router.get(
route_builder.build_analytics_route("business-model"),
response_model=BusinessModelInsights
)
@analytics_tier_required
async def get_business_model_insights(
tenant_id: UUID = Path(...),
current_user: dict = Depends(get_current_user_dep),
dashboard_service: DashboardService = Depends(get_dashboard_service),
db: AsyncSession = Depends(get_db)
):
"""
Get business model insights based on inventory patterns (Professional/Enterprise only)
Analyzes inventory patterns to provide insights on:
- Detected business model (retail, wholesale, production, etc.)
- Product mix recommendations
- Inventory optimization suggestions
"""
try:
insights = await dashboard_service.get_business_model_insights(db, tenant_id)
logger.info("Business model insights retrieved",
tenant_id=str(tenant_id),
detected_model=insights.detected_model,
user_id=current_user.get('user_id'))
return insights
except Exception as e:
logger.error("Error getting business model insights",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve business model insights"
)
@router.get(
route_builder.build_analytics_route("turnover-rate"),
response_model=dict
)
@analytics_tier_required
async def get_inventory_turnover_rate(
tenant_id: UUID = Path(...),
start_date: Optional[datetime] = Query(None, description="Start date for analysis"),
end_date: Optional[datetime] = Query(None, description="End date for analysis"),
category: Optional[str] = Query(None, description="Filter by category"),
current_user: dict = Depends(get_current_user_dep),
db: AsyncSession = Depends(get_db)
):
"""
Calculate inventory turnover rate (Professional/Enterprise only)
Metrics:
- Overall turnover rate
- By category
- By product
- Trend analysis
"""
try:
service = InventoryService()
# Set default dates if not provided
if not end_date:
end_date = datetime.now()
if not start_date:
start_date = end_date - timedelta(days=90)
# Calculate turnover metrics
turnover_data = await service.calculate_turnover_rate(
tenant_id,
start_date,
end_date,
category
)
logger.info("Turnover rate calculated",
tenant_id=str(tenant_id),
category=category,
user_id=current_user.get('user_id'))
return turnover_data
except Exception as e:
logger.error("Error calculating turnover rate",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to calculate turnover rate"
)
@router.get(
route_builder.build_analytics_route("abc-analysis"),
response_model=dict
)
@analytics_tier_required
async def get_abc_analysis(
tenant_id: UUID = Path(...),
days_back: int = Query(90, ge=30, le=365, description="Days to analyze"),
current_user: dict = Depends(get_current_user_dep),
db: AsyncSession = Depends(get_db)
):
"""
Perform ABC analysis on inventory (Professional/Enterprise only)
Categorizes inventory items by:
- A: High-value items requiring tight control
- B: Moderate-value items with moderate control
- C: Low-value items with simple control
"""
try:
service = InventoryService()
abc_analysis = await service.perform_abc_analysis(tenant_id, days_back)
logger.info("ABC analysis completed",
tenant_id=str(tenant_id),
days_analyzed=days_back,
user_id=current_user.get('user_id'))
return abc_analysis
except Exception as e:
logger.error("Error performing ABC analysis",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to perform ABC analysis"
)
@router.get(
route_builder.build_analytics_route("stockout-predictions"),
response_model=dict
)
@analytics_tier_required
async def get_stockout_predictions(
tenant_id: UUID = Path(...),
forecast_days: int = Query(30, ge=7, le=90, description="Days to forecast"),
current_user: dict = Depends(get_current_user_dep),
db: AsyncSession = Depends(get_db)
):
"""
Predict potential stockouts (Professional/Enterprise only)
Provides:
- Items at risk of stockout
- Predicted stockout dates
- Recommended reorder quantities
- Lead time considerations
"""
try:
service = InventoryService()
predictions = await service.predict_stockouts(tenant_id, forecast_days)
logger.info("Stockout predictions generated",
tenant_id=str(tenant_id),
forecast_days=forecast_days,
at_risk_items=len(predictions.get('items_at_risk', [])),
user_id=current_user.get('user_id'))
return predictions
except Exception as e:
logger.error("Error predicting stockouts",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to predict stockouts"
)
@router.get(
route_builder.build_analytics_route("waste-analysis"),
response_model=dict
)
@analytics_tier_required
async def get_waste_analysis(
tenant_id: UUID = Path(...),
start_date: Optional[datetime] = Query(None, description="Start date"),
end_date: Optional[datetime] = Query(None, description="End date"),
current_user: dict = Depends(get_current_user_dep),
db: AsyncSession = Depends(get_db)
):
"""
Analyze inventory waste and expiration (Professional/Enterprise only)
Metrics:
- Total waste value
- Waste by category
- Expiration patterns
- Optimization recommendations
"""
try:
service = InventoryService()
# Set default dates
if not end_date:
end_date = datetime.now()
if not start_date:
start_date = end_date - timedelta(days=30)
waste_analysis = await service.analyze_waste(tenant_id, start_date, end_date)
logger.info("Waste analysis completed",
tenant_id=str(tenant_id),
total_waste_value=waste_analysis.get('total_waste_value', 0),
user_id=current_user.get('user_id'))
return waste_analysis
except Exception as e:
logger.error("Error analyzing waste",
tenant_id=str(tenant_id),
error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to analyze waste"
)