""" Internal Demo API Endpoints for Orchestrator Service Used by demo_session service to clone data for virtual demo tenants """ from fastapi import APIRouter, Depends, HTTPException, Header from typing import Dict, Any from uuid import UUID import structlog import os from app.core.database import get_db from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete, func from app.models.orchestration_run import OrchestrationRun import uuid from datetime import datetime, timezone, timedelta from typing import Optional router = APIRouter() logger = structlog.get_logger() # Internal API key for service-to-service communication INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY", "dev-internal-key-change-in-production") def verify_internal_api_key(x_internal_api_key: str = Header(...)): """Verify internal API key for service-to-service communication""" if x_internal_api_key != INTERNAL_API_KEY: raise HTTPException(status_code=403, detail="Invalid internal API key") return True @router.post("/internal/demo/clone") async def clone_demo_data( base_tenant_id: str, virtual_tenant_id: str, demo_account_type: str, session_id: Optional[str] = None, session_created_at: Optional[str] = None, db: AsyncSession = Depends(get_db), _: bool = Depends(verify_internal_api_key) ): """ Clone orchestration run demo data from base tenant to virtual tenant This endpoint is called by the demo_session service during session initialization. It clones orchestration runs with date adjustments to make them appear recent. """ start_time = datetime.now(timezone.utc) # Parse session_created_at or use current time if session_created_at: try: reference_time = datetime.fromisoformat(session_created_at.replace('Z', '+00:00')) except: reference_time = datetime.now(timezone.utc) else: reference_time = datetime.now(timezone.utc) logger.info( "Starting orchestration runs cloning with date adjustment", base_tenant_id=base_tenant_id, virtual_tenant_id=virtual_tenant_id, demo_account_type=demo_account_type, session_id=session_id, reference_time=reference_time.isoformat() ) try: base_uuid = uuid.UUID(base_tenant_id) virtual_uuid = uuid.UUID(virtual_tenant_id) # Fetch base tenant orchestration runs # Get all completed and partial_success runs from the base tenant result = await db.execute( select(OrchestrationRun) .where(OrchestrationRun.tenant_id == base_uuid) .order_by(OrchestrationRun.started_at.desc()) .limit(10) # Clone last 10 runs for demo ) base_runs = list(result.scalars().all()) runs_cloned = 0 # Clone each orchestration run with date adjustment for base_run in base_runs: # Calculate time offset: how old was this run relative to when it was created # We'll adjust all timestamps to be relative to the session creation time if base_run.started_at: # Calculate how many days ago this run was from a reference point # Use a fixed reference date for consistency reference_date = datetime(2025, 1, 15, 12, 0, 0, tzinfo=timezone.utc) time_offset = base_run.started_at - reference_date # Apply this offset to the current reference time new_started_at = reference_time + time_offset else: new_started_at = reference_time - timedelta(hours=2) # Adjust completed_at if it exists if base_run.completed_at and base_run.started_at: duration = base_run.completed_at - base_run.started_at new_completed_at = new_started_at + duration else: new_completed_at = None # Adjust all step timestamps proportionally def adjust_timestamp(original_timestamp): if not original_timestamp or not base_run.started_at: return None step_offset = original_timestamp - base_run.started_at return new_started_at + step_offset # Create new orchestration run for virtual tenant new_run = OrchestrationRun( id=uuid.uuid4(), tenant_id=virtual_uuid, run_number=f"{base_run.run_number}-DEMO", status=base_run.status, run_type=base_run.run_type, priority=base_run.priority, started_at=new_started_at, completed_at=new_completed_at, duration_seconds=base_run.duration_seconds, # Forecasting step forecasting_started_at=adjust_timestamp(base_run.forecasting_started_at), forecasting_completed_at=adjust_timestamp(base_run.forecasting_completed_at), forecasting_status=base_run.forecasting_status, forecasting_error=base_run.forecasting_error, # Production step production_started_at=adjust_timestamp(base_run.production_started_at), production_completed_at=adjust_timestamp(base_run.production_completed_at), production_status=base_run.production_status, production_error=base_run.production_error, # Procurement step procurement_started_at=adjust_timestamp(base_run.procurement_started_at), procurement_completed_at=adjust_timestamp(base_run.procurement_completed_at), procurement_status=base_run.procurement_status, procurement_error=base_run.procurement_error, # Notification step notification_started_at=adjust_timestamp(base_run.notification_started_at), notification_completed_at=adjust_timestamp(base_run.notification_completed_at), notification_status=base_run.notification_status, notification_error=base_run.notification_error, # AI Insights (if exists) ai_insights_started_at=adjust_timestamp(base_run.ai_insights_started_at) if hasattr(base_run, 'ai_insights_started_at') else None, ai_insights_completed_at=adjust_timestamp(base_run.ai_insights_completed_at) if hasattr(base_run, 'ai_insights_completed_at') else None, ai_insights_status=base_run.ai_insights_status if hasattr(base_run, 'ai_insights_status') else None, ai_insights_generated=base_run.ai_insights_generated if hasattr(base_run, 'ai_insights_generated') else None, ai_insights_posted=base_run.ai_insights_posted if hasattr(base_run, 'ai_insights_posted') else None, # Results summary forecasts_generated=base_run.forecasts_generated, production_batches_created=base_run.production_batches_created, procurement_plans_created=base_run.procurement_plans_created, purchase_orders_created=base_run.purchase_orders_created, notifications_sent=base_run.notifications_sent, # Performance metrics fulfillment_rate=base_run.fulfillment_rate, on_time_delivery_rate=base_run.on_time_delivery_rate, cost_accuracy=base_run.cost_accuracy, quality_score=base_run.quality_score, # Data forecast_data=base_run.forecast_data, run_metadata=base_run.run_metadata, # Metadata triggered_by=base_run.triggered_by, created_at=reference_time, updated_at=reference_time ) db.add(new_run) await db.flush() runs_cloned += 1 await db.commit() duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) logger.info( "Orchestration runs cloned successfully", virtual_tenant_id=str(virtual_tenant_id), runs_cloned=runs_cloned, duration_ms=duration_ms ) return { "service": "orchestrator", "status": "completed", "success": True, "records_cloned": runs_cloned, "runs_cloned": runs_cloned, "duration_ms": duration_ms } except Exception as e: logger.error("Failed to clone orchestration runs", error=str(e), exc_info=True) await db.rollback() raise HTTPException(status_code=500, detail=f"Failed to clone orchestration runs: {str(e)}") @router.delete("/internal/demo/tenant/{virtual_tenant_id}") async def delete_demo_data( virtual_tenant_id: str, db: AsyncSession = Depends(get_db), _: bool = Depends(verify_internal_api_key) ): """Delete all orchestration runs for a virtual demo tenant""" logger.info("Deleting orchestration runs for virtual tenant", virtual_tenant_id=virtual_tenant_id) start_time = datetime.now(timezone.utc) try: virtual_uuid = uuid.UUID(virtual_tenant_id) # Count records run_count = await db.scalar( select(func.count(OrchestrationRun.id)) .where(OrchestrationRun.tenant_id == virtual_uuid) ) # Delete orchestration runs await db.execute( delete(OrchestrationRun) .where(OrchestrationRun.tenant_id == virtual_uuid) ) await db.commit() duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) logger.info( "Orchestration runs deleted successfully", virtual_tenant_id=virtual_tenant_id, duration_ms=duration_ms ) return { "service": "orchestrator", "status": "deleted", "virtual_tenant_id": virtual_tenant_id, "records_deleted": { "orchestration_runs": run_count, "total": run_count }, "duration_ms": duration_ms } except Exception as e: logger.error("Failed to delete orchestration runs", error=str(e), exc_info=True) await db.rollback() raise HTTPException(status_code=500, detail=str(e)) @router.get("/internal/demo/clone/health") async def health_check(_: bool = Depends(verify_internal_api_key)): """Health check for demo cloning endpoint""" return {"status": "healthy", "service": "orchestrator"}