Files
bakery-ia/services/demo_session/app/services/clone_orchestrator.py
2025-10-30 21:08:07 +01:00

347 lines
12 KiB
Python

"""
Demo Data Cloning Orchestrator
Coordinates asynchronous cloning across microservices
"""
import asyncio
import httpx
import structlog
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional
import os
from enum import Enum
from app.models.demo_session import CloningStatus
logger = structlog.get_logger()
class ServiceDefinition:
"""Definition of a service that can clone demo data"""
def __init__(self, name: str, url: str, required: bool = True, timeout: float = 10.0):
self.name = name
self.url = url
self.required = required # If True, failure blocks session creation
self.timeout = timeout
class CloneOrchestrator:
"""Orchestrates parallel demo data cloning across services"""
def __init__(self):
self.internal_api_key = os.getenv("INTERNAL_API_KEY", "dev-internal-key-change-in-production")
# Define services that participate in cloning
# URLs should be internal Kubernetes service names
self.services = [
ServiceDefinition(
name="tenant",
url=os.getenv("TENANT_SERVICE_URL", "http://tenant-service:8000"),
required=True, # Tenant must succeed - critical for session
timeout=5.0
),
ServiceDefinition(
name="inventory",
url=os.getenv("INVENTORY_SERVICE_URL", "http://inventory-service:8000"),
required=False, # Optional - provides ingredients/recipes
timeout=30.0 # Increased for inventory data cloning
),
ServiceDefinition(
name="recipes",
url=os.getenv("RECIPES_SERVICE_URL", "http://recipes-service:8000"),
required=False, # Optional - provides recipes and production batches
timeout=15.0
),
ServiceDefinition(
name="suppliers",
url=os.getenv("SUPPLIERS_SERVICE_URL", "http://suppliers-service:8000"),
required=False, # Optional - provides supplier data and purchase orders
timeout=20.0 # Longer - clones many entities
),
ServiceDefinition(
name="sales",
url=os.getenv("SALES_SERVICE_URL", "http://sales-service:8000"),
required=False, # Optional - provides sales history
timeout=30.0 # Increased for sales data cloning
),
ServiceDefinition(
name="orders",
url=os.getenv("ORDERS_SERVICE_URL", "http://orders-service:8000"),
required=False, # Optional - provides customer orders & procurement
timeout=15.0 # Slightly longer - clones more entities
),
ServiceDefinition(
name="production",
url=os.getenv("PRODUCTION_SERVICE_URL", "http://production-service:8000"),
required=False, # Optional - provides production batches and quality checks
timeout=20.0 # Longer - clones many entities
),
ServiceDefinition(
name="forecasting",
url=os.getenv("FORECASTING_SERVICE_URL", "http://forecasting-service:8000"),
required=False, # Optional - provides historical forecasts
timeout=15.0
),
ServiceDefinition(
name="pos",
url=os.getenv("POS_SERVICE_URL", "http://pos-service:8000"),
required=False, # Optional - provides POS configurations
timeout=30.0 # Increased for POS configurations cloning
),
ServiceDefinition(
name="procurement",
url=os.getenv("PROCUREMENT_SERVICE_URL", "http://procurement-service:8000"),
required=False, # Optional - provides procurement and purchase orders
timeout=25.0 # Longer - clones many procurement entities
),
]
async def clone_all_services(
self,
base_tenant_id: str,
virtual_tenant_id: str,
demo_account_type: str,
session_id: str
) -> Dict[str, Any]:
"""
Orchestrate cloning across all services in parallel
Args:
base_tenant_id: Template tenant UUID
virtual_tenant_id: Target virtual tenant UUID
demo_account_type: Type of demo account
session_id: Session ID for tracing
Returns:
Dictionary with overall status and per-service results
"""
logger.info(
"Starting orchestrated cloning",
session_id=session_id,
virtual_tenant_id=virtual_tenant_id,
demo_account_type=demo_account_type,
service_count=len(self.services)
)
start_time = datetime.now(timezone.utc)
# Create tasks for all services
tasks = []
service_map = {}
for service_def in self.services:
task = asyncio.create_task(
self._clone_service(
service_def=service_def,
base_tenant_id=base_tenant_id,
virtual_tenant_id=virtual_tenant_id,
demo_account_type=demo_account_type,
session_id=session_id
)
)
tasks.append(task)
service_map[task] = service_def.name
# Wait for all tasks to complete (with individual timeouts)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
service_results = {}
total_records = 0
failed_services = []
required_service_failed = False
for task, result in zip(tasks, results):
service_name = service_map[task]
service_def = next(s for s in self.services if s.name == service_name)
if isinstance(result, Exception):
logger.error(
"Service cloning failed with exception",
service=service_name,
error=str(result)
)
service_results[service_name] = {
"status": CloningStatus.FAILED.value,
"records_cloned": 0,
"error": str(result),
"duration_ms": 0
}
failed_services.append(service_name)
if service_def.required:
required_service_failed = True
else:
service_results[service_name] = result
if result.get("status") == "completed":
total_records += result.get("records_cloned", 0)
elif result.get("status") == "failed":
failed_services.append(service_name)
if service_def.required:
required_service_failed = True
# Determine overall status
if required_service_failed:
overall_status = "failed"
elif failed_services:
overall_status = "partial"
else:
overall_status = "ready"
duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
result = {
"overall_status": overall_status,
"total_records_cloned": total_records,
"duration_ms": duration_ms,
"services": service_results,
"failed_services": failed_services,
"completed_at": datetime.now(timezone.utc).isoformat()
}
logger.info(
"Orchestrated cloning completed",
session_id=session_id,
overall_status=overall_status,
total_records=total_records,
duration_ms=duration_ms,
failed_services=failed_services
)
return result
async def _clone_service(
self,
service_def: ServiceDefinition,
base_tenant_id: str,
virtual_tenant_id: str,
demo_account_type: str,
session_id: str
) -> Dict[str, Any]:
"""
Clone data from a single service
Args:
service_def: Service definition
base_tenant_id: Template tenant UUID
virtual_tenant_id: Target virtual tenant UUID
demo_account_type: Type of demo account
session_id: Session ID for tracing
Returns:
Cloning result for this service
"""
logger.info(
"Cloning service data",
service=service_def.name,
url=service_def.url,
session_id=session_id
)
try:
async with httpx.AsyncClient(timeout=service_def.timeout) as client:
# Get session creation time for date adjustment
session_created_at = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
response = await client.post(
f"{service_def.url}/internal/demo/clone",
params={
"base_tenant_id": base_tenant_id,
"virtual_tenant_id": virtual_tenant_id,
"demo_account_type": demo_account_type,
"session_id": session_id,
"session_created_at": session_created_at
},
headers={
"X-Internal-API-Key": self.internal_api_key
}
)
if response.status_code == 200:
result = response.json()
logger.info(
"Service cloning succeeded",
service=service_def.name,
records=result.get("records_cloned", 0),
duration_ms=result.get("duration_ms", 0)
)
return result
else:
error_msg = f"HTTP {response.status_code}: {response.text}"
logger.error(
"Service cloning failed",
service=service_def.name,
error=error_msg
)
return {
"service": service_def.name,
"status": "failed",
"records_cloned": 0,
"error": error_msg,
"duration_ms": 0
}
except asyncio.TimeoutError:
error_msg = f"Timeout after {service_def.timeout}s"
logger.error(
"Service cloning timeout",
service=service_def.name,
timeout=service_def.timeout
)
return {
"service": service_def.name,
"status": "failed",
"records_cloned": 0,
"error": error_msg,
"duration_ms": int(service_def.timeout * 1000)
}
except Exception as e:
logger.error(
"Service cloning exception",
service=service_def.name,
error=str(e),
exc_info=True
)
return {
"service": service_def.name,
"status": "failed",
"records_cloned": 0,
"error": str(e),
"duration_ms": 0
}
async def health_check_services(self) -> Dict[str, bool]:
"""
Check health of all cloning endpoints
Returns:
Dictionary mapping service names to availability status
"""
tasks = []
service_names = []
for service_def in self.services:
task = asyncio.create_task(self._check_service_health(service_def))
tasks.append(task)
service_names.append(service_def.name)
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
name: (result is True)
for name, result in zip(service_names, results)
}
async def _check_service_health(self, service_def: ServiceDefinition) -> bool:
"""Check if a service's clone endpoint is available"""
try:
async with httpx.AsyncClient(timeout=2.0) as client:
response = await client.get(
f"{service_def.url}/internal/demo/clone/health",
headers={"X-Internal-API-Key": self.internal_api_key}
)
return response.status_code == 200
except Exception:
return False