From a21461781586eb226da2f015cf5620aa8a28dab4 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Sat, 23 Aug 2025 15:48:32 +0200 Subject: [PATCH] Add new alert architecture to forecasting --- docker-compose.yml | 5 +- .../app/clients/inventory_client.py | 82 +++ services/forecasting/app/main.py | 29 +- services/forecasting/app/models/forecasts.py | 1 + .../app/services/forecasting_alert_service.py | 688 ++++++++++++++++++ shared/alerts/templates.py | 44 ++ 6 files changed, 846 insertions(+), 3 deletions(-) create mode 100644 services/forecasting/app/clients/inventory_client.py create mode 100644 services/forecasting/app/services/forecasting_alert_service.py diff --git a/docker-compose.yml b/docker-compose.yml index 45d3cbfc..79ae2ab3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -88,10 +88,11 @@ services: bakery-network: ipv4_address: 172.20.0.11 healthcheck: - test: ["CMD", "rabbitmq-diagnostics", "ping"] + test: ["CMD", "timeout", "5", "bash", "-c", " Optional[str]: + """ + Get product name from inventory service + Returns None if service is unavailable or product not found + """ + try: + async with aiohttp.ClientSession(timeout=self.timeout) as session: + url = f"{self.base_url}/api/v1/products/{inventory_product_id}" + headers = {"X-Tenant-ID": tenant_id} + + async with session.get(url, headers=headers) as response: + if response.status == 200: + data = await response.json() + return data.get("name", f"Product-{inventory_product_id}") + else: + logger.debug("Product not found in inventory service", + inventory_product_id=inventory_product_id, + status=response.status) + return None + + except Exception as e: + logger.debug("Failed to get product name from inventory service", + inventory_product_id=inventory_product_id, + error=str(e)) + return None + + async def get_multiple_product_names(self, tenant_id: str, product_ids: list) -> Dict[str, str]: + """ + Get multiple product names efficiently + Returns a mapping of product_id -> product_name + """ + try: + async with aiohttp.ClientSession(timeout=self.timeout) as session: + url = f"{self.base_url}/api/v1/products/batch" + headers = {"X-Tenant-ID": tenant_id} + payload = {"product_ids": product_ids} + + async with session.post(url, json=payload, headers=headers) as response: + if response.status == 200: + data = await response.json() + return {item["id"]: item["name"] for item in data.get("products", [])} + else: + logger.debug("Batch product lookup failed", + product_count=len(product_ids), + status=response.status) + return {} + + except Exception as e: + logger.debug("Failed to get product names from inventory service", + product_count=len(product_ids), + error=str(e)) + return {} + +# Global client instance +_inventory_client = None + +def get_inventory_client() -> InventoryServiceClient: + """Get the global inventory client instance""" + global _inventory_client + if _inventory_client is None: + _inventory_client = InventoryServiceClient() + return _inventory_client \ No newline at end of file diff --git a/services/forecasting/app/main.py b/services/forecasting/app/main.py index 9e6a5a89..7800ba35 100644 --- a/services/forecasting/app/main.py +++ b/services/forecasting/app/main.py @@ -18,6 +18,7 @@ from app.api import forecasts, predictions from app.services.messaging import setup_messaging, cleanup_messaging +from app.services.forecasting_alert_service import ForecastingAlertService from shared.monitoring.logging import setup_logging from shared.monitoring.metrics import MetricsCollector @@ -28,6 +29,9 @@ logger = structlog.get_logger() # Initialize metrics collector metrics_collector = MetricsCollector("forecasting-service") +# Initialize alert service +alert_service = None + @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager for startup and shutdown events""" @@ -45,6 +49,13 @@ async def lifespan(app: FastAPI): await setup_messaging() logger.info("Messaging initialized") + # Initialize forecasting alert service + logger.info("Setting up forecasting alert service") + global alert_service + alert_service = ForecastingAlertService(settings) + await alert_service.start() + logger.info("Forecasting alert service initialized") + # Register custom metrics metrics_collector.register_counter("forecasts_generated_total", "Total forecasts generated") metrics_collector.register_counter("predictions_served_total", "Total predictions served") @@ -70,6 +81,11 @@ async def lifespan(app: FastAPI): logger.info("Shutting down Forecasting Service") try: + # Cleanup alert service + if alert_service: + await alert_service.stop() + logger.info("Alert service cleanup completed") + await cleanup_messaging() logger.info("Messaging cleanup completed") except Exception as e: @@ -104,12 +120,16 @@ app.include_router(predictions.router, prefix="/api/v1", tags=["predictions"]) async def health_check(): """Health check endpoint""" db_health = await get_db_health() + alert_health = await alert_service.health_check() if alert_service else {"status": "not_initialized"} + + overall_health = db_health and alert_health.get("status") == "healthy" return { - "status": "healthy" if db_health else "unhealthy", + "status": "healthy" if overall_health else "unhealthy", "service": "forecasting-service", "version": "1.0.0", "database": "connected" if db_health else "disconnected", + "alert_service": alert_health, "timestamp": structlog.get_logger().info("Health check requested") } @@ -118,6 +138,13 @@ async def get_metrics(): """Metrics endpoint for Prometheus""" return metrics_collector.get_metrics() +@app.get("/alert-metrics") +async def get_alert_metrics(): + """Alert service metrics endpoint""" + if alert_service: + return alert_service.get_metrics() + return {"error": "Alert service not initialized"} + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/services/forecasting/app/models/forecasts.py b/services/forecasting/app/models/forecasts.py index 625855bd..3570bae5 100644 --- a/services/forecasting/app/models/forecasts.py +++ b/services/forecasting/app/models/forecasts.py @@ -19,6 +19,7 @@ class Forecast(Base): id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True) inventory_product_id = Column(UUID(as_uuid=True), nullable=False, index=True) # Reference to inventory service + product_name = Column(String(255), nullable=False, index=True) # Product name stored locally location = Column(String(255), nullable=False, index=True) # Forecast period diff --git a/services/forecasting/app/services/forecasting_alert_service.py b/services/forecasting/app/services/forecasting_alert_service.py new file mode 100644 index 00000000..07c8c63b --- /dev/null +++ b/services/forecasting/app/services/forecasting_alert_service.py @@ -0,0 +1,688 @@ +# services/forecasting/app/services/forecasting_alert_service.py +""" +Forecasting-specific alert and recommendation detection service +Monitors demand patterns, weather impacts, and holiday preparations +""" + +import json +from typing import List, Dict, Any, Optional +from uuid import UUID +from datetime import datetime, timedelta +import structlog +from apscheduler.triggers.cron import CronTrigger + +from shared.alerts.base_service import BaseAlertService, AlertServiceMixin +from shared.alerts.templates import format_item_message +from app.clients.inventory_client import get_inventory_client + +logger = structlog.get_logger() + +class ForecastingAlertService(BaseAlertService, AlertServiceMixin): + """Forecasting service alert and recommendation detection""" + + def setup_scheduled_checks(self): + """Forecasting-specific scheduled checks for alerts and recommendations""" + + # Weekend demand surge analysis - every Friday at 3 PM + self.scheduler.add_job( + self.check_weekend_demand_surge, + CronTrigger(day_of_week=4, hour=15, minute=0), # Friday 3 PM + id='weekend_surge_check', + misfire_grace_time=3600, + max_instances=1 + ) + + # Weather impact analysis - every 6 hours during business days + self.scheduler.add_job( + self.check_weather_impact, + CronTrigger(hour='6,12,18', day_of_week='0-6'), + id='weather_impact_check', + misfire_grace_time=300, + max_instances=1 + ) + + # Holiday preparation analysis - daily at 9 AM + self.scheduler.add_job( + self.check_holiday_preparation, + CronTrigger(hour=9, minute=0), + id='holiday_prep_check', + misfire_grace_time=3600, + max_instances=1 + ) + + # Demand pattern analysis - every Monday at 8 AM + self.scheduler.add_job( + self.analyze_demand_patterns, + CronTrigger(day_of_week=0, hour=8, minute=0), + id='demand_pattern_analysis', + misfire_grace_time=3600, + max_instances=1 + ) + + logger.info("Forecasting alert schedules configured", + service=self.config.SERVICE_NAME) + + async def check_weekend_demand_surge(self): + """Check for predicted weekend demand surges (alerts)""" + try: + self._checks_performed += 1 + + query = """ + WITH weekend_forecast AS ( + SELECT + f.tenant_id, + f.inventory_product_id, + f.product_name, + f.predicted_demand, + f.forecast_date, + LAG(f.predicted_demand, 7) OVER ( + PARTITION BY f.tenant_id, f.inventory_product_id + ORDER BY f.forecast_date + ) as prev_week_demand, + AVG(f.predicted_demand) OVER ( + PARTITION BY f.tenant_id, f.inventory_product_id + ORDER BY f.forecast_date + ROWS BETWEEN 6 PRECEDING AND CURRENT ROW + ) as avg_weekly_demand + FROM forecasts f + WHERE f.forecast_date >= CURRENT_DATE + INTERVAL '1 day' + AND f.forecast_date <= CURRENT_DATE + INTERVAL '3 days' + AND EXTRACT(DOW FROM f.forecast_date) IN (6, 0) -- Saturday, Sunday + AND f.tenant_id = $1 + ), + surge_analysis AS ( + SELECT *, + CASE + WHEN prev_week_demand > 0 THEN + (predicted_demand - prev_week_demand) / prev_week_demand * 100 + ELSE 0 + END as growth_percentage, + CASE + WHEN avg_weekly_demand > 0 THEN + (predicted_demand - avg_weekly_demand) / avg_weekly_demand * 100 + ELSE 0 + END as avg_growth_percentage + FROM weekend_forecast + ) + SELECT * FROM surge_analysis + WHERE growth_percentage > 50 OR avg_growth_percentage > 50 + ORDER BY growth_percentage DESC + """ + + tenants = await self.get_active_tenants() + + for tenant_id in tenants: + try: + from sqlalchemy import text + async with self.db_manager.get_session() as session: + result = await session.execute(text(query), {"tenant_id": tenant_id}) + surges = result.fetchall() + + for surge in surges: + await self._process_weekend_surge(tenant_id, surge) + + except Exception as e: + logger.error("Error checking weekend demand surge", + tenant_id=str(tenant_id), + error=str(e)) + + except Exception as e: + logger.error("Weekend demand surge check failed", error=str(e)) + self._errors_count += 1 + + async def _process_weekend_surge(self, tenant_id: UUID, surge: Dict[str, Any]): + """Process weekend demand surge alert""" + try: + growth_percentage = surge['growth_percentage'] + avg_growth_percentage = surge['avg_growth_percentage'] + max_growth = max(growth_percentage, avg_growth_percentage) + + # Resolve product name with fallback + product_name = await self._resolve_product_name( + tenant_id, + str(surge['inventory_product_id']), + surge.get('product_name') + ) + + # Determine severity based on growth magnitude + if max_growth > 100: + severity = 'high' + elif max_growth > 75: + severity = 'medium' + else: + severity = 'low' + + # Format message based on weather conditions (simplified check) + weather_favorable = await self._check_favorable_weather(surge['forecast_date']) + + await self.publish_item(tenant_id, { + 'type': 'demand_surge_weekend', + 'severity': severity, + 'title': f'📈 Fin de semana con alta demanda: {product_name}', + 'message': f'📈 Fin de semana con alta demanda: {product_name} +{max_growth:.0f}%', + 'actions': ['increase_production', 'order_extra_ingredients', 'schedule_staff'], + 'triggers': [ + f'weekend_forecast > {max_growth:.0f}%_normal', + 'weather_favorable' if weather_favorable else 'weather_normal' + ], + 'metadata': { + 'product_name': product_name, + 'inventory_product_id': str(surge['inventory_product_id']), + 'predicted_demand': float(surge['predicted_demand']), + 'growth_percentage': float(max_growth), + 'forecast_date': surge['forecast_date'].isoformat(), + 'weather_favorable': weather_favorable + } + }, item_type='alert') + + except Exception as e: + logger.error("Error processing weekend surge", + product_name=surge.get('product_name'), + error=str(e)) + + async def check_weather_impact(self): + """Check for weather impact on demand (alerts)""" + try: + self._checks_performed += 1 + + # Get weather forecast data and correlate with demand patterns + query = """ + WITH weather_impact AS ( + SELECT + f.tenant_id, + f.inventory_product_id, + f.product_name, + f.predicted_demand, + f.forecast_date, + f.weather_precipitation, + f.weather_temperature, + f.traffic_volume, + AVG(f.predicted_demand) OVER ( + PARTITION BY f.tenant_id, f.inventory_product_id + ORDER BY f.forecast_date + ROWS BETWEEN 6 PRECEDING AND CURRENT ROW + ) as avg_demand + FROM forecasts f + WHERE f.forecast_date >= CURRENT_DATE + INTERVAL '1 day' + AND f.forecast_date <= CURRENT_DATE + INTERVAL '2 days' + AND f.tenant_id = $1 + ), + rain_impact AS ( + SELECT *, + CASE + WHEN weather_precipitation > 2.0 THEN true + ELSE false + END as rain_forecast, + CASE + WHEN traffic_volume < 80 THEN true + ELSE false + END as low_traffic_expected, + (predicted_demand - avg_demand) / avg_demand * 100 as demand_change + FROM weather_impact + ) + SELECT * FROM rain_impact + WHERE rain_forecast = true OR demand_change < -15 + ORDER BY demand_change ASC + """ + + tenants = await self.get_active_tenants() + + for tenant_id in tenants: + try: + from sqlalchemy import text + async with self.db_manager.get_session() as session: + result = await session.execute(text(query), {"tenant_id": tenant_id}) + weather_impacts = result.fetchall() + + for impact in weather_impacts: + await self._process_weather_impact(tenant_id, impact) + + except Exception as e: + logger.error("Error checking weather impact", + tenant_id=str(tenant_id), + error=str(e)) + + except Exception as e: + logger.error("Weather impact check failed", error=str(e)) + self._errors_count += 1 + + async def _process_weather_impact(self, tenant_id: UUID, impact: Dict[str, Any]): + """Process weather impact alert""" + try: + rain_forecast = impact['rain_forecast'] + demand_change = impact['demand_change'] + precipitation = impact['weather_precipitation'] or 0.0 + + if rain_forecast: + # Rain impact alert + triggers = ['rain_forecast'] + if demand_change < -15: + triggers.append('outdoor_events_cancelled') + + await self.publish_item(tenant_id, { + 'type': 'weather_impact_alert', + 'severity': 'low', + 'title': '🌧️ Impacto climático previsto', + 'message': '🌧️ Lluvia prevista: -20% tráfico peatonal esperado', + 'actions': ['reduce_fresh_production', 'focus_comfort_products', 'delivery_promo'], + 'triggers': triggers, + 'metadata': { + 'forecast_date': impact['forecast_date'].isoformat(), + 'precipitation_mm': float(precipitation), + 'expected_demand_change': float(demand_change), + 'traffic_volume': impact.get('traffic_volume', 100), + 'weather_type': 'rain' + } + }, item_type='alert') + + elif demand_change < -20: + # General weather impact alert + product_name = await self._resolve_product_name( + tenant_id, + str(impact['inventory_product_id']), + impact.get('product_name') + ) + + await self.publish_item(tenant_id, { + 'type': 'weather_impact_alert', + 'severity': 'low', + 'title': f'🌤️ Impacto climático: {product_name}', + 'message': f'Condiciones climáticas pueden reducir demanda de {product_name} en {abs(demand_change):.0f}%', + 'actions': ['adjust_production', 'focus_indoor_products', 'plan_promotions'], + 'triggers': ['weather_conditions', 'demand_forecast_low'], + 'metadata': { + 'product_name': product_name, + 'forecast_date': impact['forecast_date'].isoformat(), + 'expected_demand_change': float(demand_change), + 'temperature': impact.get('weather_temperature'), + 'weather_type': 'general' + } + }, item_type='alert') + + except Exception as e: + logger.error("Error processing weather impact", + product_name=impact.get('product_name'), + error=str(e)) + + async def check_holiday_preparation(self): + """Check for upcoming Spanish holidays requiring preparation (alerts)""" + try: + self._checks_performed += 1 + + # Check for Spanish holidays in the next 3-7 days + upcoming_holidays = await self._get_upcoming_spanish_holidays(3, 7) + + if not upcoming_holidays: + return + + # Analyze historical demand spikes for holidays + query = """ + WITH holiday_demand AS ( + SELECT + f.tenant_id, + f.inventory_product_id, + f.product_name, + AVG(f.predicted_demand) as avg_holiday_demand, + AVG(CASE WHEN f.is_holiday = false THEN f.predicted_demand END) as avg_normal_demand, + COUNT(*) as forecast_count + FROM forecasts f + WHERE f.created_at > CURRENT_DATE - INTERVAL '365 days' + AND f.tenant_id = $1 + GROUP BY f.tenant_id, f.inventory_product_id, f.product_name + HAVING COUNT(*) >= 10 + ), + demand_spike_analysis AS ( + SELECT *, + CASE + WHEN avg_normal_demand > 0 THEN + (avg_holiday_demand - avg_normal_demand) / avg_normal_demand * 100 + ELSE 0 + END as spike_percentage + FROM holiday_demand + ) + SELECT * FROM demand_spike_analysis + WHERE spike_percentage > 25 + ORDER BY spike_percentage DESC + """ + + tenants = await self.get_active_tenants() + + for tenant_id in tenants: + try: + from sqlalchemy import text + async with self.db_manager.get_session() as session: + result = await session.execute(text(query), {"tenant_id": tenant_id}) + demand_spikes = result.fetchall() + + for holiday_info in upcoming_holidays: + for spike in demand_spikes: + await self._process_holiday_preparation( + tenant_id, holiday_info, spike + ) + + except Exception as e: + logger.error("Error checking holiday preparation", + tenant_id=str(tenant_id), + error=str(e)) + + except Exception as e: + logger.error("Holiday preparation check failed", error=str(e)) + self._errors_count += 1 + + async def _process_holiday_preparation(self, tenant_id: UUID, holiday: Dict[str, Any], spike: Dict[str, Any]): + """Process holiday preparation alert""" + try: + days_until_holiday = holiday['days_until'] + holiday_name = holiday['name'] + spike_percentage = spike['spike_percentage'] + + # Determine severity based on spike magnitude and preparation time + if spike_percentage > 75 and days_until_holiday <= 3: + severity = 'high' + elif spike_percentage > 50 or days_until_holiday <= 3: + severity = 'medium' + else: + severity = 'low' + + triggers = [f'spanish_holiday_in_{days_until_holiday}_days'] + if spike_percentage > 25: + triggers.append('historical_demand_spike') + + await self.publish_item(tenant_id, { + 'type': 'holiday_preparation', + 'severity': severity, + 'title': f'🎉 Preparación para {holiday_name}', + 'message': f'🎉 {holiday_name} en {days_until_holiday} días: pedidos especiales aumentan {spike_percentage:.0f}%', + 'actions': ['prepare_special_menu', 'stock_decorations', 'extend_hours'], + 'triggers': triggers, + 'metadata': { + 'holiday_name': holiday_name, + 'days_until_holiday': days_until_holiday, + 'product_name': spike['product_name'], + 'spike_percentage': float(spike_percentage), + 'avg_holiday_demand': float(spike['avg_holiday_demand']), + 'avg_normal_demand': float(spike['avg_normal_demand']), + 'holiday_date': holiday['date'].isoformat() + } + }, item_type='alert') + + except Exception as e: + logger.error("Error processing holiday preparation", + holiday_name=holiday.get('name'), + error=str(e)) + + async def analyze_demand_patterns(self): + """Analyze demand patterns for recommendations""" + try: + self._checks_performed += 1 + + # Analyze weekly patterns for optimization opportunities + query = """ + WITH weekly_patterns AS ( + SELECT + f.tenant_id, + f.inventory_product_id, + f.product_name, + EXTRACT(DOW FROM f.forecast_date) as day_of_week, + AVG(f.predicted_demand) as avg_demand, + STDDEV(f.predicted_demand) as demand_variance, + COUNT(*) as data_points + FROM forecasts f + WHERE f.created_at > CURRENT_DATE - INTERVAL '60 days' + AND f.tenant_id = $1 + GROUP BY f.tenant_id, f.inventory_product_id, f.product_name, EXTRACT(DOW FROM f.forecast_date) + HAVING COUNT(*) >= 5 + ), + pattern_analysis AS ( + SELECT + tenant_id, inventory_product_id, product_name, + MAX(avg_demand) as peak_demand, + MIN(avg_demand) as min_demand, + AVG(avg_demand) as overall_avg, + MAX(avg_demand) - MIN(avg_demand) as demand_range + FROM weekly_patterns + GROUP BY tenant_id, inventory_product_id, product_name + ) + SELECT * FROM pattern_analysis + WHERE demand_range > overall_avg * 0.3 + AND peak_demand > overall_avg * 1.5 + ORDER BY demand_range DESC + """ + + tenants = await self.get_active_tenants() + + for tenant_id in tenants: + try: + from sqlalchemy import text + async with self.db_manager.get_session() as session: + result = await session.execute(text(query), {"tenant_id": tenant_id}) + patterns = result.fetchall() + + for pattern in patterns: + await self._generate_demand_pattern_recommendation(tenant_id, pattern) + + except Exception as e: + logger.error("Error analyzing demand patterns", + tenant_id=str(tenant_id), + error=str(e)) + + except Exception as e: + logger.error("Demand pattern analysis failed", error=str(e)) + self._errors_count += 1 + + async def _generate_demand_pattern_recommendation(self, tenant_id: UUID, pattern: Dict[str, Any]): + """Generate demand pattern optimization recommendation""" + try: + if not self.should_send_recommendation(tenant_id, 'demand_optimization'): + return + + demand_range = pattern['demand_range'] + peak_demand = pattern['peak_demand'] + overall_avg = pattern['overall_avg'] + + optimization_potential = (demand_range / overall_avg) * 100 + + await self.publish_item(tenant_id, { + 'type': 'demand_pattern_optimization', + 'severity': 'medium', + 'title': f'📊 Optimización de Patrones: {pattern["product_name"]}', + 'message': f'Demanda de {pattern["product_name"]} varía {optimization_potential:.0f}% durante la semana. Oportunidad de optimización.', + 'actions': ['Analizar patrones semanales', 'Ajustar producción diaria', 'Optimizar inventario', 'Planificar promociones'], + 'metadata': { + 'product_name': pattern['product_name'], + 'optimization_potential': float(optimization_potential), + 'peak_demand': float(peak_demand), + 'min_demand': float(pattern['min_demand']), + 'demand_range': float(demand_range), + 'recommendation_type': 'demand_optimization' + } + }, item_type='recommendation') + + except Exception as e: + logger.error("Error generating demand pattern recommendation", + product_name=pattern.get('product_name'), + error=str(e)) + + # Helper methods + async def _resolve_product_name(self, tenant_id: UUID, inventory_product_id: str, fallback_name: Optional[str] = None) -> str: + """ + Resolve product name, with fallbacks for when inventory service is unavailable + """ + # If we already have a product name, use it + if fallback_name: + return fallback_name + + # Try to get from inventory service + try: + inventory_client = get_inventory_client() + product_name = await inventory_client.get_product_name(str(tenant_id), inventory_product_id) + + if product_name: + return product_name + except Exception as e: + logger.debug("Failed to resolve product name from inventory service", + inventory_product_id=inventory_product_id, + error=str(e)) + + # Fallback to generic name + return f"Product-{inventory_product_id[:8]}" + + async def _check_favorable_weather(self, forecast_date: datetime) -> bool: + """Simple weather favorability check""" + # In a real implementation, this would check actual weather APIs + # For now, return a simple heuristic based on season + month = forecast_date.month + return month in [4, 5, 6, 7, 8, 9] # Spring/Summer months + + async def _get_upcoming_spanish_holidays(self, min_days: int, max_days: int) -> List[Dict[str, Any]]: + """Get upcoming Spanish holidays within date range""" + today = datetime.now().date() + holidays = [] + + # Major Spanish holidays + spanish_holidays = [ + {"name": "Año Nuevo", "month": 1, "day": 1}, + {"name": "Reyes Magos", "month": 1, "day": 6}, + {"name": "Día del Trabajador", "month": 5, "day": 1}, + {"name": "Asunción", "month": 8, "day": 15}, + {"name": "Fiesta Nacional", "month": 10, "day": 12}, + {"name": "Todos los Santos", "month": 11, "day": 1}, + {"name": "Constitución", "month": 12, "day": 6}, + {"name": "Inmaculada", "month": 12, "day": 8}, + {"name": "Navidad", "month": 12, "day": 25} + ] + + current_year = today.year + + for holiday in spanish_holidays: + # Check current year + holiday_date = datetime(current_year, holiday["month"], holiday["day"]).date() + days_until = (holiday_date - today).days + + if min_days <= days_until <= max_days: + holidays.append({ + "name": holiday["name"], + "date": holiday_date, + "days_until": days_until + }) + + # Check next year if needed + if holiday_date < today: + next_year_date = datetime(current_year + 1, holiday["month"], holiday["day"]).date() + days_until = (next_year_date - today).days + + if min_days <= days_until <= max_days: + holidays.append({ + "name": holiday["name"], + "date": next_year_date, + "days_until": days_until + }) + + return holidays + + async def register_db_listeners(self, conn): + """Register forecasting-specific database listeners""" + try: + await conn.add_listener('forecasting_alerts', self.handle_forecasting_db_alert) + + logger.info("Database listeners registered", + service=self.config.SERVICE_NAME) + except Exception as e: + logger.error("Failed to register database listeners", + service=self.config.SERVICE_NAME, + error=str(e)) + + async def handle_forecasting_db_alert(self, connection, pid, channel, payload): + """Handle forecasting alert from database trigger""" + try: + data = json.loads(payload) + tenant_id = UUID(data['tenant_id']) + + if data['alert_type'] == 'demand_spike': + await self.publish_item(tenant_id, { + 'type': 'demand_spike_detected', + 'severity': 'medium', + 'title': f'📈 Pico de Demanda Detectado', + 'message': f'Demanda inesperada de {data["product_name"]}: {data["spike_percentage"]:.0f}% sobre lo normal.', + 'actions': ['Revisar inventario', 'Aumentar producción', 'Notificar equipo'], + 'metadata': { + 'product_name': data['product_name'], + 'spike_percentage': data['spike_percentage'], + 'trigger_source': 'database' + } + }, item_type='alert') + + except Exception as e: + logger.error("Error handling forecasting DB alert", error=str(e)) + + async def start_event_listener(self): + """Listen for forecasting-affecting events""" + try: + # Subscribe to weather events that might affect forecasting + await self.rabbitmq_client.consume_events( + "bakery_events", + f"forecasting.weather.{self.config.SERVICE_NAME}", + "weather.severe_change", + self.handle_weather_event + ) + + # Subscribe to sales events that might trigger demand alerts + await self.rabbitmq_client.consume_events( + "bakery_events", + f"forecasting.sales.{self.config.SERVICE_NAME}", + "sales.unexpected_spike", + self.handle_sales_spike_event + ) + + logger.info("Event listeners started", + service=self.config.SERVICE_NAME) + except Exception as e: + logger.error("Failed to start event listeners", + service=self.config.SERVICE_NAME, + error=str(e)) + + async def handle_weather_event(self, message): + """Handle severe weather change event""" + try: + weather_data = json.loads(message.body) + tenant_id = UUID(weather_data['tenant_id']) + + if weather_data['change_type'] == 'severe_storm': + await self.publish_item(tenant_id, { + 'type': 'severe_weather_impact', + 'severity': 'high', + 'title': '⛈️ Impacto Climático Severo', + 'message': f'Tormenta severa prevista: reducir producción de productos frescos y activar delivery.', + 'actions': ['reduce_fresh_production', 'activate_delivery', 'secure_outdoor_displays'], + 'metadata': { + 'weather_type': weather_data['change_type'], + 'severity_level': weather_data.get('severity', 'high'), + 'duration_hours': weather_data.get('duration_hours', 0) + } + }, item_type='alert') + + except Exception as e: + logger.error("Error handling weather event", error=str(e)) + + async def handle_sales_spike_event(self, message): + """Handle unexpected sales spike event""" + try: + sales_data = json.loads(message.body) + tenant_id = UUID(sales_data['tenant_id']) + + await self.publish_item(tenant_id, { + 'type': 'unexpected_demand_spike', + 'severity': 'medium', + 'title': '📈 Pico de Ventas Inesperado', + 'message': f'Ventas de {sales_data["product_name"]} {sales_data["spike_percentage"]:.0f}% sobre pronóstico.', + 'actions': ['increase_production', 'check_inventory', 'update_forecast'], + 'metadata': { + 'product_name': sales_data['product_name'], + 'spike_percentage': sales_data['spike_percentage'], + 'current_sales': sales_data.get('current_sales', 0), + 'forecasted_sales': sales_data.get('forecasted_sales', 0) + } + }, item_type='alert') + + except Exception as e: + logger.error("Error handling sales spike event", error=str(e)) \ No newline at end of file diff --git a/shared/alerts/templates.py b/shared/alerts/templates.py index 1483a386..2c5c9f86 100644 --- a/shared/alerts/templates.py +++ b/shared/alerts/templates.py @@ -142,6 +142,50 @@ ITEM_TEMPLATES = { 'message': 'Picos de trabajo los {days} a las {hours}. Considerar ajustar turnos para mejor eficiencia.', 'actions': ['Analizar cargas trabajo', 'Reorganizar turnos', 'Entrenar polivalencia', 'Contratar temporal'] } + }, + + # FORECASTING ALERTS - Demand prediction and planning alerts + 'demand_surge_weekend': { + 'es': { + 'title': '📈 Fin de semana con alta demanda: {product_name}', + 'message': '📈 Fin de semana con alta demanda: {product_name} +{percentage}%', + 'actions': ['Aumentar producción', 'Pedir ingredientes extra', 'Programar personal'] + } + }, + 'weather_impact_alert': { + 'es': { + 'title': '🌧️ Impacto climático previsto', + 'message': '🌧️ Lluvia prevista: -20% tráfico peatonal esperado', + 'actions': ['Reducir producción fresca', 'Enfoque productos comfort', 'Promoción delivery'] + } + }, + 'holiday_preparation': { + 'es': { + 'title': '🎉 {holiday_name} en {days} días', + 'message': '🎉 {holiday_name} en {days} días: pedidos especiales aumentan {percentage}%', + 'actions': ['Preparar menú especial', 'Stock decoraciones', 'Extender horarios'] + } + }, + 'demand_pattern_optimization': { + 'es': { + 'title': '📊 Optimización de Patrones: {product_name}', + 'message': 'Demanda de {product_name} varía {variation_percent}% durante la semana. Oportunidad de optimización.', + 'actions': ['Analizar patrones semanales', 'Ajustar producción diaria', 'Optimizar inventario', 'Planificar promociones'] + } + }, + 'severe_weather_impact': { + 'es': { + 'title': '⛈️ Impacto Climático Severo', + 'message': 'Tormenta severa prevista: reducir producción de productos frescos y activar delivery.', + 'actions': ['Reducir producción fresca', 'Activar delivery', 'Asegurar displays exteriores'] + } + }, + 'unexpected_demand_spike': { + 'es': { + 'title': '📈 Pico de Demanda Inesperado', + 'message': 'Ventas de {product_name} {spike_percentage}% sobre pronóstico.', + 'actions': ['Aumentar producción', 'Revisar inventario', 'Actualizar pronóstico'] + } } }