Files
bakery-ia/services/forecasting/app/services/forecasting_alert_service.py
2026-01-12 22:15:11 +01:00

338 lines
10 KiB
Python

"""
Forecasting Alert Service - Simplified
Emits minimal events using EventPublisher.
All enrichment handled by alert_processor.
"""
import json
from typing import List, Dict, Any, Optional
from uuid import UUID
from datetime import datetime, timedelta
import structlog
from shared.messaging import UnifiedEventPublisher
logger = structlog.get_logger()
class ForecastingAlertService:
"""Simplified forecasting alert service using EventPublisher"""
def __init__(self, event_publisher: UnifiedEventPublisher):
self.publisher = event_publisher
async def start(self):
"""Start the forecasting alert service"""
logger.info("ForecastingAlertService started")
# Add any initialization logic here if needed
async def stop(self):
"""Stop the forecasting alert service"""
logger.info("ForecastingAlertService stopped")
# Add any cleanup logic here if needed
async def health_check(self):
"""Health check for the forecasting alert service"""
try:
# Check if the event publisher is available and operational
if hasattr(self, 'publisher') and self.publisher:
# Basic check if publisher is available
return True
return False
except Exception as e:
logger.error("ForecastingAlertService health check failed", error=str(e))
return False
async def emit_demand_surge_weekend(
self,
tenant_id: UUID,
product_name: str,
inventory_product_id: str,
predicted_demand: float,
growth_percentage: float,
forecast_date: str,
weather_favorable: bool = False
):
"""Emit weekend demand surge alert"""
# Determine severity based on growth magnitude
if growth_percentage > 100:
severity = 'high'
elif growth_percentage > 75:
severity = 'medium'
else:
severity = 'low'
metadata = {
"product_name": product_name,
"inventory_product_id": str(inventory_product_id),
"predicted_demand": float(predicted_demand),
"growth_percentage": float(growth_percentage),
"forecast_date": forecast_date,
"weather_favorable": weather_favorable
}
await self.publisher.publish_alert(
event_type="forecasting.demand_surge_weekend",
tenant_id=tenant_id,
severity=severity,
data=metadata
)
logger.info(
"demand_surge_weekend_emitted",
tenant_id=str(tenant_id),
product_name=product_name,
growth_percentage=growth_percentage
)
async def emit_weather_impact_alert(
self,
tenant_id: UUID,
forecast_date: str,
precipitation: float,
expected_demand_change: float,
traffic_volume: int,
weather_type: str = "general",
product_name: Optional[str] = None
):
"""Emit weather impact alert"""
# Determine severity based on impact
if expected_demand_change < -20:
severity = 'high'
elif expected_demand_change < -10:
severity = 'medium'
else:
severity = 'low'
metadata = {
"forecast_date": forecast_date,
"precipitation_mm": float(precipitation),
"expected_demand_change": float(expected_demand_change),
"traffic_volume": traffic_volume,
"weather_type": weather_type
}
if product_name:
metadata["product_name"] = product_name
# Add triggers information
triggers = ['weather_conditions', 'demand_forecast']
if precipitation > 0:
triggers.append('rain_forecast')
if expected_demand_change < -15:
triggers.append('outdoor_events_cancelled')
metadata["triggers"] = triggers
await self.publisher.publish_alert(
event_type="forecasting.weather_impact_alert",
tenant_id=tenant_id,
severity=severity,
data=metadata
)
logger.info(
"weather_impact_alert_emitted",
tenant_id=str(tenant_id),
weather_type=weather_type,
expected_demand_change=expected_demand_change
)
async def emit_holiday_preparation(
self,
tenant_id: UUID,
holiday_name: str,
days_until_holiday: int,
product_name: str,
spike_percentage: float,
avg_holiday_demand: float,
avg_normal_demand: float,
holiday_date: str
):
"""Emit holiday preparation alert"""
# Determine severity based on spike magnitude and preparation time
if spike_percentage > 75 and days_until_holiday <= 3:
severity = 'high'
elif spike_percentage > 50 or days_until_holiday <= 3:
severity = 'medium'
else:
severity = 'low'
metadata = {
"holiday_name": holiday_name,
"days_until_holiday": days_until_holiday,
"product_name": product_name,
"spike_percentage": float(spike_percentage),
"avg_holiday_demand": float(avg_holiday_demand),
"avg_normal_demand": float(avg_normal_demand),
"holiday_date": holiday_date
}
# Add triggers information
triggers = [f'spanish_holiday_in_{days_until_holiday}_days']
if spike_percentage > 25:
triggers.append('historical_demand_spike')
metadata["triggers"] = triggers
await self.publisher.publish_alert(
event_type="forecasting.holiday_preparation",
tenant_id=tenant_id,
severity=severity,
data=metadata
)
logger.info(
"holiday_preparation_emitted",
tenant_id=str(tenant_id),
holiday_name=holiday_name,
spike_percentage=spike_percentage
)
async def emit_demand_optimization_recommendation(
self,
tenant_id: UUID,
product_name: str,
optimization_potential: float,
peak_demand: float,
min_demand: float,
demand_range: float
):
"""Emit demand pattern optimization recommendation"""
metadata = {
"product_name": product_name,
"optimization_potential": float(optimization_potential),
"peak_demand": float(peak_demand),
"min_demand": float(min_demand),
"demand_range": float(demand_range)
}
await self.publisher.publish_recommendation(
event_type="forecasting.demand_pattern_optimization",
tenant_id=tenant_id,
data=metadata
)
logger.info(
"demand_pattern_optimization_emitted",
tenant_id=str(tenant_id),
product_name=product_name,
optimization_potential=optimization_potential
)
async def emit_demand_spike_detected(
self,
tenant_id: UUID,
product_name: str,
spike_percentage: float
):
"""Emit demand spike detected event"""
# Determine severity based on spike magnitude
if spike_percentage > 50:
severity = 'high'
elif spike_percentage > 20:
severity = 'medium'
else:
severity = 'low'
metadata = {
"product_name": product_name,
"spike_percentage": float(spike_percentage),
"detection_source": "database"
}
await self.publisher.publish_alert(
event_type="forecasting.demand_spike_detected",
tenant_id=tenant_id,
severity=severity,
data=metadata
)
logger.info(
"demand_spike_detected_emitted",
tenant_id=str(tenant_id),
product_name=product_name,
spike_percentage=spike_percentage
)
async def emit_severe_weather_impact(
self,
tenant_id: UUID,
weather_type: str,
severity_level: str,
duration_hours: int
):
"""Emit severe weather impact event"""
# Determine alert severity based on weather severity
if severity_level == 'critical' or duration_hours > 24:
alert_severity = 'urgent'
elif severity_level == 'high' or duration_hours > 12:
alert_severity = 'high'
else:
alert_severity = 'medium'
metadata = {
"weather_type": weather_type,
"severity_level": severity_level,
"duration_hours": duration_hours
}
await self.publisher.publish_alert(
event_type="forecasting.severe_weather_impact",
tenant_id=tenant_id,
severity=alert_severity,
data=metadata
)
logger.info(
"severe_weather_impact_emitted",
tenant_id=str(tenant_id),
weather_type=weather_type,
severity_level=severity_level
)
async def emit_unexpected_demand_spike(
self,
tenant_id: UUID,
product_name: str,
spike_percentage: float,
current_sales: float,
forecasted_sales: float
):
"""Emit unexpected sales spike event"""
# Determine severity based on spike magnitude
if spike_percentage > 75:
severity = 'high'
elif spike_percentage > 40:
severity = 'medium'
else:
severity = 'low'
metadata = {
"product_name": product_name,
"spike_percentage": float(spike_percentage),
"current_sales": float(current_sales),
"forecasted_sales": float(forecasted_sales)
}
await self.publisher.publish_alert(
event_type="forecasting.unexpected_demand_spike",
tenant_id=tenant_id,
severity=severity,
data=metadata
)
logger.info(
"unexpected_demand_spike_emitted",
tenant_id=str(tenant_id),
product_name=product_name,
spike_percentage=spike_percentage
)