# ================================================================ # services/forecasting/app/api/forecasts.py # ================================================================ """ Forecast API endpoints """ import structlog from fastapi import APIRouter, Depends, HTTPException, status, Query, Path from sqlalchemy.ext.asyncio import AsyncSession from typing import List, Optional from datetime import date, datetime from sqlalchemy import select, delete, func import uuid from app.core.database import get_db from shared.auth.decorators import ( get_current_user_dep, require_admin_role ) from app.services.forecasting_service import ForecastingService from app.schemas.forecasts import ( ForecastRequest, ForecastResponse, BatchForecastRequest, BatchForecastResponse, AlertResponse ) from app.models.forecasts import Forecast, PredictionBatch, ForecastAlert from app.services.messaging import publish_forecasts_deleted_event logger = structlog.get_logger() router = APIRouter() # Initialize service forecasting_service = ForecastingService() @router.post("/tenants/{tenant_id}/forecasts/single", response_model=ForecastResponse) async def create_single_forecast( request: ForecastRequest, db: AsyncSession = Depends(get_db), tenant_id: str = Path(..., description="Tenant ID") ): """Generate a single product forecast""" try: # Generate forecast forecast = await forecasting_service.generate_forecast(tenant_id, request, db) # Convert to response model return ForecastResponse( id=str(forecast.id), tenant_id=tenant_id, product_name=forecast.product_name, location=forecast.location, forecast_date=forecast.forecast_date, predicted_demand=forecast.predicted_demand, confidence_lower=forecast.confidence_lower, confidence_upper=forecast.confidence_upper, confidence_level=forecast.confidence_level, model_id=str(forecast.model_id), model_version=forecast.model_version, algorithm=forecast.algorithm, business_type=forecast.business_type, is_holiday=forecast.is_holiday, is_weekend=forecast.is_weekend, day_of_week=forecast.day_of_week, weather_temperature=forecast.weather_temperature, weather_precipitation=forecast.weather_precipitation, weather_description=forecast.weather_description, traffic_volume=forecast.traffic_volume, created_at=forecast.created_at, processing_time_ms=forecast.processing_time_ms, features_used=forecast.features_used ) except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) except Exception as e: logger.error("Error creating single forecast", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error" ) @router.post("/tenants/{tenant_id}/forecasts/batch", response_model=BatchForecastResponse) async def create_batch_forecast( request: BatchForecastRequest, db: AsyncSession = Depends(get_db), tenant_id: str = Path(..., description="Tenant ID"), current_user: dict = Depends(get_current_user_dep) ): """Generate batch forecasts for multiple products""" try: # Verify tenant access if str(request.tenant_id) != tenant_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this tenant" ) # Generate batch forecast batch = await forecasting_service.generate_batch_forecast(request, db) # Get associated forecasts forecasts = await forecasting_service.get_forecasts( tenant_id=request.tenant_id, location=request.location, db=db ) # Convert forecasts to response models forecast_responses = [] for forecast in forecasts[:batch.total_products]: # Limit to batch size forecast_responses.append(ForecastResponse( id=str(forecast.id), tenant_id=str(forecast.tenant_id), product_name=forecast.product_name, location=forecast.location, forecast_date=forecast.forecast_date, predicted_demand=forecast.predicted_demand, confidence_lower=forecast.confidence_lower, confidence_upper=forecast.confidence_upper, confidence_level=forecast.confidence_level, model_id=str(forecast.model_id), model_version=forecast.model_version, algorithm=forecast.algorithm, business_type=forecast.business_type, is_holiday=forecast.is_holiday, is_weekend=forecast.is_weekend, day_of_week=forecast.day_of_week, weather_temperature=forecast.weather_temperature, weather_precipitation=forecast.weather_precipitation, weather_description=forecast.weather_description, traffic_volume=forecast.traffic_volume, created_at=forecast.created_at, processing_time_ms=forecast.processing_time_ms, features_used=forecast.features_used )) return BatchForecastResponse( id=str(batch.id), tenant_id=str(batch.tenant_id), batch_name=batch.batch_name, status=batch.status, total_products=batch.total_products, completed_products=batch.completed_products, failed_products=batch.failed_products, requested_at=batch.requested_at, completed_at=batch.completed_at, processing_time_ms=batch.processing_time_ms, forecasts=forecast_responses ) except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) except Exception as e: logger.error("Error creating batch forecast", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error" ) @router.get("/tenants/{tenant_id}/forecasts/list", response_model=List[ForecastResponse]) async def list_forecasts( location: str, start_date: Optional[date] = Query(None), end_date: Optional[date] = Query(None), product_name: Optional[str] = Query(None), db: AsyncSession = Depends(get_db), tenant_id: str = Path(..., description="Tenant ID") ): """List forecasts with filtering""" try: # Get forecasts forecasts = await forecasting_service.get_forecasts( tenant_id=tenant_id, location=location, start_date=start_date, end_date=end_date, product_name=product_name, db=db ) # Convert to response models return [ ForecastResponse( id=str(forecast.id), tenant_id=str(forecast.tenant_id), product_name=forecast.product_name, location=forecast.location, forecast_date=forecast.forecast_date, predicted_demand=forecast.predicted_demand, confidence_lower=forecast.confidence_lower, confidence_upper=forecast.confidence_upper, confidence_level=forecast.confidence_level, model_id=str(forecast.model_id), model_version=forecast.model_version, algorithm=forecast.algorithm, business_type=forecast.business_type, is_holiday=forecast.is_holiday, is_weekend=forecast.is_weekend, day_of_week=forecast.day_of_week, weather_temperature=forecast.weather_temperature, weather_precipitation=forecast.weather_precipitation, weather_description=forecast.weather_description, traffic_volume=forecast.traffic_volume, created_at=forecast.created_at, processing_time_ms=forecast.processing_time_ms, features_used=forecast.features_used ) for forecast in forecasts ] except Exception as e: logger.error("Error listing forecasts", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error" ) @router.get("/tenants/{tenant_id}/forecasts/alerts", response_model=List[AlertResponse]) async def get_forecast_alerts( active_only: bool = Query(True), db: AsyncSession = Depends(get_db), tenant_id: str = Path(..., description="Tenant ID"), current_user: dict = Depends(get_current_user_dep) ): """Get forecast alerts for tenant""" try: from sqlalchemy import select, and_ # Build query query = select(ForecastAlert).where( ForecastAlert.tenant_id == tenant_id ) if active_only: query = query.where(ForecastAlert.is_active == True) query = query.order_by(ForecastAlert.created_at.desc()) # Execute query result = await db.execute(query) alerts = result.scalars().all() # Convert to response models return [ AlertResponse( id=str(alert.id), tenant_id=str(alert.tenant_id), forecast_id=str(alert.forecast_id), alert_type=alert.alert_type, severity=alert.severity, message=alert.message, is_active=alert.is_active, created_at=alert.created_at, acknowledged_at=alert.acknowledged_at, notification_sent=alert.notification_sent ) for alert in alerts ] except Exception as e: logger.error("Error getting forecast alerts", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error" ) @router.put("/tenants/{tenant_id}/forecasts/alerts/{alert_id}/acknowledge") async def acknowledge_alert( alert_id: str, db: AsyncSession = Depends(get_db), tenant_id: str = Path(..., description="Tenant ID"), current_user: dict = Depends(get_current_user_dep) ): """Acknowledge a forecast alert""" try: from sqlalchemy import select, update from datetime import datetime # Get alert result = await db.execute( select(ForecastAlert).where( and_( ForecastAlert.id == alert_id, ForecastAlert.tenant_id == tenant_id ) ) ) alert = result.scalar_one_or_none() if not alert: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Alert not found" ) # Update alert alert.acknowledged_at = datetime.now() alert.is_active = False await db.commit() return {"message": "Alert acknowledged successfully"} except HTTPException: raise except Exception as e: logger.error("Error acknowledging alert", error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error" ) @router.delete("/tenants/{tenant_id}/forecasts") async def delete_tenant_forecasts( tenant_id: str, current_user = Depends(get_current_user_dep), _admin_check = Depends(require_admin_role), db: AsyncSession = Depends(get_db) ): """Delete all forecasts and predictions for a tenant (admin only)""" try: tenant_uuid = uuid.UUID(tenant_id) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid tenant ID format" ) try: from app.models.forecasts import Forecast, Prediction, PredictionBatch deletion_stats = { "tenant_id": tenant_id, "deleted_at": datetime.utcnow().isoformat(), "forecasts_deleted": 0, "predictions_deleted": 0, "batches_deleted": 0, "errors": [] } # Count before deletion forecasts_count_query = select(func.count(Forecast.id)).where( Forecast.tenant_id == tenant_uuid ) forecasts_count_result = await db.execute(forecasts_count_query) forecasts_count = forecasts_count_result.scalar() predictions_count_query = select(func.count(Prediction.id)).where( Prediction.tenant_id == tenant_uuid ) predictions_count_result = await db.execute(predictions_count_query) predictions_count = predictions_count_result.scalar() batches_count_query = select(func.count(PredictionBatch.id)).where( PredictionBatch.tenant_id == tenant_uuid ) batches_count_result = await db.execute(batches_count_query) batches_count = batches_count_result.scalar() # Delete predictions first (they may reference forecasts) try: predictions_delete_query = delete(Prediction).where( Prediction.tenant_id == tenant_uuid ) predictions_delete_result = await db.execute(predictions_delete_query) deletion_stats["predictions_deleted"] = predictions_delete_result.rowcount except Exception as e: error_msg = f"Error deleting predictions: {str(e)}" deletion_stats["errors"].append(error_msg) logger.error(error_msg) # Delete prediction batches try: batches_delete_query = delete(PredictionBatch).where( PredictionBatch.tenant_id == tenant_uuid ) batches_delete_result = await db.execute(batches_delete_query) deletion_stats["batches_deleted"] = batches_delete_result.rowcount except Exception as e: error_msg = f"Error deleting prediction batches: {str(e)}" deletion_stats["errors"].append(error_msg) logger.error(error_msg) # Delete forecasts try: forecasts_delete_query = delete(Forecast).where( Forecast.tenant_id == tenant_uuid ) forecasts_delete_result = await db.execute(forecasts_delete_query) deletion_stats["forecasts_deleted"] = forecasts_delete_result.rowcount except Exception as e: error_msg = f"Error deleting forecasts: {str(e)}" deletion_stats["errors"].append(error_msg) logger.error(error_msg) await db.commit() logger.info("Deleted tenant forecasting data", tenant_id=tenant_id, forecasts=deletion_stats["forecasts_deleted"], predictions=deletion_stats["predictions_deleted"], batches=deletion_stats["batches_deleted"]) deletion_stats["success"] = len(deletion_stats["errors"]) == 0 deletion_stats["expected_counts"] = { "forecasts": forecasts_count, "predictions": predictions_count, "batches": batches_count } return deletion_stats except Exception as e: await db.rollback() logger.error("Failed to delete tenant forecasts", tenant_id=tenant_id, error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete tenant forecasts" ) @router.get("/tenants/{tenant_id}/forecasts/count") async def get_tenant_forecasts_count( tenant_id: str, current_user = Depends(get_current_user_dep), _admin_check = Depends(require_admin_role), db: AsyncSession = Depends(get_db) ): """Get count of forecasts and predictions for a tenant (admin only)""" try: tenant_uuid = uuid.UUID(tenant_id) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid tenant ID format" ) try: from app.models.forecasts import Forecast, Prediction, PredictionBatch # Count forecasts forecasts_count_query = select(func.count(Forecast.id)).where( Forecast.tenant_id == tenant_uuid ) forecasts_count_result = await db.execute(forecasts_count_query) forecasts_count = forecasts_count_result.scalar() # Count predictions predictions_count_query = select(func.count(Prediction.id)).where( Prediction.tenant_id == tenant_uuid ) predictions_count_result = await db.execute(predictions_count_query) predictions_count = predictions_count_result.scalar() # Count batches batches_count_query = select(func.count(PredictionBatch.id)).where( PredictionBatch.tenant_id == tenant_uuid ) batches_count_result = await db.execute(batches_count_query) batches_count = batches_count_result.scalar() return { "tenant_id": tenant_id, "forecasts_count": forecasts_count, "predictions_count": predictions_count, "batches_count": batches_count, "total_forecasting_assets": forecasts_count + predictions_count + batches_count } except Exception as e: logger.error("Failed to get tenant forecasts count", tenant_id=tenant_id, error=str(e)) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to get forecasts count" )