Add the initial user admin delete code

This commit is contained in:
Urtzi Alfaro
2025-08-01 22:20:32 +02:00
parent 6e6c2cc42e
commit d4687e6375
3 changed files with 1147 additions and 3 deletions

View File

@@ -2,19 +2,22 @@
User management API routes
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Dict, Any
import structlog
import uuid
from app.core.database import get_db
from app.schemas.auth import UserResponse, PasswordChange
from app.schemas.users import UserUpdate
from app.services.user_service import UserService
from app.models.users import User
from app.models.users import User
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.admin_delete import AdminUserDeleteService
# Import unified authentication from shared library
from shared.auth.decorators import (
get_current_user_dep,
@@ -116,4 +119,122 @@ async def update_current_user(
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update user"
)
)
@router.delete("/delete/users/{user_id}")
async def delete_admin_user(
user_id: str,
background_tasks: BackgroundTasks,
current_user = Depends(get_current_user_dep),
#_admin_check = Depends(require_admin_role),
db: AsyncSession = Depends(get_db)
):
"""
Delete an admin user and all associated data across all services.
This operation will:
1. Cancel any active training jobs for user's tenants
2. Delete all trained models and artifacts
3. Delete all forecasts and predictions
4. Delete notification preferences and logs
5. Handle tenant ownership (transfer or delete)
6. Delete user account and authentication data
**Warning: This operation is irreversible!**
"""
# Validate user_id format
try:
uuid.UUID(user_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid user ID format"
)
# Prevent self-deletion
if user_id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete your own account"
)
# Initialize deletion service
deletion_service = AdminUserDeleteService(db)
# Perform the deletion
try:
result = await deletion_service.delete_admin_user_complete(
user_id=user_id,
requesting_user_id=current_user.id
)
return {
"success": True,
"message": f"Admin user {user_id} has been successfully deleted",
"deletion_details": result
}
except HTTPException:
raise
except Exception as e:
logger.error("Unexpected error during user deletion",
user_id=user_id,
error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An unexpected error occurred during user deletion"
)
@router.get("/delete/users/{user_id}/deletion-preview")
async def preview_user_deletion(
user_id: str,
current_user = Depends(get_current_user_dep),
#_admin_check = Depends(require_admin_role),
db: AsyncSession = Depends(get_db)
):
"""
Preview what data would be deleted for an admin user.
This endpoint provides a dry-run preview of the deletion operation
without actually deleting any data.
"""
try:
uuid.UUID(user_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid user ID format"
)
deletion_service = AdminUserDeleteService(db)
# Get user info
user_info = await deletion_service._validate_admin_user(user_id)
if not user_info:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Admin user {user_id} not found"
)
# Get tenant associations
tenant_info = await deletion_service._get_user_tenant_info(user_id)
# Build preview
preview = {
"user": user_info,
"tenant_associations": tenant_info,
"estimated_deletions": {
"training_models": "All models for associated tenants",
"forecasts": "All forecasts for associated tenants",
"notifications": "All user notification data",
"tenant_memberships": tenant_info['total_tenants'],
"owned_tenants": f"{tenant_info['owned_tenants']} (will be transferred or deleted)"
},
"warning": "This operation is irreversible and will permanently delete all associated data"
}
return preview

View File

