Files
2025-12-05 20:07:01 +01:00

420 lines
13 KiB
Python

"""API endpoints for AI Insights."""
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional
from uuid import UUID
from datetime import datetime
import math
from app.core.database import get_db
from app.repositories.insight_repository import InsightRepository
from app.repositories.feedback_repository import FeedbackRepository
from app.schemas.insight import (
AIInsightCreate,
AIInsightUpdate,
AIInsightResponse,
AIInsightList,
InsightMetrics,
InsightFilters
)
from app.schemas.feedback import InsightFeedbackCreate, InsightFeedbackResponse
router = APIRouter()
@router.post("/tenants/{tenant_id}/insights", response_model=AIInsightResponse, status_code=status.HTTP_201_CREATED)
async def create_insight(
tenant_id: UUID,
insight_data: AIInsightCreate,
db: AsyncSession = Depends(get_db)
):
"""Create a new AI Insight."""
# Ensure tenant_id matches
if insight_data.tenant_id != tenant_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Tenant ID mismatch"
)
repo = InsightRepository(db)
insight = await repo.create(insight_data)
await db.commit()
return insight
@router.get("/tenants/{tenant_id}/insights", response_model=AIInsightList)
async def get_insights(
tenant_id: UUID,
category: Optional[str] = Query(None),
priority: Optional[str] = Query(None),
status: Optional[str] = Query(None),
actionable_only: bool = Query(False),
min_confidence: int = Query(0, ge=0, le=100),
source_service: Optional[str] = Query(None),
from_date: Optional[datetime] = Query(None),
to_date: Optional[datetime] = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db)
):
"""Get insights for a tenant with filters and pagination."""
filters = InsightFilters(
category=category,
priority=priority,
status=status,
actionable_only=actionable_only,
min_confidence=min_confidence,
source_service=source_service,
from_date=from_date,
to_date=to_date
)
repo = InsightRepository(db)
skip = (page - 1) * page_size
insights, total = await repo.get_by_tenant(tenant_id, filters, skip, page_size)
total_pages = math.ceil(total / page_size) if total > 0 else 0
return AIInsightList(
items=insights,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages
)
@router.get("/tenants/{tenant_id}/insights/orchestration-ready")
async def get_orchestration_ready_insights(
tenant_id: UUID,
target_date: datetime = Query(...),
min_confidence: int = Query(70, ge=0, le=100),
db: AsyncSession = Depends(get_db)
):
"""Get actionable insights for orchestration workflow."""
repo = InsightRepository(db)
categorized_insights = await repo.get_orchestration_ready_insights(
tenant_id, target_date, min_confidence
)
return categorized_insights
@router.get("/tenants/{tenant_id}/insights/{insight_id}", response_model=AIInsightResponse)
async def get_insight(
tenant_id: UUID,
insight_id: UUID,
db: AsyncSession = Depends(get_db)
):
"""Get a single insight by ID."""
repo = InsightRepository(db)
insight = await repo.get_by_id(insight_id)
if not insight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Insight not found"
)
if insight.tenant_id != tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied"
)
return insight
@router.patch("/tenants/{tenant_id}/insights/{insight_id}", response_model=AIInsightResponse)
async def update_insight(
tenant_id: UUID,
insight_id: UUID,
update_data: AIInsightUpdate,
db: AsyncSession = Depends(get_db)
):
"""Update an insight (typically status changes)."""
repo = InsightRepository(db)
# Verify insight exists and belongs to tenant
insight = await repo.get_by_id(insight_id)
if not insight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Insight not found"
)
if insight.tenant_id != tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied"
)
updated_insight = await repo.update(insight_id, update_data)
await db.commit()
return updated_insight
@router.delete("/tenants/{tenant_id}/insights/{insight_id}", status_code=status.HTTP_204_NO_CONTENT)
async def dismiss_insight(
tenant_id: UUID,
insight_id: UUID,
db: AsyncSession = Depends(get_db)
):
"""Dismiss an insight (soft delete)."""
repo = InsightRepository(db)
# Verify insight exists and belongs to tenant
insight = await repo.get_by_id(insight_id)
if not insight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Insight not found"
)
if insight.tenant_id != tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied"
)
await repo.delete(insight_id)
await db.commit()
@router.get("/tenants/{tenant_id}/insights/metrics/summary", response_model=InsightMetrics)
async def get_insights_metrics(
tenant_id: UUID,
db: AsyncSession = Depends(get_db)
):
"""Get aggregate metrics for insights."""
repo = InsightRepository(db)
metrics = await repo.get_metrics(tenant_id)
return InsightMetrics(**metrics)
@router.post("/tenants/{tenant_id}/insights/{insight_id}/apply")
async def apply_insight(
tenant_id: UUID,
insight_id: UUID,
db: AsyncSession = Depends(get_db)
):
"""Apply an insight recommendation (trigger action)."""
repo = InsightRepository(db)
# Verify insight exists and belongs to tenant
insight = await repo.get_by_id(insight_id)
if not insight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Insight not found"
)
if insight.tenant_id != tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied"
)
if not insight.actionable:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This insight is not actionable"
)
# Update status to in_progress
update_data = AIInsightUpdate(status='in_progress', applied_at=datetime.utcnow())
await repo.update(insight_id, update_data)
await db.commit()
# Route to appropriate service based on recommendation_actions
applied_actions = []
failed_actions = []
try:
import structlog
logger = structlog.get_logger()
for action in insight.recommendation_actions:
try:
action_type = action.get('action_type')
action_target = action.get('target_service')
logger.info("Processing insight action",
insight_id=str(insight_id),
action_type=action_type,
target_service=action_target)
# Route based on target service
if action_target == 'procurement':
# Create purchase order or adjust reorder points
from shared.clients.procurement_client import ProcurementServiceClient
from shared.config.base import get_settings
config = get_settings()
procurement_client = ProcurementServiceClient(config, "ai_insights")
# Example: trigger procurement action
logger.info("Routing action to procurement service", action=action)
applied_actions.append(action_type)
elif action_target == 'production':
# Adjust production schedule
from shared.clients.production_client import ProductionServiceClient
from shared.config.base import get_settings
config = get_settings()
production_client = ProductionServiceClient(config, "ai_insights")
logger.info("Routing action to production service", action=action)
applied_actions.append(action_type)
elif action_target == 'inventory':
# Adjust inventory settings
from shared.clients.inventory_client import InventoryServiceClient
from shared.config.base import get_settings
config = get_settings()
inventory_client = InventoryServiceClient(config, "ai_insights")
logger.info("Routing action to inventory service", action=action)
applied_actions.append(action_type)
elif action_target == 'pricing':
# Update pricing recommendations
logger.info("Price adjustment action identified", action=action)
applied_actions.append(action_type)
else:
logger.warning("Unknown target service for action",
action_type=action_type,
target_service=action_target)
failed_actions.append({
'action_type': action_type,
'reason': f'Unknown target service: {action_target}'
})
except Exception as action_error:
logger.error("Failed to apply action",
action_type=action.get('action_type'),
error=str(action_error))
failed_actions.append({
'action_type': action.get('action_type'),
'reason': str(action_error)
})
# Update final status
final_status = 'applied' if not failed_actions else 'partially_applied'
final_update = AIInsightUpdate(status=final_status)
await repo.update(insight_id, final_update)
await db.commit()
except Exception as e:
logger.error("Failed to route insight actions",
insight_id=str(insight_id),
error=str(e))
# Update status to failed
failed_update = AIInsightUpdate(status='failed')
await repo.update(insight_id, failed_update)
await db.commit()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to apply insight: {str(e)}"
)
return {
"message": "Insight application initiated",
"insight_id": str(insight_id),
"actions": insight.recommendation_actions,
"applied_actions": applied_actions,
"failed_actions": failed_actions,
"status": final_status
}
@router.post("/tenants/{tenant_id}/insights/{insight_id}/feedback", response_model=InsightFeedbackResponse)
async def record_feedback(
tenant_id: UUID,
insight_id: UUID,
feedback_data: InsightFeedbackCreate,
db: AsyncSession = Depends(get_db)
):
"""Record feedback for an applied insight."""
insight_repo = InsightRepository(db)
# Verify insight exists and belongs to tenant
insight = await insight_repo.get_by_id(insight_id)
if not insight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Insight not found"
)
if insight.tenant_id != tenant_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied"
)
# Ensure feedback is for this insight
if feedback_data.insight_id != insight_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Insight ID mismatch"
)
feedback_repo = FeedbackRepository(db)
feedback = await feedback_repo.create(feedback_data)
# Update insight status based on feedback
new_status = 'applied' if feedback.success else 'dismissed'
update_data = AIInsightUpdate(status=new_status)
await insight_repo.update(insight_id, update_data)
await db.commit()
return feedback
@router.post("/tenants/{tenant_id}/insights/refresh")
async def refresh_insights(
tenant_id: UUID,
db: AsyncSession = Depends(get_db)
):
"""Trigger insight refresh (expire old, generate new)."""
repo = InsightRepository(db)
# Expire old insights
expired_count = await repo.expire_old_insights()
await db.commit()
return {
"message": "Insights refreshed",
"expired_count": expired_count
}
@router.get("/tenants/{tenant_id}/insights/export")
async def export_insights(
tenant_id: UUID,
format: str = Query("json", regex="^(json|csv)$"),
db: AsyncSession = Depends(get_db)
):
"""Export insights to JSON or CSV."""
repo = InsightRepository(db)
insights, _ = await repo.get_by_tenant(tenant_id, filters=None, skip=0, limit=1000)
if format == "json":
return {"insights": [AIInsightResponse.model_validate(i) for i in insights]}
# CSV export would be implemented here
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="CSV export not yet implemented"
)