Add new alert architecture to forecasting

This commit is contained in:
Urtzi Alfaro
2025-08-23 15:48:32 +02:00
parent 4b4268d640
commit a214617815
6 changed files with 846 additions and 3 deletions

View File

@@ -0,0 +1,82 @@
# services/forecasting/app/clients/inventory_client.py
"""
Simple client for inventory service integration
Used when product names are not available locally
"""
import aiohttp
import structlog
from typing import Optional, Dict, Any
import os
logger = structlog.get_logger()
class InventoryServiceClient:
"""Simple client for inventory service interactions"""
def __init__(self, base_url: str = None):
self.base_url = base_url or os.getenv("INVENTORY_SERVICE_URL", "http://inventory-service:8000")
self.timeout = aiohttp.ClientTimeout(total=5) # 5 second timeout
async def get_product_name(self, tenant_id: str, inventory_product_id: str) -> Optional[str]:
"""
Get product name from inventory service
Returns None if service is unavailable or product not found
"""
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
url = f"{self.base_url}/api/v1/products/{inventory_product_id}"
headers = {"X-Tenant-ID": tenant_id}
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return data.get("name", f"Product-{inventory_product_id}")
else:
logger.debug("Product not found in inventory service",
inventory_product_id=inventory_product_id,
status=response.status)
return None
except Exception as e:
logger.debug("Failed to get product name from inventory service",
inventory_product_id=inventory_product_id,
error=str(e))
return None
async def get_multiple_product_names(self, tenant_id: str, product_ids: list) -> Dict[str, str]:
"""
Get multiple product names efficiently
Returns a mapping of product_id -> product_name
"""
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
url = f"{self.base_url}/api/v1/products/batch"
headers = {"X-Tenant-ID": tenant_id}
payload = {"product_ids": product_ids}
async with session.post(url, json=payload, headers=headers) as response:
if response.status == 200:
data = await response.json()
return {item["id"]: item["name"] for item in data.get("products", [])}
else:
logger.debug("Batch product lookup failed",
product_count=len(product_ids),
status=response.status)
return {}
except Exception as e:
logger.debug("Failed to get product names from inventory service",
product_count=len(product_ids),
error=str(e))
return {}
# Global client instance
_inventory_client = None
def get_inventory_client() -> InventoryServiceClient:
"""Get the global inventory client instance"""
global _inventory_client
if _inventory_client is None:
_inventory_client = InventoryServiceClient()
return _inventory_client

View File

