Files

168 lines
6.1 KiB
Python
Raw Permalink Normal View History

2025-12-05 20:07:01 +01:00
# services/production/app/api/batch.py
"""
Production Batch API - Batch operations for enterprise dashboards
Phase 2 optimization: Eliminate N+1 query patterns by fetching production data
for multiple tenants in a single request.
"""
from fastapi import APIRouter, Depends, HTTPException, Body
from typing import List, Dict, Any
from uuid import UUID
from pydantic import BaseModel, Field
import structlog
import asyncio
2025-12-13 23:57:54 +01:00
from fastapi import Request
2025-12-05 20:07:01 +01:00
from app.services.production_service import ProductionService
from app.core.config import settings
from shared.auth.decorators import get_current_user_dep
router = APIRouter(tags=["production-batch"])
logger = structlog.get_logger()
2025-12-13 23:57:54 +01:00
def get_production_service(request: Request) -> ProductionService:
2025-12-05 20:07:01 +01:00
"""Dependency injection for production service"""
from app.core.database import database_manager
2025-12-13 23:57:54 +01:00
notification_service = getattr(request.app.state, 'notification_service', None)
return ProductionService(database_manager, settings, notification_service)
2025-12-05 20:07:01 +01:00
class ProductionSummaryBatchRequest(BaseModel):
"""Request model for batch production summary"""
tenant_ids: List[str] = Field(..., description="List of tenant IDs", max_length=100)
class ProductionSummary(BaseModel):
"""Production summary for a single tenant"""
tenant_id: str
total_batches: int
pending_batches: int
in_progress_batches: int
completed_batches: int
on_hold_batches: int
cancelled_batches: int
total_planned_quantity: float
total_actual_quantity: float
efficiency_rate: float
@router.post("/batch/production-summary", response_model=Dict[str, ProductionSummary])
async def get_production_summary_batch(
request: ProductionSummaryBatchRequest = Body(...),
current_user: Dict[str, Any] = Depends(get_current_user_dep),
production_service: ProductionService = Depends(get_production_service)
):
"""
Get production summary for multiple tenants in a single request.
Optimized for enterprise dashboards to eliminate N+1 query patterns.
Fetches production data for all tenants in parallel.
Args:
request: Batch request with tenant IDs
Returns:
Dictionary mapping tenant_id -> production summary
Example:
POST /api/v1/production/batch/production-summary
{
"tenant_ids": ["tenant-1", "tenant-2", "tenant-3"]
}
Response:
{
"tenant-1": {"tenant_id": "tenant-1", "total_batches": 25, ...},
"tenant-2": {"tenant_id": "tenant-2", "total_batches": 18, ...},
"tenant-3": {"tenant_id": "tenant-3", "total_batches": 32, ...}
}
"""
try:
if len(request.tenant_ids) > 100:
raise HTTPException(
status_code=400,
detail="Maximum 100 tenant IDs allowed per batch request"
)
if not request.tenant_ids:
return {}
logger.info(
"Batch fetching production summaries",
tenant_count=len(request.tenant_ids)
)
async def fetch_tenant_production(tenant_id: str) -> tuple[str, ProductionSummary]:
"""Fetch production summary for a single tenant"""
try:
tenant_uuid = UUID(tenant_id)
summary = await production_service.get_dashboard_summary(tenant_uuid)
# Calculate efficiency rate
efficiency_rate = 0.0
if summary.total_planned_quantity > 0 and summary.total_actual_quantity is not None:
efficiency_rate = (summary.total_actual_quantity / summary.total_planned_quantity) * 100
return tenant_id, ProductionSummary(
tenant_id=tenant_id,
total_batches=int(summary.total_batches or 0),
pending_batches=int(summary.pending_batches or 0),
in_progress_batches=int(summary.in_progress_batches or 0),
completed_batches=int(summary.completed_batches or 0),
on_hold_batches=int(summary.on_hold_batches or 0),
cancelled_batches=int(summary.cancelled_batches or 0),
total_planned_quantity=float(summary.total_planned_quantity or 0),
total_actual_quantity=float(summary.total_actual_quantity or 0),
efficiency_rate=efficiency_rate
)
except Exception as e:
logger.warning(
"Failed to fetch production for tenant in batch",
tenant_id=tenant_id,
error=str(e)
)
return tenant_id, ProductionSummary(
tenant_id=tenant_id,
total_batches=0,
pending_batches=0,
in_progress_batches=0,
completed_batches=0,
on_hold_batches=0,
cancelled_batches=0,
total_planned_quantity=0.0,
total_actual_quantity=0.0,
efficiency_rate=0.0
)
# Fetch all tenant production data in parallel
tasks = [fetch_tenant_production(tid) for tid in request.tenant_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Build result dictionary
result_dict = {}
for result in results:
if isinstance(result, Exception):
logger.error("Exception in batch production fetch", error=str(result))
continue
tenant_id, summary = result
result_dict[tenant_id] = summary
logger.info(
"Batch production summaries retrieved",
requested_count=len(request.tenant_ids),
successful_count=len(result_dict)
)
return result_dict
except HTTPException:
raise
except Exception as e:
logger.error("Error in batch production summary", error=str(e), exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to fetch batch production summaries: {str(e)}"
)