Files
bakery-ia/services/production/app/api/quality_templates.py

174 lines
6.6 KiB
Python
Raw Normal View History

2025-09-23 19:24:22 +02:00
# services/production/app/api/quality_templates.py
"""
Quality Check Template API endpoints for production service
"""
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List, Optional
from uuid import UUID
from ..core.database import get_db
from ..models.production import QualityCheckTemplate, ProcessStage
from ..services.quality_template_service import QualityTemplateService
from ..schemas.quality_templates import (
QualityCheckTemplateCreate,
QualityCheckTemplateUpdate,
QualityCheckTemplateResponse,
QualityCheckTemplateList
)
from shared.auth.tenant_access import get_current_tenant_id
router = APIRouter(prefix="/quality-templates", tags=["quality-templates"])
@router.post("", response_model=QualityCheckTemplateResponse, status_code=status.HTTP_201_CREATED)
async def create_quality_template(
template_data: QualityCheckTemplateCreate,
tenant_id: str = Depends(get_current_tenant_id),
db: Session = Depends(get_db)
):
"""Create a new quality check template"""
try:
service = QualityTemplateService(db)
template = await service.create_template(tenant_id, template_data)
return QualityCheckTemplateResponse.from_orm(template)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.get("", response_model=QualityCheckTemplateList)
async def get_quality_templates(
tenant_id: str = Depends(get_current_tenant_id),
stage: Optional[ProcessStage] = Query(None, description="Filter by process stage"),
check_type: Optional[str] = Query(None, description="Filter by check type"),
is_active: Optional[bool] = Query(True, description="Filter by active status"),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db: Session = Depends(get_db)
):
"""Get quality check templates for tenant"""
try:
service = QualityTemplateService(db)
templates, total = await service.get_templates(
tenant_id=tenant_id,
stage=stage,
check_type=check_type,
is_active=is_active,
skip=skip,
limit=limit
)
return QualityCheckTemplateList(
templates=[QualityCheckTemplateResponse.from_orm(t) for t in templates],
total=total,
skip=skip,
limit=limit
)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.get("/{template_id}", response_model=QualityCheckTemplateResponse)
async def get_quality_template(
template_id: UUID,
tenant_id: str = Depends(get_current_tenant_id),
db: Session = Depends(get_db)
):
"""Get a specific quality check template"""
try:
service = QualityTemplateService(db)
template = await service.get_template(tenant_id, template_id)
if not template:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Template not found")
return QualityCheckTemplateResponse.from_orm(template)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.put("/{template_id}", response_model=QualityCheckTemplateResponse)
async def update_quality_template(
template_id: UUID,
template_data: QualityCheckTemplateUpdate,
tenant_id: str = Depends(get_current_tenant_id),
db: Session = Depends(get_db)
):
"""Update a quality check template"""
try:
service = QualityTemplateService(db)
template = await service.update_template(tenant_id, template_id, template_data)
if not template:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Template not found")
return QualityCheckTemplateResponse.from_orm(template)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.delete("/{template_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_quality_template(
template_id: UUID,
tenant_id: str = Depends(get_current_tenant_id),
db: Session = Depends(get_db)
):
"""Delete a quality check template"""
try:
service = QualityTemplateService(db)
success = await service.delete_template(tenant_id, template_id)
if not success:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Template not found")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.get("/stages/{stage}", response_model=QualityCheckTemplateList)
async def get_templates_for_stage(
stage: ProcessStage,
tenant_id: str = Depends(get_current_tenant_id),
is_active: Optional[bool] = Query(True, description="Filter by active status"),
db: Session = Depends(get_db)
):
"""Get quality check templates applicable to a specific process stage"""
try:
service = QualityTemplateService(db)
templates = await service.get_templates_for_stage(tenant_id, stage, is_active)
return QualityCheckTemplateList(
templates=[QualityCheckTemplateResponse.from_orm(t) for t in templates],
total=len(templates),
skip=0,
limit=len(templates)
)
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.post("/{template_id}/duplicate", response_model=QualityCheckTemplateResponse, status_code=status.HTTP_201_CREATED)
async def duplicate_quality_template(
template_id: UUID,
tenant_id: str = Depends(get_current_tenant_id),
db: Session = Depends(get_db)
):
"""Duplicate an existing quality check template"""
try:
service = QualityTemplateService(db)
template = await service.duplicate_template(tenant_id, template_id)
if not template:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Template not found")
return QualityCheckTemplateResponse.from_orm(template)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))