Files
bakery-ia/shared/clients/alert_processor_client.py

221 lines
7.1 KiB
Python
Raw Permalink Normal View History

2025-12-05 20:07:01 +01:00
# shared/clients/alert_processor_client.py
"""
Alert Processor Service Client - Inter-service communication
Handles communication with the alert processor service for alert lifecycle management
"""
import structlog
from typing import Dict, Any, List, Optional
from uuid import UUID
from shared.clients.base_service_client import BaseServiceClient
from shared.config.base import BaseServiceSettings
logger = structlog.get_logger()
class AlertProcessorClient(BaseServiceClient):
"""Client for communicating with the alert processor service via gateway"""
def __init__(self, config: BaseServiceSettings, calling_service_name: str = "unknown"):
super().__init__(calling_service_name, config)
def get_service_base_path(self) -> str:
"""Return the base path for alert processor service APIs"""
return "/api/v1"
# ================================================================
# ALERT LIFECYCLE MANAGEMENT
# ================================================================
async def acknowledge_alerts_by_metadata(
self,
tenant_id: UUID,
alert_type: str,
metadata_filter: Dict[str, Any],
acknowledged_by: Optional[str] = None
) -> Dict[str, Any]:
"""
Acknowledge all active alerts matching alert type and metadata.
Used when user actions trigger alert acknowledgment (e.g., approving a PO).
Args:
tenant_id: Tenant UUID
alert_type: Alert type to filter (e.g., 'po_approval_needed')
metadata_filter: Metadata fields to match (e.g., {'po_id': 'uuid'})
acknowledged_by: Optional user ID who acknowledged
Returns:
{
"success": true,
"acknowledged_count": 2,
"alert_ids": ["uuid1", "uuid2"]
}
"""
try:
payload = {
"alert_type": alert_type,
"metadata_filter": metadata_filter
}
if acknowledged_by:
payload["acknowledged_by"] = acknowledged_by
result = await self.post(
f"tenants/{tenant_id}/alerts/acknowledge-by-metadata",
tenant_id=str(tenant_id),
2026-01-13 22:22:38 +01:00
data=payload
2025-12-05 20:07:01 +01:00
)
if result and result.get("success"):
logger.info(
"Acknowledged alerts by metadata",
tenant_id=str(tenant_id),
alert_type=alert_type,
count=result.get("acknowledged_count", 0),
calling_service=self.calling_service_name
)
return result or {"success": False, "acknowledged_count": 0, "alert_ids": []}
except Exception as e:
logger.error(
"Error acknowledging alerts by metadata",
error=str(e),
tenant_id=str(tenant_id),
alert_type=alert_type,
metadata_filter=metadata_filter,
calling_service=self.calling_service_name
)
return {"success": False, "acknowledged_count": 0, "alert_ids": [], "error": str(e)}
async def resolve_alerts_by_metadata(
self,
tenant_id: UUID,
alert_type: str,
metadata_filter: Dict[str, Any],
resolved_by: Optional[str] = None
) -> Dict[str, Any]:
"""
Resolve all active alerts matching alert type and metadata.
Used when user actions complete an alert's underlying issue (e.g., marking delivery received).
Args:
tenant_id: Tenant UUID
alert_type: Alert type to filter (e.g., 'delivery_overdue')
metadata_filter: Metadata fields to match (e.g., {'po_id': 'uuid'})
resolved_by: Optional user ID who resolved
Returns:
{
"success": true,
"resolved_count": 1,
"alert_ids": ["uuid1"]
}
"""
try:
payload = {
"alert_type": alert_type,
"metadata_filter": metadata_filter
}
if resolved_by:
payload["resolved_by"] = resolved_by
result = await self.post(
f"tenants/{tenant_id}/alerts/resolve-by-metadata",
tenant_id=str(tenant_id),
2026-01-13 22:22:38 +01:00
data=payload
2025-12-05 20:07:01 +01:00
)
if result and result.get("success"):
logger.info(
"Resolved alerts by metadata",
tenant_id=str(tenant_id),
alert_type=alert_type,
count=result.get("resolved_count", 0),
calling_service=self.calling_service_name
)
return result or {"success": False, "resolved_count": 0, "alert_ids": []}
except Exception as e:
logger.error(
"Error resolving alerts by metadata",
error=str(e),
tenant_id=str(tenant_id),
alert_type=alert_type,
metadata_filter=metadata_filter,
calling_service=self.calling_service_name
)
return {"success": False, "resolved_count": 0, "alert_ids": [], "error": str(e)}
async def get_active_alerts(
self,
tenant_id: UUID,
priority_level: Optional[str] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get active alerts for a tenant.
Args:
tenant_id: Tenant UUID
priority_level: Optional priority filter (critical, important, standard, info)
limit: Maximum number of alerts to return
Returns:
List of alert dictionaries
"""
try:
params = {
"status": "active",
"limit": limit
}
if priority_level:
params["priority_level"] = priority_level
result = await self.get(
f"tenants/{tenant_id}/alerts",
tenant_id=str(tenant_id),
params=params
)
alerts = result.get("alerts", []) if isinstance(result, dict) else []
logger.info(
"Retrieved active alerts",
tenant_id=str(tenant_id),
count=len(alerts),
calling_service=self.calling_service_name
)
return alerts
except Exception as e:
logger.error(
"Error fetching active alerts",
error=str(e),
tenant_id=str(tenant_id),
calling_service=self.calling_service_name
)
return []
# Factory function for easy import
def get_alert_processor_client(config: BaseServiceSettings, calling_service_name: str) -> AlertProcessorClient:
"""
Factory function to create an AlertProcessorClient instance.
Args:
config: Service configuration with gateway URL
calling_service_name: Name of the service making the call (for logging)
Returns:
AlertProcessorClient instance
"""
return AlertProcessorClient(config, calling_service_name)