196 lines
7.2 KiB
Python
196 lines
7.2 KiB
Python
# ================================================================
|
|
# services/forecasting/app/services/messaging.py
|
|
# ================================================================
|
|
"""
|
|
Messaging service for event publishing and consuming
|
|
"""
|
|
|
|
import structlog
|
|
import json
|
|
from typing import Dict, Any
|
|
import asyncio
|
|
import datetime
|
|
|
|
from shared.messaging.rabbitmq import RabbitMQClient
|
|
from shared.messaging.events import (
|
|
TrainingCompletedEvent,
|
|
DataImportedEvent,
|
|
ForecastGeneratedEvent,
|
|
)
|
|
from app.core.config import settings
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
# Global messaging instance
|
|
rabbitmq_client = None
|
|
|
|
async def setup_messaging():
|
|
"""Initialize messaging services"""
|
|
global rabbitmq_client
|
|
|
|
try:
|
|
rabbitmq_client = RabbitMQClient(settings.RABBITMQ_URL, service_name="forecasting_service")
|
|
await rabbitmq_client.connect()
|
|
|
|
# Set up event handlers
|
|
# We need to adapt the callback to accept aio_pika.IncomingMessage
|
|
await rabbitmq_client.consume_events(
|
|
exchange_name="training.events",
|
|
queue_name="forecasting_model_update_queue",
|
|
routing_key="training.completed", # Assuming model updates are part of training.completed events
|
|
callback=handle_model_updated_message
|
|
)
|
|
await rabbitmq_client.consume_events(
|
|
exchange_name="data.events",
|
|
queue_name="forecasting_weather_update_queue",
|
|
routing_key="data.weather.updated", # This needs to match the actual event type if different
|
|
callback=handle_weather_updated_message
|
|
)
|
|
|
|
logger.info("Messaging setup completed")
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to setup messaging", error=str(e))
|
|
raise
|
|
|
|
async def cleanup_messaging():
|
|
"""Cleanup messaging connections"""
|
|
global rabbitmq_client
|
|
|
|
try:
|
|
if rabbitmq_client:
|
|
await rabbitmq_client.disconnect()
|
|
|
|
logger.info("Messaging cleanup completed")
|
|
|
|
except Exception as e:
|
|
logger.error("Error during messaging cleanup", error=str(e))
|
|
|
|
async def publish_forecast_completed(data: Dict[str, Any]):
|
|
"""Publish forecast completed event"""
|
|
if rabbitmq_client:
|
|
event = ForecastGeneratedEvent(service_name="forecasting_service", data=data, event_type="forecast.completed")
|
|
await rabbitmq_client.publish_forecast_event(event_type="completed", forecast_data=event.to_dict())
|
|
|
|
|
|
async def publish_batch_completed(data: Dict[str, Any]):
|
|
"""Publish batch forecast completed event"""
|
|
if rabbitmq_client:
|
|
event = ForecastGeneratedEvent(service_name="forecasting_service", data=data, event_type="forecast.batch.completed")
|
|
await rabbitmq_client.publish_forecast_event(event_type="batch.completed", forecast_data=event.to_dict())
|
|
|
|
|
|
# Event handler wrappers for aio_pika messages
|
|
async def handle_model_updated_message(message: Any):
|
|
async with message.process():
|
|
try:
|
|
event_data = json.loads(message.body.decode())
|
|
# Assuming the actual event data is nested under a 'data' key within the event dictionary
|
|
await handle_model_updated(event_data.get("data", {}))
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to decode model updated message JSON", error=str(e), body=message.body)
|
|
except Exception as e:
|
|
logger.error("Error processing model updated message", error=str(e), body=message.body)
|
|
|
|
async def handle_weather_updated_message(message: Any):
|
|
async with message.process():
|
|
try:
|
|
event_data = json.loads(message.body.decode())
|
|
# Assuming the actual event data is nested under a 'data' key within the event dictionary
|
|
await handle_weather_updated(event_data.get("data", {}))
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to decode weather updated message JSON", error=str(e), body=message.body)
|
|
except Exception as e:
|
|
logger.error("Error processing weather updated message", error=str(e), body=message.body)
|
|
|
|
|
|
# Original Event handlers (now called from the message wrappers)
|
|
async def handle_model_updated(data: Dict[str, Any]):
|
|
"""Handle model updated event from training service"""
|
|
try:
|
|
logger.info("Received model updated event",
|
|
model_id=data.get("model_id"),
|
|
tenant_id=data.get("tenant_id"))
|
|
|
|
# Clear model cache for this model
|
|
# This will be handled by PredictionService
|
|
|
|
except Exception as e:
|
|
logger.error("Error handling model updated event", error=str(e))
|
|
|
|
async def handle_weather_updated(data: Dict[str, Any]):
|
|
"""Handle weather data updated event"""
|
|
try:
|
|
logger.info("Received weather updated event",
|
|
date=data.get("date"))
|
|
|
|
# Could trigger re-forecasting if needed
|
|
|
|
except Exception as e:
|
|
logger.error("Error handling weather updated event", error=str(e))
|
|
|
|
async def publish_forecasts_deleted_event(tenant_id: str, deletion_stats: Dict[str, Any]):
|
|
"""Publish forecasts deletion event to message queue"""
|
|
try:
|
|
await rabbitmq_client.publish_event(
|
|
exchange="forecasting_events",
|
|
routing_key="forecasting.tenant.deleted",
|
|
message={
|
|
"event_type": "tenant_forecasts_deleted",
|
|
"tenant_id": tenant_id,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"deletion_stats": deletion_stats
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error("Failed to publish forecasts deletion event", error=str(e))
|
|
|
|
|
|
# Additional publishing functions for compatibility
|
|
async def publish_forecast_generated(data: dict) -> bool:
|
|
"""Publish forecast generated event"""
|
|
try:
|
|
if rabbitmq_client:
|
|
await rabbitmq_client.publish_event(
|
|
exchange="forecasting_events",
|
|
routing_key="forecast.generated",
|
|
message=data
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to publish forecast generated event", error=str(e))
|
|
return False
|
|
|
|
async def publish_batch_forecast_completed(data: dict) -> bool:
|
|
"""Publish batch forecast completed event"""
|
|
try:
|
|
if rabbitmq_client:
|
|
await rabbitmq_client.publish_event(
|
|
exchange="forecasting_events",
|
|
routing_key="forecast.batch.completed",
|
|
message=data
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to publish batch forecast event", error=str(e))
|
|
return False
|
|
|
|
|
|
|
|
# Publisher class for compatibility
|
|
class ForecastingStatusPublisher:
|
|
"""Publisher for forecasting status events"""
|
|
|
|
async def publish_status(self, status: str, data: dict) -> bool:
|
|
"""Publish forecasting status"""
|
|
try:
|
|
if rabbitmq_client:
|
|
await rabbitmq_client.publish_event(
|
|
exchange="forecasting_events",
|
|
routing_key=f"forecast.status.{status}",
|
|
message=data
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to publish {status} status", error=str(e))
|
|
return False |