@@ -0,0 +1,620 @@
# ================================================================
# Admin User Delete API - Complete Implementation
# ================================================================
"""
Complete admin user deletion API that handles all associated data
across all microservices in the bakery forecasting platform.
This implementation ensures proper cascade deletion of:
1. User account and authentication data
2. Tenant ownership and memberships
3. All training models and artifacts
4. Forecasts and predictions
5. Notification preferences and logs
6. Refresh tokens and sessions
"""
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, text
from typing import Dict, List, Any, Optional
import structlog
import uuid
from datetime import datetime
from shared.auth.decorators import get_current_user_dep
from app.core.database import get_db
from app.services.messaging import auth_publisher
from app.services.auth_service_clients import AuthServiceClientFactory
from app.core.config import settings
logger = structlog.get_logger()
router = APIRouter()
class AdminUserDeleteService:
"""Service to handle complete admin user deletion across all microservices"""
def __init__(self, db: AsyncSession):
self.db = db
self.clients = AuthServiceClientFactory(settings)
async def delete_admin_user_complete(self, user_id: str, requesting_user_id: str) -> Dict[str, Any]:
"""
Complete admin user deletion with all associated data using inter-service clients
Args:
user_id: ID of the admin user to delete
requesting_user_id: ID of the user performing the deletion
Returns:
Dictionary with deletion results from all services
"""
deletion_results = {
'user_id': user_id,
'requested_by': requesting_user_id,
'started_at': datetime.utcnow().isoformat(),
'services_processed': {},
'errors': [],
'summary': {}
}
try:
# Step 1: Validate user exists and is admin
user_info = await self._validate_admin_user(user_id)
if not user_info:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Admin user {user_id} not found"
)
deletion_results['user_info'] = user_info
# Step 2: Get all tenant associations using tenant client
tenant_info = await self._get_user_tenant_info(user_id)
deletion_results['tenant_associations'] = tenant_info
# Step 3: Delete in proper order to respect dependencies
# 3.1 Stop all active training jobs and delete models
training_result = await self._delete_training_data(tenant_info['tenant_ids'])
deletion_results['services_processed']['training'] = training_result
# 3.2 Delete all forecasts and predictions
forecasting_result = await self._delete_forecasting_data(tenant_info['tenant_ids'])
deletion_results['services_processed']['forecasting'] = forecasting_result
# 3.3 Delete notification preferences and logs
notification_result = await self._delete_notification_data(user_id)
deletion_results['services_processed']['notification'] = notification_result
# 3.4 Delete tenant memberships and handle owned tenants
tenant_result = await self._delete_tenant_data(user_id, tenant_info)
deletion_results['services_processed']['tenant'] = tenant_result
# 3.5 Finally delete user account and auth data
auth_result = await self._delete_auth_data(user_id)
deletion_results['services_processed']['auth'] = auth_result
# Step 4: Generate summary
deletion_results['summary'] = await self._generate_deletion_summary(deletion_results)
deletion_results['completed_at'] = datetime.utcnow().isoformat()
deletion_results['status'] = 'success'
# Step 5: Publish deletion event
await self._publish_user_deleted_event(user_id, deletion_results)
# Step 6: Send notification to admins
await self._notify_admins_of_deletion(user_info, deletion_results)
logger.info("Admin user deletion completed successfully",
user_id=user_id,
tenants_affected=len(tenant_info['tenant_ids']))
return deletion_results
except Exception as e:
deletion_results['status'] = 'failed'
deletion_results['error'] = str(e)
deletion_results['completed_at'] = datetime.utcnow().isoformat()
logger.error("Admin user deletion failed",
user_id=user_id,
error=str(e))
# Attempt to publish failure event
try:
await self._publish_user_deletion_failed_event(user_id, str(e))
except:
pass
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"User deletion failed: {str(e)}"
)
async def _validate_admin_user(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Validate user exists and get basic info from local database"""
try:
from app.models.users import User
# Query user from local auth database
query = select(User).where(User.id == uuid.UUID(user_id))
result = await self.db.execute(query)
user = result.scalar_one_or_none()
if not user:
return None
return {
'id': str(user.id),
'email': user.email,
'full_name': user.full_name,
'created_at': user.created_at.isoformat() if user.created_at else None,
'is_active': user.is_active,
'is_verified': user.is_verified
}
except Exception as e:
logger.error("Failed to validate admin user", user_id=user_id, error=str(e))
raise
async def _get_user_tenant_info(self, user_id: str) -> Dict[str, Any]:
"""Get all tenant associations for the user using tenant client"""
try:
# Use tenant service client to get memberships
memberships = await self.clients.tenant_client.get_user_tenants(user_id)
if not memberships:
return {
'tenant_ids': [],
'total_tenants': 0,
'owned_tenants': 0,
'memberships': []
}
tenant_ids = [m['tenant_id'] for m in memberships]
owned_tenants = [m for m in memberships if m.get('role') == 'owner']
return {
'tenant_ids': tenant_ids,
'total_tenants': len(tenant_ids),
'owned_tenants': len(owned_tenants),
'memberships': memberships
}
except Exception as e:
logger.error("Failed to get tenant info", user_id=user_id, error=str(e))
return {'tenant_ids': [], 'total_tenants': 0, 'owned_tenants': 0, 'memberships': []}
async def _delete_training_data(self, tenant_ids: List[str]) -> Dict[str, Any]:
"""Delete all training models, jobs, and artifacts for user's tenants"""
result = {
'models_deleted': 0,
'jobs_cancelled': 0,
'artifacts_deleted': 0,
'total_tenants_processed': 0,
'errors': []
}
try:
for tenant_id in tenant_ids:
try:
# Cancel active training jobs using training client
cancel_result = await self.clients.training_client.cancel_tenant_training_jobs(tenant_id)
if cancel_result:
result['jobs_cancelled'] += cancel_result.get('jobs_cancelled', 0)
if cancel_result.get('errors'):
result['errors'].extend(cancel_result['errors'])
# Delete all models and artifacts using training client
delete_result = await self.clients.training_client.delete_tenant_models(tenant_id)
if delete_result:
result['models_deleted'] += delete_result.get('models_deleted', 0)
result['artifacts_deleted'] += delete_result.get('artifacts_deleted', 0)
if delete_result.get('errors'):
result['errors'].extend(delete_result['errors'])
result['total_tenants_processed'] += 1
logger.debug("Training data deleted for tenant",
tenant_id=tenant_id,
models=delete_result.get('models_deleted', 0) if delete_result else 0)
except Exception as e:
error_msg = f"Error deleting training data for tenant {tenant_id}: {str(e)}"
result['errors'].append(error_msg)
logger.error(error_msg)
except Exception as e:
result['errors'].append(f"Training service communication error: {str(e)}")
return result
async def _delete_forecasting_data(self, tenant_ids: List[str]) -> Dict[str, Any]:
"""Delete all forecasts, predictions, and caches for user's tenants"""
result = {
'forecasts_deleted': 0,
'predictions_deleted': 0,
'cache_cleared': 0,
'batches_cancelled': 0,
'total_tenants_processed': 0,
'errors': []
}
try:
for tenant_id in tenant_ids:
try:
# Cancel any active prediction batches
batch_result = await self.clients.forecasting_client.cancel_tenant_prediction_batches(tenant_id)
if batch_result:
result['batches_cancelled'] += batch_result.get('batches_cancelled', 0)
if batch_result.get('errors'):
result['errors'].extend(batch_result['errors'])
# Clear prediction cache
cache_result = await self.clients.forecasting_client.clear_tenant_prediction_cache(tenant_id)
if cache_result:
result['cache_cleared'] += cache_result.get('cache_cleared', 0)
if cache_result.get('errors'):
result['errors'].extend(cache_result['errors'])
# Delete all forecasts for tenant
delete_result = await self.clients.forecasting_client.delete_tenant_forecasts(tenant_id)
if delete_result:
result['forecasts_deleted'] += delete_result.get('forecasts_deleted', 0)
result['predictions_deleted'] += delete_result.get('predictions_deleted', 0)
if delete_result.get('errors'):
result['errors'].extend(delete_result['errors'])
result['total_tenants_processed'] += 1
logger.debug("Forecasting data deleted for tenant",
tenant_id=tenant_id,
forecasts=delete_result.get('forecasts_deleted', 0) if delete_result else 0)
except Exception as e:
error_msg = f"Error deleting forecasting data for tenant {tenant_id}: {str(e)}"
result['errors'].append(error_msg)
logger.error(error_msg)
except Exception as e:
result['errors'].append(f"Forecasting service communication error: {str(e)}")
return result
async def _delete_notification_data(self, user_id: str) -> Dict[str, Any]:
"""Delete notification preferences, logs, and pending notifications"""
result = {
'preferences_deleted': 0,
'notifications_deleted': 0,
'notifications_cancelled': 0,
'logs_deleted': 0,
'errors': []
}
try:
# Cancel pending notifications first
cancel_result = await self.clients.notification_client.cancel_pending_user_notifications(user_id)
if cancel_result:
result['notifications_cancelled'] = cancel_result.get('notifications_cancelled', 0)
if cancel_result.get('errors'):
result['errors'].extend(cancel_result['errors'])
# Delete all notification data for user
delete_result = await self.clients.notification_client.delete_user_notification_data(user_id)
if delete_result:
result['preferences_deleted'] = delete_result.get('preferences_deleted', 0)
result['notifications_deleted'] = delete_result.get('notifications_deleted', 0)
result['logs_deleted'] = delete_result.get('logs_deleted', 0)
if delete_result.get('errors'):
result['errors'].extend(delete_result['errors'])
logger.debug("Notification data deleted for user",
user_id=user_id,
notifications=result['notifications_deleted'])
except Exception as e:
result['errors'].append(f"Notification service communication error: {str(e)}")
return result
async def _delete_tenant_data(self, user_id: str, tenant_info: Dict[str, Any]) -> Dict[str, Any]:
"""Delete tenant memberships and handle owned tenants using tenant client"""
result = {
'memberships_deleted': 0,
'tenants_deleted': 0,
'tenants_transferred': 0,
'errors': []
}
try:
# Handle owned tenants - either delete or transfer ownership
for membership in tenant_info['memberships']:
if membership.get('role') == 'owner':
tenant_id = membership['tenant_id']
try:
# Check if tenant has other admin members who can take ownership
has_other_admins = await self.clients.tenant_client.check_tenant_has_other_admins(
tenant_id, user_id
)
if has_other_admins:
# Get tenant members to find first admin
members = await self.clients.tenant_client.get_tenant_members(tenant_id)
admin_members = [
m for m in members
if m.get('role') == 'admin' and m.get('user_id') != user_id
]
if admin_members:
# Transfer ownership to first admin
transfer_result = await self.clients.tenant_client.transfer_tenant_ownership(
tenant_id, user_id, admin_members[0]['user_id']
)
if transfer_result:
result['tenants_transferred'] += 1
logger.info("Transferred tenant ownership",
tenant_id=tenant_id,
new_owner=admin_members[0]['user_id'])
else:
result['errors'].append(f"Failed to transfer ownership of tenant {tenant_id}")
else:
result['errors'].append(f"No admin members found for tenant {tenant_id}")
else:
# No other admins, delete the tenant completely
delete_result = await self.clients.tenant_client.delete_tenant(tenant_id)
if delete_result:
result['tenants_deleted'] += 1
logger.info("Deleted tenant", tenant_id=tenant_id)
else:
result['errors'].append(f"Failed to delete tenant {tenant_id}")
except Exception as e:
error_msg = f"Error handling owned tenant {tenant_id}: {str(e)}"
result['errors'].append(error_msg)
logger.error(error_msg)
# Delete user's memberships
delete_result = await self.clients.tenant_client.delete_user_memberships(user_id)
if delete_result:
result['memberships_deleted'] = delete_result.get('memberships_deleted', 0)
if delete_result.get('errors'):
result['errors'].extend(delete_result['errors'])
else:
result['errors'].append("Failed to delete user memberships")
except Exception as e:
result['errors'].append(f"Tenant service communication error: {str(e)}")
return result
async def _delete_auth_data(self, user_id: str) -> Dict[str, Any]:
"""Delete user account, refresh tokens, and auth data from local database"""
result = {
'user_deleted': False,
'refresh_tokens_deleted': 0,
'sessions_invalidated': 0,
'errors': []
}
try:
from app.models.users import User, RefreshToken
# Delete refresh tokens
token_delete_query = delete(RefreshToken).where(RefreshToken.user_id == uuid.UUID(user_id))
token_result = await self.db.execute(token_delete_query)
result['refresh_tokens_deleted'] = token_result.rowcount
# Delete user account
user_delete_query = delete(User).where(User.id == uuid.UUID(user_id))
user_result = await self.db.execute(user_delete_query)
if user_result.rowcount > 0:
result['user_deleted'] = True
await self.db.commit()
logger.info("User and tokens deleted from auth database",
user_id=user_id,
tokens_deleted=result['refresh_tokens_deleted'])
else:
result['errors'].append("User not found in auth database")
await self.db.rollback()
except Exception as e:
await self.db.rollback()
error_msg = f"Auth database error: {str(e)}"
result['errors'].append(error_msg)
logger.error(error_msg)
return result
async def _generate_deletion_summary(self, deletion_results: Dict[str, Any]) -> Dict[str, Any]:
"""Generate summary of deletion operation"""
summary = {
'total_tenants_affected': deletion_results['tenant_associations']['total_tenants'],
'total_models_deleted': deletion_results['services_processed']['training']['models_deleted'],
'total_forecasts_deleted': deletion_results['services_processed']['forecasting']['forecasts_deleted'],
'total_notifications_deleted': deletion_results['services_processed']['notification']['notifications_deleted'],
'tenants_transferred': deletion_results['services_processed']['tenant']['tenants_transferred'],
'tenants_deleted': deletion_results['services_processed']['tenant']['tenants_deleted'],
'user_deleted': deletion_results['services_processed']['auth']['user_deleted'],
'total_errors': 0
}
# Count total errors across all services
for service_result in deletion_results['services_processed'].values():
if isinstance(service_result, dict) and 'errors' in service_result:
summary['total_errors'] += len(service_result['errors'])
# Add success indicator
summary['deletion_successful'] = (
summary['user_deleted'] and
summary['total_errors'] == 0
)
return summary
async def _publish_user_deleted_event(self, user_id: str, deletion_results: Dict[str, Any]):
"""Publish user deletion event to message queue"""
try:
await auth_publisher.publish_event(
exchange="user_events",
routing_key="user.admin.deleted",
message={
"event_type": "admin_user_deleted",
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat(),
"deletion_summary": deletion_results['summary'],
"services_affected": list(deletion_results['services_processed'].keys())
}
)
logger.info("Published user deletion event", user_id=user_id)
except Exception as e:
logger.error("Failed to publish user deletion event", error=str(e))
async def _publish_user_deletion_failed_event(self, user_id: str, error: str):
"""Publish user deletion failure event"""
try:
await auth_publisher.publish_event(
exchange="user_events",
routing_key="user.deletion.failed",
message={
"event_type": "admin_user_deletion_failed",
"user_id": user_id,
"error": error,
"timestamp": datetime.utcnow().isoformat()
}
)
logger.info("Published user deletion failure event", user_id=user_id)
except Exception as e:
logger.error("Failed to publish deletion failure event", error=str(e))
async def _notify_admins_of_deletion(self, user_info: Dict[str, Any], deletion_results: Dict[str, Any]):
"""Send notification to other admins about the user deletion"""
try:
# Get requesting user info for notification
requesting_user_id = deletion_results['requested_by']
requesting_user = await self._validate_admin_user(requesting_user_id)
if requesting_user:
await self.clients.notification_client.send_user_deletion_notification(
admin_email=requesting_user['email'],
deleted_user_email=user_info['email'],
deletion_summary=deletion_results['summary']
)
logger.info("Sent user deletion notification",
deleted_user=user_info['email'],
notified_admin=requesting_user['email'])
except Exception as e:
logger.error("Failed to send admin notification", error=str(e))
async def preview_user_deletion(self, user_id: str) -> Dict[str, Any]:
"""
Preview what data would be deleted for an admin user without actually deleting
"""
try:
# Get user info
user_info = await self._validate_admin_user(user_id)
if not user_info:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Admin user {user_id} not found"
)
# Get tenant associations
tenant_info = await self._get_user_tenant_info(user_id)
# Get counts from each service
training_models_count = 0
forecasts_count = 0
notifications_count = 0
for tenant_id in tenant_info['tenant_ids']:
try:
# Get training models count
models_count = await self.clients.training_client.get_tenant_models_count(tenant_id)
training_models_count += models_count
# Get forecasts count
tenant_forecasts = await self.clients.forecasting_client.get_tenant_forecasts_count(tenant_id)
forecasts_count += tenant_forecasts
except Exception as e:
logger.warning("Could not get counts for tenant", tenant_id=tenant_id, error=str(e))
try:
# Get user notifications count
notifications_count = await self.clients.notification_client.get_user_notification_count(user_id)
except Exception as e:
logger.warning("Could not get notification count", user_id=user_id, error=str(e))
# Build preview
preview = {
"user": user_info,
"tenant_associations": tenant_info,
"estimated_deletions": {
"training_models": training_models_count,
"forecasts": forecasts_count,
"notifications": notifications_count,
"tenant_memberships": tenant_info['total_tenants'],
"owned_tenants": tenant_info['owned_tenants']
},
"tenant_handling": await self._preview_tenant_handling(user_id, tenant_info),
"warning": "This operation is irreversible and will permanently delete all associated data"
}
return preview
except HTTPException:
raise
except Exception as e:
logger.error("Error generating deletion preview", user_id=user_id, error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate deletion preview"
)
async def _preview_tenant_handling(self, user_id: str, tenant_info: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Preview how each owned tenant would be handled"""
tenant_handling = []
for membership in tenant_info['memberships']:
if membership.get('role') == 'owner':
tenant_id = membership['tenant_id']
try:
has_other_admins = await self.clients.tenant_client.check_tenant_has_other_admins(
tenant_id, user_id
)
if has_other_admins:
members = await self.clients.tenant_client.get_tenant_members(tenant_id)
admin_members = [
m for m in members
if m.get('role') == 'admin' and m.get('user_id') != user_id
]
tenant_handling.append({
"tenant_id": tenant_id,
"action": "transfer_ownership",
"details": f"Ownership will be transferred to admin: {admin_members[0]['user_id'] if admin_members else 'Unknown'}"
})
else:
tenant_handling.append({
"tenant_id": tenant_id,
"action": "delete_tenant",
"details": "Tenant will be deleted completely (no other admins found)"
})
except Exception as e:
tenant_handling.append({
"tenant_id": tenant_id,
"action": "error",
"details": f"Could not determine action: {str(e)}"
})
return tenant_handling

View File

@@ -0,0 +1,403 @@
# ================================================================
# Auth Service Inter-Service Communication Clients
# ================================================================
"""
Inter-service communication clients for the Auth Service to communicate
with other microservices in the bakery forecasting platform.
These clients handle authenticated API calls to:
- Tenant Service
- Training Service
- Forecasting Service
- Notification Service
"""
import structlog
from typing import Dict, Any, Optional, List, Union
from shared.clients.base_service_client import BaseServiceClient
from shared.config.base import BaseServiceSettings
logger = structlog.get_logger()
# ================================================================
# TENANT SERVICE CLIENT
# ================================================================
class AuthTenantServiceClient(BaseServiceClient):
"""Client for Auth Service to communicate with Tenant Service"""
def __init__(self, config: BaseServiceSettings):
super().__init__("auth", config)
self.service_url = config.TENANT_SERVICE_URL
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# USER TENANT OPERATIONS
# ================================================================
async def get_user_tenants(self, user_id: str) -> Optional[List[Dict[str, Any]]]:
"""Get all tenant memberships for a user"""
try:
result = await self.get(f"users/{user_id}/tenants")
return result.get("memberships", []) if result else []
except Exception as e:
logger.error("Failed to get user tenants", user_id=user_id, error=str(e))
return []
async def get_user_owned_tenants(self, user_id: str) -> Optional[List[Dict[str, Any]]]:
"""Get tenants owned by a user"""
try:
memberships = await self.get_user_tenants(user_id)
if memberships:
return [m for m in memberships if m.get('role') == 'owner']
return []
except Exception as e:
logger.error("Failed to get owned tenants", user_id=user_id, error=str(e))
return []
async def transfer_tenant_ownership(
self,
tenant_id: str,
current_owner_id: str,
new_owner_id: str
) -> Optional[Dict[str, Any]]:
"""Transfer tenant ownership from one user to another"""
try:
data = {
"current_owner_id": current_owner_id,
"new_owner_id": new_owner_id
}
return await self.post(f"tenants/{tenant_id}/transfer-ownership", data=data)
except Exception as e:
logger.error("Failed to transfer tenant ownership",
tenant_id=tenant_id,
error=str(e))
return None
async def delete_tenant(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Delete a tenant completely"""
try:
return await self.delete(f"tenants/{tenant_id}")
except Exception as e:
logger.error("Failed to delete tenant", tenant_id=tenant_id, error=str(e))
return None
async def delete_user_memberships(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Delete all tenant memberships for a user"""
try:
return await self.delete(f"users/{user_id}/memberships")
except Exception as e:
logger.error("Failed to delete user memberships", user_id=user_id, error=str(e))
return None
async def get_tenant_members(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]:
"""Get all members of a tenant"""
try:
result = await self.get(f"tenants/{tenant_id}/members")
return result.get("members", []) if result else []
except Exception as e:
logger.error("Failed to get tenant members", tenant_id=tenant_id, error=str(e))
return []
async def check_tenant_has_other_admins(self, tenant_id: str, excluding_user_id: str) -> bool:
"""Check if tenant has other admin users besides the excluded one"""
try:
members = await self.get_tenant_members(tenant_id)
if members:
admin_members = [
m for m in members
if m.get('role') in ['admin', 'owner'] and m.get('user_id') != excluding_user_id
]
return len(admin_members) > 0
return False
except Exception as e:
logger.error("Failed to check tenant admins",
tenant_id=tenant_id,
error=str(e))
return False
# ================================================================
# TRAINING SERVICE CLIENT
# ================================================================
class AuthTrainingServiceClient(BaseServiceClient):
"""Client for Auth Service to communicate with Training Service"""
def __init__(self, config: BaseServiceSettings):
super().__init__("auth", config)
self.service_url = config.TRAINING_SERVICE_URL
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# TRAINING JOB OPERATIONS
# ================================================================
async def cancel_tenant_training_jobs(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Cancel all active training jobs for a tenant"""
try:
data = {"tenant_id": tenant_id}
return await self.post("jobs/cancel-tenant", data=data)
except Exception as e:
logger.error("Failed to cancel tenant training jobs",
tenant_id=tenant_id,
error=str(e))
return {"jobs_cancelled": 0, "errors": [str(e)]}
async def get_tenant_active_jobs(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]:
"""Get all active training jobs for a tenant"""
try:
params = {"status": "running,queued,pending", "tenant_id": tenant_id}
result = await self.get("jobs", params=params)
return result.get("jobs", []) if result else []
except Exception as e:
logger.error("Failed to get tenant active jobs",
tenant_id=tenant_id,
error=str(e))
return []
# ================================================================
# MODEL OPERATIONS
# ================================================================
async def delete_tenant_models(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Delete all trained models and artifacts for a tenant"""
try:
return await self.delete(f"models/tenant/{tenant_id}")
except Exception as e:
logger.error("Failed to delete tenant models",
tenant_id=tenant_id,
error=str(e))
return {"models_deleted": 0, "artifacts_deleted": 0, "errors": [str(e)]}
async def get_tenant_models_count(self, tenant_id: str) -> int:
"""Get count of trained models for a tenant"""
try:
result = await self.get(f"models/tenant/{tenant_id}/count")
return result.get("count", 0) if result else 0
except Exception as e:
logger.error("Failed to get tenant models count",
tenant_id=tenant_id,
error=str(e))
return 0
async def get_tenant_model_artifacts(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]:
"""Get all model artifacts for a tenant"""
try:
result = await self.get(f"models/tenant/{tenant_id}/artifacts")
return result.get("artifacts", []) if result else []
except Exception as e:
logger.error("Failed to get tenant model artifacts",
tenant_id=tenant_id,
error=str(e))
return []
# ================================================================
# FORECASTING SERVICE CLIENT
# ================================================================
class AuthForecastingServiceClient(BaseServiceClient):
"""Client for Auth Service to communicate with Forecasting Service"""
def __init__(self, config: BaseServiceSettings):
super().__init__("auth", config)
self.service_url = config.FORECASTING_SERVICE_URL
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# FORECAST OPERATIONS
# ================================================================
async def delete_tenant_forecasts(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Delete all forecasts and predictions for a tenant"""
try:
return await self.delete(f"forecasts/tenant/{tenant_id}")
except Exception as e:
logger.error("Failed to delete tenant forecasts",
tenant_id=tenant_id,
error=str(e))
return {
"forecasts_deleted": 0,
"predictions_deleted": 0,
"cache_cleared": 0,
"errors": [str(e)]
}
async def get_tenant_forecasts_count(self, tenant_id: str) -> int:
"""Get count of forecasts for a tenant"""
try:
result = await self.get(f"forecasts/tenant/{tenant_id}/count")
return result.get("count", 0) if result else 0
except Exception as e:
logger.error("Failed to get tenant forecasts count",
tenant_id=tenant_id,
error=str(e))
return 0
async def clear_tenant_prediction_cache(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Clear prediction cache for a tenant"""
try:
return await self.post(f"predictions/cache/clear/{tenant_id}")
except Exception as e:
logger.error("Failed to clear tenant prediction cache",
tenant_id=tenant_id,
error=str(e))
return {"cache_cleared": 0, "errors": [str(e)]}
async def cancel_tenant_prediction_batches(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Cancel any active prediction batches for a tenant"""
try:
data = {"tenant_id": tenant_id}
return await self.post("predictions/batches/cancel", data=data)
except Exception as e:
logger.error("Failed to cancel tenant prediction batches",
tenant_id=tenant_id,
error=str(e))
return {"batches_cancelled": 0, "errors": [str(e)]}
# ================================================================
# NOTIFICATION SERVICE CLIENT
# ================================================================
class AuthNotificationServiceClient(BaseServiceClient):
"""Client for Auth Service to communicate with Notification Service"""
def __init__(self, config: BaseServiceSettings):
super().__init__("auth", config)
self.service_url = config.NOTIFICATION_SERVICE_URL
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# USER NOTIFICATION OPERATIONS
# ================================================================
async def delete_user_notification_data(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Delete all notification data for a user"""
try:
return await self.delete(f"users/{user_id}/data")
except Exception as e:
logger.error("Failed to delete user notification data",
user_id=user_id,
error=str(e))
return {
"preferences_deleted": 0,
"notifications_deleted": 0,
"logs_deleted": 0,
"errors": [str(e)]
}
async def cancel_pending_user_notifications(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Cancel all pending notifications for a user"""
try:
return await self.post(f"users/{user_id}/notifications/cancel")
except Exception as e:
logger.error("Failed to cancel user notifications",
user_id=user_id,
error=str(e))
return {"notifications_cancelled": 0, "errors": [str(e)]}
async def get_user_notification_count(self, user_id: str) -> int:
"""Get count of notifications for a user"""
try:
result = await self.get(f"users/{user_id}/notifications/count")
return result.get("count", 0) if result else 0
except Exception as e:
logger.error("Failed to get user notification count",
user_id=user_id,
error=str(e))
return 0
async def send_user_deletion_notification(
self,
admin_email: str,
deleted_user_email: str,
deletion_summary: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Send notification about user deletion to administrators"""
try:
data = {
"type": "email",
"recipient_email": admin_email,
"template_key": "user_deletion_notification",
"template_data": {
"deleted_user_email": deleted_user_email,
"deletion_summary": deletion_summary
},
"priority": "high"
}
return await self.post("notifications/send", data=data)
except Exception as e:
logger.error("Failed to send user deletion notification",
admin_email=admin_email,
error=str(e))
return None
# ================================================================
# CLIENT FACTORY
# ================================================================
class AuthServiceClientFactory:
"""Factory for creating inter-service clients for Auth Service"""
def __init__(self, config: BaseServiceSettings):
self.config = config
self._tenant_client = None
self._training_client = None
self._forecasting_client = None
self._notification_client = None
@property
def tenant_client(self) -> AuthTenantServiceClient:
"""Get or create tenant service client"""
if self._tenant_client is None:
self._tenant_client = AuthTenantServiceClient(self.config)
return self._tenant_client
@property
def training_client(self) -> AuthTrainingServiceClient:
"""Get or create training service client"""
if self._training_client is None:
self._training_client = AuthTrainingServiceClient(self.config)
return self._training_client
@property
def forecasting_client(self) -> AuthForecastingServiceClient:
"""Get or create forecasting service client"""
if self._forecasting_client is None:
self._forecasting_client = AuthForecastingServiceClient(self.config)
return self._forecasting_client
@property
def notification_client(self) -> AuthNotificationServiceClient:
"""Get or create notification service client"""
if self._notification_client is None:
self._notification_client = AuthNotificationServiceClient(self.config)
return self._notification_client
async def health_check_all_services(self) -> Dict[str, bool]:
"""Check health of all services"""
results = {}
clients = [
("tenant", self.tenant_client),
("training", self.training_client),
("forecasting", self.forecasting_client),
("notification", self.notification_client)
]
for service_name, client in clients:
try:
health = await client.get("health")
results[service_name] = health is not None and health.get("status") == "healthy"
except Exception as e:
logger.error(f"Health check failed for {service_name} service", error=str(e))
results[service_name] = False
return results