Files
bakery-ia/services/auth/app/services/auth_service_clients.py
2025-08-02 18:38:14 +02:00

403 lines
16 KiB
Python

# ================================================================
# Auth Service Inter-Service Communication Clients
# ================================================================
"""
Inter-service communication clients for the Auth Service to communicate
with other microservices in the bakery forecasting platform.
These clients handle authenticated API calls to:
- Tenant Service
- Training Service
- Forecasting Service
- Notification Service
"""
import structlog
from typing import Dict, Any, Optional, List, Union
from shared.clients.base_service_client import BaseServiceClient
from shared.config.base import BaseServiceSettings
logger = structlog.get_logger()
# ================================================================
# TENANT SERVICE CLIENT
# ================================================================
class AuthTenantServiceClient(BaseServiceClient):
"""Client for Auth Service to communicate with Tenant Service"""
def __init__(self, config: BaseServiceSettings):
super().__init__("auth", config)
self.service_url = config.TENANT_SERVICE_URL
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# USER TENANT OPERATIONS
# ================================================================
async def get_user_tenants(self, user_id: str) -> Optional[List[Dict[str, Any]]]:
"""Get all tenant memberships for a user"""
try:
result = await self.get(f"tenants/user/{user_id}")
return result.get("memberships", []) if result else []
except Exception as e:
logger.error("Failed to get user tenants", user_id=user_id, error=str(e))
return []
async def get_user_owned_tenants(self, user_id: str) -> Optional[List[Dict[str, Any]]]:
"""Get tenants owned by a user"""
try:
memberships = await self.get_user_tenants(user_id)
if memberships:
return [m for m in memberships if m.get('role') == 'owner']
return []
except Exception as e:
logger.error("Failed to get owned tenants", user_id=user_id, error=str(e))
return []
async def transfer_tenant_ownership(
self,
tenant_id: str,
current_owner_id: str,
new_owner_id: str
) -> Optional[Dict[str, Any]]:
"""Transfer tenant ownership from one user to another"""
try:
data = {
"current_owner_id": current_owner_id,
"new_owner_id": new_owner_id
}
return await self.post(f"tenants/{tenant_id}/transfer-ownership", data=data)
except Exception as e:
logger.error("Failed to transfer tenant ownership",
tenant_id=tenant_id,
error=str(e))
return None
async def delete_tenant(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Delete a tenant completely"""
try:
return await self.delete(f"tenants/{tenant_id}")
except Exception as e:
logger.error("Failed to delete tenant", tenant_id=tenant_id, error=str(e))
return None
async def delete_user_memberships(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Delete all tenant memberships for a user"""
try:
return await self.delete(f"users/{user_id}/memberships")
except Exception as e:
logger.error("Failed to delete user memberships", user_id=user_id, error=str(e))
return None
async def get_tenant_members(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]:
"""Get all members of a tenant"""
try:
result = await self.get(f"tenants/{tenant_id}/members")
return result.get("members", []) if result else []
except Exception as e:
logger.error("Failed to get tenant members", tenant_id=tenant_id, error=str(e))
return []
async def check_tenant_has_other_admins(self, tenant_id: str, excluding_user_id: str) -> bool:
"""Check if tenant has other admin users besides the excluded one"""
try:
members = await self.get_tenant_members(tenant_id)
if members:
admin_members = [
m for m in members
if m.get('role') in ['admin', 'owner'] and m.get('user_id') != excluding_user_id
]
return len(admin_members) > 0
return False
except Exception as e:
logger.error("Failed to check tenant admins",
tenant_id=tenant_id,
error=str(e))
return False
# ================================================================
# TRAINING SERVICE CLIENT
# ================================================================
class AuthTrainingServiceClient(BaseServiceClient):
"""Client for Auth Service to communicate with Training Service"""
def __init__(self, config: BaseServiceSettings):
super().__init__("auth", config)
self.service_url = config.TRAINING_SERVICE_URL
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# TRAINING JOB OPERATIONS
# ================================================================
async def cancel_tenant_training_jobs(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Cancel all active training jobs for a tenant"""
try:
data = {"tenant_id": tenant_id}
return await self.post("/tenants/{tenant_id}/training/jobs/cancel", data=data)
except Exception as e:
logger.error("Failed to cancel tenant training jobs",
tenant_id=tenant_id,
error=str(e))
return {"jobs_cancelled": 0, "errors": [str(e)]}
async def get_tenant_active_jobs(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]:
"""Get all active training jobs for a tenant"""
try:
params = {"status": "running,queued,pending", "tenant_id": tenant_id}
result = await self.get("/tenants/{tenant_id}/training/jobs/active", params=params)
return result.get("jobs", []) if result else []
except Exception as e:
logger.error("Failed to get tenant active jobs",
tenant_id=tenant_id,
error=str(e))
return []
# ================================================================
# MODEL OPERATIONS
# ================================================================
async def delete_tenant_models(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Delete all trained models and artifacts for a tenant"""
try:
return await self.delete(f"models/tenant/{tenant_id}")
except Exception as e:
logger.error("Failed to delete tenant models",
tenant_id=tenant_id,
error=str(e))
return {"models_deleted": 0, "artifacts_deleted": 0, "errors": [str(e)]}
async def get_tenant_models_count(self, tenant_id: str) -> int:
"""Get count of trained models for a tenant"""
try:
result = await self.get(f"models/tenant/{tenant_id}/count")
return result.get("count", 0) if result else 0
except Exception as e:
logger.error("Failed to get tenant models count",
tenant_id=tenant_id,
error=str(e))
return 0
async def get_tenant_model_artifacts(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]:
"""Get all model artifacts for a tenant"""
try:
result = await self.get(f"models/tenant/{tenant_id}/artifacts")
return result.get("artifacts", []) if result else []
except Exception as e:
logger.error("Failed to get tenant model artifacts",
tenant_id=tenant_id,
error=str(e))
return []
# ================================================================
# FORECASTING SERVICE CLIENT
# ================================================================
class AuthForecastingServiceClient(BaseServiceClient):
"""Client for Auth Service to communicate with Forecasting Service"""
def __init__(self, config: BaseServiceSettings):
super().__init__("auth", config)
self.service_url = config.FORECASTING_SERVICE_URL
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# FORECAST OPERATIONS
# ================================================================
async def delete_tenant_forecasts(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Delete all forecasts and predictions for a tenant"""
try:
return await self.delete(f"forecasts/tenant/{tenant_id}")
except Exception as e:
logger.error("Failed to delete tenant forecasts",
tenant_id=tenant_id,
error=str(e))
return {
"forecasts_deleted": 0,
"predictions_deleted": 0,
"cache_cleared": 0,
"errors": [str(e)]
}
async def get_tenant_forecasts_count(self, tenant_id: str) -> int:
"""Get count of forecasts for a tenant"""
try:
result = await self.get(f"forecasts/tenant/{tenant_id}/count")
return result.get("count", 0) if result else 0
except Exception as e:
logger.error("Failed to get tenant forecasts count",
tenant_id=tenant_id,
error=str(e))
return 0
async def clear_tenant_prediction_cache(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Clear prediction cache for a tenant"""
try:
return await self.post(f"predictions/cache/clear/{tenant_id}")
except Exception as e:
logger.error("Failed to clear tenant prediction cache",
tenant_id=tenant_id,
error=str(e))
return {"cache_cleared": 0, "errors": [str(e)]}
async def cancel_tenant_prediction_batches(self, tenant_id: str) -> Optional[Dict[str, Any]]:
"""Cancel any active prediction batches for a tenant"""
try:
data = {"tenant_id": tenant_id}
return await self.post("predictions/batches/cancel", data=data)
except Exception as e:
logger.error("Failed to cancel tenant prediction batches",
tenant_id=tenant_id,
error=str(e))
return {"batches_cancelled": 0, "errors": [str(e)]}
# ================================================================
# NOTIFICATION SERVICE CLIENT
# ================================================================
class AuthNotificationServiceClient(BaseServiceClient):
"""Client for Auth Service to communicate with Notification Service"""
def __init__(self, config: BaseServiceSettings):
super().__init__("auth", config)
self.service_url = config.NOTIFICATION_SERVICE_URL
def get_service_base_path(self) -> str:
return "/api/v1"
# ================================================================
# USER NOTIFICATION OPERATIONS
# ================================================================
async def delete_user_notification_data(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Delete all notification data for a user"""
try:
return await self.delete(f"users/{user_id}/data")
except Exception as e:
logger.error("Failed to delete user notification data",
user_id=user_id,
error=str(e))
return {
"preferences_deleted": 0,
"notifications_deleted": 0,
"logs_deleted": 0,
"errors": [str(e)]
}
async def cancel_pending_user_notifications(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Cancel all pending notifications for a user"""
try:
return await self.post(f"users/{user_id}/notifications/cancel")
except Exception as e:
logger.error("Failed to cancel user notifications",
user_id=user_id,
error=str(e))
return {"notifications_cancelled": 0, "errors": [str(e)]}
async def get_user_notification_count(self, user_id: str) -> int:
"""Get count of notifications for a user"""
try:
result = await self.get(f"users/{user_id}/notifications/count")
return result.get("count", 0) if result else 0
except Exception as e:
logger.error("Failed to get user notification count",
user_id=user_id,
error=str(e))
return 0
async def send_user_deletion_notification(
self,
admin_email: str,
deleted_user_email: str,
deletion_summary: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Send notification about user deletion to administrators"""
try:
data = {
"type": "email",
"recipient_email": admin_email,
"template_key": "user_deletion_notification",
"template_data": {
"deleted_user_email": deleted_user_email,
"deletion_summary": deletion_summary
},
"priority": "high"
}
return await self.post("notifications/send", data=data)
except Exception as e:
logger.error("Failed to send user deletion notification",
admin_email=admin_email,
error=str(e))
return None
# ================================================================
# CLIENT FACTORY
# ================================================================
class AuthServiceClientFactory:
"""Factory for creating inter-service clients for Auth Service"""
def __init__(self, config: BaseServiceSettings):
self.config = config
self._tenant_client = None
self._training_client = None
self._forecasting_client = None
self._notification_client = None
@property
def tenant_client(self) -> AuthTenantServiceClient:
"""Get or create tenant service client"""
if self._tenant_client is None:
self._tenant_client = AuthTenantServiceClient(self.config)
return self._tenant_client
@property
def training_client(self) -> AuthTrainingServiceClient:
"""Get or create training service client"""
if self._training_client is None:
self._training_client = AuthTrainingServiceClient(self.config)
return self._training_client
@property
def forecasting_client(self) -> AuthForecastingServiceClient:
"""Get or create forecasting service client"""
if self._forecasting_client is None:
self._forecasting_client = AuthForecastingServiceClient(self.config)
return self._forecasting_client
@property
def notification_client(self) -> AuthNotificationServiceClient:
"""Get or create notification service client"""
if self._notification_client is None:
self._notification_client = AuthNotificationServiceClient(self.config)
return self._notification_client
async def health_check_all_services(self) -> Dict[str, bool]:
"""Check health of all services"""
results = {}
clients = [
("tenant", self.tenant_client),
("training", self.training_client),
("forecasting", self.forecasting_client),
("notification", self.notification_client)
]
for service_name, client in clients:
try:
health = await client.get("health")
results[service_name] = health is not None and health.get("status") == "healthy"
except Exception as e:
logger.error(f"Health check failed for {service_name} service", error=str(e))
results[service_name] = False
return results