From d4687e6375c03fca468738cc57db5a720d57e72f Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Fri, 1 Aug 2025 22:20:32 +0200 Subject: [PATCH] Add the initial user admin delete code --- services/auth/app/api/users.py | 127 +++- services/auth/app/services/admin_delete.py | 620 ++++++++++++++++++ .../auth/app/services/auth_service_clients.py | 403 ++++++++++++ 3 files changed, 1147 insertions(+), 3 deletions(-) create mode 100644 services/auth/app/services/admin_delete.py create mode 100644 services/auth/app/services/auth_service_clients.py diff --git a/services/auth/app/api/users.py b/services/auth/app/api/users.py index 476f55df..a286d0f2 100644 --- a/services/auth/app/api/users.py +++ b/services/auth/app/api/users.py @@ -2,19 +2,22 @@ User management API routes """ -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from sqlalchemy.ext.asyncio import AsyncSession from typing import Dict, Any import structlog +import uuid from app.core.database import get_db from app.schemas.auth import UserResponse, PasswordChange from app.schemas.users import UserUpdate from app.services.user_service import UserService -from app.models.users import User +from app.models.users import User from sqlalchemy.ext.asyncio import AsyncSession +from app.services.admin_delete import AdminUserDeleteService + # Import unified authentication from shared library from shared.auth.decorators import ( get_current_user_dep, @@ -116,4 +119,122 @@ async def update_current_user( raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update user" - ) \ No newline at end of file + ) + +@router.delete("/delete/users/{user_id}") +async def delete_admin_user( + user_id: str, + background_tasks: BackgroundTasks, + current_user = Depends(get_current_user_dep), + #_admin_check = Depends(require_admin_role), + db: AsyncSession = Depends(get_db) +): + """ + Delete an admin user and all associated data across all services. + + This operation will: + 1. Cancel any active training jobs for user's tenants + 2. Delete all trained models and artifacts + 3. Delete all forecasts and predictions + 4. Delete notification preferences and logs + 5. Handle tenant ownership (transfer or delete) + 6. Delete user account and authentication data + + **Warning: This operation is irreversible!** + """ + + # Validate user_id format + try: + uuid.UUID(user_id) + except ValueError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid user ID format" + ) + + # Prevent self-deletion + if user_id == current_user.id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Cannot delete your own account" + ) + + # Initialize deletion service + deletion_service = AdminUserDeleteService(db) + + # Perform the deletion + try: + result = await deletion_service.delete_admin_user_complete( + user_id=user_id, + requesting_user_id=current_user.id + ) + + return { + "success": True, + "message": f"Admin user {user_id} has been successfully deleted", + "deletion_details": result + } + + except HTTPException: + raise + except Exception as e: + logger.error("Unexpected error during user deletion", + user_id=user_id, + error=str(e)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="An unexpected error occurred during user deletion" + ) + + +@router.get("/delete/users/{user_id}/deletion-preview") +async def preview_user_deletion( + user_id: str, + current_user = Depends(get_current_user_dep), + #_admin_check = Depends(require_admin_role), + db: AsyncSession = Depends(get_db) +): + """ + Preview what data would be deleted for an admin user. + + This endpoint provides a dry-run preview of the deletion operation + without actually deleting any data. + """ + + try: + uuid.UUID(user_id) + except ValueError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid user ID format" + ) + + deletion_service = AdminUserDeleteService(db) + + # Get user info + user_info = await deletion_service._validate_admin_user(user_id) + if not user_info: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Admin user {user_id} not found" + ) + + # Get tenant associations + tenant_info = await deletion_service._get_user_tenant_info(user_id) + + # Build preview + preview = { + "user": user_info, + "tenant_associations": tenant_info, + "estimated_deletions": { + "training_models": "All models for associated tenants", + "forecasts": "All forecasts for associated tenants", + "notifications": "All user notification data", + "tenant_memberships": tenant_info['total_tenants'], + "owned_tenants": f"{tenant_info['owned_tenants']} (will be transferred or deleted)" + }, + "warning": "This operation is irreversible and will permanently delete all associated data" + } + + return preview + diff --git a/services/auth/app/services/admin_delete.py b/services/auth/app/services/admin_delete.py new file mode 100644 index 00000000..8d08a422 --- /dev/null +++ b/services/auth/app/services/admin_delete.py @@ -0,0 +1,620 @@ +# ================================================================ +# Admin User Delete API - Complete Implementation +# ================================================================ +""" +Complete admin user deletion API that handles all associated data +across all microservices in the bakery forecasting platform. + +This implementation ensures proper cascade deletion of: +1. User account and authentication data +2. Tenant ownership and memberships +3. All training models and artifacts +4. Forecasts and predictions +5. Notification preferences and logs +6. Refresh tokens and sessions +""" + +from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, delete, text +from typing import Dict, List, Any, Optional +import structlog +import uuid +from datetime import datetime + +from shared.auth.decorators import get_current_user_dep +from app.core.database import get_db +from app.services.messaging import auth_publisher +from app.services.auth_service_clients import AuthServiceClientFactory +from app.core.config import settings + +logger = structlog.get_logger() + +router = APIRouter() + +class AdminUserDeleteService: + """Service to handle complete admin user deletion across all microservices""" + + def __init__(self, db: AsyncSession): + self.db = db + self.clients = AuthServiceClientFactory(settings) + + async def delete_admin_user_complete(self, user_id: str, requesting_user_id: str) -> Dict[str, Any]: + """ + Complete admin user deletion with all associated data using inter-service clients + + Args: + user_id: ID of the admin user to delete + requesting_user_id: ID of the user performing the deletion + + Returns: + Dictionary with deletion results from all services + """ + + deletion_results = { + 'user_id': user_id, + 'requested_by': requesting_user_id, + 'started_at': datetime.utcnow().isoformat(), + 'services_processed': {}, + 'errors': [], + 'summary': {} + } + + try: + # Step 1: Validate user exists and is admin + user_info = await self._validate_admin_user(user_id) + if not user_info: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Admin user {user_id} not found" + ) + + deletion_results['user_info'] = user_info + + # Step 2: Get all tenant associations using tenant client + tenant_info = await self._get_user_tenant_info(user_id) + deletion_results['tenant_associations'] = tenant_info + + # Step 3: Delete in proper order to respect dependencies + + # 3.1 Stop all active training jobs and delete models + training_result = await self._delete_training_data(tenant_info['tenant_ids']) + deletion_results['services_processed']['training'] = training_result + + # 3.2 Delete all forecasts and predictions + forecasting_result = await self._delete_forecasting_data(tenant_info['tenant_ids']) + deletion_results['services_processed']['forecasting'] = forecasting_result + + # 3.3 Delete notification preferences and logs + notification_result = await self._delete_notification_data(user_id) + deletion_results['services_processed']['notification'] = notification_result + + # 3.4 Delete tenant memberships and handle owned tenants + tenant_result = await self._delete_tenant_data(user_id, tenant_info) + deletion_results['services_processed']['tenant'] = tenant_result + + # 3.5 Finally delete user account and auth data + auth_result = await self._delete_auth_data(user_id) + deletion_results['services_processed']['auth'] = auth_result + + # Step 4: Generate summary + deletion_results['summary'] = await self._generate_deletion_summary(deletion_results) + deletion_results['completed_at'] = datetime.utcnow().isoformat() + deletion_results['status'] = 'success' + + # Step 5: Publish deletion event + await self._publish_user_deleted_event(user_id, deletion_results) + + # Step 6: Send notification to admins + await self._notify_admins_of_deletion(user_info, deletion_results) + + logger.info("Admin user deletion completed successfully", + user_id=user_id, + tenants_affected=len(tenant_info['tenant_ids'])) + + return deletion_results + + except Exception as e: + deletion_results['status'] = 'failed' + deletion_results['error'] = str(e) + deletion_results['completed_at'] = datetime.utcnow().isoformat() + + logger.error("Admin user deletion failed", + user_id=user_id, + error=str(e)) + + # Attempt to publish failure event + try: + await self._publish_user_deletion_failed_event(user_id, str(e)) + except: + pass + + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"User deletion failed: {str(e)}" + ) + + async def _validate_admin_user(self, user_id: str) -> Optional[Dict[str, Any]]: + """Validate user exists and get basic info from local database""" + try: + from app.models.users import User + + # Query user from local auth database + query = select(User).where(User.id == uuid.UUID(user_id)) + result = await self.db.execute(query) + user = result.scalar_one_or_none() + + if not user: + return None + + return { + 'id': str(user.id), + 'email': user.email, + 'full_name': user.full_name, + 'created_at': user.created_at.isoformat() if user.created_at else None, + 'is_active': user.is_active, + 'is_verified': user.is_verified + } + + except Exception as e: + logger.error("Failed to validate admin user", user_id=user_id, error=str(e)) + raise + + async def _get_user_tenant_info(self, user_id: str) -> Dict[str, Any]: + """Get all tenant associations for the user using tenant client""" + try: + # Use tenant service client to get memberships + memberships = await self.clients.tenant_client.get_user_tenants(user_id) + + if not memberships: + return { + 'tenant_ids': [], + 'total_tenants': 0, + 'owned_tenants': 0, + 'memberships': [] + } + + tenant_ids = [m['tenant_id'] for m in memberships] + owned_tenants = [m for m in memberships if m.get('role') == 'owner'] + + return { + 'tenant_ids': tenant_ids, + 'total_tenants': len(tenant_ids), + 'owned_tenants': len(owned_tenants), + 'memberships': memberships + } + + except Exception as e: + logger.error("Failed to get tenant info", user_id=user_id, error=str(e)) + return {'tenant_ids': [], 'total_tenants': 0, 'owned_tenants': 0, 'memberships': []} + + async def _delete_training_data(self, tenant_ids: List[str]) -> Dict[str, Any]: + """Delete all training models, jobs, and artifacts for user's tenants""" + result = { + 'models_deleted': 0, + 'jobs_cancelled': 0, + 'artifacts_deleted': 0, + 'total_tenants_processed': 0, + 'errors': [] + } + + try: + for tenant_id in tenant_ids: + try: + # Cancel active training jobs using training client + cancel_result = await self.clients.training_client.cancel_tenant_training_jobs(tenant_id) + if cancel_result: + result['jobs_cancelled'] += cancel_result.get('jobs_cancelled', 0) + if cancel_result.get('errors'): + result['errors'].extend(cancel_result['errors']) + + # Delete all models and artifacts using training client + delete_result = await self.clients.training_client.delete_tenant_models(tenant_id) + if delete_result: + result['models_deleted'] += delete_result.get('models_deleted', 0) + result['artifacts_deleted'] += delete_result.get('artifacts_deleted', 0) + if delete_result.get('errors'): + result['errors'].extend(delete_result['errors']) + + result['total_tenants_processed'] += 1 + + logger.debug("Training data deleted for tenant", + tenant_id=tenant_id, + models=delete_result.get('models_deleted', 0) if delete_result else 0) + + except Exception as e: + error_msg = f"Error deleting training data for tenant {tenant_id}: {str(e)}" + result['errors'].append(error_msg) + logger.error(error_msg) + + except Exception as e: + result['errors'].append(f"Training service communication error: {str(e)}") + + return result + + async def _delete_forecasting_data(self, tenant_ids: List[str]) -> Dict[str, Any]: + """Delete all forecasts, predictions, and caches for user's tenants""" + result = { + 'forecasts_deleted': 0, + 'predictions_deleted': 0, + 'cache_cleared': 0, + 'batches_cancelled': 0, + 'total_tenants_processed': 0, + 'errors': [] + } + + try: + for tenant_id in tenant_ids: + try: + # Cancel any active prediction batches + batch_result = await self.clients.forecasting_client.cancel_tenant_prediction_batches(tenant_id) + if batch_result: + result['batches_cancelled'] += batch_result.get('batches_cancelled', 0) + if batch_result.get('errors'): + result['errors'].extend(batch_result['errors']) + + # Clear prediction cache + cache_result = await self.clients.forecasting_client.clear_tenant_prediction_cache(tenant_id) + if cache_result: + result['cache_cleared'] += cache_result.get('cache_cleared', 0) + if cache_result.get('errors'): + result['errors'].extend(cache_result['errors']) + + # Delete all forecasts for tenant + delete_result = await self.clients.forecasting_client.delete_tenant_forecasts(tenant_id) + if delete_result: + result['forecasts_deleted'] += delete_result.get('forecasts_deleted', 0) + result['predictions_deleted'] += delete_result.get('predictions_deleted', 0) + if delete_result.get('errors'): + result['errors'].extend(delete_result['errors']) + + result['total_tenants_processed'] += 1 + + logger.debug("Forecasting data deleted for tenant", + tenant_id=tenant_id, + forecasts=delete_result.get('forecasts_deleted', 0) if delete_result else 0) + + except Exception as e: + error_msg = f"Error deleting forecasting data for tenant {tenant_id}: {str(e)}" + result['errors'].append(error_msg) + logger.error(error_msg) + + except Exception as e: + result['errors'].append(f"Forecasting service communication error: {str(e)}") + + return result + + async def _delete_notification_data(self, user_id: str) -> Dict[str, Any]: + """Delete notification preferences, logs, and pending notifications""" + result = { + 'preferences_deleted': 0, + 'notifications_deleted': 0, + 'notifications_cancelled': 0, + 'logs_deleted': 0, + 'errors': [] + } + + try: + # Cancel pending notifications first + cancel_result = await self.clients.notification_client.cancel_pending_user_notifications(user_id) + if cancel_result: + result['notifications_cancelled'] = cancel_result.get('notifications_cancelled', 0) + if cancel_result.get('errors'): + result['errors'].extend(cancel_result['errors']) + + # Delete all notification data for user + delete_result = await self.clients.notification_client.delete_user_notification_data(user_id) + if delete_result: + result['preferences_deleted'] = delete_result.get('preferences_deleted', 0) + result['notifications_deleted'] = delete_result.get('notifications_deleted', 0) + result['logs_deleted'] = delete_result.get('logs_deleted', 0) + if delete_result.get('errors'): + result['errors'].extend(delete_result['errors']) + + logger.debug("Notification data deleted for user", + user_id=user_id, + notifications=result['notifications_deleted']) + + except Exception as e: + result['errors'].append(f"Notification service communication error: {str(e)}") + + return result + + async def _delete_tenant_data(self, user_id: str, tenant_info: Dict[str, Any]) -> Dict[str, Any]: + """Delete tenant memberships and handle owned tenants using tenant client""" + result = { + 'memberships_deleted': 0, + 'tenants_deleted': 0, + 'tenants_transferred': 0, + 'errors': [] + } + + try: + # Handle owned tenants - either delete or transfer ownership + for membership in tenant_info['memberships']: + if membership.get('role') == 'owner': + tenant_id = membership['tenant_id'] + + try: + # Check if tenant has other admin members who can take ownership + has_other_admins = await self.clients.tenant_client.check_tenant_has_other_admins( + tenant_id, user_id + ) + + if has_other_admins: + # Get tenant members to find first admin + members = await self.clients.tenant_client.get_tenant_members(tenant_id) + admin_members = [ + m for m in members + if m.get('role') == 'admin' and m.get('user_id') != user_id + ] + + if admin_members: + # Transfer ownership to first admin + transfer_result = await self.clients.tenant_client.transfer_tenant_ownership( + tenant_id, user_id, admin_members[0]['user_id'] + ) + + if transfer_result: + result['tenants_transferred'] += 1 + logger.info("Transferred tenant ownership", + tenant_id=tenant_id, + new_owner=admin_members[0]['user_id']) + else: + result['errors'].append(f"Failed to transfer ownership of tenant {tenant_id}") + else: + result['errors'].append(f"No admin members found for tenant {tenant_id}") + else: + # No other admins, delete the tenant completely + delete_result = await self.clients.tenant_client.delete_tenant(tenant_id) + + if delete_result: + result['tenants_deleted'] += 1 + logger.info("Deleted tenant", tenant_id=tenant_id) + else: + result['errors'].append(f"Failed to delete tenant {tenant_id}") + + except Exception as e: + error_msg = f"Error handling owned tenant {tenant_id}: {str(e)}" + result['errors'].append(error_msg) + logger.error(error_msg) + + # Delete user's memberships + delete_result = await self.clients.tenant_client.delete_user_memberships(user_id) + if delete_result: + result['memberships_deleted'] = delete_result.get('memberships_deleted', 0) + if delete_result.get('errors'): + result['errors'].extend(delete_result['errors']) + else: + result['errors'].append("Failed to delete user memberships") + + except Exception as e: + result['errors'].append(f"Tenant service communication error: {str(e)}") + + return result + + async def _delete_auth_data(self, user_id: str) -> Dict[str, Any]: + """Delete user account, refresh tokens, and auth data from local database""" + result = { + 'user_deleted': False, + 'refresh_tokens_deleted': 0, + 'sessions_invalidated': 0, + 'errors': [] + } + + try: + from app.models.users import User, RefreshToken + + # Delete refresh tokens + token_delete_query = delete(RefreshToken).where(RefreshToken.user_id == uuid.UUID(user_id)) + token_result = await self.db.execute(token_delete_query) + result['refresh_tokens_deleted'] = token_result.rowcount + + # Delete user account + user_delete_query = delete(User).where(User.id == uuid.UUID(user_id)) + user_result = await self.db.execute(user_delete_query) + + if user_result.rowcount > 0: + result['user_deleted'] = True + await self.db.commit() + logger.info("User and tokens deleted from auth database", + user_id=user_id, + tokens_deleted=result['refresh_tokens_deleted']) + else: + result['errors'].append("User not found in auth database") + await self.db.rollback() + + except Exception as e: + await self.db.rollback() + error_msg = f"Auth database error: {str(e)}" + result['errors'].append(error_msg) + logger.error(error_msg) + + return result + + async def _generate_deletion_summary(self, deletion_results: Dict[str, Any]) -> Dict[str, Any]: + """Generate summary of deletion operation""" + summary = { + 'total_tenants_affected': deletion_results['tenant_associations']['total_tenants'], + 'total_models_deleted': deletion_results['services_processed']['training']['models_deleted'], + 'total_forecasts_deleted': deletion_results['services_processed']['forecasting']['forecasts_deleted'], + 'total_notifications_deleted': deletion_results['services_processed']['notification']['notifications_deleted'], + 'tenants_transferred': deletion_results['services_processed']['tenant']['tenants_transferred'], + 'tenants_deleted': deletion_results['services_processed']['tenant']['tenants_deleted'], + 'user_deleted': deletion_results['services_processed']['auth']['user_deleted'], + 'total_errors': 0 + } + + # Count total errors across all services + for service_result in deletion_results['services_processed'].values(): + if isinstance(service_result, dict) and 'errors' in service_result: + summary['total_errors'] += len(service_result['errors']) + + # Add success indicator + summary['deletion_successful'] = ( + summary['user_deleted'] and + summary['total_errors'] == 0 + ) + + return summary + + async def _publish_user_deleted_event(self, user_id: str, deletion_results: Dict[str, Any]): + """Publish user deletion event to message queue""" + try: + await auth_publisher.publish_event( + exchange="user_events", + routing_key="user.admin.deleted", + message={ + "event_type": "admin_user_deleted", + "user_id": user_id, + "timestamp": datetime.utcnow().isoformat(), + "deletion_summary": deletion_results['summary'], + "services_affected": list(deletion_results['services_processed'].keys()) + } + ) + logger.info("Published user deletion event", user_id=user_id) + except Exception as e: + logger.error("Failed to publish user deletion event", error=str(e)) + + async def _publish_user_deletion_failed_event(self, user_id: str, error: str): + """Publish user deletion failure event""" + try: + await auth_publisher.publish_event( + exchange="user_events", + routing_key="user.deletion.failed", + message={ + "event_type": "admin_user_deletion_failed", + "user_id": user_id, + "error": error, + "timestamp": datetime.utcnow().isoformat() + } + ) + logger.info("Published user deletion failure event", user_id=user_id) + except Exception as e: + logger.error("Failed to publish deletion failure event", error=str(e)) + + async def _notify_admins_of_deletion(self, user_info: Dict[str, Any], deletion_results: Dict[str, Any]): + """Send notification to other admins about the user deletion""" + try: + # Get requesting user info for notification + requesting_user_id = deletion_results['requested_by'] + requesting_user = await self._validate_admin_user(requesting_user_id) + + if requesting_user: + await self.clients.notification_client.send_user_deletion_notification( + admin_email=requesting_user['email'], + deleted_user_email=user_info['email'], + deletion_summary=deletion_results['summary'] + ) + logger.info("Sent user deletion notification", + deleted_user=user_info['email'], + notified_admin=requesting_user['email']) + except Exception as e: + logger.error("Failed to send admin notification", error=str(e)) + + async def preview_user_deletion(self, user_id: str) -> Dict[str, Any]: + """ + Preview what data would be deleted for an admin user without actually deleting + """ + try: + # Get user info + user_info = await self._validate_admin_user(user_id) + if not user_info: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Admin user {user_id} not found" + ) + + # Get tenant associations + tenant_info = await self._get_user_tenant_info(user_id) + + # Get counts from each service + training_models_count = 0 + forecasts_count = 0 + notifications_count = 0 + + for tenant_id in tenant_info['tenant_ids']: + try: + # Get training models count + models_count = await self.clients.training_client.get_tenant_models_count(tenant_id) + training_models_count += models_count + + # Get forecasts count + tenant_forecasts = await self.clients.forecasting_client.get_tenant_forecasts_count(tenant_id) + forecasts_count += tenant_forecasts + + except Exception as e: + logger.warning("Could not get counts for tenant", tenant_id=tenant_id, error=str(e)) + + try: + # Get user notifications count + notifications_count = await self.clients.notification_client.get_user_notification_count(user_id) + except Exception as e: + logger.warning("Could not get notification count", user_id=user_id, error=str(e)) + + # Build preview + preview = { + "user": user_info, + "tenant_associations": tenant_info, + "estimated_deletions": { + "training_models": training_models_count, + "forecasts": forecasts_count, + "notifications": notifications_count, + "tenant_memberships": tenant_info['total_tenants'], + "owned_tenants": tenant_info['owned_tenants'] + }, + "tenant_handling": await self._preview_tenant_handling(user_id, tenant_info), + "warning": "This operation is irreversible and will permanently delete all associated data" + } + + return preview + + except HTTPException: + raise + except Exception as e: + logger.error("Error generating deletion preview", user_id=user_id, error=str(e)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to generate deletion preview" + ) + + async def _preview_tenant_handling(self, user_id: str, tenant_info: Dict[str, Any]) -> List[Dict[str, Any]]: + """Preview how each owned tenant would be handled""" + tenant_handling = [] + + for membership in tenant_info['memberships']: + if membership.get('role') == 'owner': + tenant_id = membership['tenant_id'] + + try: + has_other_admins = await self.clients.tenant_client.check_tenant_has_other_admins( + tenant_id, user_id + ) + + if has_other_admins: + members = await self.clients.tenant_client.get_tenant_members(tenant_id) + admin_members = [ + m for m in members + if m.get('role') == 'admin' and m.get('user_id') != user_id + ] + + tenant_handling.append({ + "tenant_id": tenant_id, + "action": "transfer_ownership", + "details": f"Ownership will be transferred to admin: {admin_members[0]['user_id'] if admin_members else 'Unknown'}" + }) + else: + tenant_handling.append({ + "tenant_id": tenant_id, + "action": "delete_tenant", + "details": "Tenant will be deleted completely (no other admins found)" + }) + + except Exception as e: + tenant_handling.append({ + "tenant_id": tenant_id, + "action": "error", + "details": f"Could not determine action: {str(e)}" + }) + + return tenant_handling diff --git a/services/auth/app/services/auth_service_clients.py b/services/auth/app/services/auth_service_clients.py new file mode 100644 index 00000000..ee5cc9f2 --- /dev/null +++ b/services/auth/app/services/auth_service_clients.py @@ -0,0 +1,403 @@ +# ================================================================ +# Auth Service Inter-Service Communication Clients +# ================================================================ +""" +Inter-service communication clients for the Auth Service to communicate +with other microservices in the bakery forecasting platform. + +These clients handle authenticated API calls to: +- Tenant Service +- Training Service +- Forecasting Service +- Notification Service +""" + +import structlog +from typing import Dict, Any, Optional, List, Union +from shared.clients.base_service_client import BaseServiceClient +from shared.config.base import BaseServiceSettings + +logger = structlog.get_logger() + +# ================================================================ +# TENANT SERVICE CLIENT +# ================================================================ + +class AuthTenantServiceClient(BaseServiceClient): + """Client for Auth Service to communicate with Tenant Service""" + + def __init__(self, config: BaseServiceSettings): + super().__init__("auth", config) + self.service_url = config.TENANT_SERVICE_URL + + def get_service_base_path(self) -> str: + return "/api/v1" + + # ================================================================ + # USER TENANT OPERATIONS + # ================================================================ + + async def get_user_tenants(self, user_id: str) -> Optional[List[Dict[str, Any]]]: + """Get all tenant memberships for a user""" + try: + result = await self.get(f"users/{user_id}/tenants") + return result.get("memberships", []) if result else [] + except Exception as e: + logger.error("Failed to get user tenants", user_id=user_id, error=str(e)) + return [] + + async def get_user_owned_tenants(self, user_id: str) -> Optional[List[Dict[str, Any]]]: + """Get tenants owned by a user""" + try: + memberships = await self.get_user_tenants(user_id) + if memberships: + return [m for m in memberships if m.get('role') == 'owner'] + return [] + except Exception as e: + logger.error("Failed to get owned tenants", user_id=user_id, error=str(e)) + return [] + + async def transfer_tenant_ownership( + self, + tenant_id: str, + current_owner_id: str, + new_owner_id: str + ) -> Optional[Dict[str, Any]]: + """Transfer tenant ownership from one user to another""" + try: + data = { + "current_owner_id": current_owner_id, + "new_owner_id": new_owner_id + } + return await self.post(f"tenants/{tenant_id}/transfer-ownership", data=data) + except Exception as e: + logger.error("Failed to transfer tenant ownership", + tenant_id=tenant_id, + error=str(e)) + return None + + async def delete_tenant(self, tenant_id: str) -> Optional[Dict[str, Any]]: + """Delete a tenant completely""" + try: + return await self.delete(f"tenants/{tenant_id}") + except Exception as e: + logger.error("Failed to delete tenant", tenant_id=tenant_id, error=str(e)) + return None + + async def delete_user_memberships(self, user_id: str) -> Optional[Dict[str, Any]]: + """Delete all tenant memberships for a user""" + try: + return await self.delete(f"users/{user_id}/memberships") + except Exception as e: + logger.error("Failed to delete user memberships", user_id=user_id, error=str(e)) + return None + + async def get_tenant_members(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]: + """Get all members of a tenant""" + try: + result = await self.get(f"tenants/{tenant_id}/members") + return result.get("members", []) if result else [] + except Exception as e: + logger.error("Failed to get tenant members", tenant_id=tenant_id, error=str(e)) + return [] + + async def check_tenant_has_other_admins(self, tenant_id: str, excluding_user_id: str) -> bool: + """Check if tenant has other admin users besides the excluded one""" + try: + members = await self.get_tenant_members(tenant_id) + if members: + admin_members = [ + m for m in members + if m.get('role') in ['admin', 'owner'] and m.get('user_id') != excluding_user_id + ] + return len(admin_members) > 0 + return False + except Exception as e: + logger.error("Failed to check tenant admins", + tenant_id=tenant_id, + error=str(e)) + return False + +# ================================================================ +# TRAINING SERVICE CLIENT +# ================================================================ + +class AuthTrainingServiceClient(BaseServiceClient): + """Client for Auth Service to communicate with Training Service""" + + def __init__(self, config: BaseServiceSettings): + super().__init__("auth", config) + self.service_url = config.TRAINING_SERVICE_URL + + def get_service_base_path(self) -> str: + return "/api/v1" + + # ================================================================ + # TRAINING JOB OPERATIONS + # ================================================================ + + async def cancel_tenant_training_jobs(self, tenant_id: str) -> Optional[Dict[str, Any]]: + """Cancel all active training jobs for a tenant""" + try: + data = {"tenant_id": tenant_id} + return await self.post("jobs/cancel-tenant", data=data) + except Exception as e: + logger.error("Failed to cancel tenant training jobs", + tenant_id=tenant_id, + error=str(e)) + return {"jobs_cancelled": 0, "errors": [str(e)]} + + async def get_tenant_active_jobs(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]: + """Get all active training jobs for a tenant""" + try: + params = {"status": "running,queued,pending", "tenant_id": tenant_id} + result = await self.get("jobs", params=params) + return result.get("jobs", []) if result else [] + except Exception as e: + logger.error("Failed to get tenant active jobs", + tenant_id=tenant_id, + error=str(e)) + return [] + + # ================================================================ + # MODEL OPERATIONS + # ================================================================ + + async def delete_tenant_models(self, tenant_id: str) -> Optional[Dict[str, Any]]: + """Delete all trained models and artifacts for a tenant""" + try: + return await self.delete(f"models/tenant/{tenant_id}") + except Exception as e: + logger.error("Failed to delete tenant models", + tenant_id=tenant_id, + error=str(e)) + return {"models_deleted": 0, "artifacts_deleted": 0, "errors": [str(e)]} + + async def get_tenant_models_count(self, tenant_id: str) -> int: + """Get count of trained models for a tenant""" + try: + result = await self.get(f"models/tenant/{tenant_id}/count") + return result.get("count", 0) if result else 0 + except Exception as e: + logger.error("Failed to get tenant models count", + tenant_id=tenant_id, + error=str(e)) + return 0 + + async def get_tenant_model_artifacts(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]: + """Get all model artifacts for a tenant""" + try: + result = await self.get(f"models/tenant/{tenant_id}/artifacts") + return result.get("artifacts", []) if result else [] + except Exception as e: + logger.error("Failed to get tenant model artifacts", + tenant_id=tenant_id, + error=str(e)) + return [] + +# ================================================================ +# FORECASTING SERVICE CLIENT +# ================================================================ + +class AuthForecastingServiceClient(BaseServiceClient): + """Client for Auth Service to communicate with Forecasting Service""" + + def __init__(self, config: BaseServiceSettings): + super().__init__("auth", config) + self.service_url = config.FORECASTING_SERVICE_URL + + def get_service_base_path(self) -> str: + return "/api/v1" + + # ================================================================ + # FORECAST OPERATIONS + # ================================================================ + + async def delete_tenant_forecasts(self, tenant_id: str) -> Optional[Dict[str, Any]]: + """Delete all forecasts and predictions for a tenant""" + try: + return await self.delete(f"forecasts/tenant/{tenant_id}") + except Exception as e: + logger.error("Failed to delete tenant forecasts", + tenant_id=tenant_id, + error=str(e)) + return { + "forecasts_deleted": 0, + "predictions_deleted": 0, + "cache_cleared": 0, + "errors": [str(e)] + } + + async def get_tenant_forecasts_count(self, tenant_id: str) -> int: + """Get count of forecasts for a tenant""" + try: + result = await self.get(f"forecasts/tenant/{tenant_id}/count") + return result.get("count", 0) if result else 0 + except Exception as e: + logger.error("Failed to get tenant forecasts count", + tenant_id=tenant_id, + error=str(e)) + return 0 + + async def clear_tenant_prediction_cache(self, tenant_id: str) -> Optional[Dict[str, Any]]: + """Clear prediction cache for a tenant""" + try: + return await self.post(f"predictions/cache/clear/{tenant_id}") + except Exception as e: + logger.error("Failed to clear tenant prediction cache", + tenant_id=tenant_id, + error=str(e)) + return {"cache_cleared": 0, "errors": [str(e)]} + + async def cancel_tenant_prediction_batches(self, tenant_id: str) -> Optional[Dict[str, Any]]: + """Cancel any active prediction batches for a tenant""" + try: + data = {"tenant_id": tenant_id} + return await self.post("predictions/batches/cancel", data=data) + except Exception as e: + logger.error("Failed to cancel tenant prediction batches", + tenant_id=tenant_id, + error=str(e)) + return {"batches_cancelled": 0, "errors": [str(e)]} + +# ================================================================ +# NOTIFICATION SERVICE CLIENT +# ================================================================ + +class AuthNotificationServiceClient(BaseServiceClient): + """Client for Auth Service to communicate with Notification Service""" + + def __init__(self, config: BaseServiceSettings): + super().__init__("auth", config) + self.service_url = config.NOTIFICATION_SERVICE_URL + + def get_service_base_path(self) -> str: + return "/api/v1" + + # ================================================================ + # USER NOTIFICATION OPERATIONS + # ================================================================ + + async def delete_user_notification_data(self, user_id: str) -> Optional[Dict[str, Any]]: + """Delete all notification data for a user""" + try: + return await self.delete(f"users/{user_id}/data") + except Exception as e: + logger.error("Failed to delete user notification data", + user_id=user_id, + error=str(e)) + return { + "preferences_deleted": 0, + "notifications_deleted": 0, + "logs_deleted": 0, + "errors": [str(e)] + } + + async def cancel_pending_user_notifications(self, user_id: str) -> Optional[Dict[str, Any]]: + """Cancel all pending notifications for a user""" + try: + return await self.post(f"users/{user_id}/notifications/cancel") + except Exception as e: + logger.error("Failed to cancel user notifications", + user_id=user_id, + error=str(e)) + return {"notifications_cancelled": 0, "errors": [str(e)]} + + async def get_user_notification_count(self, user_id: str) -> int: + """Get count of notifications for a user""" + try: + result = await self.get(f"users/{user_id}/notifications/count") + return result.get("count", 0) if result else 0 + except Exception as e: + logger.error("Failed to get user notification count", + user_id=user_id, + error=str(e)) + return 0 + + async def send_user_deletion_notification( + self, + admin_email: str, + deleted_user_email: str, + deletion_summary: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """Send notification about user deletion to administrators""" + try: + data = { + "type": "email", + "recipient_email": admin_email, + "template_key": "user_deletion_notification", + "template_data": { + "deleted_user_email": deleted_user_email, + "deletion_summary": deletion_summary + }, + "priority": "high" + } + return await self.post("notifications/send", data=data) + except Exception as e: + logger.error("Failed to send user deletion notification", + admin_email=admin_email, + error=str(e)) + return None + +# ================================================================ +# CLIENT FACTORY +# ================================================================ + +class AuthServiceClientFactory: + """Factory for creating inter-service clients for Auth Service""" + + def __init__(self, config: BaseServiceSettings): + self.config = config + self._tenant_client = None + self._training_client = None + self._forecasting_client = None + self._notification_client = None + + @property + def tenant_client(self) -> AuthTenantServiceClient: + """Get or create tenant service client""" + if self._tenant_client is None: + self._tenant_client = AuthTenantServiceClient(self.config) + return self._tenant_client + + @property + def training_client(self) -> AuthTrainingServiceClient: + """Get or create training service client""" + if self._training_client is None: + self._training_client = AuthTrainingServiceClient(self.config) + return self._training_client + + @property + def forecasting_client(self) -> AuthForecastingServiceClient: + """Get or create forecasting service client""" + if self._forecasting_client is None: + self._forecasting_client = AuthForecastingServiceClient(self.config) + return self._forecasting_client + + @property + def notification_client(self) -> AuthNotificationServiceClient: + """Get or create notification service client""" + if self._notification_client is None: + self._notification_client = AuthNotificationServiceClient(self.config) + return self._notification_client + + async def health_check_all_services(self) -> Dict[str, bool]: + """Check health of all services""" + results = {} + + clients = [ + ("tenant", self.tenant_client), + ("training", self.training_client), + ("forecasting", self.forecasting_client), + ("notification", self.notification_client) + ] + + for service_name, client in clients: + try: + health = await client.get("health") + results[service_name] = health is not None and health.get("status") == "healthy" + except Exception as e: + logger.error(f"Health check failed for {service_name} service", error=str(e)) + results[service_name] = False + + return results \ No newline at end of file