From b0d83720fde56da72a198d08c2fb479f6437de45 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Sun, 3 Aug 2025 14:42:33 +0200 Subject: [PATCH] Fix user delete flow 12 --- gateway/app/routes/tenant.py | 5 + services/auth/app/api/users.py | 62 ++--- services/auth/app/core/database.py | 239 ++++++++++++++++++ .../auth/app/services/auth_service_clients.py | 6 +- .../notification/app/api/notifications.py | 38 ++- services/tenant/app/api/tenants.py | 21 +- shared/database/base.py | 24 +- 7 files changed, 341 insertions(+), 54 deletions(-) diff --git a/gateway/app/routes/tenant.py b/gateway/app/routes/tenant.py index 06996b38..0e2a57f8 100644 --- a/gateway/app/routes/tenant.py +++ b/gateway/app/routes/tenant.py @@ -43,6 +43,11 @@ async def get_user_tenants(request: Request, user_id: str = Path(...)): """Get all tenant memberships for a user (admin only)""" return await _proxy_to_tenant_service(request, f"/api/v1/tenants/user/{user_id}") +@router.delete("/user/{user_id}/memberships") +async def delete_user_tenants(request: Request, user_id: str = Path(...)): + """Get all tenant memberships for a user (admin only)""" + return await _proxy_to_tenant_service(request, f"/api/v1/tenants/user/{user_id}/memberships") + # ================================================================ # TENANT-SCOPED DATA SERVICE ENDPOINTS # ================================================================ diff --git a/services/auth/app/api/users.py b/services/auth/app/api/users.py index da4f5d48..32740533 100644 --- a/services/auth/app/api/users.py +++ b/services/auth/app/api/users.py @@ -9,7 +9,7 @@ import structlog import uuid from datetime import datetime, timezone -from app.core.database import get_db +from app.core.database import get_db, get_background_db_session from app.schemas.auth import UserResponse, PasswordChange from app.schemas.users import UserUpdate from app.services.user_service import UserService @@ -171,8 +171,7 @@ async def delete_admin_user( background_tasks.add_task( execute_admin_user_deletion, user_id=user_id, - requesting_user_id=current_user.id, - db_url=str(db.bind.url) # Pass DB connection string for background task + requesting_user_id=current_user["user_id"] ) return { @@ -186,52 +185,23 @@ async def delete_admin_user( # Add this background task function to services/auth/app/api/users.py: -async def execute_admin_user_deletion( - user_id: str, - requesting_user_id: str, - db_url: str -): +async def execute_admin_user_deletion(user_id: str, requesting_user_id: str): """ - Background task to execute complete admin user deletion + Background task using shared infrastructure """ - # Create new database session for background task - from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession - from sqlalchemy.orm import sessionmaker - - engine = create_async_engine(db_url) - async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - - async with async_session() as session: - try: - # Initialize deletion service with new session - deletion_service = AdminUserDeleteService(session) - - # Perform the deletion - result = await deletion_service.delete_admin_user_complete( - user_id=user_id, - requesting_user_id=requesting_user_id - ) - - logger.info("Background admin user deletion completed successfully", - user_id=user_id, - requesting_user=requesting_user_id, - result=result) - - except Exception as e: - logger.error("Background admin user deletion failed", - user_id=user_id, - requesting_user=requesting_user_id, - error=str(e)) - - # Attempt to publish failure event - try: - deletion_service = AdminUserDeleteService(session) - await deletion_service._publish_user_deletion_failed_event(user_id, str(e)) - except: - pass + # ✅ Use the shared background session + async with get_background_db_session() as session: + deletion_service = AdminUserDeleteService(session) - finally: - await engine.dispose() + result = await deletion_service.delete_admin_user_complete( + user_id=user_id, + requesting_user_id=requesting_user_id + ) + + logger.info("Background admin user deletion completed successfully", + user_id=user_id, + requesting_user=requesting_user_id, + result=result) @router.get("/delete/{user_id}/deletion-preview") diff --git a/services/auth/app/core/database.py b/services/auth/app/core/database.py index a3e2bc65..1326c697 100644 --- a/services/auth/app/core/database.py +++ b/services/auth/app/core/database.py @@ -49,3 +49,242 @@ async def create_tables(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info("Database tables created successfully") +# ================================================================ +# services/auth/app/core/database.py - UPDATED TO USE SHARED INFRASTRUCTURE +# ================================================================ +""" +Database configuration for authentication service +Uses shared database infrastructure for consistency +""" + +import structlog +from typing import AsyncGenerator +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import text + +from shared.database.base import DatabaseManager, Base +from app.core.config import settings + +logger = structlog.get_logger() + +# ✅ Initialize database manager using shared infrastructure +database_manager = DatabaseManager(settings.DATABASE_URL) + +# ✅ Alias for convenience - matches the existing interface +get_db = database_manager.get_db + +# ✅ Use the shared background session method +get_background_db_session = database_manager.get_background_session + +async def get_db_health() -> bool: + """ + Health check function for database connectivity + """ + try: + async with database_manager.async_engine.begin() as conn: + await conn.execute(text("SELECT 1")) + logger.debug("Database health check passed") + return True + + except Exception as e: + logger.error("Database health check failed", error=str(e)) + return False + +async def create_tables(): + """Create database tables using shared infrastructure""" + await database_manager.create_tables() + logger.info("Auth database tables created successfully") + +# ✅ Auth service specific database utilities +class AuthDatabaseUtils: + """Auth service specific database utilities""" + + @staticmethod + async def cleanup_old_refresh_tokens(days_old: int = 30): + """Clean up old refresh tokens""" + try: + async with database_manager.get_background_session() as session: + if settings.DATABASE_URL.startswith("sqlite"): + query = text( + "DELETE FROM refresh_tokens " + "WHERE created_at < datetime('now', :days_param)" + ) + params = {"days_param": f"-{days_old} days"} + else: + # PostgreSQL + query = text( + "DELETE FROM refresh_tokens " + "WHERE created_at < NOW() - INTERVAL :days_param" + ) + params = {"days_param": f"{days_old} days"} + + result = await session.execute(query, params) + # No need to commit - get_background_session() handles it + + logger.info("Cleaned up old refresh tokens", + deleted_count=result.rowcount, + days_old=days_old) + + return result.rowcount + + except Exception as e: + logger.error("Failed to cleanup old refresh tokens", + error=str(e)) + return 0 + + @staticmethod + async def get_auth_statistics(tenant_id: str = None) -> dict: + """Get authentication statistics""" + try: + async with database_manager.get_background_session() as session: + # Base query for users + users_query = text("SELECT COUNT(*) as count FROM users WHERE is_active = :is_active") + params = {} + + if tenant_id: + # If tenant filtering is needed (though auth service might not have tenant_id in users table) + # This is just an example - adjust based on your actual schema + pass + + # Get active users count + active_users_result = await session.execute( + users_query, + {**params, "is_active": True} + ) + active_users = active_users_result.scalar() or 0 + + # Get inactive users count + inactive_users_result = await session.execute( + users_query, + {**params, "is_active": False} + ) + inactive_users = inactive_users_result.scalar() or 0 + + # Get refresh tokens count + tokens_query = text("SELECT COUNT(*) as count FROM refresh_tokens") + tokens_result = await session.execute(tokens_query) + active_tokens = tokens_result.scalar() or 0 + + return { + "active_users": active_users, + "inactive_users": inactive_users, + "total_users": active_users + inactive_users, + "active_tokens": active_tokens + } + + except Exception as e: + logger.error("Failed to get auth statistics", error=str(e)) + return { + "active_users": 0, + "inactive_users": 0, + "total_users": 0, + "active_tokens": 0 + } + + @staticmethod + async def check_user_exists(user_id: str) -> bool: + """Check if user exists""" + try: + async with database_manager.get_background_session() as session: + query = text( + "SELECT COUNT(*) as count " + "FROM users " + "WHERE id = :user_id " + "LIMIT 1" + ) + + result = await session.execute(query, {"user_id": user_id}) + count = result.scalar() or 0 + + return count > 0 + + except Exception as e: + logger.error("Failed to check user existence", + user_id=user_id, error=str(e)) + return False + + @staticmethod + async def get_user_token_count(user_id: str) -> int: + """Get count of active refresh tokens for a user""" + try: + async with database_manager.get_background_session() as session: + query = text( + "SELECT COUNT(*) as count " + "FROM refresh_tokens " + "WHERE user_id = :user_id" + ) + + result = await session.execute(query, {"user_id": user_id}) + count = result.scalar() or 0 + + return count + + except Exception as e: + logger.error("Failed to get user token count", + user_id=user_id, error=str(e)) + return 0 + +# Enhanced database session dependency with better error handling +async def get_db_session() -> AsyncGenerator[AsyncSession, None]: + """ + Enhanced database session dependency with better logging and error handling + """ + async with database_manager.async_session_local() as session: + try: + logger.debug("Database session created") + yield session + except Exception as e: + logger.error("Database session error", error=str(e), exc_info=True) + await session.rollback() + raise + finally: + await session.close() + logger.debug("Database session closed") + +# Database initialization for auth service +async def initialize_auth_database(): + """Initialize database tables for auth service""" + try: + logger.info("Initializing auth service database") + + # Import models to ensure they're registered + from app.models.users import User + from app.models.refresh_tokens import RefreshToken + + # Create tables using shared infrastructure + await database_manager.create_tables() + + logger.info("Auth service database initialized successfully") + + except Exception as e: + logger.error("Failed to initialize auth service database", error=str(e)) + raise + +# Database cleanup for auth service +async def cleanup_auth_database(): + """Cleanup database connections for auth service""" + try: + logger.info("Cleaning up auth service database connections") + + # Close engine connections + if hasattr(database_manager, 'async_engine') and database_manager.async_engine: + await database_manager.async_engine.dispose() + + logger.info("Auth service database cleanup completed") + + except Exception as e: + logger.error("Failed to cleanup auth service database", error=str(e)) + +# Export the commonly used items to maintain compatibility +__all__ = [ + 'Base', + 'database_manager', + 'get_db', + 'get_background_db_session', + 'get_db_session', + 'get_db_health', + 'AuthDatabaseUtils', + 'initialize_auth_database', + 'cleanup_auth_database', + 'create_tables' +] \ No newline at end of file diff --git a/services/auth/app/services/auth_service_clients.py b/services/auth/app/services/auth_service_clients.py index 159811bf..e44dd2ef 100644 --- a/services/auth/app/services/auth_service_clients.py +++ b/services/auth/app/services/auth_service_clients.py @@ -87,7 +87,7 @@ class AuthTenantServiceClient(BaseServiceClient): async def delete_user_memberships(self, user_id: str) -> Optional[Dict[str, Any]]: """Delete all tenant memberships for a user""" try: - return await self.delete(f"users/{user_id}/memberships") + return await self.delete(f"/tenants/user/{user_id}/memberships") except Exception as e: logger.error("Failed to delete user memberships", user_id=user_id, error=str(e)) return None @@ -281,7 +281,7 @@ class AuthNotificationServiceClient(BaseServiceClient): async def delete_user_notification_data(self, user_id: str) -> Optional[Dict[str, Any]]: """Delete all notification data for a user""" try: - return await self.delete(f"users/{user_id}/data") + return await self.delete(f"/users/{user_id}/notification-data") except Exception as e: logger.error("Failed to delete user notification data", user_id=user_id, @@ -296,7 +296,7 @@ class AuthNotificationServiceClient(BaseServiceClient): async def cancel_pending_user_notifications(self, user_id: str) -> Optional[Dict[str, Any]]: """Cancel all pending notifications for a user""" try: - return await self.post(f"users/{user_id}/notifications/cancel") + return await self.post(f"users/{user_id}/notifications/cancel-pending") except Exception as e: logger.error("Failed to cancel user notifications", user_id=user_id, diff --git a/services/notification/app/api/notifications.py b/services/notification/app/api/notifications.py index fd6fd325..fd59605a 100644 --- a/services/notification/app/api/notifications.py +++ b/services/notification/app/api/notifications.py @@ -527,9 +527,26 @@ async def test_send_whatsapp( async def cancel_pending_user_notifications( user_id: str, current_user = Depends(get_current_user_dep), - _admin_check = Depends(require_role(["admin"])), db: AsyncSession = Depends(get_db) ): + + # Check if this is a service call or admin user + user_type = current_user.get('type', '') + user_role = current_user.get('role', '').lower() + service_name = current_user.get('service', '') + + logger.info("The user_type and user_role", user_type=user_type, user_role=user_role) + + # ✅ IMPROVED: Accept service tokens OR admin users + is_service_token = (user_type == 'service' or service_name in ['auth', 'admin']) + is_admin_user = (user_role == 'admin') + + if not (is_service_token or is_admin_user): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Admin role or service authentication required" + ) + """Cancel all pending notifications for a user (admin only)""" try: user_uuid = uuid.UUID(user_id) @@ -597,9 +614,26 @@ async def cancel_pending_user_notifications( async def delete_user_notification_data( user_id: str, current_user = Depends(get_current_user_dep), - _admin_check = Depends(require_role(["admin"])), db: AsyncSession = Depends(get_db) ): + + # Check if this is a service call or admin user + user_type = current_user.get('type', '') + user_role = current_user.get('role', '').lower() + service_name = current_user.get('service', '') + + logger.info("The user_type and user_role", user_type=user_type, user_role=user_role) + + # ✅ IMPROVED: Accept service tokens OR admin users + is_service_token = (user_type == 'service' or service_name in ['auth', 'admin']) + is_admin_user = (user_role == 'admin') + + if not (is_service_token or is_admin_user): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Admin role or service authentication required" + ) + """Delete all notification data for a user (admin only)""" try: user_uuid = uuid.UUID(user_id) diff --git a/services/tenant/app/api/tenants.py b/services/tenant/app/api/tenants.py index 67d35313..45acae8f 100644 --- a/services/tenant/app/api/tenants.py +++ b/services/tenant/app/api/tenants.py @@ -508,13 +508,30 @@ async def transfer_tenant_ownership( detail="Failed to transfer tenant ownership" ) -@router.delete("/users/{user_id}/memberships") +@router.delete("/tenants/user/{user_id}/memberships") async def delete_user_memberships( user_id: str, current_user = Depends(get_current_user_dep), - _admin_check = Depends(require_admin_role), db: AsyncSession = Depends(get_db) ): + + # Check if this is a service call or admin user + user_type = current_user.get('type', '') + user_role = current_user.get('role', '').lower() + service_name = current_user.get('service', '') + + logger.info("The user_type and user_role", user_type=user_type, user_role=user_role) + + # ✅ IMPROVED: Accept service tokens OR admin users + is_service_token = (user_type == 'service' or service_name in ['auth', 'admin']) + is_admin_user = (user_role == 'admin') + + if not (is_service_token or is_admin_user): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Admin role or service authentication required" + ) + """Delete all tenant memberships for a user (admin only)""" try: user_uuid = uuid.UUID(user_id) diff --git a/shared/database/base.py b/shared/database/base.py index e5766716..3f0cace8 100644 --- a/shared/database/base.py +++ b/shared/database/base.py @@ -7,6 +7,7 @@ from sqlalchemy import create_engine from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy.pool import StaticPool +from contextlib import asynccontextmanager import logging logger = logging.getLogger(__name__) @@ -34,7 +35,7 @@ class DatabaseManager: ) async def get_db(self): - """Get database session""" + """Get database session for request handlers""" async with self.async_session_local() as session: try: yield session @@ -45,6 +46,27 @@ class DatabaseManager: finally: await session.close() + @asynccontextmanager + async def get_background_session(self): + """ + ✅ NEW: Get database session for background tasks + + Usage: + async with database_manager.get_background_session() as session: + # Your background task code here + await session.commit() + """ + async with self.async_session_local() as session: + try: + yield session + await session.commit() + except Exception as e: + await session.rollback() + logger.error(f"Background task database error: {e}") + raise + finally: + await session.close() + async def create_tables(self): """Create database tables""" async with self.async_engine.begin() as conn: