Files
bakery-ia/services/forecasting/app/api/scenario_operations.py

422 lines
17 KiB
Python
Raw Normal View History

2025-10-07 07:15:07 +02:00
"""
Scenario Simulation Operations API - PROFESSIONAL/ENTERPRISE ONLY
Business operations for "what-if" scenario testing and strategic planning
"""
import structlog
from fastapi import APIRouter, Depends, HTTPException, status, Path, Request
from typing import List, Dict, Any
from datetime import date, datetime, timedelta
import uuid
from app.schemas.forecasts import (
ScenarioSimulationRequest,
ScenarioSimulationResponse,
ScenarioComparisonRequest,
ScenarioComparisonResponse,
ScenarioType,
ScenarioImpact,
ForecastResponse,
ForecastRequest
)
from app.services.forecasting_service import EnhancedForecastingService
from shared.auth.decorators import get_current_user_dep
from shared.database.base import create_database_manager
from shared.monitoring.decorators import track_execution_time
from shared.monitoring.metrics import get_metrics_collector
from app.core.config import settings
from shared.routing import RouteBuilder
from shared.auth.access_control import require_user_role
route_builder = RouteBuilder('forecasting')
logger = structlog.get_logger()
router = APIRouter(tags=["scenario-simulation"])
def get_enhanced_forecasting_service():
"""Dependency injection for EnhancedForecastingService"""
database_manager = create_database_manager(settings.DATABASE_URL, "forecasting-service")
return EnhancedForecastingService(database_manager)
@router.post(
route_builder.build_analytics_route("scenario-simulation"),
response_model=ScenarioSimulationResponse
)
@require_user_role(['viewer', 'member', 'admin', 'owner'])
@track_execution_time("scenario_simulation_duration_seconds", "forecasting-service")
async def simulate_scenario(
request: ScenarioSimulationRequest,
tenant_id: str = Path(..., description="Tenant ID"),
request_obj: Request = None,
forecasting_service: EnhancedForecastingService = Depends(get_enhanced_forecasting_service)
):
"""
Run a "what-if" scenario simulation on forecasts
This endpoint allows users to test how different scenarios might impact demand:
- Weather events (heatwaves, cold snaps, rain)
- Competition (new competitors opening nearby)
- Events (festivals, concerts, sports events)
- Pricing changes
- Promotions
- Supply disruptions
**PROFESSIONAL/ENTERPRISE ONLY**
"""
metrics = get_metrics_collector(request_obj)
start_time = datetime.utcnow()
try:
logger.info("Starting scenario simulation",
tenant_id=tenant_id,
scenario_name=request.scenario_name,
scenario_type=request.scenario_type.value,
products=len(request.inventory_product_ids))
if metrics:
metrics.increment_counter(f"scenario_simulations_total")
metrics.increment_counter(f"scenario_simulations_{request.scenario_type.value}_total")
# Generate simulation ID
simulation_id = str(uuid.uuid4())
end_date = request.start_date + timedelta(days=request.duration_days - 1)
# Step 1: Generate baseline forecasts
baseline_forecasts = []
if request.include_baseline:
logger.info("Generating baseline forecasts", tenant_id=tenant_id)
for product_id in request.inventory_product_ids:
forecast_request = ForecastRequest(
inventory_product_id=product_id,
forecast_date=request.start_date,
forecast_days=request.duration_days,
location="default" # TODO: Get from tenant settings
)
multi_day_result = await forecasting_service.generate_multi_day_forecast(
tenant_id=tenant_id,
request=forecast_request
)
baseline_forecasts.extend(multi_day_result.get("forecasts", []))
# Step 2: Apply scenario adjustments to generate scenario forecasts
scenario_forecasts = await _apply_scenario_adjustments(
tenant_id=tenant_id,
request=request,
baseline_forecasts=baseline_forecasts if request.include_baseline else [],
forecasting_service=forecasting_service
)
# Step 3: Calculate impacts
product_impacts = _calculate_product_impacts(
baseline_forecasts,
scenario_forecasts,
request.inventory_product_ids
)
# Step 4: Calculate totals
total_baseline_demand = sum(f.predicted_demand for f in baseline_forecasts) if baseline_forecasts else 0
total_scenario_demand = sum(f.predicted_demand for f in scenario_forecasts)
overall_impact_percent = (
((total_scenario_demand - total_baseline_demand) / total_baseline_demand * 100)
if total_baseline_demand > 0 else 0
)
# Step 5: Generate insights and recommendations
insights, recommendations, risk_level = _generate_insights(
request.scenario_type,
request,
product_impacts,
overall_impact_percent
)
# Calculate processing time
processing_time_ms = int((datetime.utcnow() - start_time).total_seconds() * 1000)
if metrics:
metrics.increment_counter("scenario_simulations_success_total")
metrics.observe_histogram("scenario_simulation_processing_time_ms", processing_time_ms)
logger.info("Scenario simulation completed successfully",
tenant_id=tenant_id,
simulation_id=simulation_id,
overall_impact=f"{overall_impact_percent:.2f}%",
processing_time_ms=processing_time_ms)
return ScenarioSimulationResponse(
id=simulation_id,
tenant_id=tenant_id,
scenario_name=request.scenario_name,
scenario_type=request.scenario_type,
start_date=request.start_date,
end_date=end_date,
duration_days=request.duration_days,
baseline_forecasts=baseline_forecasts if request.include_baseline else None,
scenario_forecasts=scenario_forecasts,
total_baseline_demand=total_baseline_demand,
total_scenario_demand=total_scenario_demand,
overall_impact_percent=overall_impact_percent,
product_impacts=product_impacts,
insights=insights,
recommendations=recommendations,
risk_level=risk_level,
created_at=datetime.utcnow(),
processing_time_ms=processing_time_ms
)
except ValueError as e:
if metrics:
metrics.increment_counter("scenario_simulation_validation_errors_total")
logger.error("Scenario simulation validation error", error=str(e), tenant_id=tenant_id)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
if metrics:
metrics.increment_counter("scenario_simulations_errors_total")
logger.error("Scenario simulation failed", error=str(e), tenant_id=tenant_id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Scenario simulation failed"
)
async def _apply_scenario_adjustments(
tenant_id: str,
request: ScenarioSimulationRequest,
baseline_forecasts: List[ForecastResponse],
forecasting_service: EnhancedForecastingService
) -> List[ForecastResponse]:
"""
Apply scenario-specific adjustments to forecasts
"""
scenario_forecasts = []
# If no baseline, generate fresh forecasts
if not baseline_forecasts:
for product_id in request.inventory_product_ids:
forecast_request = ForecastRequest(
inventory_product_id=product_id,
forecast_date=request.start_date,
forecast_days=request.duration_days,
location="default"
)
multi_day_result = await forecasting_service.generate_multi_day_forecast(
tenant_id=tenant_id,
request=forecast_request
)
baseline_forecasts = multi_day_result.get("forecasts", [])
# Apply multipliers based on scenario type
for forecast in baseline_forecasts:
adjusted_forecast = forecast.copy()
multiplier = _get_scenario_multiplier(request)
# Adjust predicted demand
adjusted_forecast.predicted_demand *= multiplier
adjusted_forecast.confidence_lower *= multiplier
adjusted_forecast.confidence_upper *= multiplier
scenario_forecasts.append(adjusted_forecast)
return scenario_forecasts
def _get_scenario_multiplier(request: ScenarioSimulationRequest) -> float:
"""
Calculate demand multiplier based on scenario type and parameters
"""
if request.scenario_type == ScenarioType.WEATHER:
if request.weather_params:
# Heatwave increases demand for cold items, decreases for hot items
if request.weather_params.temperature_change and request.weather_params.temperature_change > 10:
return 1.25 # 25% increase during heatwave
elif request.weather_params.temperature_change and request.weather_params.temperature_change < -10:
return 0.85 # 15% decrease during cold snap
elif request.weather_params.precipitation_change and request.weather_params.precipitation_change > 10:
return 0.90 # 10% decrease during heavy rain
return 1.0
elif request.scenario_type == ScenarioType.COMPETITION:
if request.competition_params:
# New competition reduces demand based on market share loss
return 1.0 - request.competition_params.estimated_market_share_loss
return 0.85 # Default 15% reduction
elif request.scenario_type == ScenarioType.EVENT:
if request.event_params:
# Events increase demand based on attendance and proximity
if request.event_params.distance_km < 1.0:
return 1.5 # 50% increase for very close events
elif request.event_params.distance_km < 5.0:
return 1.2 # 20% increase for nearby events
return 1.15 # Default 15% increase
elif request.scenario_type == ScenarioType.PRICING:
if request.pricing_params:
# Price elasticity: typically -0.5 to -2.0
# 10% price increase = 5-20% demand decrease
elasticity = -1.0 # Average elasticity
return 1.0 + (request.pricing_params.price_change_percent / 100) * elasticity
return 1.0
elif request.scenario_type == ScenarioType.PROMOTION:
if request.promotion_params:
# Promotions increase traffic and conversion
traffic_boost = 1.0 + request.promotion_params.expected_traffic_increase
discount_boost = 1.0 + (request.promotion_params.discount_percent / 100) * 0.5
return traffic_boost * discount_boost
return 1.3 # Default 30% increase
elif request.scenario_type == ScenarioType.SUPPLY_DISRUPTION:
return 0.6 # 40% reduction due to limited supply
elif request.scenario_type == ScenarioType.CUSTOM:
if request.custom_multipliers and 'demand' in request.custom_multipliers:
return request.custom_multipliers['demand']
return 1.0
return 1.0
def _calculate_product_impacts(
baseline_forecasts: List[ForecastResponse],
scenario_forecasts: List[ForecastResponse],
product_ids: List[str]
) -> List[ScenarioImpact]:
"""
Calculate per-product impact of the scenario
"""
impacts = []
for product_id in product_ids:
baseline_total = sum(
f.predicted_demand for f in baseline_forecasts
if f.inventory_product_id == product_id
)
scenario_total = sum(
f.predicted_demand for f in scenario_forecasts
if f.inventory_product_id == product_id
)
if baseline_total > 0:
change_percent = ((scenario_total - baseline_total) / baseline_total) * 100
else:
change_percent = 0
# Get confidence ranges
scenario_product_forecasts = [
f for f in scenario_forecasts if f.inventory_product_id == product_id
]
avg_lower = sum(f.confidence_lower for f in scenario_product_forecasts) / len(scenario_product_forecasts) if scenario_product_forecasts else 0
avg_upper = sum(f.confidence_upper for f in scenario_product_forecasts) / len(scenario_product_forecasts) if scenario_product_forecasts else 0
impacts.append(ScenarioImpact(
inventory_product_id=product_id,
baseline_demand=baseline_total,
simulated_demand=scenario_total,
demand_change_percent=change_percent,
confidence_range=(avg_lower, avg_upper),
impact_factors={"primary_driver": "scenario_adjustment"}
))
return impacts
def _generate_insights(
scenario_type: ScenarioType,
request: ScenarioSimulationRequest,
impacts: List[ScenarioImpact],
overall_impact: float
) -> tuple[List[str], List[str], str]:
"""
Generate AI-powered insights and recommendations
"""
insights = []
recommendations = []
risk_level = "low"
# Determine risk level
if abs(overall_impact) > 30:
risk_level = "high"
elif abs(overall_impact) > 15:
risk_level = "medium"
# Generate scenario-specific insights
if scenario_type == ScenarioType.WEATHER:
if request.weather_params:
if request.weather_params.temperature_change and request.weather_params.temperature_change > 10:
insights.append(f"Heatwave of +{request.weather_params.temperature_change}°C expected to increase demand by {overall_impact:.1f}%")
recommendations.append("Increase inventory of cold beverages and refrigerated items")
recommendations.append("Extend operating hours to capture increased evening traffic")
elif request.weather_params.temperature_change and request.weather_params.temperature_change < -10:
insights.append(f"Cold snap of {request.weather_params.temperature_change}°C expected to decrease demand by {abs(overall_impact):.1f}%")
recommendations.append("Increase production of warm comfort foods")
recommendations.append("Reduce inventory of cold items")
elif scenario_type == ScenarioType.COMPETITION:
insights.append(f"New competitor expected to reduce demand by {abs(overall_impact):.1f}%")
recommendations.append("Consider launching loyalty program to retain customers")
recommendations.append("Differentiate with unique product offerings")
recommendations.append("Focus on customer service excellence")
elif scenario_type == ScenarioType.EVENT:
insights.append(f"Local event expected to increase demand by {overall_impact:.1f}%")
recommendations.append("Increase staffing for the event period")
recommendations.append("Stock additional inventory of popular items")
recommendations.append("Consider event-specific promotions")
elif scenario_type == ScenarioType.PRICING:
if overall_impact < 0:
insights.append(f"Price increase expected to reduce demand by {abs(overall_impact):.1f}%")
recommendations.append("Consider smaller price increases")
recommendations.append("Communicate value proposition to customers")
else:
insights.append(f"Price decrease expected to increase demand by {overall_impact:.1f}%")
recommendations.append("Ensure adequate inventory to meet increased demand")
elif scenario_type == ScenarioType.PROMOTION:
insights.append(f"Promotion expected to increase demand by {overall_impact:.1f}%")
recommendations.append("Stock additional inventory before promotion starts")
recommendations.append("Increase staffing during promotion period")
recommendations.append("Prepare marketing materials and signage")
# Add product-specific insights
high_impact_products = [
impact for impact in impacts
if abs(impact.demand_change_percent) > 20
]
if high_impact_products:
insights.append(f"{len(high_impact_products)} products show significant impact (>20% change)")
# Add general recommendation
if risk_level == "high":
recommendations.append("⚠️ High-impact scenario - review and adjust operational plans immediately")
elif risk_level == "medium":
recommendations.append("Monitor situation closely and prepare contingency plans")
return insights, recommendations, risk_level
@router.post(
route_builder.build_analytics_route("scenario-comparison"),
response_model=ScenarioComparisonResponse
)
@require_user_role(['viewer', 'member', 'admin', 'owner'])
async def compare_scenarios(
request: ScenarioComparisonRequest,
tenant_id: str = Path(..., description="Tenant ID")
):
"""
Compare multiple scenario simulations
**PROFESSIONAL/ENTERPRISE ONLY**
"""
# TODO: Implement scenario comparison
# This would retrieve saved scenarios and compare them
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Scenario comparison not yet implemented"
)