1104 lines
42 KiB
Python
1104 lines
42 KiB
Python
"""
|
|
Simplified Demo Data Cloning Orchestrator
|
|
Coordinates direct HTTP calls to internal_demo endpoints across microservices
|
|
"""
|
|
|
|
import asyncio
|
|
import httpx
|
|
import structlog
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Any, List, Optional
|
|
from uuid import UUID
|
|
import os
|
|
|
|
from shared.clients.inventory_client import InventoryServiceClient
|
|
from shared.clients.production_client import ProductionServiceClient
|
|
from shared.clients.procurement_client import ProcurementServiceClient
|
|
from shared.config.base import BaseServiceSettings
|
|
from app.monitoring.metrics import (
|
|
demo_sessions_created_total,
|
|
demo_session_creation_duration_seconds,
|
|
demo_service_clone_duration_seconds,
|
|
demo_cloning_errors_total,
|
|
demo_sessions_active,
|
|
demo_alerts_generated_total,
|
|
demo_ai_insights_generated_total,
|
|
demo_cross_service_calls_total,
|
|
demo_cross_service_call_duration_seconds
|
|
)
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class ServiceDefinition:
|
|
"""Definition of a service that can clone demo data"""
|
|
|
|
def __init__(self, name: str, url: str, required: bool = True, timeout: float = 30.0):
|
|
self.name = name
|
|
self.url = url
|
|
self.required = required # If True, failure blocks session creation
|
|
self.timeout = timeout
|
|
|
|
|
|
class CloneOrchestrator:
|
|
"""Orchestrates parallel demo data cloning via direct HTTP calls to internal_demo endpoints"""
|
|
|
|
def __init__(self, redis_manager=None):
|
|
from app.core.config import settings
|
|
self.internal_api_key = settings.INTERNAL_API_KEY
|
|
self.redis_manager = redis_manager # For real-time progress updates
|
|
|
|
# Shared HTTP client with connection pooling
|
|
self._http_client: Optional[httpx.AsyncClient] = None
|
|
|
|
# Define services that participate in cloning
|
|
# URLs should be internal Kubernetes service names
|
|
self.services = [
|
|
ServiceDefinition(
|
|
name="tenant",
|
|
url=os.getenv("TENANT_SERVICE_URL", "http://tenant-service:8000"),
|
|
required=True, # Tenant must succeed - critical for session
|
|
timeout=10.0
|
|
),
|
|
ServiceDefinition(
|
|
name="auth",
|
|
url=os.getenv("AUTH_SERVICE_URL", "http://auth-service:8000"),
|
|
required=True, # Auth must succeed - users needed for demo
|
|
timeout=10.0
|
|
),
|
|
ServiceDefinition(
|
|
name="inventory",
|
|
url=os.getenv("INVENTORY_SERVICE_URL", "http://inventory-service:8000"),
|
|
required=False, # Optional - provides ingredients/stock
|
|
timeout=30.0
|
|
),
|
|
ServiceDefinition(
|
|
name="recipes",
|
|
url=os.getenv("RECIPES_SERVICE_URL", "http://recipes-service:8000"),
|
|
required=False, # Optional - provides recipes
|
|
timeout=15.0
|
|
),
|
|
ServiceDefinition(
|
|
name="suppliers",
|
|
url=os.getenv("SUPPLIERS_SERVICE_URL", "http://suppliers-service:8000"),
|
|
required=False, # Optional - provides supplier data
|
|
timeout=20.0
|
|
),
|
|
ServiceDefinition(
|
|
name="production",
|
|
url=os.getenv("PRODUCTION_SERVICE_URL", "http://production-service:8000"),
|
|
required=False, # Optional - provides production batches
|
|
timeout=30.0
|
|
),
|
|
ServiceDefinition(
|
|
name="procurement",
|
|
url=os.getenv("PROCUREMENT_SERVICE_URL", "http://procurement-service:8000"),
|
|
required=False, # Optional - provides purchase orders
|
|
timeout=25.0
|
|
),
|
|
ServiceDefinition(
|
|
name="sales",
|
|
url=os.getenv("SALES_SERVICE_URL", "http://sales-service:8000"),
|
|
required=False, # Optional - provides sales history
|
|
timeout=30.0
|
|
),
|
|
ServiceDefinition(
|
|
name="orders",
|
|
url=os.getenv("ORDERS_SERVICE_URL", "http://orders-service:8000"),
|
|
required=False, # Optional - provides customer orders
|
|
timeout=15.0
|
|
),
|
|
ServiceDefinition(
|
|
name="forecasting",
|
|
url=os.getenv("FORECASTING_SERVICE_URL", "http://forecasting-service:8000"),
|
|
required=False, # Optional - provides forecasts
|
|
timeout=15.0
|
|
),
|
|
ServiceDefinition(
|
|
name="orchestrator",
|
|
url=os.getenv("ORCHESTRATOR_SERVICE_URL", "http://orchestrator-service:8000"),
|
|
required=False, # Optional - provides orchestration history
|
|
timeout=15.0
|
|
),
|
|
ServiceDefinition(
|
|
name="distribution",
|
|
url=os.getenv("DISTRIBUTION_SERVICE_URL", "http://distribution-service:8000"),
|
|
required=False, # Optional - provides distribution routes and shipments
|
|
timeout=20.0
|
|
),
|
|
]
|
|
|
|
async def _get_http_client(self) -> httpx.AsyncClient:
|
|
"""Get or create shared HTTP client with connection pooling"""
|
|
if self._http_client is None or self._http_client.is_closed:
|
|
self._http_client = httpx.AsyncClient(
|
|
timeout=httpx.Timeout(30.0, connect=5.0),
|
|
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
|
|
)
|
|
return self._http_client
|
|
|
|
async def close(self):
|
|
"""Close the HTTP client"""
|
|
if self._http_client and not self._http_client.is_closed:
|
|
await self._http_client.aclose()
|
|
|
|
async def _update_progress_in_redis(
|
|
self,
|
|
session_id: str,
|
|
progress_data: Dict[str, Any]
|
|
) -> None:
|
|
"""Update cloning progress in Redis for real-time frontend polling"""
|
|
if not self.redis_manager:
|
|
return
|
|
|
|
try:
|
|
status_key = f"session:{session_id}:status"
|
|
client = await self.redis_manager.get_client()
|
|
|
|
# Get existing status data or create new
|
|
existing_data_str = await client.get(status_key)
|
|
if existing_data_str:
|
|
import json
|
|
status_data = json.loads(existing_data_str)
|
|
else:
|
|
status_data = {
|
|
"session_id": session_id,
|
|
"status": "cloning",
|
|
"progress": {},
|
|
"total_records_cloned": 0
|
|
}
|
|
|
|
# Update progress field with new data
|
|
status_data["progress"] = progress_data
|
|
|
|
# Calculate total records from services
|
|
total_records = sum(
|
|
service.get("records_cloned", 0)
|
|
for service in progress_data.values()
|
|
if isinstance(service, dict)
|
|
)
|
|
status_data["total_records_cloned"] = total_records
|
|
|
|
# Update Redis with 2-hour TTL
|
|
import json
|
|
await client.setex(
|
|
status_key,
|
|
7200, # 2 hours
|
|
json.dumps(status_data)
|
|
)
|
|
|
|
logger.debug(
|
|
"Updated progress in Redis",
|
|
session_id=session_id,
|
|
services_completed=len(progress_data),
|
|
total_records=total_records
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Failed to update progress in Redis",
|
|
session_id=session_id,
|
|
error=str(e)
|
|
)
|
|
|
|
async def _store_error_details_in_redis(
|
|
self,
|
|
session_id: str,
|
|
failed_services: List[str],
|
|
services_status: Dict[str, Any],
|
|
demo_account_type: str
|
|
) -> None:
|
|
"""Store detailed error information in Redis for failed cloning operations"""
|
|
if not self.redis_manager:
|
|
return
|
|
|
|
try:
|
|
error_key = f"session:{session_id}:errors"
|
|
client = await self.redis_manager.get_client()
|
|
|
|
# Extract detailed error information for each failed service
|
|
error_details = []
|
|
for service_name in failed_services:
|
|
if service_name in services_status:
|
|
service_data = services_status[service_name]
|
|
if isinstance(service_data, dict):
|
|
error_details.append({
|
|
"service": service_name,
|
|
"error": service_data.get("error", "Unknown error"),
|
|
"response_status": service_data.get("response_status"),
|
|
"response_text": service_data.get("response_text", ""),
|
|
"duration_ms": service_data.get("duration_ms", 0),
|
|
"records_cloned": service_data.get("records_cloned", 0)
|
|
})
|
|
|
|
# Create comprehensive error report
|
|
error_report = {
|
|
"session_id": session_id,
|
|
"demo_account_type": demo_account_type,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"failed_services": failed_services,
|
|
"error_count": len(error_details),
|
|
"errors": error_details
|
|
}
|
|
|
|
# Store in Redis with 2-hour TTL
|
|
import json
|
|
await client.setex(
|
|
error_key,
|
|
7200, # 2 hours
|
|
json.dumps(error_report)
|
|
)
|
|
|
|
logger.info(
|
|
"Stored error details in Redis",
|
|
session_id=session_id,
|
|
failed_services=failed_services,
|
|
error_count=len(error_details)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to store error details in Redis",
|
|
session_id=session_id,
|
|
error=str(e),
|
|
exc_info=True
|
|
)
|
|
|
|
async def clone_all_services(
|
|
self,
|
|
base_tenant_id: str,
|
|
virtual_tenant_id: str,
|
|
demo_account_type: str,
|
|
session_id: str,
|
|
session_metadata: Optional[Dict[str, Any]] = None,
|
|
services_filter: Optional[List[str]] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Orchestrate cloning via direct HTTP calls to internal_demo endpoints
|
|
|
|
Args:
|
|
base_tenant_id: Template tenant UUID
|
|
virtual_tenant_id: Target virtual tenant UUID
|
|
demo_account_type: Type of demo account ("professional" or "enterprise")
|
|
session_id: Session ID for tracing
|
|
session_metadata: Additional session metadata (required for enterprise demos)
|
|
services_filter: Optional list of service names to clone
|
|
|
|
Returns:
|
|
Dictionary with overall status and service results
|
|
"""
|
|
logger.info(
|
|
"Starting simplified cloning via direct HTTP calls",
|
|
session_id=session_id,
|
|
virtual_tenant_id=virtual_tenant_id,
|
|
demo_account_type=demo_account_type,
|
|
is_enterprise=demo_account_type == "enterprise"
|
|
)
|
|
|
|
start_time = datetime.now(timezone.utc)
|
|
|
|
# Update active sessions metric
|
|
demo_sessions_active.labels(tier=demo_account_type).inc()
|
|
|
|
# Filter services if specified
|
|
services_to_clone = self.services
|
|
if services_filter:
|
|
services_to_clone = [s for s in self.services if s.name in services_filter]
|
|
|
|
# Extract session creation time for date adjustments
|
|
session_created_at = datetime.now(timezone.utc)
|
|
if session_metadata:
|
|
created_at_str = session_metadata.get("created_at")
|
|
if created_at_str:
|
|
if isinstance(created_at_str, str):
|
|
session_created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00'))
|
|
elif isinstance(created_at_str, datetime):
|
|
session_created_at = created_at_str
|
|
|
|
# Clone parent tenant first (for both professional and enterprise)
|
|
parent_results = await self._clone_parent_tenant(
|
|
base_tenant_id=base_tenant_id,
|
|
virtual_tenant_id=virtual_tenant_id,
|
|
demo_account_type=demo_account_type,
|
|
session_id=session_id,
|
|
session_created_at=session_created_at,
|
|
services_to_clone=services_to_clone
|
|
)
|
|
|
|
# For enterprise, clone child outlets
|
|
child_results = []
|
|
if demo_account_type == "enterprise" and session_metadata:
|
|
child_results = await self._clone_child_outlets(
|
|
session_id=session_id,
|
|
virtual_parent_id=virtual_tenant_id,
|
|
session_metadata=session_metadata,
|
|
session_created_at=session_created_at
|
|
)
|
|
|
|
# Aggregate results
|
|
all_services = parent_results["services"]
|
|
failed_services = parent_results["failed_services"]
|
|
total_records = parent_results["total_records"]
|
|
|
|
# Add child results if any
|
|
if child_results:
|
|
all_services["children"] = child_results
|
|
for child in child_results:
|
|
if child.get("status") == "failed":
|
|
failed_services.append(f"child_{child.get('child_name')}")
|
|
total_records += child.get("records_cloned", 0)
|
|
|
|
# Determine overall status
|
|
if failed_services:
|
|
# Check if any required services failed
|
|
required_failed = any(
|
|
svc.name in failed_services
|
|
for svc in self.services
|
|
if svc.required
|
|
)
|
|
overall_status = "failed" if required_failed else "partial"
|
|
else:
|
|
overall_status = "completed"
|
|
|
|
duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
|
|
|
|
result = {
|
|
"overall_status": overall_status,
|
|
"services": all_services,
|
|
"total_records": total_records,
|
|
"failed_services": failed_services,
|
|
"duration_ms": duration_ms
|
|
}
|
|
|
|
# If cloning completed successfully, trigger post-clone operations in background
|
|
if overall_status in ["completed", "partial"]:
|
|
asyncio.create_task(self._run_post_clone_enrichments(
|
|
virtual_tenant_id=virtual_tenant_id,
|
|
demo_account_type=demo_account_type,
|
|
session_id=session_id
|
|
))
|
|
|
|
logger.info(
|
|
"Cloning completed",
|
|
session_id=session_id,
|
|
overall_status=overall_status,
|
|
total_records=total_records,
|
|
duration_ms=duration_ms,
|
|
failed_services_count=len(failed_services)
|
|
)
|
|
|
|
# Store detailed error information in Redis if cloning failed
|
|
if overall_status in ["failed", "partial"] and failed_services:
|
|
await self._store_error_details_in_redis(
|
|
session_id=session_id,
|
|
failed_services=failed_services,
|
|
services_status=all_services,
|
|
demo_account_type=demo_account_type
|
|
)
|
|
|
|
# Update Prometheus metrics
|
|
demo_session_creation_duration_seconds.labels(tier=demo_account_type).observe(duration_ms / 1000)
|
|
demo_sessions_created_total.labels(tier=demo_account_type, status=overall_status).inc()
|
|
|
|
# Update alert and insight metrics if available
|
|
if result.get("alert_generation"):
|
|
alert_gen = result["alert_generation"]
|
|
for alert_type, alerts in alert_gen.items():
|
|
if isinstance(alerts, dict) and alerts.get("alerts_generated"):
|
|
demo_alerts_generated_total.labels(
|
|
tier=demo_account_type,
|
|
alert_type=alert_type
|
|
).inc(alerts["alerts_generated"])
|
|
|
|
if result.get("ai_insights_generation"):
|
|
insights_gen = result["ai_insights_generation"]
|
|
for insight_type, insights in insights_gen.items():
|
|
if isinstance(insights, dict) and insights.get("insights_posted"):
|
|
demo_ai_insights_generated_total.labels(
|
|
tier=demo_account_type,
|
|
insight_type=insight_type
|
|
).inc(insights["insights_posted"])
|
|
|
|
return result
|
|
|
|
async def _clone_parent_tenant(
|
|
self,
|
|
base_tenant_id: str,
|
|
virtual_tenant_id: str,
|
|
demo_account_type: str,
|
|
session_id: str,
|
|
session_created_at: datetime,
|
|
services_to_clone: List[ServiceDefinition]
|
|
) -> Dict[str, Any]:
|
|
"""Clone data for parent tenant across all services"""
|
|
logger.info(
|
|
"Cloning parent tenant data",
|
|
session_id=session_id,
|
|
virtual_tenant_id=virtual_tenant_id,
|
|
services_count=len(services_to_clone)
|
|
)
|
|
|
|
# Clone all services in parallel
|
|
tasks = [
|
|
self._clone_service(
|
|
service=svc,
|
|
base_tenant_id=base_tenant_id,
|
|
virtual_tenant_id=virtual_tenant_id,
|
|
demo_account_type=demo_account_type,
|
|
session_id=session_id,
|
|
session_created_at=session_created_at
|
|
)
|
|
for svc in services_to_clone
|
|
]
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Process results
|
|
services_status = {}
|
|
failed_services = []
|
|
total_records = 0
|
|
|
|
for svc, result in zip(services_to_clone, results):
|
|
if isinstance(result, Exception):
|
|
logger.error(
|
|
"Service cloning failed with exception",
|
|
service=svc.name,
|
|
error=str(result),
|
|
exc_info=result
|
|
)
|
|
services_status[svc.name] = {
|
|
"status": "failed",
|
|
"error": str(result),
|
|
"records_cloned": 0
|
|
}
|
|
if svc.required:
|
|
failed_services.append(svc.name)
|
|
else:
|
|
services_status[svc.name] = result
|
|
if result.get("status") == "failed" and svc.required:
|
|
failed_services.append(svc.name)
|
|
total_records += result.get("records_cloned", 0)
|
|
|
|
# Update progress in Redis
|
|
await self._update_progress_in_redis(session_id, services_status)
|
|
|
|
return {
|
|
"services": services_status,
|
|
"failed_services": failed_services,
|
|
"total_records": total_records
|
|
}
|
|
|
|
async def _clone_service(
|
|
self,
|
|
service: ServiceDefinition,
|
|
base_tenant_id: str,
|
|
virtual_tenant_id: str,
|
|
demo_account_type: str,
|
|
session_id: str,
|
|
session_created_at: datetime
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Clone data from a single service via internal_demo endpoint
|
|
|
|
Args:
|
|
service: Service definition
|
|
base_tenant_id: Template tenant UUID
|
|
virtual_tenant_id: Target virtual tenant UUID
|
|
demo_account_type: Type of demo account
|
|
session_id: Session ID for tracing
|
|
session_created_at: Session creation timestamp
|
|
|
|
Returns:
|
|
Cloning result for this service
|
|
"""
|
|
logger.info(
|
|
"Cloning service data",
|
|
service=service.name,
|
|
virtual_tenant_id=virtual_tenant_id,
|
|
session_id=session_id,
|
|
service_url=service.url
|
|
)
|
|
|
|
start_time = datetime.now(timezone.utc)
|
|
|
|
try:
|
|
logger.debug(
|
|
"Attempting HTTP connection to service",
|
|
service=service.name,
|
|
url=f"{service.url}/internal/demo/clone",
|
|
timeout=service.timeout
|
|
)
|
|
|
|
client = await self._get_http_client()
|
|
logger.debug(
|
|
"Sending clone request",
|
|
service=service.name,
|
|
base_tenant_id=base_tenant_id,
|
|
virtual_tenant_id=virtual_tenant_id,
|
|
demo_account_type=demo_account_type
|
|
)
|
|
|
|
response = await client.post(
|
|
f"{service.url}/internal/demo/clone",
|
|
params={
|
|
"base_tenant_id": base_tenant_id,
|
|
"virtual_tenant_id": virtual_tenant_id,
|
|
"demo_account_type": demo_account_type,
|
|
"session_id": session_id,
|
|
"session_created_at": session_created_at.isoformat()
|
|
},
|
|
headers={"X-Internal-API-Key": self.internal_api_key},
|
|
timeout=service.timeout
|
|
)
|
|
|
|
duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
|
|
duration_seconds = duration_ms / 1000
|
|
|
|
logger.debug(
|
|
"Received response from service",
|
|
service=service.name,
|
|
status_code=response.status_code,
|
|
duration_ms=duration_ms
|
|
)
|
|
|
|
demo_cross_service_calls_total.labels(
|
|
source_service="demo-session",
|
|
target_service=service.name,
|
|
status="success"
|
|
).inc()
|
|
demo_cross_service_call_duration_seconds.labels(
|
|
source_service="demo-session",
|
|
target_service=service.name
|
|
).observe(duration_seconds)
|
|
demo_service_clone_duration_seconds.labels(
|
|
tier=demo_account_type,
|
|
service=service.name
|
|
).observe(duration_seconds)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
logger.info(
|
|
"Service cloning completed",
|
|
service=service.name,
|
|
records_cloned=result.get("records_cloned", 0),
|
|
duration_ms=duration_ms
|
|
)
|
|
return result
|
|
else:
|
|
error_msg = f"HTTP {response.status_code}: {response.text}"
|
|
logger.error(
|
|
"Service cloning failed",
|
|
service=service.name,
|
|
status_code=response.status_code,
|
|
error=error_msg,
|
|
response_text=response.text
|
|
)
|
|
|
|
demo_cross_service_calls_total.labels(
|
|
source_service="demo-session",
|
|
target_service=service.name,
|
|
status="failed"
|
|
).inc()
|
|
demo_cloning_errors_total.labels(
|
|
tier=demo_account_type,
|
|
service=service.name,
|
|
error_type="http_error"
|
|
).inc()
|
|
|
|
return {
|
|
"service": service.name,
|
|
"status": "failed",
|
|
"error": error_msg,
|
|
"records_cloned": 0,
|
|
"duration_ms": duration_ms,
|
|
"response_status": response.status_code,
|
|
"response_text": response.text
|
|
}
|
|
|
|
except httpx.TimeoutException:
|
|
duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
|
|
duration_seconds = duration_ms / 1000
|
|
error_msg = f"Timeout after {service.timeout}s"
|
|
logger.error(
|
|
"Service cloning timeout",
|
|
service=service.name,
|
|
timeout=service.timeout,
|
|
url=service.url
|
|
)
|
|
|
|
# Update error metrics
|
|
demo_cross_service_calls_total.labels(
|
|
source_service="demo-session",
|
|
target_service=service.name,
|
|
status="failed"
|
|
).inc()
|
|
demo_cloning_errors_total.labels(
|
|
tier=demo_account_type,
|
|
service=service.name,
|
|
error_type="timeout"
|
|
).inc()
|
|
demo_service_clone_duration_seconds.labels(
|
|
tier=demo_account_type,
|
|
service=service.name
|
|
).observe(duration_seconds)
|
|
|
|
return {
|
|
"service": service.name,
|
|
"status": "failed",
|
|
"error": error_msg,
|
|
"records_cloned": 0,
|
|
"duration_ms": duration_ms
|
|
}
|
|
|
|
except httpx.NetworkError as e:
|
|
duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
|
|
duration_seconds = duration_ms / 1000
|
|
error_msg = f"Network error: {str(e)}"
|
|
logger.error(
|
|
"Service cloning network error",
|
|
service=service.name,
|
|
error=str(e),
|
|
url=service.url,
|
|
exc_info=True
|
|
)
|
|
|
|
# Update error metrics
|
|
demo_cross_service_calls_total.labels(
|
|
source_service="demo-session",
|
|
target_service=service.name,
|
|
status="failed"
|
|
).inc()
|
|
demo_cloning_errors_total.labels(
|
|
tier=demo_account_type,
|
|
service=service.name,
|
|
error_type="network_error"
|
|
).inc()
|
|
demo_service_clone_duration_seconds.labels(
|
|
tier=demo_account_type,
|
|
service=service.name
|
|
).observe(duration_seconds)
|
|
|
|
return {
|
|
"service": service.name,
|
|
"status": "failed",
|
|
"error": error_msg,
|
|
"records_cloned": 0,
|
|
"duration_ms": duration_ms
|
|
}
|
|
|
|
except Exception as e:
|
|
duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
|
|
duration_seconds = duration_ms / 1000
|
|
error_msg = f"Unexpected error: {str(e)}"
|
|
logger.error(
|
|
"Service cloning exception",
|
|
service=service.name,
|
|
error=str(e),
|
|
url=service.url,
|
|
exc_info=True
|
|
)
|
|
|
|
# Update error metrics
|
|
demo_cross_service_calls_total.labels(
|
|
source_service="demo-session",
|
|
target_service=service.name,
|
|
status="failed"
|
|
).inc()
|
|
demo_cloning_errors_total.labels(
|
|
tier=demo_account_type,
|
|
service=service.name,
|
|
error_type="exception"
|
|
).inc()
|
|
demo_service_clone_duration_seconds.labels(
|
|
tier=demo_account_type,
|
|
service=service.name
|
|
).observe(duration_seconds)
|
|
|
|
return {
|
|
"service": service.name,
|
|
"status": "failed",
|
|
"error": error_msg,
|
|
"records_cloned": 0,
|
|
"duration_ms": duration_ms
|
|
}
|
|
|
|
async def _clone_child_outlets(
|
|
self,
|
|
session_id: str,
|
|
virtual_parent_id: str,
|
|
session_metadata: Dict[str, Any],
|
|
session_created_at: datetime
|
|
) -> List[Dict[str, Any]]:
|
|
"""Clone child outlets for enterprise demos"""
|
|
child_configs = session_metadata.get("child_configs", [])
|
|
child_tenant_ids = session_metadata.get("child_tenant_ids", [])
|
|
|
|
if not child_configs or not child_tenant_ids:
|
|
logger.warning("No child configs or IDs found for enterprise demo")
|
|
return []
|
|
|
|
logger.info(
|
|
"Cloning child outlets",
|
|
session_id=session_id,
|
|
child_count=len(child_configs)
|
|
)
|
|
|
|
tasks = [
|
|
self._clone_child_outlet(
|
|
session_id=session_id,
|
|
virtual_parent_id=virtual_parent_id,
|
|
virtual_child_id=child_tenant_ids[i],
|
|
child_config=child_config,
|
|
session_created_at=session_created_at
|
|
)
|
|
for i, child_config in enumerate(child_configs)
|
|
]
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Process results
|
|
child_results = []
|
|
for result in results:
|
|
if isinstance(result, Exception):
|
|
logger.error(
|
|
"Child outlet cloning failed",
|
|
error=str(result),
|
|
exc_info=result
|
|
)
|
|
child_results.append({
|
|
"status": "failed",
|
|
"error": str(result),
|
|
"records_cloned": 0
|
|
})
|
|
else:
|
|
child_results.append(result)
|
|
|
|
return child_results
|
|
|
|
async def _clone_child_outlet(
|
|
self,
|
|
session_id: str,
|
|
virtual_parent_id: str,
|
|
virtual_child_id: str,
|
|
child_config: Dict[str, Any],
|
|
session_created_at: datetime
|
|
) -> Dict[str, Any]:
|
|
"""Clone a single child outlet"""
|
|
child_name = child_config.get("name", "Unknown")
|
|
child_base_id = child_config.get("base_tenant_id")
|
|
location = child_config.get("location", {})
|
|
|
|
logger.info(
|
|
"Cloning child outlet",
|
|
session_id=session_id,
|
|
child_name=child_name,
|
|
virtual_child_id=virtual_child_id
|
|
)
|
|
|
|
try:
|
|
# First, create child tenant via tenant service
|
|
tenant_url = os.getenv("TENANT_SERVICE_URL", "http://tenant-service:8000")
|
|
client = await self._get_http_client()
|
|
response = await client.post(
|
|
f"{tenant_url}/internal/demo/create-child",
|
|
json={
|
|
"base_tenant_id": child_base_id,
|
|
"virtual_tenant_id": virtual_child_id,
|
|
"parent_tenant_id": virtual_parent_id,
|
|
"child_name": child_name,
|
|
"location": location,
|
|
"session_id": session_id
|
|
},
|
|
headers={"X-Internal-API-Key": self.internal_api_key},
|
|
timeout=30.0
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
return {
|
|
"child_id": virtual_child_id,
|
|
"child_name": child_name,
|
|
"status": "failed",
|
|
"error": f"Tenant creation failed: HTTP {response.status_code}",
|
|
"records_cloned": 0
|
|
}
|
|
|
|
# Then clone data from all services for this child
|
|
records_cloned = 0
|
|
for service in self.services:
|
|
if service.name == "tenant":
|
|
continue # Already created
|
|
|
|
try:
|
|
result = await self._clone_service(
|
|
service=service,
|
|
base_tenant_id=child_base_id,
|
|
virtual_tenant_id=virtual_child_id,
|
|
demo_account_type="enterprise_child",
|
|
session_id=session_id,
|
|
session_created_at=session_created_at
|
|
)
|
|
records_cloned += result.get("records_cloned", 0)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Child service cloning failed (non-fatal)",
|
|
child_name=child_name,
|
|
service=service.name,
|
|
error=str(e)
|
|
)
|
|
|
|
return {
|
|
"child_id": virtual_child_id,
|
|
"child_name": child_name,
|
|
"status": "completed",
|
|
"records_cloned": records_cloned
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Child outlet cloning failed",
|
|
child_name=child_name,
|
|
error=str(e),
|
|
exc_info=True
|
|
)
|
|
return {
|
|
"child_id": virtual_child_id,
|
|
"child_name": child_name,
|
|
"status": "failed",
|
|
"error": str(e),
|
|
"records_cloned": 0
|
|
}
|
|
|
|
async def _trigger_alert_generation_post_clone(
|
|
self,
|
|
virtual_tenant_id: str,
|
|
demo_account_type: str
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Trigger alert generation after demo data cloning completes.
|
|
|
|
Makes exactly 3 calls as required by orchestration demo:
|
|
1. Call to procurement service to check delivery status
|
|
2. Call to production service to trigger scheduler functionality
|
|
3. Call to inventory service to trigger inventory alerts
|
|
"""
|
|
results = {}
|
|
|
|
# Initialize shared clients
|
|
config = BaseServiceSettings()
|
|
inventory_client = InventoryServiceClient(config, calling_service_name="demo-session")
|
|
production_client = ProductionServiceClient(config, calling_service_name="demo-session")
|
|
procurement_client = ProcurementServiceClient(config, service_name="demo-session")
|
|
|
|
# Call 1: Trigger delivery tracking via procurement service (for all demo types)
|
|
try:
|
|
logger.info("Triggering delivery tracking", tenant_id=virtual_tenant_id)
|
|
result = await procurement_client.trigger_delivery_tracking_internal(virtual_tenant_id)
|
|
if result:
|
|
results["delivery_tracking"] = result
|
|
logger.info(
|
|
"Delivery tracking triggered",
|
|
tenant_id=virtual_tenant_id,
|
|
alerts_generated=result.get("alerts_generated", 0)
|
|
)
|
|
else:
|
|
results["delivery_tracking"] = {"error": "No result returned"}
|
|
except Exception as e:
|
|
logger.error("Failed to trigger delivery tracking", tenant_id=virtual_tenant_id, error=str(e))
|
|
results["delivery_tracking"] = {"error": str(e)}
|
|
|
|
# Calls 2 & 3: For professional/enterprise only
|
|
if demo_account_type in ["professional", "enterprise"]:
|
|
|
|
# Call 2: Trigger inventory alerts
|
|
try:
|
|
logger.info("Triggering inventory alerts", tenant_id=virtual_tenant_id)
|
|
result = await inventory_client.trigger_inventory_alerts_internal(virtual_tenant_id)
|
|
if result:
|
|
results["inventory_alerts"] = result
|
|
logger.info(
|
|
"Inventory alerts triggered",
|
|
tenant_id=virtual_tenant_id,
|
|
alerts_generated=result.get("alerts_generated", 0)
|
|
)
|
|
else:
|
|
results["inventory_alerts"] = {"error": "No result returned"}
|
|
except Exception as e:
|
|
logger.error("Failed to trigger inventory alerts", tenant_id=virtual_tenant_id, error=str(e))
|
|
results["inventory_alerts"] = {"error": str(e)}
|
|
|
|
# Call 3: Trigger production alerts
|
|
try:
|
|
logger.info("Triggering production alerts", tenant_id=virtual_tenant_id)
|
|
result = await production_client.trigger_production_alerts_internal(virtual_tenant_id)
|
|
if result:
|
|
results["production_alerts"] = result
|
|
logger.info(
|
|
"Production alerts triggered",
|
|
tenant_id=virtual_tenant_id,
|
|
alerts_generated=result.get("alerts_generated", 0)
|
|
)
|
|
else:
|
|
results["production_alerts"] = {"error": "No result returned"}
|
|
except Exception as e:
|
|
logger.error("Failed to trigger production alerts", tenant_id=virtual_tenant_id, error=str(e))
|
|
results["production_alerts"] = {"error": str(e)}
|
|
|
|
logger.info(
|
|
"Alert generation post-clone completed",
|
|
tenant_id=virtual_tenant_id,
|
|
delivery_alerts=results.get("delivery_tracking", {}).get("alerts_generated", 0),
|
|
production_alerts=results.get("production_alerts", {}).get("alerts_generated", 0),
|
|
inventory_alerts=results.get("inventory_alerts", {}).get("alerts_generated", 0)
|
|
)
|
|
|
|
return results
|
|
|
|
async def _trigger_ai_insights_generation_post_clone(
|
|
self,
|
|
virtual_tenant_id: str,
|
|
demo_account_type: str
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Trigger AI insights generation after demo data cloning completes.
|
|
|
|
This invokes the ML orchestrators in each service to analyze the seeded data
|
|
and generate actionable insights.
|
|
"""
|
|
results = {}
|
|
total_insights = 0
|
|
|
|
# Initialize shared clients
|
|
config = BaseServiceSettings()
|
|
inventory_client = InventoryServiceClient(config, calling_service_name="demo-session")
|
|
production_client = ProductionServiceClient(config, calling_service_name="demo-session")
|
|
procurement_client = ProcurementServiceClient(config, service_name="demo-session")
|
|
from shared.clients.forecast_client import ForecastServiceClient
|
|
forecasting_client = ForecastServiceClient(config, calling_service_name="demo-session")
|
|
|
|
# For professional/enterprise demos, trigger all AI insights
|
|
if demo_account_type in ["professional", "enterprise"]:
|
|
|
|
# 1. Trigger price forecasting insights
|
|
try:
|
|
logger.info("Triggering price forecasting insights", tenant_id=virtual_tenant_id)
|
|
result = await procurement_client.trigger_price_insights_internal(virtual_tenant_id)
|
|
if result:
|
|
results["price_insights"] = result
|
|
total_insights += result.get("insights_posted", 0)
|
|
logger.info(
|
|
"Price insights generated",
|
|
tenant_id=virtual_tenant_id,
|
|
insights_posted=result.get("insights_posted", 0)
|
|
)
|
|
else:
|
|
results["price_insights"] = {"error": "No response from service"}
|
|
except Exception as e:
|
|
logger.error("Failed to trigger price insights", tenant_id=virtual_tenant_id, error=str(e))
|
|
results["price_insights"] = {"error": str(e)}
|
|
|
|
# 2. Trigger safety stock optimization insights
|
|
try:
|
|
logger.info("Triggering safety stock optimization insights", tenant_id=virtual_tenant_id)
|
|
result = await inventory_client.trigger_safety_stock_insights_internal(virtual_tenant_id)
|
|
if result:
|
|
results["safety_stock_insights"] = result
|
|
total_insights += result.get("insights_posted", 0)
|
|
logger.info(
|
|
"Safety stock insights generated",
|
|
tenant_id=virtual_tenant_id,
|
|
insights_posted=result.get("insights_posted", 0)
|
|
)
|
|
else:
|
|
results["safety_stock_insights"] = {"error": "No response from service"}
|
|
except Exception as e:
|
|
logger.error("Failed to trigger safety stock insights", tenant_id=virtual_tenant_id, error=str(e))
|
|
results["safety_stock_insights"] = {"error": str(e)}
|
|
|
|
# 3. Trigger yield improvement insights
|
|
try:
|
|
logger.info("Triggering yield improvement insights", tenant_id=virtual_tenant_id)
|
|
result = await production_client.trigger_yield_insights_internal(virtual_tenant_id)
|
|
if result:
|
|
results["yield_insights"] = result
|
|
total_insights += result.get("insights_posted", 0)
|
|
logger.info(
|
|
"Yield insights generated",
|
|
tenant_id=virtual_tenant_id,
|
|
insights_posted=result.get("insights_posted", 0)
|
|
)
|
|
else:
|
|
results["yield_insights"] = {"error": "No response from service"}
|
|
except Exception as e:
|
|
logger.error("Failed to trigger yield insights", tenant_id=virtual_tenant_id, error=str(e))
|
|
results["yield_insights"] = {"error": str(e)}
|
|
|
|
# 4. Trigger demand forecasting insights
|
|
try:
|
|
logger.info("Triggering demand forecasting insights", tenant_id=virtual_tenant_id)
|
|
result = await forecasting_client.trigger_demand_insights_internal(virtual_tenant_id)
|
|
if result:
|
|
results["demand_insights"] = result
|
|
total_insights += result.get("insights_posted", 0)
|
|
logger.info(
|
|
"Demand insights generated",
|
|
tenant_id=virtual_tenant_id,
|
|
insights_posted=result.get("insights_posted", 0)
|
|
)
|
|
else:
|
|
results["demand_insights"] = {"error": "No response from service"}
|
|
except Exception as e:
|
|
logger.error("Failed to trigger demand insights", tenant_id=virtual_tenant_id, error=str(e))
|
|
results["demand_insights"] = {"error": str(e)}
|
|
|
|
logger.info(
|
|
"AI insights generation post-clone completed",
|
|
tenant_id=virtual_tenant_id,
|
|
total_insights_generated=total_insights
|
|
)
|
|
|
|
results["total_insights_generated"] = total_insights
|
|
return results
|
|
|
|
async def _run_post_clone_enrichments(
|
|
self,
|
|
virtual_tenant_id: str,
|
|
demo_account_type: str,
|
|
session_id: str
|
|
) -> None:
|
|
"""
|
|
Background task for non-blocking enrichments (alerts and AI insights).
|
|
Runs in fire-and-forget mode to avoid blocking session readiness.
|
|
"""
|
|
try:
|
|
logger.info(
|
|
"Starting background enrichments",
|
|
session_id=session_id,
|
|
tenant_id=virtual_tenant_id
|
|
)
|
|
|
|
await asyncio.gather(
|
|
self._trigger_alert_generation_post_clone(virtual_tenant_id, demo_account_type),
|
|
self._trigger_ai_insights_generation_post_clone(virtual_tenant_id, demo_account_type),
|
|
return_exceptions=True
|
|
)
|
|
|
|
if self.redis_manager:
|
|
client = await self.redis_manager.get_client()
|
|
await client.set(
|
|
f"session:{session_id}:enrichments_complete",
|
|
"true",
|
|
ex=7200
|
|
)
|
|
|
|
logger.info(
|
|
"Background enrichments completed",
|
|
session_id=session_id,
|
|
tenant_id=virtual_tenant_id
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Background enrichments failed",
|
|
session_id=session_id,
|
|
error=str(e)
|
|
)
|