Files
bakery-ia/services/orchestrator/app/api/internal_demo.py

278 lines
9.7 KiB
Python
Raw Permalink Normal View History

2025-11-21 16:15:09 +01:00
"""
Internal Demo API Endpoints for Orchestrator Service
Used by demo_session service to clone data for virtual demo tenants
"""
from fastapi import APIRouter, Depends, HTTPException, Header
from typing import Dict, Any
from uuid import UUID
import structlog
import os
2025-12-13 23:57:54 +01:00
import json
2025-11-21 16:15:09 +01:00
from app.core.database import get_db
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete, func
Fix critical bugs and standardize service integrations Critical Fixes: - Orchestrator: Add missing OrchestrationStatus import (fixes HTTP 500 during demo clone) - Procurement: Migrate from custom cache utils to shared Redis utils - Suppliers: Use proper Settings for Redis configuration with TLS/auth - Recipes/Suppliers clients: Fix endpoint paths (remove duplicate path segments) - Procurement client: Use suppliers service directly for supplier details Details: 1. services/orchestrator/app/api/internal_demo.py: - Added OrchestrationStatus import to fix cloning error - This was causing HTTP 500 errors during demo session cloning 2. services/procurement/app/api/purchase_orders.py + service: - Replaced app.utils.cache with shared.redis_utils - Standardizes caching across all services - Removed custom cache utilities (deleted app/utils/cache.py) 3. services/suppliers/app/consumers/alert_event_consumer.py: - Use Settings().REDIS_URL instead of os.getenv - Ensures proper Redis connection with TLS and authentication 4. shared/clients/recipes_client.py: - Fixed endpoint paths: recipes/recipes/{id} → recipes/{id} - Applied to all recipe methods (by_id, by_products, instructions, yield) 5. shared/clients/suppliers_client.py: - Fixed endpoint path: suppliers/suppliers/{id} → suppliers/{id} 6. shared/clients/procurement_client.py: - get_supplier_by_id now uses SuppliersServiceClient directly - Removes incorrect call to procurement service for supplier details Impact: - Demo session cloning now works without orchestrator errors ✅ - Consistent Redis usage across all services - Correct service boundaries (suppliers data from suppliers service) - Clean client endpoint paths 🤖 Generated with Claude Code (https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-16 11:33:22 +01:00
from app.models.orchestration_run import OrchestrationRun, OrchestrationStatus
2025-11-21 16:15:09 +01:00
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional
import sys
from pathlib import Path
# Add shared utilities to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
2025-12-14 11:58:14 +01:00
from shared.utils.demo_dates import adjust_date_for_demo
2025-11-21 16:15:09 +01:00
2025-11-30 09:12:40 +01:00
from app.core.config import settings
2026-01-02 11:12:50 +01:00
router = APIRouter(prefix="/internal/demo", tags=["internal"])
2025-11-21 16:15:09 +01:00
logger = structlog.get_logger()
async def ensure_unique_run_number(db: AsyncSession, base_run_number: str) -> str:
"""Ensure the run number is unique by appending a suffix if needed"""
proposed_run_number = base_run_number
# Check if the proposed run number already exists in the database
while True:
result = await db.execute(
select(OrchestrationRun)
.where(OrchestrationRun.run_number == proposed_run_number)
)
existing_run = result.scalar_one_or_none()
if not existing_run:
# Run number is unique, return it
return proposed_run_number
# Generate a new run number with an additional random suffix
random_suffix = str(uuid.uuid4())[:4].upper()
proposed_run_number = f"{base_run_number[:50-len(random_suffix)-1]}-{random_suffix}"
2025-12-14 20:13:59 +01:00
async def load_fixture_data_for_tenant(
db: AsyncSession,
tenant_uuid: UUID,
demo_account_type: str,
2025-12-17 13:03:52 +01:00
reference_time: datetime,
base_tenant_id: Optional[str] = None
2025-12-14 20:13:59 +01:00
) -> int:
"""
Load orchestration run data from JSON fixture directly into the virtual tenant.
Returns the number of runs created.
"""
from shared.utils.seed_data_paths import get_seed_data_path
from shared.utils.demo_dates import resolve_time_marker, adjust_date_for_demo
# Load fixture data
2025-12-17 13:03:52 +01:00
if demo_account_type == "enterprise_child" and base_tenant_id:
json_file = get_seed_data_path("enterprise", "11-orchestrator.json", child_id=base_tenant_id)
else:
2025-12-14 20:13:59 +01:00
json_file = get_seed_data_path(demo_account_type, "11-orchestrator.json")
with open(json_file, 'r', encoding='utf-8') as f:
fixture_data = json.load(f)
orchestration_run_data = fixture_data.get("orchestration_run")
if not orchestration_run_data:
logger.warning("No orchestration_run data in fixture")
return 0
# Parse and adjust dates from fixture to reference_time
2025-12-15 21:14:22 +01:00
base_started_at = resolve_time_marker(orchestration_run_data.get("started_at"), reference_time)
base_completed_at = resolve_time_marker(orchestration_run_data.get("completed_at"), reference_time)
2025-12-14 20:13:59 +01:00
# Adjust dates to make them appear recent relative to session creation
started_at = adjust_date_for_demo(base_started_at, reference_time) if base_started_at else reference_time - timedelta(hours=2)
completed_at = adjust_date_for_demo(base_completed_at, reference_time) if base_completed_at else started_at + timedelta(minutes=15)
# Generate unique run number with session context
current_year = reference_time.year
unique_suffix = str(uuid.uuid4())[:8].upper()
run_number = f"ORCH-DEMO-PROF-{current_year}-001-{unique_suffix}"
# Create orchestration run for virtual tenant
new_run = OrchestrationRun(
id=uuid.uuid4(), # Generate new UUID
tenant_id=tenant_uuid,
run_number=run_number,
status=OrchestrationStatus[orchestration_run_data["status"]],
run_type=orchestration_run_data.get("run_type", "daily"),
priority="normal",
started_at=started_at,
completed_at=completed_at,
duration_seconds=orchestration_run_data.get("duration_seconds", 900),
# Step statuses from orchestration_results
forecasting_status="success",
forecasting_started_at=started_at,
forecasting_completed_at=started_at + timedelta(minutes=2),
production_status="success",
production_started_at=started_at + timedelta(minutes=2),
production_completed_at=started_at + timedelta(minutes=5),
procurement_status="success",
procurement_started_at=started_at + timedelta(minutes=5),
procurement_completed_at=started_at + timedelta(minutes=8),
notification_status="success",
notification_started_at=started_at + timedelta(minutes=8),
notification_completed_at=completed_at,
# Results from orchestration_results
forecasts_generated=fixture_data.get("orchestration_results", {}).get("forecasts_generated", 10),
production_batches_created=fixture_data.get("orchestration_results", {}).get("production_batches_created", 18),
procurement_plans_created=0,
purchase_orders_created=fixture_data.get("orchestration_results", {}).get("purchase_orders_created", 6),
notifications_sent=fixture_data.get("orchestration_results", {}).get("notifications_sent", 8),
# Metadata
triggered_by="system",
created_at=started_at,
updated_at=completed_at
)
db.add(new_run)
await db.flush()
logger.info(
"Loaded orchestration run from fixture",
tenant_id=str(tenant_uuid),
run_number=new_run.run_number,
started_at=started_at.isoformat()
)
return 1
2026-01-02 11:12:50 +01:00
@router.post("/clone")
2025-11-21 16:15:09 +01:00
async def clone_demo_data(
base_tenant_id: str,
virtual_tenant_id: str,
demo_account_type: str,
session_id: Optional[str] = None,
session_created_at: Optional[str] = None,
2026-01-12 14:24:14 +01:00
db: AsyncSession = Depends(get_db)
2025-11-21 16:15:09 +01:00
):
"""
Clone orchestration run demo data from base tenant to virtual tenant
This endpoint is called by the demo_session service during session initialization.
It clones orchestration runs with date adjustments to make them appear recent.
2025-12-14 20:13:59 +01:00
If the base tenant has no orchestration runs, it will first seed them from the fixture.
2025-11-21 16:15:09 +01:00
"""
start_time = datetime.now(timezone.utc)
# Parse session_created_at or use current time
if session_created_at:
try:
reference_time = datetime.fromisoformat(session_created_at.replace('Z', '+00:00'))
except:
reference_time = datetime.now(timezone.utc)
else:
reference_time = datetime.now(timezone.utc)
logger.info(
"Starting orchestration runs cloning with date adjustment",
base_tenant_id=base_tenant_id,
virtual_tenant_id=virtual_tenant_id,
demo_account_type=demo_account_type,
session_id=session_id,
reference_time=reference_time.isoformat()
)
try:
virtual_uuid = uuid.UUID(virtual_tenant_id)
2025-12-14 20:13:59 +01:00
# Load fixture data directly into virtual tenant (no base tenant cloning)
runs_created = await load_fixture_data_for_tenant(
db,
virtual_uuid,
demo_account_type,
2025-12-17 13:03:52 +01:00
reference_time,
base_tenant_id
2025-11-21 16:15:09 +01:00
)
await db.commit()
duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
logger.info(
2025-12-14 20:13:59 +01:00
"Orchestration runs loaded from fixture successfully",
2025-11-21 16:15:09 +01:00
virtual_tenant_id=str(virtual_tenant_id),
2025-12-14 20:13:59 +01:00
runs_created=runs_created,
2025-11-21 16:15:09 +01:00
duration_ms=duration_ms
)
return {
"service": "orchestrator",
"status": "completed",
"success": True,
2025-12-14 20:13:59 +01:00
"records_cloned": runs_created,
"runs_cloned": runs_created,
2025-11-21 16:15:09 +01:00
"duration_ms": duration_ms
}
except Exception as e:
logger.error("Failed to clone orchestration runs", error=str(e), exc_info=True)
await db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to clone orchestration runs: {str(e)}")
2026-01-02 12:18:46 +01:00
@router.delete("/tenant/{virtual_tenant_id}")
2025-11-21 16:15:09 +01:00
async def delete_demo_data(
virtual_tenant_id: str,
2026-01-12 14:24:14 +01:00
db: AsyncSession = Depends(get_db)
2025-11-21 16:15:09 +01:00
):
"""Delete all orchestration runs for a virtual demo tenant"""
logger.info("Deleting orchestration runs for virtual tenant", virtual_tenant_id=virtual_tenant_id)
start_time = datetime.now(timezone.utc)
try:
virtual_uuid = uuid.UUID(virtual_tenant_id)
# Count records
run_count = await db.scalar(
select(func.count(OrchestrationRun.id))
.where(OrchestrationRun.tenant_id == virtual_uuid)
)
# Delete orchestration runs
await db.execute(
delete(OrchestrationRun)
.where(OrchestrationRun.tenant_id == virtual_uuid)
)
await db.commit()
duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
logger.info(
"Orchestration runs deleted successfully",
virtual_tenant_id=virtual_tenant_id,
duration_ms=duration_ms
)
return {
"service": "orchestrator",
"status": "deleted",
"virtual_tenant_id": virtual_tenant_id,
"records_deleted": {
"orchestration_runs": run_count,
"total": run_count
},
"duration_ms": duration_ms
}
except Exception as e:
logger.error("Failed to delete orchestration runs", error=str(e), exc_info=True)
await db.rollback()
raise HTTPException(status_code=500, detail=str(e))
2026-01-02 12:18:46 +01:00
@router.get("/clone/health")
2026-01-12 14:24:14 +01:00
async def health_check():
2025-11-21 16:15:09 +01:00
"""Health check for demo cloning endpoint"""
return {"status": "healthy", "service": "orchestrator"}