""" Demo Sessions API - Atomic CRUD operations on DemoSession model """ from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request from typing import Optional from uuid import UUID from datetime import datetime, timezone import structlog import jwt from app.api.schemas import DemoSessionCreate, DemoSessionResponse from app.services import DemoSessionManager from app.core import get_db from app.core.redis_wrapper import get_redis, DemoRedisWrapper from sqlalchemy.ext.asyncio import AsyncSession from shared.routing import RouteBuilder router = APIRouter(tags=["demo-sessions"]) logger = structlog.get_logger() route_builder = RouteBuilder('demo') async def _background_cloning_task(session_id: str, session_obj_id: UUID, base_tenant_id: str): """Background task for orchestrated cloning - creates its own DB session""" from app.core.database import db_manager from app.models import DemoSession, DemoSessionStatus from sqlalchemy import select, update from app.core.redis_wrapper import get_redis logger.info( "Starting background cloning task", session_id=session_id, session_obj_id=str(session_obj_id), base_tenant_id=base_tenant_id ) # Create new database session for background task async with db_manager.session_factory() as db: try: # Get Redis client redis = await get_redis() # Fetch the session from the database result = await db.execute( select(DemoSession).where(DemoSession.id == session_obj_id) ) session = result.scalar_one_or_none() if not session: logger.error("Session not found for cloning", session_id=session_id) # Mark session as failed in Redis for frontend polling try: client = await redis.get_client() status_key = f"session:{session_id}:status" import json status_data = { "session_id": session_id, "status": "failed", "error": "Session not found in database", "progress": {}, "total_records_cloned": 0 } await client.setex(status_key, 7200, json.dumps(status_data)) except Exception as redis_error: logger.error("Failed to update Redis status for missing session", error=str(redis_error)) return logger.info( "Found session for cloning", session_id=session_id, current_status=session.status.value, demo_account_type=session.demo_account_type ) # Create session manager with new DB session session_manager = DemoSessionManager(db, redis) await session_manager.trigger_orchestrated_cloning(session, base_tenant_id) except Exception as e: logger.error( "Background cloning failed", session_id=session_id, error=str(e), exc_info=True ) # Attempt to update session status to failed if possible try: # Try to update the session directly in DB to mark it as failed async with db_manager.session_factory() as update_db: update_result = await update_db.execute( update(DemoSession) .where(DemoSession.id == session_obj_id) .values(status=DemoSessionStatus.FAILED, cloning_completed_at=datetime.now(timezone.utc)) ) await update_db.commit() logger.info("Successfully updated session status to FAILED in database") except Exception as update_error: logger.error( "Failed to update session status to FAILED after background task error", session_id=session_id, error=str(update_error) ) # Also update Redis status for frontend polling try: client = await redis.get_client() status_key = f"session:{session_id}:status" import json status_data = { "session_id": session_id, "status": "failed", "error": str(e), "progress": {}, "total_records_cloned": 0, "cloning_completed_at": datetime.now(timezone.utc).isoformat() } await client.setex(status_key, 7200, json.dumps(status_data)) logger.info("Successfully updated Redis status to FAILED") except Exception as redis_error: logger.error("Failed to update Redis status after background task error", error=str(redis_error)) def _handle_task_result(task, session_id: str): """Handle the result of the background cloning task""" try: # This will raise the exception if the task failed task.result() except Exception as e: logger.error( "Background cloning task failed with exception", session_id=session_id, error=str(e), exc_info=True ) # Try to update Redis status to reflect the failure try: from app.core.redis_wrapper import get_redis import json async def update_redis_status(): redis = await get_redis() client = await redis.get_client() status_key = f"session:{session_id}:status" status_data = { "session_id": session_id, "status": "failed", "error": f"Task exception: {str(e)}", "progress": {}, "total_records_cloned": 0, "cloning_completed_at": datetime.now(timezone.utc).isoformat() } await client.setex(status_key, 7200, json.dumps(status_data)) # Run the async function import asyncio asyncio.run(update_redis_status()) except Exception as redis_error: logger.error( "Failed to update Redis status in task result handler", session_id=session_id, error=str(redis_error) ) @router.post( route_builder.build_base_route("sessions", include_tenant_prefix=False), response_model=DemoSessionResponse, status_code=201 ) async def create_demo_session( request: DemoSessionCreate, http_request: Request, db: AsyncSession = Depends(get_db), redis: DemoRedisWrapper = Depends(get_redis) ): """Create a new isolated demo session (ATOMIC)""" logger.info("Creating demo session", demo_account_type=request.demo_account_type) try: ip_address = request.ip_address or http_request.client.host user_agent = request.user_agent or http_request.headers.get("user-agent", "") session_manager = DemoSessionManager(db, redis) session = await session_manager.create_session( demo_account_type=request.demo_account_type, subscription_tier=request.subscription_tier, user_id=request.user_id, ip_address=ip_address, user_agent=user_agent ) # Trigger async orchestrated cloning in background import asyncio from app.core.config import settings from app.models import DemoSession # Get base tenant ID from config demo_config = settings.DEMO_ACCOUNTS.get(request.demo_account_type, {}) base_tenant_id = demo_config.get("base_tenant_id", str(session.base_demo_tenant_id)) # Start cloning in background task with session ID (not session object) # Store task reference in case we need to track it task = asyncio.create_task( _background_cloning_task(session.session_id, session.id, base_tenant_id) ) # Add error handling for the task to prevent silent failures task.add_done_callback(lambda t: _handle_task_result(t, session.session_id)) # Get complete demo account data from config (includes user, tenant, subscription info) subscription_tier = demo_config.get("subscription_tier", "professional") user_data = demo_config.get("user", {}) tenant_data = demo_config.get("tenant", {}) # Generate session token with subscription data session_token = jwt.encode( { "session_id": session.session_id, "virtual_tenant_id": str(session.virtual_tenant_id), "demo_account_type": request.demo_account_type, "exp": session.expires_at.timestamp(), "tenant_id": str(session.virtual_tenant_id), "subscription": { "tier": subscription_tier, "status": "active", "valid_until": session.expires_at.isoformat() }, "is_demo": True }, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM ) # Build complete response like a real login would return return { "session_id": session.session_id, "virtual_tenant_id": str(session.virtual_tenant_id), "demo_account_type": session.demo_account_type, "status": session.status.value, "created_at": session.created_at, "expires_at": session.expires_at, "demo_config": session.session_metadata.get("demo_config", {}), "session_token": session_token, "subscription_tier": subscription_tier, "is_enterprise": session.demo_account_type == "enterprise", # Complete user data (like a real login response) "user": { "id": user_data.get("id"), "email": user_data.get("email"), "full_name": user_data.get("full_name"), "role": user_data.get("role", "owner"), "is_active": user_data.get("is_active", True), "is_verified": user_data.get("is_verified", True), "tenant_id": str(session.virtual_tenant_id), "created_at": session.created_at.isoformat() }, # Complete tenant data "tenant": { "id": str(session.virtual_tenant_id), "name": demo_config.get("name"), "subdomain": demo_config.get("subdomain"), "subscription_tier": subscription_tier, "tenant_type": demo_config.get("tenant_type", "standalone"), "business_type": tenant_data.get("business_type"), "business_model": tenant_data.get("business_model"), "description": tenant_data.get("description"), "is_active": True } } except Exception as e: logger.error("Failed to create demo session", error=str(e)) raise HTTPException(status_code=500, detail=f"Failed to create demo session: {str(e)}") @router.get( route_builder.build_resource_detail_route("sessions", "session_id", include_tenant_prefix=False), response_model=dict ) async def get_session_info( session_id: str = Path(...), db: AsyncSession = Depends(get_db), redis: DemoRedisWrapper = Depends(get_redis) ): """Get demo session information (ATOMIC READ)""" session_manager = DemoSessionManager(db, redis) session = await session_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") return session.to_dict() @router.get( route_builder.build_resource_detail_route("sessions", "session_id", include_tenant_prefix=False) + "/status", response_model=dict ) async def get_session_status( session_id: str = Path(...), db: AsyncSession = Depends(get_db), redis: DemoRedisWrapper = Depends(get_redis) ): """ Get demo session provisioning status Returns current status of data cloning and readiness. Use this endpoint for polling (recommended interval: 1-2 seconds). """ session_manager = DemoSessionManager(db, redis) status = await session_manager.get_session_status(session_id) if not status: raise HTTPException(status_code=404, detail="Session not found") return status @router.get( route_builder.build_resource_detail_route("sessions", "session_id", include_tenant_prefix=False) + "/errors", response_model=dict ) async def get_session_errors( session_id: str = Path(...), db: AsyncSession = Depends(get_db), redis: DemoRedisWrapper = Depends(get_redis) ): """ Get detailed error information for a failed demo session Returns comprehensive error details including: - Failed services and their specific errors - Network connectivity issues - Timeout problems - Service-specific error messages """ try: # Try to get the session first session_manager = DemoSessionManager(db, redis) session = await session_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") # Check if session has failed status if session.status != DemoSessionStatus.FAILED: return { "session_id": session_id, "status": session.status.value, "has_errors": False, "message": "Session has not failed - no error details available" } # Get detailed error information from cloning progress error_details = [] failed_services = [] if session.cloning_progress: for service_name, service_data in session.cloning_progress.items(): if isinstance(service_data, dict) and service_data.get("status") == "failed": failed_services.append(service_name) error_details.append({ "service": service_name, "error": service_data.get("error", "Unknown error"), "response_status": service_data.get("response_status"), "response_text": service_data.get("response_text", ""), "duration_ms": service_data.get("duration_ms", 0) }) # Check Redis for additional error information client = await redis.get_client() error_key = f"session:{session_id}:errors" redis_errors = await client.get(error_key) if redis_errors: import json try: additional_errors = json.loads(redis_errors) if isinstance(additional_errors, list): error_details.extend(additional_errors) elif isinstance(additional_errors, dict): error_details.append(additional_errors) except json.JSONDecodeError: logger.warning("Failed to parse Redis error data", session_id=session_id) # Create comprehensive error report error_report = { "session_id": session_id, "status": session.status.value, "has_errors": True, "failed_services": failed_services, "error_count": len(error_details), "errors": error_details, "cloning_started_at": session.cloning_started_at.isoformat() if session.cloning_started_at else None, "cloning_completed_at": session.cloning_completed_at.isoformat() if session.cloning_completed_at else None, "total_records_cloned": session.total_records_cloned, "demo_account_type": session.demo_account_type } # Add troubleshooting suggestions suggestions = [] if "tenant" in failed_services: suggestions.append("Check if tenant service is running and accessible") suggestions.append("Verify base tenant ID configuration") if "auth" in failed_services: suggestions.append("Check if auth service is running and accessible") suggestions.append("Verify seed data files for auth service") if any(svc in failed_services for svc in ["inventory", "recipes", "suppliers", "production"]): suggestions.append("Check if the specific service is running and accessible") suggestions.append("Verify seed data files exist and are valid") if any("timeout" in error.get("error", "").lower() for error in error_details): suggestions.append("Check service response times and consider increasing timeouts") suggestions.append("Verify network connectivity between services") if any("network" in error.get("error", "").lower() for error in error_details): suggestions.append("Check network connectivity between demo-session and other services") suggestions.append("Verify DNS resolution and service discovery") if suggestions: error_report["troubleshooting_suggestions"] = suggestions return error_report except Exception as e: logger.error( "Failed to retrieve session errors", session_id=session_id, error=str(e), exc_info=True ) raise HTTPException( status_code=500, detail=f"Failed to retrieve error details: {str(e)}" ) @router.post( route_builder.build_resource_detail_route("sessions", "session_id", include_tenant_prefix=False) + "/retry", response_model=dict ) async def retry_session_cloning( session_id: str = Path(...), db: AsyncSession = Depends(get_db), redis: DemoRedisWrapper = Depends(get_redis) ): """ Retry failed cloning operations Only available for sessions in "failed" or "partial" status. """ try: session_manager = DemoSessionManager(db, redis) result = await session_manager.retry_failed_cloning(session_id) return { "message": "Cloning retry initiated", "session_id": session_id, "result": result } except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error("Failed to retry cloning", error=str(e)) raise HTTPException(status_code=500, detail=str(e)) @router.delete( route_builder.build_resource_detail_route("sessions", "session_id", include_tenant_prefix=False), response_model=dict ) async def destroy_demo_session( session_id: str = Path(...), db: AsyncSession = Depends(get_db), redis: DemoRedisWrapper = Depends(get_redis) ): """Destroy demo session and cleanup resources (ATOMIC DELETE)""" try: session_manager = DemoSessionManager(db, redis) await session_manager.destroy_session(session_id) return {"message": "Session destroyed successfully", "session_id": session_id} except Exception as e: logger.error("Failed to destroy session", error=str(e)) raise HTTPException(status_code=500, detail=str(e)) @router.post( route_builder.build_resource_detail_route("sessions", "session_id", include_tenant_prefix=False) + "/destroy", response_model=dict ) async def destroy_demo_session_post( session_id: str = Path(...), db: AsyncSession = Depends(get_db), redis: DemoRedisWrapper = Depends(get_redis) ): """Destroy demo session via POST (for frontend compatibility)""" try: session_manager = DemoSessionManager(db, redis) await session_manager.destroy_session(session_id) return {"message": "Session destroyed successfully", "session_id": session_id} except Exception as e: logger.error("Failed to destroy session", error=str(e)) raise HTTPException(status_code=500, detail=str(e))