Files
2025-08-08 09:08:41 +02:00

470 lines
18 KiB
Python

"""
Log Repository
Repository for notification log operations
"""
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, text, and_
from datetime import datetime, timedelta
import structlog
import json
from .base import NotificationBaseRepository
from app.models.notifications import NotificationLog, NotificationStatus
from shared.database.exceptions import DatabaseError, ValidationError
logger = structlog.get_logger()
class LogRepository(NotificationBaseRepository):
"""Repository for notification log operations"""
def __init__(self, session: AsyncSession, cache_ttl: Optional[int] = 120):
# Logs are very dynamic, very short cache time (2 minutes)
super().__init__(NotificationLog, session, cache_ttl)
async def create_log_entry(self, log_data: Dict[str, Any]) -> NotificationLog:
"""Create a new notification log entry"""
try:
# Validate log data
validation_result = self._validate_notification_data(
log_data,
["notification_id", "attempt_number", "status"]
)
if not validation_result["is_valid"]:
raise ValidationError(f"Invalid log data: {validation_result['errors']}")
# Set default values
if "attempted_at" not in log_data:
log_data["attempted_at"] = datetime.utcnow()
# Serialize metadata if it's a dict
if "log_metadata" in log_data and isinstance(log_data["log_metadata"], dict):
log_data["log_metadata"] = json.dumps(log_data["log_metadata"])
# Serialize provider response if it's a dict
if "provider_response" in log_data and isinstance(log_data["provider_response"], dict):
log_data["provider_response"] = json.dumps(log_data["provider_response"])
# Create log entry
log_entry = await self.create(log_data)
logger.debug("Notification log entry created",
log_id=log_entry.id,
notification_id=log_entry.notification_id,
attempt_number=log_entry.attempt_number,
status=log_entry.status.value)
return log_entry
except ValidationError:
raise
except Exception as e:
logger.error("Failed to create log entry",
notification_id=log_data.get("notification_id"),
error=str(e))
raise DatabaseError(f"Failed to create log entry: {str(e)}")
async def get_logs_for_notification(
self,
notification_id: str,
skip: int = 0,
limit: int = 50
) -> List[NotificationLog]:
"""Get all log entries for a specific notification"""
try:
return await self.get_multi(
filters={"notification_id": notification_id},
skip=skip,
limit=limit,
order_by="attempt_number",
order_desc=False
)
except Exception as e:
logger.error("Failed to get logs for notification",
notification_id=notification_id,
error=str(e))
return []
async def get_latest_log_for_notification(
self,
notification_id: str
) -> Optional[NotificationLog]:
"""Get the most recent log entry for a notification"""
try:
logs = await self.get_multi(
filters={"notification_id": notification_id},
limit=1,
order_by="attempt_number",
order_desc=True
)
return logs[0] if logs else None
except Exception as e:
logger.error("Failed to get latest log for notification",
notification_id=notification_id,
error=str(e))
return None
async def get_failed_delivery_logs(
self,
hours_back: int = 24,
provider: str = None,
limit: int = 100
) -> List[NotificationLog]:
"""Get failed delivery logs for analysis"""
try:
cutoff_time = datetime.utcnow() - timedelta(hours=hours_back)
conditions = [
"status = 'failed'",
"attempted_at >= :cutoff_time"
]
params = {"cutoff_time": cutoff_time, "limit": limit}
if provider:
conditions.append("provider = :provider")
params["provider"] = provider
query_text = f"""
SELECT * FROM notification_logs
WHERE {' AND '.join(conditions)}
ORDER BY attempted_at DESC
LIMIT :limit
"""
result = await self.session.execute(text(query_text), params)
logs = []
for row in result.fetchall():
record_dict = dict(row._mapping)
# Convert enum string back to enum object
record_dict["status"] = NotificationStatus(record_dict["status"])
log_entry = self.model(**record_dict)
logs.append(log_entry)
return logs
except Exception as e:
logger.error("Failed to get failed delivery logs",
hours_back=hours_back,
provider=provider,
error=str(e))
return []
async def get_delivery_performance_stats(
self,
hours_back: int = 24,
provider: str = None
) -> Dict[str, Any]:
"""Get delivery performance statistics"""
try:
cutoff_time = datetime.utcnow() - timedelta(hours=hours_back)
conditions = ["attempted_at >= :cutoff_time"]
params = {"cutoff_time": cutoff_time}
if provider:
conditions.append("provider = :provider")
params["provider"] = provider
where_clause = " AND ".join(conditions)
# Get overall statistics
stats_query = text(f"""
SELECT
COUNT(*) as total_attempts,
COUNT(CASE WHEN status = 'sent' OR status = 'delivered' THEN 1 END) as successful_attempts,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed_attempts,
AVG(response_time_ms) as avg_response_time_ms,
MIN(response_time_ms) as min_response_time_ms,
MAX(response_time_ms) as max_response_time_ms
FROM notification_logs
WHERE {where_clause}
""")
result = await self.session.execute(stats_query, params)
stats = result.fetchone()
total = stats.total_attempts or 0
successful = stats.successful_attempts or 0
failed = stats.failed_attempts or 0
success_rate = (successful / total * 100) if total > 0 else 0
failure_rate = (failed / total * 100) if total > 0 else 0
# Get error breakdown
error_query = text(f"""
SELECT error_code, COUNT(*) as count
FROM notification_logs
WHERE {where_clause} AND status = 'failed' AND error_code IS NOT NULL
GROUP BY error_code
ORDER BY count DESC
LIMIT 10
""")
result = await self.session.execute(error_query, params)
error_breakdown = {row.error_code: row.count for row in result.fetchall()}
# Get provider breakdown if not filtering by provider
provider_breakdown = {}
if not provider:
provider_query = text(f"""
SELECT provider,
COUNT(*) as total,
COUNT(CASE WHEN status = 'sent' OR status = 'delivered' THEN 1 END) as successful
FROM notification_logs
WHERE {where_clause} AND provider IS NOT NULL
GROUP BY provider
ORDER BY total DESC
""")
result = await self.session.execute(provider_query, params)
for row in result.fetchall():
provider_success_rate = (row.successful / row.total * 100) if row.total > 0 else 0
provider_breakdown[row.provider] = {
"total": row.total,
"successful": row.successful,
"success_rate_percent": round(provider_success_rate, 2)
}
return {
"total_attempts": total,
"successful_attempts": successful,
"failed_attempts": failed,
"success_rate_percent": round(success_rate, 2),
"failure_rate_percent": round(failure_rate, 2),
"avg_response_time_ms": float(stats.avg_response_time_ms or 0),
"min_response_time_ms": int(stats.min_response_time_ms or 0),
"max_response_time_ms": int(stats.max_response_time_ms or 0),
"error_breakdown": error_breakdown,
"provider_breakdown": provider_breakdown,
"hours_analyzed": hours_back
}
except Exception as e:
logger.error("Failed to get delivery performance stats",
hours_back=hours_back,
provider=provider,
error=str(e))
return {
"total_attempts": 0,
"successful_attempts": 0,
"failed_attempts": 0,
"success_rate_percent": 0.0,
"failure_rate_percent": 0.0,
"avg_response_time_ms": 0.0,
"min_response_time_ms": 0,
"max_response_time_ms": 0,
"error_breakdown": {},
"provider_breakdown": {},
"hours_analyzed": hours_back
}
async def get_logs_by_provider(
self,
provider: str,
hours_back: int = 24,
status: NotificationStatus = None,
limit: int = 100
) -> List[NotificationLog]:
"""Get logs for a specific provider"""
try:
cutoff_time = datetime.utcnow() - timedelta(hours=hours_back)
conditions = [
"provider = :provider",
"attempted_at >= :cutoff_time"
]
params = {"provider": provider, "cutoff_time": cutoff_time, "limit": limit}
if status:
conditions.append("status = :status")
params["status"] = status.value
query_text = f"""
SELECT * FROM notification_logs
WHERE {' AND '.join(conditions)}
ORDER BY attempted_at DESC
LIMIT :limit
"""
result = await self.session.execute(text(query_text), params)
logs = []
for row in result.fetchall():
record_dict = dict(row._mapping)
# Convert enum string back to enum object
record_dict["status"] = NotificationStatus(record_dict["status"])
log_entry = self.model(**record_dict)
logs.append(log_entry)
return logs
except Exception as e:
logger.error("Failed to get logs by provider",
provider=provider,
error=str(e))
return []
async def cleanup_old_logs(self, days_old: int = 30) -> int:
"""Clean up old notification logs"""
try:
cutoff_date = datetime.utcnow() - timedelta(days=days_old)
# Only delete logs for successfully delivered or permanently failed notifications
query_text = """
DELETE FROM notification_logs
WHERE attempted_at < :cutoff_date
AND status IN ('delivered', 'failed')
"""
result = await self.session.execute(text(query_text), {"cutoff_date": cutoff_date})
deleted_count = result.rowcount
logger.info("Cleaned up old notification logs",
deleted_count=deleted_count,
days_old=days_old)
return deleted_count
except Exception as e:
logger.error("Failed to cleanup old logs", error=str(e))
raise DatabaseError(f"Cleanup failed: {str(e)}")
async def get_notification_timeline(
self,
notification_id: str
) -> Dict[str, Any]:
"""Get complete timeline for a notification including all attempts"""
try:
logs = await self.get_logs_for_notification(notification_id)
timeline = []
for log in logs:
entry = {
"attempt_number": log.attempt_number,
"status": log.status.value,
"attempted_at": log.attempted_at.isoformat() if log.attempted_at else None,
"provider": log.provider,
"provider_message_id": log.provider_message_id,
"response_time_ms": log.response_time_ms,
"error_code": log.error_code,
"error_message": log.error_message
}
# Parse metadata if present
if log.log_metadata:
try:
entry["metadata"] = json.loads(log.log_metadata)
except json.JSONDecodeError:
entry["metadata"] = log.log_metadata
# Parse provider response if present
if log.provider_response:
try:
entry["provider_response"] = json.loads(log.provider_response)
except json.JSONDecodeError:
entry["provider_response"] = log.provider_response
timeline.append(entry)
# Calculate summary statistics
total_attempts = len(logs)
successful_attempts = len([log for log in logs if log.status in [NotificationStatus.SENT, NotificationStatus.DELIVERED]])
failed_attempts = len([log for log in logs if log.status == NotificationStatus.FAILED])
avg_response_time = 0
if logs:
response_times = [log.response_time_ms for log in logs if log.response_time_ms is not None]
avg_response_time = sum(response_times) / len(response_times) if response_times else 0
return {
"notification_id": notification_id,
"total_attempts": total_attempts,
"successful_attempts": successful_attempts,
"failed_attempts": failed_attempts,
"avg_response_time_ms": round(avg_response_time, 2),
"timeline": timeline
}
except Exception as e:
logger.error("Failed to get notification timeline",
notification_id=notification_id,
error=str(e))
return {
"notification_id": notification_id,
"error": str(e),
"timeline": []
}
async def get_retry_analysis(self, days_back: int = 7) -> Dict[str, Any]:
"""Analyze retry patterns and success rates"""
try:
cutoff_date = datetime.utcnow() - timedelta(days=days_back)
# Get retry statistics
retry_query = text("""
SELECT
attempt_number,
COUNT(*) as total_attempts,
COUNT(CASE WHEN status = 'sent' OR status = 'delivered' THEN 1 END) as successful_attempts
FROM notification_logs
WHERE attempted_at >= :cutoff_date
GROUP BY attempt_number
ORDER BY attempt_number
""")
result = await self.session.execute(retry_query, {"cutoff_date": cutoff_date})
retry_stats = {}
for row in result.fetchall():
success_rate = (row.successful_attempts / row.total_attempts * 100) if row.total_attempts > 0 else 0
retry_stats[row.attempt_number] = {
"total_attempts": row.total_attempts,
"successful_attempts": row.successful_attempts,
"success_rate_percent": round(success_rate, 2)
}
# Get common failure patterns
failure_query = text("""
SELECT
error_code,
attempt_number,
COUNT(*) as count
FROM notification_logs
WHERE attempted_at >= :cutoff_date
AND status = 'failed'
AND error_code IS NOT NULL
GROUP BY error_code, attempt_number
ORDER BY count DESC
LIMIT 20
""")
result = await self.session.execute(failure_query, {"cutoff_date": cutoff_date})
failure_patterns = []
for row in result.fetchall():
failure_patterns.append({
"error_code": row.error_code,
"attempt_number": row.attempt_number,
"count": row.count
})
return {
"retry_statistics": retry_stats,
"failure_patterns": failure_patterns,
"days_analyzed": days_back
}
except Exception as e:
logger.error("Failed to get retry analysis", error=str(e))
return {
"retry_statistics": {},
"failure_patterns": [],
"days_analyzed": days_back,
"error": str(e)
}