Files
bakery-ia/services/production/app/services/production_alert_service.py
2025-10-23 07:44:54 +02:00

713 lines
33 KiB
Python

# services/production/app/services/production_alert_service.py
"""
Production-specific alert and recommendation detection service
Monitors production capacity, delays, quality issues, and optimization opportunities
"""
import json
import asyncio
from typing import List, Dict, Any, Optional
from uuid import UUID
from datetime import datetime, timedelta
import structlog
from apscheduler.triggers.cron import CronTrigger
from shared.alerts.base_service import BaseAlertService, AlertServiceMixin
from shared.alerts.templates import format_item_message
logger = structlog.get_logger()
class ProductionAlertService(BaseAlertService, AlertServiceMixin):
"""Production service alert and recommendation detection"""
def setup_scheduled_checks(self):
"""Production-specific scheduled checks for alerts and recommendations"""
# Reduced frequency to prevent deadlocks and resource contention
# Production capacity checks - every 15 minutes during business hours (reduced from 10)
self.scheduler.add_job(
self.check_production_capacity,
CronTrigger(minute='*/15', hour='6-20'),
id='capacity_check',
misfire_grace_time=120, # Increased grace time
max_instances=1,
coalesce=True # Combine missed runs
)
# Production delays - every 10 minutes during production hours (reduced from 5)
self.scheduler.add_job(
self.check_production_delays,
CronTrigger(minute='*/10', hour='4-22'),
id='delay_check',
misfire_grace_time=60,
max_instances=1,
coalesce=True
)
# Quality issues check - every 20 minutes (reduced from 15)
self.scheduler.add_job(
self.check_quality_issues,
CronTrigger(minute='*/20'),
id='quality_check',
misfire_grace_time=120,
max_instances=1,
coalesce=True
)
# Equipment monitoring - check equipment status every 45 minutes (reduced from 30)
self.scheduler.add_job(
self.check_equipment_status,
CronTrigger(minute='*/45'),
id='equipment_check',
misfire_grace_time=180,
max_instances=1,
coalesce=True
)
# Efficiency recommendations - every hour (reduced from 30 minutes)
self.scheduler.add_job(
self.generate_efficiency_recommendations,
CronTrigger(minute='0'),
id='efficiency_recs',
misfire_grace_time=300,
max_instances=1,
coalesce=True
)
# Energy optimization - every 2 hours (reduced from 1 hour)
self.scheduler.add_job(
self.generate_energy_recommendations,
CronTrigger(minute='0', hour='*/2'),
id='energy_recs',
misfire_grace_time=600, # 10 minutes grace
max_instances=1,
coalesce=True
)
logger.info("Production alert schedules configured",
service=self.config.SERVICE_NAME)
async def check_production_capacity(self):
"""Check if production plan exceeds capacity (alerts)"""
try:
self._checks_performed += 1
# Use timeout and proper session handling
try:
from app.repositories.production_alert_repository import ProductionAlertRepository
async with self.db_manager.get_session() as session:
alert_repo = ProductionAlertRepository(session)
# Set statement timeout to prevent long-running queries
await alert_repo.set_statement_timeout('30s')
capacity_issues = await alert_repo.get_capacity_issues()
for issue in capacity_issues:
await self._process_capacity_issue(issue['tenant_id'], issue)
except asyncio.TimeoutError:
logger.warning("Capacity check timed out", service=self.config.SERVICE_NAME)
self._errors_count += 1
except Exception as e:
logger.debug("Capacity check failed", error=str(e), service=self.config.SERVICE_NAME)
except Exception as e:
# Skip capacity checks if tables don't exist (graceful degradation)
if "does not exist" in str(e).lower() or "relation" in str(e).lower():
logger.debug("Capacity check skipped - missing tables", error=str(e))
else:
logger.error("Capacity check failed", error=str(e))
self._errors_count += 1
async def _process_capacity_issue(self, tenant_id: UUID, issue: Dict[str, Any]):
"""Process capacity overload issue"""
try:
status = issue['capacity_status']
percentage = issue['capacity_percentage']
if status == 'severe_overload':
template_data = self.format_spanish_message(
'order_overload',
percentage=int(percentage - 100)
)
await self.publish_item(tenant_id, {
'type': 'severe_capacity_overload',
'severity': 'urgent',
'title': template_data['title'],
'message': template_data['message'],
'actions': template_data['actions'],
'metadata': {
'planned_date': issue['planned_date'].isoformat(),
'capacity_percentage': float(percentage),
'overload_percentage': float(percentage - 100),
'equipment_count': issue['equipment_count']
}
}, item_type='alert')
elif status == 'overload':
severity = self.get_business_hours_severity('high')
await self.publish_item(tenant_id, {
'type': 'capacity_overload',
'severity': severity,
'title': f'⚠️ Capacidad Excedida: {percentage:.0f}%',
'message': f'Producción planificada para {issue["planned_date"]} excede capacidad en {percentage-100:.0f}%.',
'actions': ['Redistribuir cargas', 'Ampliar turnos', 'Subcontratar', 'Posponer pedidos'],
'metadata': {
'planned_date': issue['planned_date'].isoformat(),
'capacity_percentage': float(percentage),
'equipment_count': issue['equipment_count']
}
}, item_type='alert')
elif status == 'near_capacity':
severity = self.get_business_hours_severity('medium')
await self.publish_item(tenant_id, {
'type': 'near_capacity',
'severity': severity,
'title': f'📊 Cerca de Capacidad Máxima: {percentage:.0f}%',
'message': f'Producción del {issue["planned_date"]} está al {percentage:.0f}% de capacidad. Monitorear de cerca.',
'actions': ['Revisar planificación', 'Preparar contingencias', 'Optimizar eficiencia'],
'metadata': {
'planned_date': issue['planned_date'].isoformat(),
'capacity_percentage': float(percentage)
}
}, item_type='alert')
except Exception as e:
logger.error("Error processing capacity issue", error=str(e))
async def check_production_delays(self):
"""Check for production delays (alerts)"""
try:
self._checks_performed += 1
try:
from app.repositories.production_alert_repository import ProductionAlertRepository
async with self.db_manager.get_session() as session:
alert_repo = ProductionAlertRepository(session)
# Set statement timeout
await alert_repo.set_statement_timeout('30s')
delays = await alert_repo.get_production_delays()
for delay in delays:
await self._process_production_delay(delay)
except asyncio.TimeoutError:
logger.warning("Production delay check timed out", service=self.config.SERVICE_NAME)
self._errors_count += 1
except Exception as e:
logger.debug("Production delay check failed", error=str(e), service=self.config.SERVICE_NAME)
except Exception as e:
# Skip delay checks if tables don't exist (graceful degradation)
if "does not exist" in str(e).lower() or "relation" in str(e).lower():
logger.debug("Production delay check skipped - missing tables", error=str(e))
else:
logger.error("Production delay check failed", error=str(e))
self._errors_count += 1
async def _process_production_delay(self, delay: Dict[str, Any]):
"""Process production delay"""
try:
delay_minutes = delay['delay_minutes']
priority = delay['priority_level']
affected_orders = delay['affected_orders']
# Determine severity based on delay time and priority
if delay_minutes > 120 or priority == 'urgent':
severity = 'urgent'
elif delay_minutes > 60 or priority == 'high':
severity = 'high'
elif delay_minutes > 30:
severity = 'medium'
else:
severity = 'low'
template_data = self.format_spanish_message(
'production_delay',
batch_name=f"{delay['product_name']} #{delay['batch_number']}",
delay_minutes=int(delay_minutes)
)
await self.publish_item(delay['tenant_id'], {
'type': 'production_delay',
'severity': severity,
'title': template_data['title'],
'message': template_data['message'],
'actions': template_data['actions'],
'metadata': {
'batch_id': str(delay['id']),
'product_name': delay['product_name'],
'batch_number': delay['batch_number'],
'delay_minutes': delay_minutes,
'priority_level': priority,
'affected_orders': affected_orders,
'planned_completion': delay['planned_completion_time'].isoformat()
}
}, item_type='alert')
except Exception as e:
logger.error("Error processing production delay",
batch_id=str(delay.get('id')),
error=str(e))
async def check_quality_issues(self):
"""Check for quality control issues (alerts)"""
try:
self._checks_performed += 1
from app.repositories.production_alert_repository import ProductionAlertRepository
async with self.db_manager.get_session() as session:
alert_repo = ProductionAlertRepository(session)
quality_issues = await alert_repo.get_quality_issues()
for issue in quality_issues:
await self._process_quality_issue(issue)
except Exception as e:
# Skip quality checks if tables don't exist (graceful degradation)
if "does not exist" in str(e) or "column" in str(e).lower() and "does not exist" in str(e).lower():
logger.debug("Quality check skipped - missing tables or columns", error=str(e))
else:
logger.error("Quality check failed", error=str(e))
self._errors_count += 1
async def _process_quality_issue(self, issue: Dict[str, Any]):
"""Process quality control failure"""
try:
qc_severity = issue['qc_severity']
total_failures = issue['total_failures']
# Map QC severity to alert severity
if qc_severity == 'critical' or total_failures > 2:
severity = 'urgent'
elif qc_severity == 'major':
severity = 'high'
else:
severity = 'medium'
await self.publish_item(issue['tenant_id'], {
'type': 'quality_control_failure',
'severity': severity,
'title': f'❌ Fallo Control Calidad: {issue["product_name"]}',
'message': f'Lote {issue["batch_number"]} falló en {issue["test_type"]}. Valor: {issue["result_value"]} (rango: {issue["min_acceptable"]}-{issue["max_acceptable"]})',
'actions': ['Revisar lote', 'Repetir prueba', 'Ajustar proceso', 'Documentar causa'],
'metadata': {
'quality_check_id': str(issue['id']),
'batch_id': str(issue['batch_id']),
'test_type': issue['test_type'],
'result_value': float(issue['result_value']),
'min_acceptable': float(issue['min_acceptable']),
'max_acceptable': float(issue['max_acceptable']),
'qc_severity': qc_severity,
'total_failures': total_failures
}
}, item_type='alert')
# Mark as acknowledged to avoid duplicates - using proper session management
try:
from app.repositories.production_alert_repository import ProductionAlertRepository
async with self.db_manager.get_session() as session:
alert_repo = ProductionAlertRepository(session)
await alert_repo.mark_quality_check_acknowledged(issue['id'])
except Exception as e:
logger.error("Failed to update quality check acknowledged status",
quality_check_id=str(issue.get('id')),
error=str(e))
# Don't raise here to avoid breaking the main flow
except Exception as e:
logger.error("Error processing quality issue",
quality_check_id=str(issue.get('id')),
error=str(e))
async def check_equipment_status(self):
"""Check equipment status and maintenance requirements (alerts)"""
try:
self._checks_performed += 1
from app.repositories.production_alert_repository import ProductionAlertRepository
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
# Use a separate session for each tenant to avoid connection blocking
async with self.db_manager.get_session() as session:
alert_repo = ProductionAlertRepository(session)
equipment_list = await alert_repo.get_equipment_status(tenant_id)
for equipment in equipment_list:
# Process each equipment item in a non-blocking manner
await self._process_equipment_issue(equipment)
except Exception as e:
logger.error("Error checking equipment status",
tenant_id=str(tenant_id),
error=str(e))
# Continue processing other tenants despite this error
except Exception as e:
logger.error("Equipment status check failed", error=str(e))
self._errors_count += 1
async def _process_equipment_issue(self, equipment: Dict[str, Any]):
"""Process equipment issue"""
try:
status = equipment['status']
efficiency = equipment.get('efficiency_percentage', 100)
days_to_maintenance = equipment.get('days_to_maintenance', 30)
if status == 'down':
template_data = self.format_spanish_message(
'equipment_failure',
equipment_name=equipment['name']
)
await self.publish_item(equipment['tenant_id'], {
'type': 'equipment_failure',
'severity': 'urgent',
'title': template_data['title'],
'message': template_data['message'],
'actions': template_data['actions'],
'metadata': {
'equipment_id': str(equipment['id']),
'equipment_name': equipment['name'],
'equipment_type': equipment['type'],
'efficiency': efficiency
}
}, item_type='alert')
elif status == 'maintenance' or (days_to_maintenance is not None and days_to_maintenance <= 3):
severity = 'high' if (days_to_maintenance is not None and days_to_maintenance <= 1) else 'medium'
template_data = self.format_spanish_message(
'maintenance_required',
equipment_name=equipment['name'],
days_until_maintenance=max(0, int(days_to_maintenance)) if days_to_maintenance is not None else 3
)
await self.publish_item(equipment['tenant_id'], {
'type': 'maintenance_required',
'severity': severity,
'title': template_data['title'],
'message': template_data['message'],
'actions': template_data['actions'],
'metadata': {
'equipment_id': str(equipment['id']),
'equipment_name': equipment['name'],
'days_to_maintenance': days_to_maintenance,
'last_maintenance': equipment.get('last_maintenance_date')
}
}, item_type='alert')
elif efficiency is not None and efficiency < 80:
severity = 'medium' if efficiency < 70 else 'low'
template_data = self.format_spanish_message(
'low_equipment_efficiency',
equipment_name=equipment['name'],
efficiency_percent=round(efficiency, 1)
)
await self.publish_item(equipment['tenant_id'], {
'type': 'low_equipment_efficiency',
'severity': severity,
'title': template_data['title'],
'message': template_data['message'],
'actions': template_data['actions'],
'metadata': {
'equipment_id': str(equipment['id']),
'equipment_name': equipment['name'],
'efficiency_percent': float(efficiency)
}
}, item_type='alert')
except Exception as e:
logger.error("Error processing equipment issue",
equipment_id=str(equipment.get('id')),
error=str(e))
async def generate_efficiency_recommendations(self):
"""Generate production efficiency recommendations"""
try:
self._checks_performed += 1
from app.repositories.production_alert_repository import ProductionAlertRepository
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
# Use a separate session per tenant to avoid connection blocking
async with self.db_manager.get_session() as session:
alert_repo = ProductionAlertRepository(session)
recommendations = await alert_repo.get_efficiency_recommendations(tenant_id)
for rec in recommendations:
# Process each recommendation individually
await self._generate_efficiency_recommendation(tenant_id, rec)
except Exception as e:
logger.error("Error generating efficiency recommendations",
tenant_id=str(tenant_id),
error=str(e))
# Continue with other tenants despite this error
except Exception as e:
logger.error("Efficiency recommendations failed", error=str(e))
self._errors_count += 1
async def _generate_efficiency_recommendation(self, tenant_id: UUID, rec: Dict[str, Any]):
"""Generate specific efficiency recommendation"""
try:
if not self.should_send_recommendation(tenant_id, rec['recommendation_type']):
return
rec_type = rec['recommendation_type']
efficiency_loss = rec['efficiency_loss_percent']
if rec_type == 'reduce_production_time':
template_data = self.format_spanish_message(
'production_efficiency',
suggested_time=f"{rec['start_hour']:02d}:00",
savings_percent=efficiency_loss
)
await self.publish_item(tenant_id, {
'type': 'production_efficiency',
'severity': 'medium',
'title': template_data['title'],
'message': template_data['message'],
'actions': template_data['actions'],
'metadata': {
'product_name': rec['product_name'],
'avg_production_time': float(rec['avg_production_time']),
'avg_planned_duration': float(rec['avg_planned_duration']),
'efficiency_loss_percent': float(efficiency_loss),
'batch_count': rec['batch_count'],
'recommendation_type': rec_type
}
}, item_type='recommendation')
elif rec_type == 'improve_yield':
await self.publish_item(tenant_id, {
'type': 'yield_improvement',
'severity': 'medium',
'title': f'📈 Mejorar Rendimiento: {rec["product_name"]}',
'message': f'Rendimiento promedio del {rec["product_name"]} es {rec["avg_yield"]:.1f}%. Oportunidad de mejora.',
'actions': ['Revisar receta', 'Optimizar proceso', 'Entrenar personal', 'Verificar ingredientes'],
'metadata': {
'product_name': rec['product_name'],
'avg_yield': float(rec['avg_yield']),
'batch_count': rec['batch_count'],
'recommendation_type': rec_type
}
}, item_type='recommendation')
elif rec_type == 'avoid_afternoon_production':
await self.publish_item(tenant_id, {
'type': 'schedule_optimization',
'severity': 'low',
'title': f'⏰ Optimizar Horario: {rec["product_name"]}',
'message': f'Producción de {rec["product_name"]} en horario {rec["start_hour"]}:00 muestra menor eficiencia.',
'actions': ['Cambiar horario', 'Analizar causas', 'Revisar personal', 'Optimizar ambiente'],
'metadata': {
'product_name': rec['product_name'],
'start_hour': rec['start_hour'],
'efficiency_loss_percent': float(efficiency_loss),
'recommendation_type': rec_type
}
}, item_type='recommendation')
except Exception as e:
logger.error("Error generating efficiency recommendation",
product_name=rec.get('product_name'),
error=str(e))
async def generate_energy_recommendations(self):
"""Generate energy optimization recommendations"""
try:
from app.repositories.production_alert_repository import ProductionAlertRepository
tenants = await self.get_active_tenants()
for tenant_id in tenants:
try:
# Use a separate session per tenant to avoid connection blocking
async with self.db_manager.get_session() as session:
alert_repo = ProductionAlertRepository(session)
energy_data = await alert_repo.get_energy_consumption_patterns(tenant_id)
# Analyze for peak hours and optimization opportunities
await self._analyze_energy_patterns(tenant_id, energy_data)
except Exception as e:
logger.error("Error generating energy recommendations",
tenant_id=str(tenant_id),
error=str(e))
# Continue with other tenants despite this error
except Exception as e:
logger.error("Energy recommendations failed", error=str(e))
self._errors_count += 1
async def _analyze_energy_patterns(self, tenant_id: UUID, energy_data: List[Dict[str, Any]]):
"""Analyze energy consumption patterns for optimization"""
try:
if not energy_data:
return
# Group by equipment and find peak hours
equipment_data = {}
for record in energy_data:
equipment = record['equipment_name']
if equipment not in equipment_data:
equipment_data[equipment] = []
equipment_data[equipment].append(record)
for equipment, records in equipment_data.items():
# Find peak consumption hours
peak_hour_record = max(records, key=lambda x: x['avg_energy'])
off_peak_records = [r for r in records if r['hour_of_day'] < 7 or r['hour_of_day'] > 22]
if off_peak_records and peak_hour_record['avg_energy'] > 0:
min_off_peak = min(off_peak_records, key=lambda x: x['avg_energy'])
potential_savings = ((peak_hour_record['avg_energy'] - min_off_peak['avg_energy']) /
peak_hour_record['avg_energy']) * 100
if potential_savings > 15: # More than 15% potential savings
template_data = self.format_spanish_message(
'energy_optimization',
start_time=f"{min_off_peak['hour_of_day']:02d}:00",
end_time=f"{min_off_peak['hour_of_day']+2:02d}:00",
savings_euros=potential_savings * 0.15 # Rough estimate
)
await self.publish_item(tenant_id, {
'type': 'energy_optimization',
'severity': 'low',
'title': template_data['title'],
'message': template_data['message'],
'actions': template_data['actions'],
'metadata': {
'equipment_name': equipment,
'peak_hour': peak_hour_record['hour_of_day'],
'optimal_hour': min_off_peak['hour_of_day'],
'potential_savings_percent': float(potential_savings),
'peak_consumption': float(peak_hour_record['avg_energy']),
'optimal_consumption': float(min_off_peak['avg_energy'])
}
}, item_type='recommendation')
except Exception as e:
logger.error("Error analyzing energy patterns", error=str(e))
async def register_db_listeners(self, conn):
"""Register production-specific database listeners"""
try:
await conn.add_listener('production_alerts', self.handle_production_db_alert)
logger.info("Database listeners registered",
service=self.config.SERVICE_NAME)
except Exception as e:
logger.error("Failed to register database listeners",
service=self.config.SERVICE_NAME,
error=str(e))
async def handle_production_db_alert(self, connection, pid, channel, payload):
"""Handle production alert from database trigger"""
try:
data = json.loads(payload)
tenant_id = UUID(data['tenant_id'])
template_data = self.format_spanish_message(
'production_delay',
batch_name=f"{data['product_name']} #{data.get('batch_number', 'N/A')}",
delay_minutes=data['delay_minutes']
)
await self.publish_item(tenant_id, {
'type': 'production_delay',
'severity': 'high',
'title': template_data['title'],
'message': template_data['message'],
'actions': template_data['actions'],
'metadata': {
'batch_id': data['batch_id'],
'delay_minutes': data['delay_minutes'],
'trigger_source': 'database'
}
}, item_type='alert')
except Exception as e:
logger.error("Error handling production DB alert", error=str(e))
async def start_event_listener(self):
"""Listen for production-affecting events"""
try:
# Subscribe to inventory events that might affect production
await self.rabbitmq_client.consume_events(
"bakery_events",
f"production.inventory.{self.config.SERVICE_NAME}",
"inventory.critical_shortage",
self.handle_inventory_shortage
)
logger.info("Event listeners started",
service=self.config.SERVICE_NAME)
except Exception as e:
logger.error("Failed to start event listeners",
service=self.config.SERVICE_NAME,
error=str(e))
async def handle_inventory_shortage(self, message):
"""Handle critical inventory shortage affecting production"""
try:
shortage = json.loads(message.body)
tenant_id = UUID(shortage['tenant_id'])
# Check if this ingredient affects any current production
affected_batches = await self.get_affected_production_batches(
shortage['ingredient_id']
)
if affected_batches:
await self.publish_item(tenant_id, {
'type': 'production_ingredient_shortage',
'severity': 'high',
'title': f'🚨 Falta Ingrediente para Producción',
'message': f'Escasez de {shortage["ingredient_name"]} afecta {len(affected_batches)} lotes en producción.',
'actions': ['Buscar ingrediente alternativo', 'Pausar producción', 'Contactar proveedor urgente', 'Reorganizar plan'],
'metadata': {
'ingredient_id': shortage['ingredient_id'],
'ingredient_name': shortage['ingredient_name'],
'affected_batches': [str(b) for b in affected_batches],
'shortage_amount': shortage.get('shortage_amount', 0)
}
}, item_type='alert')
except Exception as e:
logger.error("Error handling inventory shortage event", error=str(e))
async def get_affected_production_batches(self, ingredient_id: str) -> List[str]:
"""Get production batches affected by ingredient shortage"""
try:
from app.repositories.production_alert_repository import ProductionAlertRepository
async with self.db_manager.get_session() as session:
alert_repo = ProductionAlertRepository(session)
return await alert_repo.get_affected_production_batches(ingredient_id)
except Exception as e:
logger.error("Error getting affected production batches",
ingredient_id=ingredient_id,
error=str(e))
return []