Files
bakery-ia/services/forecasting/app/services/forecasting_alert_service.py
2025-10-23 07:44:54 +02:00

547 lines
24 KiB
Python

# services/forecasting/app/services/forecasting_alert_service.py
"""
Forecasting-specific alert and recommendation detection service
Monitors demand patterns, weather impacts, and holiday preparations
"""
import json
from typing import List, Dict, Any, Optional
from uuid import UUID
from datetime import datetime, timedelta
import structlog
from apscheduler.triggers.cron import CronTrigger
from shared.alerts.base_service import BaseAlertService, AlertServiceMixin
from shared.alerts.templates import format_item_message
from app.clients.inventory_client import get_inventory_client
logger = structlog.get_logger()
class ForecastingAlertService(BaseAlertService, AlertServiceMixin):
"""Forecasting service alert and recommendation detection"""
def setup_scheduled_checks(self):
"""Forecasting-specific scheduled checks for alerts and recommendations"""
# Weekend demand surge analysis - every Friday at 3 PM
self.scheduler.add_job(
self.check_weekend_demand_surge,
CronTrigger(day_of_week=4, hour=15, minute=0), # Friday 3 PM
id='weekend_surge_check',
misfire_grace_time=3600,
max_instances=1
)
# Weather impact analysis - every 6 hours during business days
self.scheduler.add_job(
self.check_weather_impact,
CronTrigger(hour='6,12,18', day_of_week='0-6'),
id='weather_impact_check',
misfire_grace_time=300,
max_instances=1
)
# Holiday preparation analysis - daily at 9 AM
self.scheduler.add_job(
self.check_holiday_preparation,
CronTrigger(hour=9, minute=0),
id='holiday_prep_check',
misfire_grace_time=3600,
max_instances=1
)
# Demand pattern analysis - every Monday at 8 AM
self.scheduler.add_job(
self.analyze_demand_patterns,
CronTrigger(day_of_week=0, hour=8, minute=0),
id='demand_pattern_analysis',
misfire_grace_time=3600,
max_instances=1
)
logger.info("Forecasting alert schedules configured",
service=self.config.SERVICE_NAME)
async def check_weekend_demand_surge(self):
"""Check for predicted weekend demand surges (alerts)"""
try:
self._checks_performed += 1
from app.repositories.forecasting_alert_repository import ForecastingAlertRepository
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
async with self.db_manager.get_session() as session:
alert_repo = ForecastingAlertRepository(session)
surges = await alert_repo.get_weekend_demand_surges(tenant_id)
for surge in surges:
await self._process_weekend_surge(tenant_id, surge)
except Exception as e:
logger.error("Error checking weekend demand surge",
tenant_id=str(tenant_id),
error=str(e))
except Exception as e:
logger.error("Weekend demand surge check failed", error=str(e))
self._errors_count += 1
async def _process_weekend_surge(self, tenant_id: UUID, surge: Dict[str, Any]):
"""Process weekend demand surge alert"""
try:
growth_percentage = surge['growth_percentage']
avg_growth_percentage = surge['avg_growth_percentage']
max_growth = max(growth_percentage, avg_growth_percentage)
# Resolve product name with fallback
product_name = await self._resolve_product_name(
tenant_id,
str(surge['inventory_product_id']),
surge.get('product_name')
)
# Determine severity based on growth magnitude
if max_growth > 100:
severity = 'high'
elif max_growth > 75:
severity = 'medium'
else:
severity = 'low'
# Format message based on weather conditions (simplified check)
weather_favorable = await self._check_favorable_weather(surge['forecast_date'])
await self.publish_item(tenant_id, {
'type': 'demand_surge_weekend',
'severity': severity,
'title': f'📈 Fin de semana con alta demanda: {product_name}',
'message': f'📈 Fin de semana con alta demanda: {product_name} +{max_growth:.0f}%',
'actions': ['increase_production', 'order_extra_ingredients', 'schedule_staff'],
'triggers': [
f'weekend_forecast > {max_growth:.0f}%_normal',
'weather_favorable' if weather_favorable else 'weather_normal'
],
'metadata': {
'product_name': product_name,
'inventory_product_id': str(surge['inventory_product_id']),
'predicted_demand': float(surge['predicted_demand']),
'growth_percentage': float(max_growth),
'forecast_date': surge['forecast_date'].isoformat(),
'weather_favorable': weather_favorable
}
}, item_type='alert')
except Exception as e:
logger.error("Error processing weekend surge",
product_name=surge.get('product_name'),
error=str(e))
async def check_weather_impact(self):
"""Check for weather impact on demand (alerts)"""
try:
self._checks_performed += 1
from app.repositories.forecasting_alert_repository import ForecastingAlertRepository
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
async with self.db_manager.get_session() as session:
alert_repo = ForecastingAlertRepository(session)
weather_impacts = await alert_repo.get_weather_impact_forecasts(tenant_id)
for impact in weather_impacts:
await self._process_weather_impact(tenant_id, impact)
except Exception as e:
logger.error("Error checking weather impact",
tenant_id=str(tenant_id),
error=str(e))
except Exception as e:
logger.error("Weather impact check failed", error=str(e))
self._errors_count += 1
async def _process_weather_impact(self, tenant_id: UUID, impact: Dict[str, Any]):
"""Process weather impact alert"""
try:
rain_forecast = impact['rain_forecast']
demand_change = impact['demand_change']
precipitation = impact['weather_precipitation'] or 0.0
if rain_forecast:
# Rain impact alert
triggers = ['rain_forecast']
if demand_change < -15:
triggers.append('outdoor_events_cancelled')
await self.publish_item(tenant_id, {
'type': 'weather_impact_alert',
'severity': 'low',
'title': '🌧️ Impacto climático previsto',
'message': '🌧️ Lluvia prevista: -20% tráfico peatonal esperado',
'actions': ['reduce_fresh_production', 'focus_comfort_products', 'delivery_promo'],
'triggers': triggers,
'metadata': {
'forecast_date': impact['forecast_date'].isoformat(),
'precipitation_mm': float(precipitation),
'expected_demand_change': float(demand_change),
'traffic_volume': impact.get('traffic_volume', 100),
'weather_type': 'rain'
}
}, item_type='alert')
elif demand_change < -20:
# General weather impact alert
product_name = await self._resolve_product_name(
tenant_id,
str(impact['inventory_product_id']),
impact.get('product_name')
)
await self.publish_item(tenant_id, {
'type': 'weather_impact_alert',
'severity': 'low',
'title': f'🌤️ Impacto climático: {product_name}',
'message': f'Condiciones climáticas pueden reducir demanda de {product_name} en {abs(demand_change):.0f}%',
'actions': ['adjust_production', 'focus_indoor_products', 'plan_promotions'],
'triggers': ['weather_conditions', 'demand_forecast_low'],
'metadata': {
'product_name': product_name,
'forecast_date': impact['forecast_date'].isoformat(),
'expected_demand_change': float(demand_change),
'temperature': impact.get('weather_temperature'),
'weather_type': 'general'
}
}, item_type='alert')
except Exception as e:
logger.error("Error processing weather impact",
product_name=impact.get('product_name'),
error=str(e))
async def check_holiday_preparation(self):
"""Check for upcoming Spanish holidays requiring preparation (alerts)"""
try:
self._checks_performed += 1
# Check for Spanish holidays in the next 3-7 days
upcoming_holidays = await self._get_upcoming_spanish_holidays(3, 7)
if not upcoming_holidays:
return
from app.repositories.forecasting_alert_repository import ForecastingAlertRepository
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
async with self.db_manager.get_session() as session:
alert_repo = ForecastingAlertRepository(session)
demand_spikes = await alert_repo.get_holiday_demand_spikes(tenant_id)
for holiday_info in upcoming_holidays:
for spike in demand_spikes:
await self._process_holiday_preparation(
tenant_id, holiday_info, spike
)
except Exception as e:
logger.error("Error checking holiday preparation",
tenant_id=str(tenant_id),
error=str(e))
except Exception as e:
logger.error("Holiday preparation check failed", error=str(e))
self._errors_count += 1
async def _process_holiday_preparation(self, tenant_id: UUID, holiday: Dict[str, Any], spike: Dict[str, Any]):
"""Process holiday preparation alert"""
try:
days_until_holiday = holiday['days_until']
holiday_name = holiday['name']
spike_percentage = spike['spike_percentage']
# Determine severity based on spike magnitude and preparation time
if spike_percentage > 75 and days_until_holiday <= 3:
severity = 'high'
elif spike_percentage > 50 or days_until_holiday <= 3:
severity = 'medium'
else:
severity = 'low'
triggers = [f'spanish_holiday_in_{days_until_holiday}_days']
if spike_percentage > 25:
triggers.append('historical_demand_spike')
await self.publish_item(tenant_id, {
'type': 'holiday_preparation',
'severity': severity,
'title': f'🎉 Preparación para {holiday_name}',
'message': f'🎉 {holiday_name} en {days_until_holiday} días: pedidos especiales aumentan {spike_percentage:.0f}%',
'actions': ['prepare_special_menu', 'stock_decorations', 'extend_hours'],
'triggers': triggers,
'metadata': {
'holiday_name': holiday_name,
'days_until_holiday': days_until_holiday,
'product_name': spike['product_name'],
'spike_percentage': float(spike_percentage),
'avg_holiday_demand': float(spike['avg_holiday_demand']),
'avg_normal_demand': float(spike['avg_normal_demand']),
'holiday_date': holiday['date'].isoformat()
}
}, item_type='alert')
except Exception as e:
logger.error("Error processing holiday preparation",
holiday_name=holiday.get('name'),
error=str(e))
async def analyze_demand_patterns(self):
"""Analyze demand patterns for recommendations"""
try:
self._checks_performed += 1
from app.repositories.forecasting_alert_repository import ForecastingAlertRepository
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
async with self.db_manager.get_session() as session:
alert_repo = ForecastingAlertRepository(session)
patterns = await alert_repo.get_demand_pattern_analysis(tenant_id)
for pattern in patterns:
await self._generate_demand_pattern_recommendation(tenant_id, pattern)
except Exception as e:
logger.error("Error analyzing demand patterns",
tenant_id=str(tenant_id),
error=str(e))
except Exception as e:
logger.error("Demand pattern analysis failed", error=str(e))
self._errors_count += 1
async def _generate_demand_pattern_recommendation(self, tenant_id: UUID, pattern: Dict[str, Any]):
"""Generate demand pattern optimization recommendation"""
try:
if not self.should_send_recommendation(tenant_id, 'demand_optimization'):
return
demand_range = pattern['demand_range']
peak_demand = pattern['peak_demand']
overall_avg = pattern['overall_avg']
optimization_potential = (demand_range / overall_avg) * 100
await self.publish_item(tenant_id, {
'type': 'demand_pattern_optimization',
'severity': 'medium',
'title': f'📊 Optimización de Patrones: {pattern["product_name"]}',
'message': f'Demanda de {pattern["product_name"]} varía {optimization_potential:.0f}% durante la semana. Oportunidad de optimización.',
'actions': ['Analizar patrones semanales', 'Ajustar producción diaria', 'Optimizar inventario', 'Planificar promociones'],
'metadata': {
'product_name': pattern['product_name'],
'optimization_potential': float(optimization_potential),
'peak_demand': float(peak_demand),
'min_demand': float(pattern['min_demand']),
'demand_range': float(demand_range),
'recommendation_type': 'demand_optimization'
}
}, item_type='recommendation')
except Exception as e:
logger.error("Error generating demand pattern recommendation",
product_name=pattern.get('product_name'),
error=str(e))
# Helper methods
async def _resolve_product_name(self, tenant_id: UUID, inventory_product_id: str, fallback_name: Optional[str] = None) -> str:
"""
Resolve product name, with fallbacks for when inventory service is unavailable
"""
# If we already have a product name, use it
if fallback_name:
return fallback_name
# Try to get from inventory service
try:
inventory_client = get_inventory_client()
product_name = await inventory_client.get_product_name(str(tenant_id), inventory_product_id)
if product_name:
return product_name
except Exception as e:
logger.debug("Failed to resolve product name from inventory service",
inventory_product_id=inventory_product_id,
error=str(e))
# Fallback to generic name
return f"Product-{inventory_product_id[:8]}"
async def _check_favorable_weather(self, forecast_date: datetime) -> bool:
"""Simple weather favorability check"""
# In a real implementation, this would check actual weather APIs
# For now, return a simple heuristic based on season
month = forecast_date.month
return month in [4, 5, 6, 7, 8, 9] # Spring/Summer months
async def _get_upcoming_spanish_holidays(self, min_days: int, max_days: int) -> List[Dict[str, Any]]:
"""Get upcoming Spanish holidays within date range"""
today = datetime.now().date()
holidays = []
# Major Spanish holidays
spanish_holidays = [
{"name": "Año Nuevo", "month": 1, "day": 1},
{"name": "Reyes Magos", "month": 1, "day": 6},
{"name": "Día del Trabajador", "month": 5, "day": 1},
{"name": "Asunción", "month": 8, "day": 15},
{"name": "Fiesta Nacional", "month": 10, "day": 12},
{"name": "Todos los Santos", "month": 11, "day": 1},
{"name": "Constitución", "month": 12, "day": 6},
{"name": "Inmaculada", "month": 12, "day": 8},
{"name": "Navidad", "month": 12, "day": 25}
]
current_year = today.year
for holiday in spanish_holidays:
# Check current year
holiday_date = datetime(current_year, holiday["month"], holiday["day"]).date()
days_until = (holiday_date - today).days
if min_days <= days_until <= max_days:
holidays.append({
"name": holiday["name"],
"date": holiday_date,
"days_until": days_until
})
# Check next year if needed
if holiday_date < today:
next_year_date = datetime(current_year + 1, holiday["month"], holiday["day"]).date()
days_until = (next_year_date - today).days
if min_days <= days_until <= max_days:
holidays.append({
"name": holiday["name"],
"date": next_year_date,
"days_until": days_until
})
return holidays
async def register_db_listeners(self, conn):
"""Register forecasting-specific database listeners"""
try:
await conn.add_listener('forecasting_alerts', self.handle_forecasting_db_alert)
logger.info("Database listeners registered",
service=self.config.SERVICE_NAME)
except Exception as e:
logger.error("Failed to register database listeners",
service=self.config.SERVICE_NAME,
error=str(e))
async def handle_forecasting_db_alert(self, connection, pid, channel, payload):
"""Handle forecasting alert from database trigger"""
try:
data = json.loads(payload)
tenant_id = UUID(data['tenant_id'])
if data['alert_type'] == 'demand_spike':
await self.publish_item(tenant_id, {
'type': 'demand_spike_detected',
'severity': 'medium',
'title': f'📈 Pico de Demanda Detectado',
'message': f'Demanda inesperada de {data["product_name"]}: {data["spike_percentage"]:.0f}% sobre lo normal.',
'actions': ['Revisar inventario', 'Aumentar producción', 'Notificar equipo'],
'metadata': {
'product_name': data['product_name'],
'spike_percentage': data['spike_percentage'],
'trigger_source': 'database'
}
}, item_type='alert')
except Exception as e:
logger.error("Error handling forecasting DB alert", error=str(e))
async def start_event_listener(self):
"""Listen for forecasting-affecting events"""
try:
# Subscribe to weather events that might affect forecasting
await self.rabbitmq_client.consume_events(
"bakery_events",
f"forecasting.weather.{self.config.SERVICE_NAME}",
"weather.severe_change",
self.handle_weather_event
)
# Subscribe to sales events that might trigger demand alerts
await self.rabbitmq_client.consume_events(
"bakery_events",
f"forecasting.sales.{self.config.SERVICE_NAME}",
"sales.unexpected_spike",
self.handle_sales_spike_event
)
logger.info("Event listeners started",
service=self.config.SERVICE_NAME)
except Exception as e:
logger.error("Failed to start event listeners",
service=self.config.SERVICE_NAME,
error=str(e))
async def handle_weather_event(self, message):
"""Handle severe weather change event"""
try:
weather_data = json.loads(message.body)
tenant_id = UUID(weather_data['tenant_id'])
if weather_data['change_type'] == 'severe_storm':
await self.publish_item(tenant_id, {
'type': 'severe_weather_impact',
'severity': 'high',
'title': '⛈️ Impacto Climático Severo',
'message': f'Tormenta severa prevista: reducir producción de productos frescos y activar delivery.',
'actions': ['reduce_fresh_production', 'activate_delivery', 'secure_outdoor_displays'],
'metadata': {
'weather_type': weather_data['change_type'],
'severity_level': weather_data.get('severity', 'high'),
'duration_hours': weather_data.get('duration_hours', 0)
}
}, item_type='alert')
except Exception as e:
logger.error("Error handling weather event", error=str(e))
async def handle_sales_spike_event(self, message):
"""Handle unexpected sales spike event"""
try:
sales_data = json.loads(message.body)
tenant_id = UUID(sales_data['tenant_id'])
await self.publish_item(tenant_id, {
'type': 'unexpected_demand_spike',
'severity': 'medium',
'title': '📈 Pico de Ventas Inesperado',
'message': f'Ventas de {sales_data["product_name"]} {sales_data["spike_percentage"]:.0f}% sobre pronóstico.',
'actions': ['increase_production', 'check_inventory', 'update_forecast'],
'metadata': {
'product_name': sales_data['product_name'],
'spike_percentage': sales_data['spike_percentage'],
'current_sales': sales_data.get('current_sales', 0),
'forecasted_sales': sales_data.get('forecasted_sales', 0)
}
}, item_type='alert')
except Exception as e:
logger.error("Error handling sales spike event", error=str(e))