@@ -18,6 +18,7 @@ from app.api import forecasts, predictions
from app.services.messaging import setup_messaging, cleanup_messaging
from app.services.forecasting_alert_service import ForecastingAlertService
from shared.monitoring.logging import setup_logging
from shared.monitoring.metrics import MetricsCollector
@@ -28,6 +29,9 @@ logger = structlog.get_logger()
# Initialize metrics collector
metrics_collector = MetricsCollector("forecasting-service")
# Initialize alert service
alert_service = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager for startup and shutdown events"""
@@ -45,6 +49,13 @@ async def lifespan(app: FastAPI):
await setup_messaging()
logger.info("Messaging initialized")
# Initialize forecasting alert service
logger.info("Setting up forecasting alert service")
global alert_service
alert_service = ForecastingAlertService(settings)
await alert_service.start()
logger.info("Forecasting alert service initialized")
# Register custom metrics
metrics_collector.register_counter("forecasts_generated_total", "Total forecasts generated")
metrics_collector.register_counter("predictions_served_total", "Total predictions served")
@@ -70,6 +81,11 @@ async def lifespan(app: FastAPI):
logger.info("Shutting down Forecasting Service")
try:
# Cleanup alert service
if alert_service:
await alert_service.stop()
logger.info("Alert service cleanup completed")
await cleanup_messaging()
logger.info("Messaging cleanup completed")
except Exception as e:
@@ -104,12 +120,16 @@ app.include_router(predictions.router, prefix="/api/v1", tags=["predictions"])
async def health_check():
"""Health check endpoint"""
db_health = await get_db_health()
alert_health = await alert_service.health_check() if alert_service else {"status": "not_initialized"}
overall_health = db_health and alert_health.get("status") == "healthy"
return {
"status": "healthy" if db_health else "unhealthy",
"status": "healthy" if overall_health else "unhealthy",
"service": "forecasting-service",
"version": "1.0.0",
"database": "connected" if db_health else "disconnected",
"alert_service": alert_health,
"timestamp": structlog.get_logger().info("Health check requested")
}
@@ -118,6 +138,13 @@ async def get_metrics():
"""Metrics endpoint for Prometheus"""
return metrics_collector.get_metrics()
@app.get("/alert-metrics")
async def get_alert_metrics():
"""Alert service metrics endpoint"""
if alert_service:
return alert_service.get_metrics()
return {"error": "Alert service not initialized"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -19,6 +19,7 @@ class Forecast(Base):
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True)
inventory_product_id = Column(UUID(as_uuid=True), nullable=False, index=True) # Reference to inventory service
product_name = Column(String(255), nullable=False, index=True) # Product name stored locally
location = Column(String(255), nullable=False, index=True)
# Forecast period

View File

@@ -0,0 +1,688 @@
# services/forecasting/app/services/forecasting_alert_service.py
"""
Forecasting-specific alert and recommendation detection service
Monitors demand patterns, weather impacts, and holiday preparations
"""
import json
from typing import List, Dict, Any, Optional
from uuid import UUID
from datetime import datetime, timedelta
import structlog
from apscheduler.triggers.cron import CronTrigger
from shared.alerts.base_service import BaseAlertService, AlertServiceMixin
from shared.alerts.templates import format_item_message
from app.clients.inventory_client import get_inventory_client
logger = structlog.get_logger()
class ForecastingAlertService(BaseAlertService, AlertServiceMixin):
"""Forecasting service alert and recommendation detection"""
def setup_scheduled_checks(self):
"""Forecasting-specific scheduled checks for alerts and recommendations"""
# Weekend demand surge analysis - every Friday at 3 PM
self.scheduler.add_job(
self.check_weekend_demand_surge,
CronTrigger(day_of_week=4, hour=15, minute=0), # Friday 3 PM
id='weekend_surge_check',
misfire_grace_time=3600,
max_instances=1
)
# Weather impact analysis - every 6 hours during business days
self.scheduler.add_job(
self.check_weather_impact,
CronTrigger(hour='6,12,18', day_of_week='0-6'),
id='weather_impact_check',
misfire_grace_time=300,
max_instances=1
)
# Holiday preparation analysis - daily at 9 AM
self.scheduler.add_job(
self.check_holiday_preparation,
CronTrigger(hour=9, minute=0),
id='holiday_prep_check',
misfire_grace_time=3600,
max_instances=1
)
# Demand pattern analysis - every Monday at 8 AM
self.scheduler.add_job(
self.analyze_demand_patterns,
CronTrigger(day_of_week=0, hour=8, minute=0),
id='demand_pattern_analysis',
misfire_grace_time=3600,
max_instances=1
)
logger.info("Forecasting alert schedules configured",
service=self.config.SERVICE_NAME)
async def check_weekend_demand_surge(self):
"""Check for predicted weekend demand surges (alerts)"""
try:
self._checks_performed += 1
query = """
WITH weekend_forecast AS (
SELECT
f.tenant_id,
f.inventory_product_id,
f.product_name,
f.predicted_demand,
f.forecast_date,
LAG(f.predicted_demand, 7) OVER (
PARTITION BY f.tenant_id, f.inventory_product_id
ORDER BY f.forecast_date
) as prev_week_demand,
AVG(f.predicted_demand) OVER (
PARTITION BY f.tenant_id, f.inventory_product_id
ORDER BY f.forecast_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as avg_weekly_demand
FROM forecasts f
WHERE f.forecast_date >= CURRENT_DATE + INTERVAL '1 day'
AND f.forecast_date <= CURRENT_DATE + INTERVAL '3 days'
AND EXTRACT(DOW FROM f.forecast_date) IN (6, 0) -- Saturday, Sunday
AND f.tenant_id = $1
),
surge_analysis AS (
SELECT *,
CASE
WHEN prev_week_demand > 0 THEN
(predicted_demand - prev_week_demand) / prev_week_demand * 100
ELSE 0
END as growth_percentage,
CASE
WHEN avg_weekly_demand > 0 THEN
(predicted_demand - avg_weekly_demand) / avg_weekly_demand * 100
ELSE 0
END as avg_growth_percentage
FROM weekend_forecast
)
SELECT * FROM surge_analysis
WHERE growth_percentage > 50 OR avg_growth_percentage > 50
ORDER BY growth_percentage DESC
"""
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
from sqlalchemy import text
async with self.db_manager.get_session() as session:
result = await session.execute(text(query), {"tenant_id": tenant_id})
surges = result.fetchall()
for surge in surges:
await self._process_weekend_surge(tenant_id, surge)
except Exception as e:
logger.error("Error checking weekend demand surge",
tenant_id=str(tenant_id),
error=str(e))
except Exception as e:
logger.error("Weekend demand surge check failed", error=str(e))
self._errors_count += 1
async def _process_weekend_surge(self, tenant_id: UUID, surge: Dict[str, Any]):
"""Process weekend demand surge alert"""
try:
growth_percentage = surge['growth_percentage']
avg_growth_percentage = surge['avg_growth_percentage']
max_growth = max(growth_percentage, avg_growth_percentage)
# Resolve product name with fallback
product_name = await self._resolve_product_name(
tenant_id,
str(surge['inventory_product_id']),
surge.get('product_name')
)
# Determine severity based on growth magnitude
if max_growth > 100:
severity = 'high'
elif max_growth > 75:
severity = 'medium'
else:
severity = 'low'
# Format message based on weather conditions (simplified check)
weather_favorable = await self._check_favorable_weather(surge['forecast_date'])
await self.publish_item(tenant_id, {
'type': 'demand_surge_weekend',
'severity': severity,
'title': f'📈 Fin de semana con alta demanda: {product_name}',
'message': f'📈 Fin de semana con alta demanda: {product_name} +{max_growth:.0f}%',
'actions': ['increase_production', 'order_extra_ingredients', 'schedule_staff'],
'triggers': [
f'weekend_forecast > {max_growth:.0f}%_normal',
'weather_favorable' if weather_favorable else 'weather_normal'
],
'metadata': {
'product_name': product_name,
'inventory_product_id': str(surge['inventory_product_id']),
'predicted_demand': float(surge['predicted_demand']),
'growth_percentage': float(max_growth),
'forecast_date': surge['forecast_date'].isoformat(),
'weather_favorable': weather_favorable
}
}, item_type='alert')
except Exception as e:
logger.error("Error processing weekend surge",
product_name=surge.get('product_name'),
error=str(e))
async def check_weather_impact(self):
"""Check for weather impact on demand (alerts)"""
try:
self._checks_performed += 1
# Get weather forecast data and correlate with demand patterns
query = """
WITH weather_impact AS (
SELECT
f.tenant_id,
f.inventory_product_id,
f.product_name,
f.predicted_demand,
f.forecast_date,
f.weather_precipitation,
f.weather_temperature,
f.traffic_volume,
AVG(f.predicted_demand) OVER (
PARTITION BY f.tenant_id, f.inventory_product_id
ORDER BY f.forecast_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as avg_demand
FROM forecasts f
WHERE f.forecast_date >= CURRENT_DATE + INTERVAL '1 day'
AND f.forecast_date <= CURRENT_DATE + INTERVAL '2 days'
AND f.tenant_id = $1
),
rain_impact AS (
SELECT *,
CASE
WHEN weather_precipitation > 2.0 THEN true
ELSE false
END as rain_forecast,
CASE
WHEN traffic_volume < 80 THEN true
ELSE false
END as low_traffic_expected,
(predicted_demand - avg_demand) / avg_demand * 100 as demand_change
FROM weather_impact
)
SELECT * FROM rain_impact
WHERE rain_forecast = true OR demand_change < -15
ORDER BY demand_change ASC
"""
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
from sqlalchemy import text
async with self.db_manager.get_session() as session:
result = await session.execute(text(query), {"tenant_id": tenant_id})
weather_impacts = result.fetchall()
for impact in weather_impacts:
await self._process_weather_impact(tenant_id, impact)
except Exception as e:
logger.error("Error checking weather impact",
tenant_id=str(tenant_id),
error=str(e))
except Exception as e:
logger.error("Weather impact check failed", error=str(e))
self._errors_count += 1
async def _process_weather_impact(self, tenant_id: UUID, impact: Dict[str, Any]):
"""Process weather impact alert"""
try:
rain_forecast = impact['rain_forecast']
demand_change = impact['demand_change']
precipitation = impact['weather_precipitation'] or 0.0
if rain_forecast:
# Rain impact alert
triggers = ['rain_forecast']
if demand_change < -15:
triggers.append('outdoor_events_cancelled')
await self.publish_item(tenant_id, {
'type': 'weather_impact_alert',
'severity': 'low',
'title': '🌧️ Impacto climático previsto',
'message': '🌧️ Lluvia prevista: -20% tráfico peatonal esperado',
'actions': ['reduce_fresh_production', 'focus_comfort_products', 'delivery_promo'],
'triggers': triggers,
'metadata': {
'forecast_date': impact['forecast_date'].isoformat(),
'precipitation_mm': float(precipitation),
'expected_demand_change': float(demand_change),
'traffic_volume': impact.get('traffic_volume', 100),
'weather_type': 'rain'
}
}, item_type='alert')
elif demand_change < -20:
# General weather impact alert
product_name = await self._resolve_product_name(
tenant_id,
str(impact['inventory_product_id']),
impact.get('product_name')
)
await self.publish_item(tenant_id, {
'type': 'weather_impact_alert',
'severity': 'low',
'title': f'🌤️ Impacto climático: {product_name}',
'message': f'Condiciones climáticas pueden reducir demanda de {product_name} en {abs(demand_change):.0f}%',
'actions': ['adjust_production', 'focus_indoor_products', 'plan_promotions'],
'triggers': ['weather_conditions', 'demand_forecast_low'],
'metadata': {
'product_name': product_name,
'forecast_date': impact['forecast_date'].isoformat(),
'expected_demand_change': float(demand_change),
'temperature': impact.get('weather_temperature'),
'weather_type': 'general'
}
}, item_type='alert')
except Exception as e:
logger.error("Error processing weather impact",
product_name=impact.get('product_name'),
error=str(e))
async def check_holiday_preparation(self):
"""Check for upcoming Spanish holidays requiring preparation (alerts)"""
try:
self._checks_performed += 1
# Check for Spanish holidays in the next 3-7 days
upcoming_holidays = await self._get_upcoming_spanish_holidays(3, 7)
if not upcoming_holidays:
return
# Analyze historical demand spikes for holidays
query = """
WITH holiday_demand AS (
SELECT
f.tenant_id,
f.inventory_product_id,
f.product_name,
AVG(f.predicted_demand) as avg_holiday_demand,
AVG(CASE WHEN f.is_holiday = false THEN f.predicted_demand END) as avg_normal_demand,
COUNT(*) as forecast_count
FROM forecasts f
WHERE f.created_at > CURRENT_DATE - INTERVAL '365 days'
AND f.tenant_id = $1
GROUP BY f.tenant_id, f.inventory_product_id, f.product_name
HAVING COUNT(*) >= 10
),
demand_spike_analysis AS (
SELECT *,
CASE
WHEN avg_normal_demand > 0 THEN
(avg_holiday_demand - avg_normal_demand) / avg_normal_demand * 100
ELSE 0
END as spike_percentage
FROM holiday_demand
)
SELECT * FROM demand_spike_analysis
WHERE spike_percentage > 25
ORDER BY spike_percentage DESC
"""
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
from sqlalchemy import text
async with self.db_manager.get_session() as session:
result = await session.execute(text(query), {"tenant_id": tenant_id})
demand_spikes = result.fetchall()
for holiday_info in upcoming_holidays:
for spike in demand_spikes:
await self._process_holiday_preparation(
tenant_id, holiday_info, spike
)
except Exception as e:
logger.error("Error checking holiday preparation",
tenant_id=str(tenant_id),
error=str(e))
except Exception as e:
logger.error("Holiday preparation check failed", error=str(e))
self._errors_count += 1
async def _process_holiday_preparation(self, tenant_id: UUID, holiday: Dict[str, Any], spike: Dict[str, Any]):
"""Process holiday preparation alert"""
try:
days_until_holiday = holiday['days_until']
holiday_name = holiday['name']
spike_percentage = spike['spike_percentage']
# Determine severity based on spike magnitude and preparation time
if spike_percentage > 75 and days_until_holiday <= 3:
severity = 'high'
elif spike_percentage > 50 or days_until_holiday <= 3:
severity = 'medium'
else:
severity = 'low'
triggers = [f'spanish_holiday_in_{days_until_holiday}_days']
if spike_percentage > 25:
triggers.append('historical_demand_spike')
await self.publish_item(tenant_id, {
'type': 'holiday_preparation',
'severity': severity,
'title': f'🎉 Preparación para {holiday_name}',
'message': f'🎉 {holiday_name} en {days_until_holiday} días: pedidos especiales aumentan {spike_percentage:.0f}%',
'actions': ['prepare_special_menu', 'stock_decorations', 'extend_hours'],
'triggers': triggers,
'metadata': {
'holiday_name': holiday_name,
'days_until_holiday': days_until_holiday,
'product_name': spike['product_name'],
'spike_percentage': float(spike_percentage),
'avg_holiday_demand': float(spike['avg_holiday_demand']),
'avg_normal_demand': float(spike['avg_normal_demand']),
'holiday_date': holiday['date'].isoformat()
}
}, item_type='alert')
except Exception as e:
logger.error("Error processing holiday preparation",
holiday_name=holiday.get('name'),
error=str(e))
async def analyze_demand_patterns(self):
"""Analyze demand patterns for recommendations"""
try:
self._checks_performed += 1
# Analyze weekly patterns for optimization opportunities
query = """
WITH weekly_patterns AS (
SELECT
f.tenant_id,
f.inventory_product_id,
f.product_name,
EXTRACT(DOW FROM f.forecast_date) as day_of_week,
AVG(f.predicted_demand) as avg_demand,
STDDEV(f.predicted_demand) as demand_variance,
COUNT(*) as data_points
FROM forecasts f
WHERE f.created_at > CURRENT_DATE - INTERVAL '60 days'
AND f.tenant_id = $1
GROUP BY f.tenant_id, f.inventory_product_id, f.product_name, EXTRACT(DOW FROM f.forecast_date)
HAVING COUNT(*) >= 5
),
pattern_analysis AS (
SELECT
tenant_id, inventory_product_id, product_name,
MAX(avg_demand) as peak_demand,
MIN(avg_demand) as min_demand,
AVG(avg_demand) as overall_avg,
MAX(avg_demand) - MIN(avg_demand) as demand_range
FROM weekly_patterns
GROUP BY tenant_id, inventory_product_id, product_name
)
SELECT * FROM pattern_analysis
WHERE demand_range > overall_avg * 0.3
AND peak_demand > overall_avg * 1.5
ORDER BY demand_range DESC
"""
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
from sqlalchemy import text
async with self.db_manager.get_session() as session:
result = await session.execute(text(query), {"tenant_id": tenant_id})
patterns = result.fetchall()
for pattern in patterns:
await self._generate_demand_pattern_recommendation(tenant_id, pattern)
except Exception as e:
logger.error("Error analyzing demand patterns",
tenant_id=str(tenant_id),
error=str(e))
except Exception as e:
logger.error("Demand pattern analysis failed", error=str(e))
self._errors_count += 1
async def _generate_demand_pattern_recommendation(self, tenant_id: UUID, pattern: Dict[str, Any]):
"""Generate demand pattern optimization recommendation"""
try:
if not self.should_send_recommendation(tenant_id, 'demand_optimization'):
return
demand_range = pattern['demand_range']
peak_demand = pattern['peak_demand']
overall_avg = pattern['overall_avg']
optimization_potential = (demand_range / overall_avg) * 100
await self.publish_item(tenant_id, {
'type': 'demand_pattern_optimization',
'severity': 'medium',
'title': f'📊 Optimización de Patrones: {pattern["product_name"]}',
'message': f'Demanda de {pattern["product_name"]} varía {optimization_potential:.0f}% durante la semana. Oportunidad de optimización.',
'actions': ['Analizar patrones semanales', 'Ajustar producción diaria', 'Optimizar inventario', 'Planificar promociones'],
'metadata': {
'product_name': pattern['product_name'],
'optimization_potential': float(optimization_potential),
'peak_demand': float(peak_demand),
'min_demand': float(pattern['min_demand']),
'demand_range': float(demand_range),
'recommendation_type': 'demand_optimization'
}
}, item_type='recommendation')
except Exception as e:
logger.error("Error generating demand pattern recommendation",
product_name=pattern.get('product_name'),
error=str(e))
# Helper methods
async def _resolve_product_name(self, tenant_id: UUID, inventory_product_id: str, fallback_name: Optional[str] = None) -> str:
"""
Resolve product name, with fallbacks for when inventory service is unavailable
"""
# If we already have a product name, use it
if fallback_name:
return fallback_name
# Try to get from inventory service
try:
inventory_client = get_inventory_client()
product_name = await inventory_client.get_product_name(str(tenant_id), inventory_product_id)
if product_name:
return product_name
except Exception as e:
logger.debug("Failed to resolve product name from inventory service",
inventory_product_id=inventory_product_id,
error=str(e))
# Fallback to generic name
return f"Product-{inventory_product_id[:8]}"
async def _check_favorable_weather(self, forecast_date: datetime) -> bool:
"""Simple weather favorability check"""
# In a real implementation, this would check actual weather APIs
# For now, return a simple heuristic based on season
month = forecast_date.month
return month in [4, 5, 6, 7, 8, 9] # Spring/Summer months
async def _get_upcoming_spanish_holidays(self, min_days: int, max_days: int) -> List[Dict[str, Any]]:
"""Get upcoming Spanish holidays within date range"""
today = datetime.now().date()
holidays = []
# Major Spanish holidays
spanish_holidays = [
{"name": "Año Nuevo", "month": 1, "day": 1},
{"name": "Reyes Magos", "month": 1, "day": 6},
{"name": "Día del Trabajador", "month": 5, "day": 1},
{"name": "Asunción", "month": 8, "day": 15},
{"name": "Fiesta Nacional", "month": 10, "day": 12},
{"name": "Todos los Santos", "month": 11, "day": 1},
{"name": "Constitución", "month": 12, "day": 6},
{"name": "Inmaculada", "month": 12, "day": 8},
{"name": "Navidad", "month": 12, "day": 25}
]
current_year = today.year
for holiday in spanish_holidays:
# Check current year
holiday_date = datetime(current_year, holiday["month"], holiday["day"]).date()
days_until = (holiday_date - today).days
if min_days <= days_until <= max_days:
holidays.append({
"name": holiday["name"],
"date": holiday_date,
"days_until": days_until
})
# Check next year if needed
if holiday_date < today:
next_year_date = datetime(current_year + 1, holiday["month"], holiday["day"]).date()
days_until = (next_year_date - today).days
if min_days <= days_until <= max_days:
holidays.append({
"name": holiday["name"],
"date": next_year_date,
"days_until": days_until
})
return holidays
async def register_db_listeners(self, conn):
"""Register forecasting-specific database listeners"""
try:
await conn.add_listener('forecasting_alerts', self.handle_forecasting_db_alert)
logger.info("Database listeners registered",
service=self.config.SERVICE_NAME)
except Exception as e:
logger.error("Failed to register database listeners",
service=self.config.SERVICE_NAME,
error=str(e))
async def handle_forecasting_db_alert(self, connection, pid, channel, payload):
"""Handle forecasting alert from database trigger"""
try:
data = json.loads(payload)
tenant_id = UUID(data['tenant_id'])
if data['alert_type'] == 'demand_spike':
await self.publish_item(tenant_id, {
'type': 'demand_spike_detected',
'severity': 'medium',
'title': f'📈 Pico de Demanda Detectado',
'message': f'Demanda inesperada de {data["product_name"]}: {data["spike_percentage"]:.0f}% sobre lo normal.',
'actions': ['Revisar inventario', 'Aumentar producción', 'Notificar equipo'],
'metadata': {
'product_name': data['product_name'],
'spike_percentage': data['spike_percentage'],
'trigger_source': 'database'
}
}, item_type='alert')
except Exception as e:
logger.error("Error handling forecasting DB alert", error=str(e))
async def start_event_listener(self):
"""Listen for forecasting-affecting events"""
try:
# Subscribe to weather events that might affect forecasting
await self.rabbitmq_client.consume_events(
"bakery_events",
f"forecasting.weather.{self.config.SERVICE_NAME}",
"weather.severe_change",
self.handle_weather_event
)
# Subscribe to sales events that might trigger demand alerts
await self.rabbitmq_client.consume_events(
"bakery_events",
f"forecasting.sales.{self.config.SERVICE_NAME}",
"sales.unexpected_spike",
self.handle_sales_spike_event
)
logger.info("Event listeners started",
service=self.config.SERVICE_NAME)
except Exception as e:
logger.error("Failed to start event listeners",
service=self.config.SERVICE_NAME,
error=str(e))
async def handle_weather_event(self, message):
"""Handle severe weather change event"""
try:
weather_data = json.loads(message.body)
tenant_id = UUID(weather_data['tenant_id'])
if weather_data['change_type'] == 'severe_storm':
await self.publish_item(tenant_id, {
'type': 'severe_weather_impact',
'severity': 'high',
'title': '⛈️ Impacto Climático Severo',
'message': f'Tormenta severa prevista: reducir producción de productos frescos y activar delivery.',
'actions': ['reduce_fresh_production', 'activate_delivery', 'secure_outdoor_displays'],
'metadata': {
'weather_type': weather_data['change_type'],
'severity_level': weather_data.get('severity', 'high'),
'duration_hours': weather_data.get('duration_hours', 0)
}
}, item_type='alert')
except Exception as e:
logger.error("Error handling weather event", error=str(e))
async def handle_sales_spike_event(self, message):
"""Handle unexpected sales spike event"""
try:
sales_data = json.loads(message.body)
tenant_id = UUID(sales_data['tenant_id'])
await self.publish_item(tenant_id, {
'type': 'unexpected_demand_spike',
'severity': 'medium',
'title': '📈 Pico de Ventas Inesperado',
'message': f'Ventas de {sales_data["product_name"]} {sales_data["spike_percentage"]:.0f}% sobre pronóstico.',
'actions': ['increase_production', 'check_inventory', 'update_forecast'],
'metadata': {
'product_name': sales_data['product_name'],
'spike_percentage': sales_data['spike_percentage'],
'current_sales': sales_data.get('current_sales', 0),
'forecasted_sales': sales_data.get('forecasted_sales', 0)
}
}, item_type='alert')
except Exception as e:
logger.error("Error handling sales spike event", error=str(e))