77 lines
2.7 KiB
Python
77 lines
2.7 KiB
Python
# services/production/app/api/production_dashboard.py
|
|
"""
|
|
Production Dashboard API - Dashboard endpoints for production overview
|
|
"""
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Path, Query
|
|
from typing import Optional
|
|
from datetime import date, datetime
|
|
from uuid import UUID
|
|
import structlog
|
|
|
|
from shared.auth.decorators import get_current_user_dep
|
|
from shared.routing import RouteBuilder
|
|
from app.services.production_service import ProductionService
|
|
from app.schemas.production import ProductionDashboardSummary
|
|
from app.core.config import settings
|
|
|
|
logger = structlog.get_logger()
|
|
route_builder = RouteBuilder('production')
|
|
router = APIRouter(tags=["production-dashboard"])
|
|
|
|
|
|
def get_production_service() -> ProductionService:
|
|
"""Dependency injection for production service"""
|
|
from app.core.database import database_manager
|
|
return ProductionService(database_manager, settings)
|
|
|
|
|
|
@router.get(
|
|
route_builder.build_dashboard_route("summary"),
|
|
response_model=ProductionDashboardSummary
|
|
)
|
|
async def get_dashboard_summary(
|
|
tenant_id: UUID = Path(...),
|
|
current_user: dict = Depends(get_current_user_dep),
|
|
production_service: ProductionService = Depends(get_production_service)
|
|
):
|
|
"""Get production dashboard summary"""
|
|
try:
|
|
summary = await production_service.get_dashboard_summary(tenant_id)
|
|
|
|
logger.info("Retrieved production dashboard summary",
|
|
tenant_id=str(tenant_id))
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting dashboard summary",
|
|
error=str(e), tenant_id=str(tenant_id))
|
|
raise HTTPException(status_code=500, detail="Failed to get dashboard summary")
|
|
|
|
|
|
@router.get(
|
|
route_builder.build_dashboard_route("requirements"),
|
|
response_model=dict
|
|
)
|
|
async def get_production_requirements(
|
|
tenant_id: UUID = Path(...),
|
|
date: Optional[date] = Query(None, description="Target date for production requirements"),
|
|
current_user: dict = Depends(get_current_user_dep),
|
|
production_service: ProductionService = Depends(get_production_service)
|
|
):
|
|
"""Get production requirements for procurement planning"""
|
|
try:
|
|
target_date = date or datetime.now().date()
|
|
requirements = await production_service.get_production_requirements(tenant_id, target_date)
|
|
|
|
logger.info("Retrieved production requirements for procurement",
|
|
tenant_id=str(tenant_id), date=target_date.isoformat())
|
|
|
|
return requirements
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting production requirements",
|
|
error=str(e), tenant_id=str(tenant_id))
|
|
raise HTTPException(status_code=500, detail="Failed to get production requirements")
|