""" Demo Data Cloning Orchestrator Coordinates asynchronous cloning across microservices """ import asyncio import httpx import structlog from datetime import datetime, timezone from typing import Dict, Any, List, Optional import os from enum import Enum from app.models.demo_session import CloningStatus logger = structlog.get_logger() class ServiceDefinition: """Definition of a service that can clone demo data""" def __init__(self, name: str, url: str, required: bool = True, timeout: float = 10.0): self.name = name self.url = url self.required = required # If True, failure blocks session creation self.timeout = timeout class CloneOrchestrator: """Orchestrates parallel demo data cloning across services""" def __init__(self): self.internal_api_key = os.getenv("INTERNAL_API_KEY", "dev-internal-key-change-in-production") # Define services that participate in cloning # URLs should be internal Kubernetes service names self.services = [ ServiceDefinition( name="tenant", url=os.getenv("TENANT_SERVICE_URL", "http://tenant-service:8000"), required=True, # Tenant must succeed - critical for session timeout=5.0 ), ServiceDefinition( name="inventory", url=os.getenv("INVENTORY_SERVICE_URL", "http://inventory-service:8000"), required=False, # Optional - provides ingredients/recipes timeout=30.0 # Increased for inventory data cloning ), ServiceDefinition( name="recipes", url=os.getenv("RECIPES_SERVICE_URL", "http://recipes-service:8000"), required=False, # Optional - provides recipes and production batches timeout=15.0 ), ServiceDefinition( name="suppliers", url=os.getenv("SUPPLIERS_SERVICE_URL", "http://suppliers-service:8000"), required=False, # Optional - provides supplier data and purchase orders timeout=20.0 # Longer - clones many entities ), ServiceDefinition( name="sales", url=os.getenv("SALES_SERVICE_URL", "http://sales-service:8000"), required=False, # Optional - provides sales history timeout=30.0 # Increased for sales data cloning ), ServiceDefinition( name="orders", url=os.getenv("ORDERS_SERVICE_URL", "http://orders-service:8000"), required=False, # Optional - provides customer orders & procurement timeout=15.0 # Slightly longer - clones more entities ), ServiceDefinition( name="production", url=os.getenv("PRODUCTION_SERVICE_URL", "http://production-service:8000"), required=False, # Optional - provides production batches and quality checks timeout=20.0 # Longer - clones many entities ), ServiceDefinition( name="forecasting", url=os.getenv("FORECASTING_SERVICE_URL", "http://forecasting-service:8000"), required=False, # Optional - provides historical forecasts timeout=15.0 ), ServiceDefinition( name="pos", url=os.getenv("POS_SERVICE_URL", "http://pos-service:8000"), required=False, # Optional - provides POS configurations timeout=30.0 # Increased for POS configurations cloning ), ServiceDefinition( name="procurement", url=os.getenv("PROCUREMENT_SERVICE_URL", "http://procurement-service:8000"), required=False, # Optional - provides procurement and purchase orders timeout=25.0 # Longer - clones many procurement entities ), ] async def clone_all_services( self, base_tenant_id: str, virtual_tenant_id: str, demo_account_type: str, session_id: str ) -> Dict[str, Any]: """ Orchestrate cloning across all services in parallel Args: base_tenant_id: Template tenant UUID virtual_tenant_id: Target virtual tenant UUID demo_account_type: Type of demo account session_id: Session ID for tracing Returns: Dictionary with overall status and per-service results """ logger.info( "Starting orchestrated cloning", session_id=session_id, virtual_tenant_id=virtual_tenant_id, demo_account_type=demo_account_type, service_count=len(self.services) ) start_time = datetime.now(timezone.utc) # Create tasks for all services tasks = [] service_map = {} for service_def in self.services: task = asyncio.create_task( self._clone_service( service_def=service_def, base_tenant_id=base_tenant_id, virtual_tenant_id=virtual_tenant_id, demo_account_type=demo_account_type, session_id=session_id ) ) tasks.append(task) service_map[task] = service_def.name # Wait for all tasks to complete (with individual timeouts) results = await asyncio.gather(*tasks, return_exceptions=True) # Process results service_results = {} total_records = 0 failed_services = [] required_service_failed = False for task, result in zip(tasks, results): service_name = service_map[task] service_def = next(s for s in self.services if s.name == service_name) if isinstance(result, Exception): logger.error( "Service cloning failed with exception", service=service_name, error=str(result) ) service_results[service_name] = { "status": CloningStatus.FAILED.value, "records_cloned": 0, "error": str(result), "duration_ms": 0 } failed_services.append(service_name) if service_def.required: required_service_failed = True else: service_results[service_name] = result if result.get("status") == "completed": total_records += result.get("records_cloned", 0) elif result.get("status") == "failed": failed_services.append(service_name) if service_def.required: required_service_failed = True # Determine overall status if required_service_failed: overall_status = "failed" elif failed_services: overall_status = "partial" else: overall_status = "ready" duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) result = { "overall_status": overall_status, "total_records_cloned": total_records, "duration_ms": duration_ms, "services": service_results, "failed_services": failed_services, "completed_at": datetime.now(timezone.utc).isoformat() } logger.info( "Orchestrated cloning completed", session_id=session_id, overall_status=overall_status, total_records=total_records, duration_ms=duration_ms, failed_services=failed_services ) return result async def _clone_service( self, service_def: ServiceDefinition, base_tenant_id: str, virtual_tenant_id: str, demo_account_type: str, session_id: str ) -> Dict[str, Any]: """ Clone data from a single service Args: service_def: Service definition base_tenant_id: Template tenant UUID virtual_tenant_id: Target virtual tenant UUID demo_account_type: Type of demo account session_id: Session ID for tracing Returns: Cloning result for this service """ logger.info( "Cloning service data", service=service_def.name, url=service_def.url, session_id=session_id ) try: async with httpx.AsyncClient(timeout=service_def.timeout) as client: # Get session creation time for date adjustment session_created_at = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') response = await client.post( f"{service_def.url}/internal/demo/clone", params={ "base_tenant_id": base_tenant_id, "virtual_tenant_id": virtual_tenant_id, "demo_account_type": demo_account_type, "session_id": session_id, "session_created_at": session_created_at }, headers={ "X-Internal-API-Key": self.internal_api_key } ) if response.status_code == 200: result = response.json() logger.info( "Service cloning succeeded", service=service_def.name, records=result.get("records_cloned", 0), duration_ms=result.get("duration_ms", 0) ) return result else: error_msg = f"HTTP {response.status_code}: {response.text}" logger.error( "Service cloning failed", service=service_def.name, error=error_msg ) return { "service": service_def.name, "status": "failed", "records_cloned": 0, "error": error_msg, "duration_ms": 0 } except asyncio.TimeoutError: error_msg = f"Timeout after {service_def.timeout}s" logger.error( "Service cloning timeout", service=service_def.name, timeout=service_def.timeout ) return { "service": service_def.name, "status": "failed", "records_cloned": 0, "error": error_msg, "duration_ms": int(service_def.timeout * 1000) } except Exception as e: logger.error( "Service cloning exception", service=service_def.name, error=str(e), exc_info=True ) return { "service": service_def.name, "status": "failed", "records_cloned": 0, "error": str(e), "duration_ms": 0 } async def health_check_services(self) -> Dict[str, bool]: """ Check health of all cloning endpoints Returns: Dictionary mapping service names to availability status """ tasks = [] service_names = [] for service_def in self.services: task = asyncio.create_task(self._check_service_health(service_def)) tasks.append(task) service_names.append(service_def.name) results = await asyncio.gather(*tasks, return_exceptions=True) return { name: (result is True) for name, result in zip(service_names, results) } async def _check_service_health(self, service_def: ServiceDefinition) -> bool: """Check if a service's clone endpoint is available""" try: async with httpx.AsyncClient(timeout=2.0) as client: response = await client.get( f"{service_def.url}/internal/demo/clone/health", headers={"X-Internal-API-Key": self.internal_api_key} ) return response.status_code == 200 except Exception: return False