Files
bakery-ia/services/auth/app/services/data_export_service.py
2025-10-27 16:33:26 +01:00

201 lines
7.6 KiB
Python

"""
User data export service for GDPR compliance
Implements Article 15 (Right to Access) and Article 20 (Right to Data Portability)
"""
from typing import Dict, Any, List
from uuid import UUID
from datetime import datetime, timezone
import structlog
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.users import User
from app.models.tokens import RefreshToken, LoginAttempt
from app.models.consent import UserConsent, ConsentHistory
from app.models.onboarding import UserOnboardingProgress
from app.models import AuditLog
logger = structlog.get_logger()
class DataExportService:
"""Service to export all user data in machine-readable format"""
def __init__(self, db: AsyncSession):
self.db = db
async def export_user_data(self, user_id: UUID) -> Dict[str, Any]:
"""
Export all user data from auth service
Returns data in structured JSON format
"""
try:
export_data = {
"export_metadata": {
"user_id": str(user_id),
"export_date": datetime.now(timezone.utc).isoformat(),
"data_controller": "Panadería IA",
"format_version": "1.0",
"gdpr_article": "Article 15 (Right to Access) & Article 20 (Data Portability)"
},
"personal_data": await self._export_personal_data(user_id),
"account_data": await self._export_account_data(user_id),
"consent_data": await self._export_consent_data(user_id),
"security_data": await self._export_security_data(user_id),
"onboarding_data": await self._export_onboarding_data(user_id),
"audit_logs": await self._export_audit_logs(user_id)
}
logger.info("data_export_completed", user_id=str(user_id))
return export_data
except Exception as e:
logger.error("data_export_failed", user_id=str(user_id), error=str(e))
raise
async def _export_personal_data(self, user_id: UUID) -> Dict[str, Any]:
"""Export personal identifiable information"""
query = select(User).where(User.id == user_id)
result = await self.db.execute(query)
user = result.scalar_one_or_none()
if not user:
return {}
return {
"user_id": str(user.id),
"email": user.email,
"full_name": user.full_name,
"phone": user.phone,
"language": user.language,
"timezone": user.timezone,
"is_active": user.is_active,
"is_verified": user.is_verified,
"role": user.role,
"created_at": user.created_at.isoformat() if user.created_at else None,
"updated_at": user.updated_at.isoformat() if user.updated_at else None,
"last_login": user.last_login.isoformat() if user.last_login else None
}
async def _export_account_data(self, user_id: UUID) -> Dict[str, Any]:
"""Export account-related data"""
query = select(RefreshToken).where(RefreshToken.user_id == user_id)
result = await self.db.execute(query)
tokens = result.scalars().all()
active_sessions = []
for token in tokens:
if token.expires_at > datetime.now(timezone.utc) and not token.is_revoked:
active_sessions.append({
"token_id": str(token.id),
"created_at": token.created_at.isoformat() if token.created_at else None,
"expires_at": token.expires_at.isoformat() if token.expires_at else None,
"is_revoked": token.is_revoked
})
return {
"active_sessions_count": len(active_sessions),
"active_sessions": active_sessions,
"total_tokens_issued": len(tokens)
}
async def _export_consent_data(self, user_id: UUID) -> Dict[str, Any]:
"""Export consent history"""
consent_query = select(UserConsent).where(UserConsent.user_id == user_id)
consent_result = await self.db.execute(consent_query)
consents = consent_result.scalars().all()
history_query = select(ConsentHistory).where(ConsentHistory.user_id == user_id)
history_result = await self.db.execute(history_query)
history = history_result.scalars().all()
return {
"current_consent": consents[0].to_dict() if consents else None,
"consent_history": [h.to_dict() for h in history],
"total_consent_changes": len(history)
}
async def _export_security_data(self, user_id: UUID) -> Dict[str, Any]:
"""Export security-related data"""
# First get user email
user_query = select(User).where(User.id == user_id)
user_result = await self.db.execute(user_query)
user = user_result.scalar_one_or_none()
if not user:
return {
"recent_login_attempts": [],
"total_attempts_exported": 0,
"note": "User not found"
}
# LoginAttempt uses email, not user_id
query = select(LoginAttempt).where(
LoginAttempt.email == user.email
).order_by(LoginAttempt.created_at.desc()).limit(50)
result = await self.db.execute(query)
attempts = result.scalars().all()
login_attempts = []
for attempt in attempts:
login_attempts.append({
"attempted_at": attempt.created_at.isoformat() if attempt.created_at else None,
"success": attempt.success,
"ip_address": attempt.ip_address,
"user_agent": attempt.user_agent,
"failure_reason": attempt.failure_reason
})
return {
"recent_login_attempts": login_attempts,
"total_attempts_exported": len(login_attempts),
"note": "Only last 50 login attempts included for data minimization"
}
async def _export_onboarding_data(self, user_id: UUID) -> Dict[str, Any]:
"""Export onboarding progress"""
query = select(UserOnboardingProgress).where(UserOnboardingProgress.user_id == user_id)
result = await self.db.execute(query)
progress = result.scalars().all()
return {
"onboarding_steps": [
{
"step_id": str(p.id),
"step_name": p.step_name,
"completed": p.completed,
"completed_at": p.completed_at.isoformat() if p.completed_at else None
}
for p in progress
]
}
async def _export_audit_logs(self, user_id: UUID) -> Dict[str, Any]:
"""Export audit logs related to user"""
query = select(AuditLog).where(
AuditLog.user_id == user_id
).order_by(AuditLog.created_at.desc()).limit(100)
result = await self.db.execute(query)
logs = result.scalars().all()
return {
"audit_trail": [
{
"log_id": str(log.id),
"action": log.action,
"resource_type": log.resource_type,
"resource_id": log.resource_id,
"severity": log.severity,
"description": log.description,
"ip_address": log.ip_address,
"created_at": log.created_at.isoformat() if log.created_at else None
}
for log in logs
],
"total_logs_exported": len(logs),
"note": "Only last 100 audit logs included for data minimization"
}