Fix and UI imporvements
This commit is contained in:
@@ -12,7 +12,7 @@ import structlog
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.database import AsyncSessionLocal
|
||||
from shared.schemas.events import MinimalEvent
|
||||
from shared.messaging import MinimalEvent
|
||||
from app.services.enrichment_orchestrator import EnrichmentOrchestrator
|
||||
from app.repositories.event_repository import EventRepository
|
||||
from shared.clients.notification_client import create_notification_client
|
||||
|
||||
@@ -115,9 +115,18 @@ class BusinessImpactAnalyzer:
|
||||
"""Analyze impact of procurement-related alerts"""
|
||||
impact = {}
|
||||
|
||||
# PO amount as financial impact
|
||||
po_amount = metadata.get("po_amount", metadata.get("total_amount", 0))
|
||||
impact["financial_impact_eur"] = float(po_amount)
|
||||
# Extract potential_loss_eur from reasoning_data.parameters
|
||||
reasoning_data = metadata.get("reasoning_data", {})
|
||||
parameters = reasoning_data.get("parameters", {})
|
||||
potential_loss_eur = parameters.get("potential_loss_eur")
|
||||
|
||||
# Use potential loss from reasoning as financial impact (what's at risk)
|
||||
# Fallback to PO amount only if reasoning data is not available
|
||||
if potential_loss_eur is not None:
|
||||
impact["financial_impact_eur"] = float(potential_loss_eur)
|
||||
else:
|
||||
po_amount = metadata.get("po_amount", metadata.get("total_amount", 0))
|
||||
impact["financial_impact_eur"] = float(po_amount)
|
||||
|
||||
# Days overdue affects customer impact
|
||||
days_overdue = metadata.get("days_overdue", 0)
|
||||
|
||||
@@ -48,6 +48,9 @@ class UrgencyAnalyzer:
|
||||
elif "delivery" in event_type or "overdue" in event_type:
|
||||
urgency.update(self._analyze_delivery_urgency(metadata))
|
||||
|
||||
elif "po_approval" in event_type:
|
||||
urgency.update(self._analyze_po_approval_urgency(metadata))
|
||||
|
||||
# Check for explicit deadlines
|
||||
if "required_delivery_date" in metadata:
|
||||
urgency.update(self._calculate_deadline_urgency(metadata["required_delivery_date"]))
|
||||
@@ -115,6 +118,38 @@ class UrgencyAnalyzer:
|
||||
|
||||
return urgency
|
||||
|
||||
def _analyze_po_approval_urgency(self, metadata: Dict[str, Any]) -> dict:
|
||||
"""
|
||||
Analyze urgency for PO approval alerts.
|
||||
|
||||
Uses stockout time (when you run out of stock) instead of delivery date
|
||||
to determine true urgency.
|
||||
"""
|
||||
urgency = {}
|
||||
|
||||
# Extract min_depletion_hours from reasoning_data.parameters
|
||||
reasoning_data = metadata.get("reasoning_data", {})
|
||||
parameters = reasoning_data.get("parameters", {})
|
||||
min_depletion_hours = parameters.get("min_depletion_hours")
|
||||
|
||||
if min_depletion_hours is not None:
|
||||
urgency["hours_until_consequence"] = max(0, round(min_depletion_hours, 1))
|
||||
urgency["can_wait_until_tomorrow"] = min_depletion_hours > 24
|
||||
|
||||
# Set deadline_utc to when stock runs out
|
||||
now = datetime.now(timezone.utc)
|
||||
stockout_time = now + timedelta(hours=min_depletion_hours)
|
||||
urgency["deadline_utc"] = stockout_time.isoformat()
|
||||
|
||||
logger.info(
|
||||
"po_approval_urgency_calculated",
|
||||
min_depletion_hours=min_depletion_hours,
|
||||
stockout_deadline=urgency["deadline_utc"],
|
||||
can_wait=urgency["can_wait_until_tomorrow"]
|
||||
)
|
||||
|
||||
return urgency
|
||||
|
||||
def _calculate_deadline_urgency(self, deadline_str: str) -> dict:
|
||||
"""Calculate urgency based on deadline"""
|
||||
try:
|
||||
|
||||
@@ -8,7 +8,7 @@ from typing import Dict, Any
|
||||
import structlog
|
||||
from uuid import uuid4
|
||||
|
||||
from shared.schemas.events import MinimalEvent
|
||||
from shared.messaging import MinimalEvent
|
||||
from app.schemas.events import EnrichedEvent, I18nContent, BusinessImpact, Urgency, UserAgency, OrchestratorContext
|
||||
from app.enrichment.message_generator import MessageGenerator
|
||||
from app.enrichment.priority_scorer import PriorityScorer
|
||||
|
||||
@@ -130,50 +130,85 @@ class ProfessionalCloningStrategy(CloningStrategy):
|
||||
tasks.append(task)
|
||||
service_map[task] = service_def.name
|
||||
|
||||
# Wait for all tasks to complete
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Process results
|
||||
# Process tasks as they complete for real-time progress updates
|
||||
service_results = {}
|
||||
total_records = 0
|
||||
failed_services = []
|
||||
required_service_failed = False
|
||||
completed_count = 0
|
||||
total_count = len(tasks)
|
||||
|
||||
for task, result in zip(tasks, results):
|
||||
service_name = service_map[task]
|
||||
service_def = next(s for s in services_to_clone if s.name == service_name)
|
||||
# Create a mapping from futures to service names to properly identify completed tasks
|
||||
# We'll use asyncio.wait approach instead of as_completed to access the original tasks
|
||||
pending = set(tasks)
|
||||
completed_tasks_info = {task: service_map[task] for task in tasks} # Map tasks to service names
|
||||
|
||||
if isinstance(result, Exception):
|
||||
logger.error(
|
||||
f"Service {service_name} cloning failed with exception",
|
||||
session_id=context.session_id,
|
||||
error=str(result)
|
||||
)
|
||||
service_results[service_name] = {
|
||||
"status": "failed",
|
||||
"error": str(result),
|
||||
"records_cloned": 0
|
||||
}
|
||||
failed_services.append(service_name)
|
||||
if service_def.required:
|
||||
required_service_failed = True
|
||||
else:
|
||||
service_results[service_name] = result
|
||||
if result.get("status") == "failed":
|
||||
while pending:
|
||||
# Wait for at least one task to complete
|
||||
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
|
||||
|
||||
# Process each completed task
|
||||
for completed_task in done:
|
||||
try:
|
||||
# Get the result from the completed task
|
||||
result = await completed_task
|
||||
# Get the service name from our mapping
|
||||
service_name = completed_tasks_info[completed_task]
|
||||
service_def = next(s for s in services_to_clone if s.name == service_name)
|
||||
|
||||
service_results[service_name] = result
|
||||
completed_count += 1
|
||||
|
||||
if result.get("status") == "failed":
|
||||
failed_services.append(service_name)
|
||||
if service_def.required:
|
||||
required_service_failed = True
|
||||
else:
|
||||
total_records += result.get("records_cloned", 0)
|
||||
|
||||
# Track successful services for rollback
|
||||
if result.get("status") == "completed":
|
||||
rollback_stack.append({
|
||||
"type": "service",
|
||||
"service_name": service_name,
|
||||
"tenant_id": context.virtual_tenant_id,
|
||||
"session_id": context.session_id
|
||||
})
|
||||
|
||||
# Update Redis with granular progress after each service completes
|
||||
await context.orchestrator._update_progress_in_redis(context.session_id, {
|
||||
"completed_services": completed_count,
|
||||
"total_services": total_count,
|
||||
"progress_percentage": int((completed_count / total_count) * 100),
|
||||
"services": service_results,
|
||||
"total_records_cloned": total_records
|
||||
})
|
||||
|
||||
logger.info(
|
||||
f"Service {service_name} completed ({completed_count}/{total_count})",
|
||||
session_id=context.session_id,
|
||||
records_cloned=result.get("records_cloned", 0)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Handle exceptions from the task itself
|
||||
service_name = completed_tasks_info[completed_task]
|
||||
service_def = next(s for s in services_to_clone if s.name == service_name)
|
||||
|
||||
logger.error(
|
||||
f"Service {service_name} cloning failed with exception",
|
||||
session_id=context.session_id,
|
||||
error=str(e)
|
||||
)
|
||||
service_results[service_name] = {
|
||||
"status": "failed",
|
||||
"error": str(e),
|
||||
"records_cloned": 0
|
||||
}
|
||||
failed_services.append(service_name)
|
||||
completed_count += 1
|
||||
if service_def.required:
|
||||
required_service_failed = True
|
||||
else:
|
||||
total_records += result.get("records_cloned", 0)
|
||||
|
||||
# Track successful services for rollback
|
||||
if result.get("status") == "completed":
|
||||
rollback_stack.append({
|
||||
"type": "service",
|
||||
"service_name": service_name,
|
||||
"tenant_id": context.virtual_tenant_id,
|
||||
"session_id": context.session_id
|
||||
})
|
||||
|
||||
# Determine overall status
|
||||
if required_service_failed:
|
||||
@@ -475,7 +510,7 @@ class EnterpriseCloningStrategy(CloningStrategy):
|
||||
elif failed_children > 0:
|
||||
overall_status = "partial"
|
||||
else:
|
||||
overall_status = "ready"
|
||||
overall_status = "completed" # Changed from "ready" to match professional strategy
|
||||
|
||||
# Calculate total records cloned (parent + all children)
|
||||
total_records_cloned = parent_result.get("total_records", 0)
|
||||
|
||||
@@ -464,6 +464,14 @@ class DemoSessionManager:
|
||||
"""Cache session status in Redis for fast status checks"""
|
||||
status_key = f"session:{session.session_id}:status"
|
||||
|
||||
# Calculate estimated remaining time based on demo tier
|
||||
estimated_remaining_seconds = None
|
||||
if session.cloning_started_at and not session.cloning_completed_at:
|
||||
elapsed = (datetime.now(timezone.utc) - session.cloning_started_at).total_seconds()
|
||||
# Professional: ~40s average, Enterprise: ~75s average
|
||||
avg_duration = 75 if session.demo_account_type == 'enterprise' else 40
|
||||
estimated_remaining_seconds = max(0, int(avg_duration - elapsed))
|
||||
|
||||
status_data = {
|
||||
"session_id": session.session_id,
|
||||
"status": session.status.value,
|
||||
@@ -471,7 +479,9 @@ class DemoSessionManager:
|
||||
"total_records_cloned": session.total_records_cloned,
|
||||
"cloning_started_at": session.cloning_started_at.isoformat() if session.cloning_started_at else None,
|
||||
"cloning_completed_at": session.cloning_completed_at.isoformat() if session.cloning_completed_at else None,
|
||||
"expires_at": session.expires_at.isoformat()
|
||||
"expires_at": session.expires_at.isoformat(),
|
||||
"estimated_remaining_seconds": estimated_remaining_seconds,
|
||||
"demo_account_type": session.demo_account_type
|
||||
}
|
||||
|
||||
import json as json_module
|
||||
@@ -508,6 +518,14 @@ class DemoSessionManager:
|
||||
|
||||
await self._cache_session_status(session)
|
||||
|
||||
# Calculate estimated remaining time for database fallback
|
||||
estimated_remaining_seconds = None
|
||||
if session.cloning_started_at and not session.cloning_completed_at:
|
||||
elapsed = (datetime.now(timezone.utc) - session.cloning_started_at).total_seconds()
|
||||
# Professional: ~40s average, Enterprise: ~75s average
|
||||
avg_duration = 75 if session.demo_account_type == 'enterprise' else 40
|
||||
estimated_remaining_seconds = max(0, int(avg_duration - elapsed))
|
||||
|
||||
return {
|
||||
"session_id": session.session_id,
|
||||
"status": session.status.value,
|
||||
@@ -515,7 +533,9 @@ class DemoSessionManager:
|
||||
"total_records_cloned": session.total_records_cloned,
|
||||
"cloning_started_at": session.cloning_started_at.isoformat() if session.cloning_started_at else None,
|
||||
"cloning_completed_at": session.cloning_completed_at.isoformat() if session.cloning_completed_at else None,
|
||||
"expires_at": session.expires_at.isoformat()
|
||||
"expires_at": session.expires_at.isoformat(),
|
||||
"estimated_remaining_seconds": estimated_remaining_seconds,
|
||||
"demo_account_type": session.demo_account_type
|
||||
}
|
||||
|
||||
async def retry_failed_cloning(
|
||||
|
||||
@@ -39,8 +39,7 @@ from typing import List, Dict, Any
|
||||
# Add project root to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
|
||||
|
||||
from shared.messaging import RabbitMQClient
|
||||
from shared.schemas.alert_types import AlertTypeConstants
|
||||
from shared.messaging import RabbitMQClient, AlertTypeConstants
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
@@ -114,16 +114,45 @@ async def seed_distribution_history(db: AsyncSession):
|
||||
total_distance_km = random.uniform(75.0, 95.0) # Realistic for 3 retail outlets in region
|
||||
estimated_duration_minutes = random.randint(180, 240) # 3-4 hours for 3 stops
|
||||
|
||||
# Route sequence (order of deliveries)
|
||||
# Route sequence (order of deliveries) with full GPS coordinates for map display
|
||||
# Determine status based on date
|
||||
is_past = delivery_date < BASE_REFERENCE_DATE
|
||||
point_status = "delivered" if is_past else "pending"
|
||||
|
||||
route_sequence = [
|
||||
{"stop": 1, "tenant_id": str(DEMO_TENANT_CHILD_1), "location": "Madrid Centro"},
|
||||
{"stop": 2, "tenant_id": str(DEMO_TENANT_CHILD_2), "location": "Barcelona Gràcia"},
|
||||
{"stop": 3, "tenant_id": str(DEMO_TENANT_CHILD_3), "location": "Valencia Ruzafa"}
|
||||
{
|
||||
"tenant_id": str(DEMO_TENANT_CHILD_1),
|
||||
"name": "Madrid Centro",
|
||||
"address": "Calle Gran Vía 28, 28013 Madrid, Spain",
|
||||
"latitude": 40.4168,
|
||||
"longitude": -3.7038,
|
||||
"status": point_status,
|
||||
"id": str(uuid.uuid4()),
|
||||
"sequence": 1
|
||||
},
|
||||
{
|
||||
"tenant_id": str(DEMO_TENANT_CHILD_2),
|
||||
"name": "Barcelona Gràcia",
|
||||
"address": "Carrer Gran de Gràcia 15, 08012 Barcelona, Spain",
|
||||
"latitude": 41.4036,
|
||||
"longitude": 2.1561,
|
||||
"status": point_status,
|
||||
"id": str(uuid.uuid4()),
|
||||
"sequence": 2
|
||||
},
|
||||
{
|
||||
"tenant_id": str(DEMO_TENANT_CHILD_3),
|
||||
"name": "Valencia Ruzafa",
|
||||
"address": "Carrer de Sueca 51, 46006 Valencia, Spain",
|
||||
"latitude": 39.4647,
|
||||
"longitude": -0.3679,
|
||||
"status": point_status,
|
||||
"id": str(uuid.uuid4()),
|
||||
"sequence": 3
|
||||
}
|
||||
]
|
||||
|
||||
# Determine status based on whether the date is in the past or future
|
||||
# Past routes are completed, today and future routes are planned
|
||||
is_past = delivery_date < BASE_REFERENCE_DATE
|
||||
# Route status (already determined is_past above)
|
||||
route_status = DeliveryRouteStatus.completed if is_past else DeliveryRouteStatus.planned
|
||||
|
||||
route = DeliveryRoute(
|
||||
|
||||
@@ -40,6 +40,8 @@ class OrchestratorSettings(BaseServiceSettings):
|
||||
# Orchestration Settings
|
||||
ORCHESTRATION_ENABLED: bool = os.getenv("ORCHESTRATION_ENABLED", "true").lower() == "true"
|
||||
ORCHESTRATION_SCHEDULE: str = os.getenv("ORCHESTRATION_SCHEDULE", "30 5 * * *") # 5:30 AM daily (cron format)
|
||||
ORCHESTRATION_HOUR: int = int(os.getenv("ORCHESTRATION_HOUR", "2")) # Hour to run daily orchestration (default: 2 AM)
|
||||
ORCHESTRATION_MINUTE: int = int(os.getenv("ORCHESTRATION_MINUTE", "0")) # Minute to run (default: :00)
|
||||
ORCHESTRATION_TIMEOUT_SECONDS: int = int(os.getenv("ORCHESTRATION_TIMEOUT_SECONDS", "600")) # 10 minutes
|
||||
|
||||
# Tenant Processing
|
||||
|
||||
@@ -19,6 +19,7 @@ from datetime import datetime, date, timezone
|
||||
from decimal import Decimal
|
||||
from typing import List, Dict, Any, Optional
|
||||
import structlog
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
|
||||
# Updated imports - removed old alert system
|
||||
@@ -51,6 +52,9 @@ class OrchestratorSchedulerService:
|
||||
self.publisher = event_publisher
|
||||
self.config = config
|
||||
|
||||
# APScheduler instance for running daily orchestration
|
||||
self.scheduler = None
|
||||
|
||||
# Service clients
|
||||
self.forecast_client = ForecastServiceClient(config, "orchestrator-service")
|
||||
self.production_client = ProductionServiceClient(config, "orchestrator-service")
|
||||
@@ -670,13 +674,46 @@ class OrchestratorSchedulerService:
|
||||
|
||||
async def start(self):
|
||||
"""Start the orchestrator scheduler service"""
|
||||
logger.info("OrchestratorSchedulerService started")
|
||||
# Add any initialization logic here if needed
|
||||
if not settings.ORCHESTRATION_ENABLED:
|
||||
logger.info("Orchestration disabled via config")
|
||||
return
|
||||
|
||||
# Initialize APScheduler
|
||||
self.scheduler = AsyncIOScheduler()
|
||||
|
||||
# Add daily orchestration job
|
||||
self.scheduler.add_job(
|
||||
self.run_daily_orchestration,
|
||||
trigger=CronTrigger(
|
||||
hour=settings.ORCHESTRATION_HOUR,
|
||||
minute=settings.ORCHESTRATION_MINUTE
|
||||
),
|
||||
id='daily_orchestration',
|
||||
name='Daily Orchestration Workflow',
|
||||
replace_existing=True,
|
||||
max_instances=1,
|
||||
coalesce=True
|
||||
)
|
||||
|
||||
# Start the scheduler
|
||||
self.scheduler.start()
|
||||
|
||||
# Log next run time
|
||||
next_run = self.scheduler.get_job('daily_orchestration').next_run_time
|
||||
logger.info(
|
||||
"OrchestratorSchedulerService started with daily job",
|
||||
orchestration_hour=settings.ORCHESTRATION_HOUR,
|
||||
orchestration_minute=settings.ORCHESTRATION_MINUTE,
|
||||
next_run=next_run.isoformat() if next_run else None
|
||||
)
|
||||
|
||||
async def stop(self):
|
||||
"""Stop the orchestrator scheduler service"""
|
||||
logger.info("OrchestratorSchedulerService stopped")
|
||||
# Add any cleanup logic here if needed
|
||||
if self.scheduler and self.scheduler.running:
|
||||
self.scheduler.shutdown(wait=True)
|
||||
logger.info("OrchestratorSchedulerService stopped")
|
||||
else:
|
||||
logger.info("OrchestratorSchedulerService already stopped")
|
||||
|
||||
def get_circuit_breaker_stats(self) -> Dict[str, Any]:
|
||||
"""Get circuit breaker statistics for monitoring"""
|
||||
|
||||
@@ -42,18 +42,41 @@ class DeliveryTrackingService:
|
||||
|
||||
async def start(self):
|
||||
"""Start the delivery tracking scheduler"""
|
||||
# Initialize and start scheduler if not already running
|
||||
if not self.scheduler.running:
|
||||
# Add hourly job to check deliveries
|
||||
self.scheduler.add_job(
|
||||
self._check_all_tenants,
|
||||
trigger=CronTrigger(minute=30), # Run every hour at :30 (00:30, 01:30, 02:30, etc.)
|
||||
id='hourly_delivery_check',
|
||||
name='Hourly Delivery Tracking',
|
||||
replace_existing=True,
|
||||
max_instances=1, # Ensure no overlapping runs
|
||||
coalesce=True # Combine missed runs
|
||||
)
|
||||
|
||||
self.scheduler.start()
|
||||
logger.info(
|
||||
"Delivery tracking scheduler started",
|
||||
instance_id=self.instance_id
|
||||
)
|
||||
|
||||
# Log next run time
|
||||
next_run = self.scheduler.get_job('hourly_delivery_check').next_run_time
|
||||
logger.info(
|
||||
"Delivery tracking scheduler started with hourly checks",
|
||||
instance_id=self.instance_id,
|
||||
next_run=next_run.isoformat() if next_run else None
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"Delivery tracking scheduler already running",
|
||||
instance_id=self.instance_id
|
||||
)
|
||||
|
||||
async def stop(self):
|
||||
"""Stop the scheduler and release leader lock"""
|
||||
if self.scheduler.running:
|
||||
self.scheduler.shutdown(wait=False)
|
||||
self.scheduler.shutdown(wait=True) # Graceful shutdown
|
||||
logger.info("Delivery tracking scheduler stopped", instance_id=self.instance_id)
|
||||
else:
|
||||
logger.info("Delivery tracking scheduler already stopped", instance_id=self.instance_id)
|
||||
|
||||
async def _check_all_tenants(self):
|
||||
"""
|
||||
|
||||
@@ -176,7 +176,7 @@ async def get_batch_details(
|
||||
from app.repositories.production_batch_repository import ProductionBatchRepository
|
||||
batch_repo = ProductionBatchRepository(db)
|
||||
|
||||
batch = await batch_repo.get(batch_id)
|
||||
batch = await batch_repo.get_by_id(batch_id)
|
||||
if not batch or str(batch.tenant_id) != str(tenant_id):
|
||||
raise HTTPException(status_code=404, detail="Production batch not found")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user