diff --git a/frontend/src/api/hooks/useDashboardData.ts b/frontend/src/api/hooks/useDashboardData.ts index c56dfe44..82f8cabf 100644 --- a/frontend/src/api/hooks/useDashboardData.ts +++ b/frontend/src/api/hooks/useDashboardData.ts @@ -122,6 +122,7 @@ export function useDashboardData(tenantId: string) { // Get reasoning data from multiple possible locations const reasoningData = alert.event_metadata?.reasoning_data || + alert.metadata?.reasoning_data || alert.ai_reasoning_details || alert.reasoning_data || alert.ai_reasoning || @@ -150,8 +151,9 @@ export function useDashboardData(tenantId: string) { return { ...po, supplier_name: supplierName, // Enrich with actual supplier name - reasoning_data: reasoningInfo?.reasoning_data, - ai_reasoning_summary: reasoningInfo?.ai_reasoning_summary, + // Prioritize reasoning_data from PO itself, then fall back to alert + reasoning_data: po.reasoning_data || reasoningInfo?.reasoning_data, + ai_reasoning_summary: po.ai_reasoning_summary || reasoningInfo?.ai_reasoning_summary, }; }); diff --git a/frontend/src/api/services/purchase_orders.ts b/frontend/src/api/services/purchase_orders.ts index 6645c857..3faf2bf7 100644 --- a/frontend/src/api/services/purchase_orders.ts +++ b/frontend/src/api/services/purchase_orders.ts @@ -60,6 +60,8 @@ export interface PurchaseOrderSummary { total_amount: string; // Decimal as string currency: string; created_at: string; + reasoning_data?: any; // AI reasoning data for dashboard display + ai_reasoning_summary?: string; // Human-readable summary } export interface PurchaseOrderDetail extends PurchaseOrderSummary { diff --git a/frontend/src/components/dashboard/blocks/PendingPurchasesBlock.tsx b/frontend/src/components/dashboard/blocks/PendingPurchasesBlock.tsx index cef5bfdb..e792565b 100644 --- a/frontend/src/components/dashboard/blocks/PendingPurchasesBlock.tsx +++ b/frontend/src/components/dashboard/blocks/PendingPurchasesBlock.tsx @@ -138,6 +138,58 @@ export function PendingPurchasesBlock({ }); } + // Handle production requirement reasoning + if (reasoningData.type === 'production_requirement') { + const params = reasoningData.parameters || {}; + const productNames = params.product_names || []; + const productsStr = productNames.length > 0 ? productNames.join(', ') : 'products'; + + return t('dashboard:new_dashboard.pending_purchases.reasoning.production_requirement', { + products: productsStr, + batches: params.production_batches || params.batches_required || 0, + days: params.days_until_required || 0, + }); + } + + // Handle safety stock replenishment reasoning + if (reasoningData.type === 'safety_stock_replenishment') { + const params = reasoningData.parameters || {}; + return t('dashboard:new_dashboard.pending_purchases.reasoning.safety_stock', { + count: params.product_count || 0, + current: params.current_safety_stock || 0, + target: params.target_safety_stock || 0, + }); + } + + // Handle supplier contract reasoning + if (reasoningData.type === 'supplier_contract') { + const params = reasoningData.parameters || {}; + return t('dashboard:new_dashboard.pending_purchases.reasoning.supplier_contract', { + supplier: params.supplier_name || 'supplier', + products: params.product_categories?.join(', ') || 'products', + }); + } + + // Handle seasonal demand reasoning + if (reasoningData.type === 'seasonal_demand') { + const params = reasoningData.parameters || {}; + return t('dashboard:new_dashboard.pending_purchases.reasoning.seasonal_demand', { + season: params.season || 'season', + increase: params.expected_demand_increase_pct || 0, + products: params.product_names?.join(', ') || 'products', + }); + } + + // Handle forecast demand reasoning + if (reasoningData.type === 'forecast_demand') { + const params = reasoningData.parameters || {}; + return t('dashboard:new_dashboard.pending_purchases.reasoning.forecast_demand', { + product: params.product_name || 'product', + confidence: params.confidence_score ? Math.round(params.confidence_score * 100) : 0, + period: params.forecast_period_days || 0, + }); + } + if (reasoningData.summary) return reasoningData.summary; // Fallback to ai_reasoning_summary if structured data doesn't have a matching type diff --git a/frontend/src/locales/en/dashboard.json b/frontend/src/locales/en/dashboard.json index c4faa7ba..440d9dc3 100644 --- a/frontend/src/locales/en/dashboard.json +++ b/frontend/src/locales/en/dashboard.json @@ -458,7 +458,12 @@ "reasoning": { "low_stock": "{ingredient} will run out in {days, plural, =0 {less than a day} one {# day} other {# days}}", "low_stock_detailed": "{count, plural, one {# critical ingredient} other {# critical ingredients}} at risk: {products}. Earliest depletion in {days, plural, =0 {<1 day} one {1 day} other {# days}}, affecting {batches, plural, one {# batch} other {# batches}}. Potential loss: €{loss}", - "demand_forecast": "Demand for {product} is expected to increase by {increase}%" + "demand_forecast": "Demand for {product} is expected to increase by {increase}%", + "production_requirement": "{products} needed for {batches, plural, one {# batch} other {# batches}} of production in {days, plural, =0 {less than a day} one {# day} other {# days}}", + "safety_stock": "Safety stock replenishment: {count, plural, one {# product} other {# products}} (current: {current}, target: {target})", + "supplier_contract": "Contract with {supplier} for {products}", + "seasonal_demand": "Seasonal increase of {increase}% in {products} for {season}", + "forecast_demand": "Forecasted demand for {product} with {confidence}% confidence for next {period, plural, one {# day} other {# days}}" } }, "pending_deliveries": { diff --git a/frontend/src/locales/es/dashboard.json b/frontend/src/locales/es/dashboard.json index e98036ff..1c891c03 100644 --- a/frontend/src/locales/es/dashboard.json +++ b/frontend/src/locales/es/dashboard.json @@ -507,7 +507,12 @@ "reasoning": { "low_stock": "{ingredient} se agotará en {days, plural, =0 {menos de un día} one {# día} other {# días}}", "low_stock_detailed": "{count, plural, one {# ingrediente crítico} other {# ingredientes críticos}} en riesgo: {products}. Agotamiento más temprano en {days, plural, =0 {<1 día} one {1 día} other {# días}}, afectando {batches, plural, one {# lote} other {# lotes}}. Pérdida potencial: €{loss}", - "demand_forecast": "Se espera que la demanda de {product} aumente un {increase}%" + "demand_forecast": "Se espera que la demanda de {product} aumente un {increase}%", + "production_requirement": "Se necesitan {products} para {batches, plural, one {# lote} other {# lotes}} de producción en {days, plural, =0 {menos de un día} one {# día} other {# días}}", + "safety_stock": "Reabastecimiento de stock de seguridad: {count, plural, one {# producto} other {# productos}} (actual: {current}, objetivo: {target})", + "supplier_contract": "Contrato con {supplier} para {products}", + "seasonal_demand": "Aumento estacional del {increase}% en {products} para {season}", + "forecast_demand": "Demanda prevista de {product} con {confidence}% de confianza para los próximos {period, plural, one {# día} other {# días}}" } }, "pending_deliveries": { diff --git a/gateway/app/main.py b/gateway/app/main.py index 2cbaa022..decca6d8 100644 --- a/gateway/app/main.py +++ b/gateway/app/main.py @@ -6,6 +6,8 @@ Handles routing, authentication, rate limiting, and cross-cutting concerns import asyncio import json import structlog +import resource +import os from fastapi import FastAPI, Request, HTTPException, Depends, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse @@ -30,6 +32,38 @@ from shared.monitoring.metrics import MetricsCollector setup_logging("gateway", settings.LOG_LEVEL) logger = structlog.get_logger() +# Check file descriptor limits and warn if too low +try: + soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) + if soft_limit < 1024: + logger.warning(f"Low file descriptor limit detected: {soft_limit}. Gateway may experience 'too many open files' errors.") + logger.warning(f"Recommended: Increase limit with 'ulimit -n 4096' or higher for production.") + if soft_limit < 256: + logger.error(f"Critical: File descriptor limit ({soft_limit}) is too low for gateway operation!") + else: + logger.info(f"File descriptor limit: {soft_limit} (sufficient)") +except Exception as e: + logger.debug(f"Could not check file descriptor limits: {e}") + +# Check and log current working directory and permissions +try: + cwd = os.getcwd() + logger.info(f"Current working directory: {cwd}") + + # Check if we can write to common log locations + test_locations = ["/var/log", "./logs", "."] + for location in test_locations: + try: + test_file = os.path.join(location, ".gateway_permission_test") + with open(test_file, 'w') as f: + f.write("test") + os.remove(test_file) + logger.info(f"Write permission confirmed for: {location}") + except Exception as e: + logger.warning(f"Cannot write to {location}: {e}") +except Exception as e: + logger.debug(f"Could not check directory permissions: {e}") + # Create FastAPI app app = FastAPI( title="Bakery Forecasting API Gateway", @@ -390,7 +424,18 @@ async def events_stream( """Generate server-sent events from Redis pub/sub with multi-channel support""" pubsub = None try: + # Create pubsub connection with resource monitoring pubsub = redis_client.pubsub() + logger.debug(f"Created Redis pubsub connection for tenant: {tenant_id}") + + # Monitor connection count + try: + connection_info = await redis_client.info('clients') + connected_clients = connection_info.get('connected_clients', 'unknown') + logger.debug(f"Redis connected clients: {connected_clients}") + except Exception: + # Don't fail if we can't get connection info + pass # Determine which channels to subscribe to subscription_channels = _get_subscription_channels(tenant_id, channel_filters) @@ -460,10 +505,24 @@ async def events_stream( except Exception as e: logger.error(f"SSE error for tenant {tenant_id}: {e}", exc_info=True) finally: - if pubsub: - await pubsub.unsubscribe() - await pubsub.close() - logger.info(f"SSE connection closed for tenant: {tenant_id}") + try: + if pubsub: + try: + # Unsubscribe from all channels + await pubsub.unsubscribe() + logger.debug(f"Unsubscribed from Redis channels for tenant: {tenant_id}") + except Exception as unsubscribe_error: + logger.error(f"Failed to unsubscribe Redis pubsub for tenant {tenant_id}: {unsubscribe_error}") + + try: + # Close pubsub connection + await pubsub.close() + logger.debug(f"Closed Redis pubsub connection for tenant: {tenant_id}") + except Exception as close_error: + logger.error(f"Failed to close Redis pubsub for tenant {tenant_id}: {close_error}") + logger.info(f"SSE connection closed for tenant: {tenant_id}") + except Exception as finally_error: + logger.error(f"Error in SSE cleanup for tenant {tenant_id}: {finally_error}") return StreamingResponse( event_generator(), diff --git a/gateway/app/routes/tenant.py b/gateway/app/routes/tenant.py index 603ee419..a98f6230 100644 --- a/gateway/app/routes/tenant.py +++ b/gateway/app/routes/tenant.py @@ -385,6 +385,12 @@ async def proxy_tenant_ingredients_base(request: Request, tenant_id: str = Path( target_path = f"/api/v1/tenants/{tenant_id}/ingredients" return await _proxy_to_inventory_service(request, target_path, tenant_id=tenant_id) +@router.api_route("/{tenant_id}/ingredients/count", methods=["GET"]) +async def proxy_tenant_ingredients_count(request: Request, tenant_id: str = Path(...)): + """Proxy tenant ingredient count requests to inventory service""" + target_path = f"/api/v1/tenants/{tenant_id}/ingredients/count" + return await _proxy_to_inventory_service(request, target_path, tenant_id=tenant_id) + @router.api_route("/{tenant_id}/ingredients/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"]) async def proxy_tenant_ingredients_with_path(request: Request, tenant_id: str = Path(...), path: str = ""): """Proxy tenant ingredient requests to inventory service (with additional path)""" @@ -493,6 +499,12 @@ async def proxy_tenant_suppliers_base(request: Request, tenant_id: str = Path(.. target_path = f"/api/v1/tenants/{tenant_id}/suppliers" return await _proxy_to_suppliers_service(request, target_path, tenant_id=tenant_id) +@router.api_route("/{tenant_id}/suppliers/count", methods=["GET"]) +async def proxy_tenant_suppliers_count(request: Request, tenant_id: str = Path(...)): + """Proxy tenant supplier count requests to suppliers service""" + target_path = f"/api/v1/tenants/{tenant_id}/suppliers/count" + return await _proxy_to_suppliers_service(request, target_path, tenant_id=tenant_id) + @router.api_route("/{tenant_id}/suppliers/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"]) async def proxy_tenant_suppliers_with_path(request: Request, tenant_id: str = Path(...), path: str = ""): """Proxy tenant supplier requests to suppliers service (with additional path)""" @@ -545,6 +557,12 @@ async def proxy_tenant_recipes_base(request: Request, tenant_id: str = Path(...) target_path = f"/api/v1/tenants/{tenant_id}/recipes" return await _proxy_to_recipes_service(request, target_path, tenant_id=tenant_id) +@router.api_route("/{tenant_id}/recipes/count", methods=["GET"]) +async def proxy_tenant_recipes_count(request: Request, tenant_id: str = Path(...)): + """Proxy tenant recipes count requests to recipes service""" + target_path = f"/api/v1/tenants/{tenant_id}/recipes/count" + return await _proxy_to_recipes_service(request, target_path, tenant_id=tenant_id) + @router.api_route("/{tenant_id}/recipes/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"]) async def proxy_tenant_recipes_with_path(request: Request, tenant_id: str = Path(...), path: str = ""): """Proxy tenant recipes requests to recipes service (with additional path)""" diff --git a/services/forecasting/app/api/internal_demo.py b/services/forecasting/app/api/internal_demo.py index 02ea74ce..535b215f 100644 --- a/services/forecasting/app/api/internal_demo.py +++ b/services/forecasting/app/api/internal_demo.py @@ -251,26 +251,44 @@ async def clone_demo_data( # Set default location if not provided in seed data location = forecast_data.get('location') or "Main Bakery" - + + # Get or calculate forecast date + forecast_date = forecast_data.get('forecast_date') + if not forecast_date: + forecast_date = session_time + + # Calculate day_of_week from forecast_date if not provided + # day_of_week should be 0-6 (Monday=0, Sunday=6) + day_of_week = forecast_data.get('day_of_week') + if day_of_week is None and forecast_date: + day_of_week = forecast_date.weekday() + + # Calculate is_weekend from day_of_week if not provided + is_weekend = forecast_data.get('is_weekend') + if is_weekend is None and day_of_week is not None: + is_weekend = day_of_week >= 5 # Saturday=5, Sunday=6 + else: + is_weekend = False + new_forecast = Forecast( id=transformed_id, tenant_id=virtual_uuid, inventory_product_id=inventory_product_id, product_name=forecast_data.get('product_name'), location=location, - forecast_date=forecast_data.get('forecast_date'), + forecast_date=forecast_date, created_at=forecast_data.get('created_at', session_time), predicted_demand=predicted_demand, confidence_lower=forecast_data.get('confidence_lower', max(0.0, float(predicted_demand or 0.0) * 0.8)), confidence_upper=forecast_data.get('confidence_upper', max(0.0, float(predicted_demand or 0.0) * 1.2)), confidence_level=forecast_data.get('confidence_level', 0.8), - model_id=forecast_data.get('model_id'), - model_version=forecast_data.get('model_version'), + model_id=forecast_data.get('model_id') or 'default-fallback-model', + model_version=forecast_data.get('model_version') or '1.0', algorithm=forecast_data.get('algorithm', 'prophet'), business_type=forecast_data.get('business_type', 'individual'), - day_of_week=forecast_data.get('day_of_week'), + day_of_week=day_of_week, is_holiday=forecast_data.get('is_holiday', False), - is_weekend=forecast_data.get('is_weekend', False), + is_weekend=is_weekend, weather_temperature=forecast_data.get('weather_temperature'), weather_precipitation=forecast_data.get('weather_precipitation'), weather_description=forecast_data.get('weather_description'), diff --git a/services/forecasting/app/services/forecasting_service.py b/services/forecasting/app/services/forecasting_service.py index 93b42b12..0de68d47 100644 --- a/services/forecasting/app/services/forecasting_service.py +++ b/services/forecasting/app/services/forecasting_service.py @@ -385,7 +385,7 @@ class EnhancedForecastingService: "confidence_lower": adjusted_prediction.get('lower_bound', max(0.0, float(adjusted_prediction.get('prediction') or 0.0) * 0.8)), "confidence_upper": adjusted_prediction.get('upper_bound', max(0.0, float(adjusted_prediction.get('prediction') or 0.0) * 1.2)), "confidence_level": request.confidence_level, - "model_id": model_data['model_id'], + "model_id": model_data.get('model_id') or 'default-fallback-model', "model_version": str(model_data.get('version', '1.0')), "algorithm": model_data.get('algorithm', 'prophet'), "business_type": features.get('business_type', 'individual'), diff --git a/services/inventory/app/api/ml_insights.py b/services/inventory/app/api/ml_insights.py index 5586a5e4..0c44064d 100644 --- a/services/inventory/app/api/ml_insights.py +++ b/services/inventory/app/api/ml_insights.py @@ -157,21 +157,18 @@ async def trigger_safety_stock_optimization( try: # Fetch sales data for this product - sales_response = await sales_client.get_sales_data( + sales_data = await sales_client.get_sales_data( tenant_id=tenant_id, product_id=product_id, start_date=start_date.strftime('%Y-%m-%d'), end_date=end_date.strftime('%Y-%m-%d') ) - if not sales_response or not sales_response.get('sales'): + if not sales_data: logger.warning( f"No sales history for product {product_id}, skipping" ) continue - - # Convert sales data to daily demand - sales_data = sales_response.get('sales', []) demand_data = [] for sale in sales_data: diff --git a/services/inventory/app/services/inventory_scheduler.py b/services/inventory/app/services/inventory_scheduler.py index 433fda84..af08fde6 100644 --- a/services/inventory/app/services/inventory_scheduler.py +++ b/services/inventory/app/services/inventory_scheduler.py @@ -179,8 +179,21 @@ class InventoryScheduler: for shortage in stock_shortages: try: - ingredient_id = UUID(shortage["ingredient_id"]) - tenant_id = UUID(shortage["tenant_id"]) + # Handle asyncpg UUID objects properly + ingredient_id_val = shortage["ingredient_id"] + tenant_id_val = shortage["tenant_id"] + + # Convert asyncpg UUID to string first, then to UUID + if hasattr(ingredient_id_val, 'hex'): + ingredient_id = UUID(hex=ingredient_id_val.hex) + else: + ingredient_id = UUID(str(ingredient_id_val)) + + if hasattr(tenant_id_val, 'hex'): + tenant_id = UUID(hex=tenant_id_val.hex) + else: + tenant_id = UUID(str(tenant_id_val)) + current_quantity = float(shortage["current_quantity"]) required_quantity = float(shortage["required_quantity"]) shortage_amount = float(shortage["shortage_amount"]) @@ -515,7 +528,12 @@ class InventoryScheduler: for shortage in critical_shortages: try: - ingredient_id = UUID(str(shortage["id"])) # Use 'id' instead of 'ingredient_id' + # Handle asyncpg UUID objects properly + ingredient_id_val = shortage["id"] + if hasattr(ingredient_id_val, 'hex'): + ingredient_id = UUID(hex=ingredient_id_val.hex) + else: + ingredient_id = UUID(str(ingredient_id_val)) # Extract values with defaults current_quantity = float(shortage.get("current_stock", 0)) @@ -732,8 +750,19 @@ class InventoryScheduler: for shortage in critical_shortages: try: - ingredient_id = UUID(str(shortage["id"])) - tenant_id = UUID(shortage["tenant_id"]) + # Handle asyncpg UUID objects properly + ingredient_id_val = shortage["id"] + tenant_id_val = shortage["tenant_id"] + + if hasattr(ingredient_id_val, 'hex'): + ingredient_id = UUID(hex=ingredient_id_val.hex) + else: + ingredient_id = UUID(str(ingredient_id_val)) + + if hasattr(tenant_id_val, 'hex'): + tenant_id = UUID(hex=tenant_id_val.hex) + else: + tenant_id = UUID(str(tenant_id_val)) # Extract values with defaults current_quantity = float(shortage.get("current_stock", 0)) diff --git a/services/procurement/app/api/internal_demo.py b/services/procurement/app/api/internal_demo.py index 76bcd54e..e6f97fe7 100644 --- a/services/procurement/app/api/internal_demo.py +++ b/services/procurement/app/api/internal_demo.py @@ -9,7 +9,7 @@ from sqlalchemy import select, delete, func import structlog import uuid from datetime import datetime, timezone, timedelta, date -from typing import Optional +from typing import Optional, Dict, Any import os import json from pathlib import Path @@ -26,6 +26,7 @@ from shared.schemas.reasoning_types import ( create_po_reasoning_supplier_contract ) from app.core.config import settings +from shared.clients.suppliers_client import SuppliersServiceClient logger = structlog.get_logger() router = APIRouter(prefix="/internal/demo", tags=["internal"]) @@ -42,6 +43,155 @@ def verify_internal_api_key(x_internal_api_key: Optional[str] = Header(None)): return True +async def _emit_po_approval_alerts_for_demo( + virtual_tenant_id: uuid.UUID, + pending_pos: list[PurchaseOrder] +) -> int: + """ + Emit alerts for pending approval POs during demo cloning. + Creates clients internally to avoid dependency injection issues. + Returns the number of alerts successfully emitted. + """ + if not pending_pos: + return 0 + + alerts_emitted = 0 + + try: + # Initialize clients locally for this operation + from shared.clients.suppliers_client import SuppliersServiceClient + from shared.messaging import RabbitMQClient + + # Use the existing settings instead of creating a new config + # This avoids issues with property-based configuration + suppliers_client = SuppliersServiceClient(settings, "procurement-service") + rabbitmq_client = RabbitMQClient(settings.RABBITMQ_URL, "procurement-service") + + # Connect to RabbitMQ + await rabbitmq_client.connect() + + logger.info( + "Emitting PO approval alerts for demo", + pending_po_count=len(pending_pos), + virtual_tenant_id=str(virtual_tenant_id) + ) + + # Emit alerts for each pending PO + for po in pending_pos: + try: + # Get supplier details + supplier_details = await suppliers_client.get_supplier_by_id( + tenant_id=str(virtual_tenant_id), + supplier_id=str(po.supplier_id) + ) + + # Skip if supplier not found + if not supplier_details: + logger.warning( + "Supplier not found for PO, skipping alert", + po_id=str(po.id), + supplier_id=str(po.supplier_id) + ) + continue + + # Calculate urgency fields + now = datetime.utcnow() + hours_until_consequence = None + deadline = None + + if po.required_delivery_date: + supplier_lead_time_days = supplier_details.get('standard_lead_time', 7) + approval_deadline = po.required_delivery_date - timedelta(days=supplier_lead_time_days) + deadline = approval_deadline + hours_until_consequence = (approval_deadline - now).total_seconds() / 3600 + + # Prepare alert payload + alert_data = { + 'id': str(uuid.uuid4()), + 'tenant_id': str(virtual_tenant_id), + 'service': 'procurement', + 'type': 'po_approval_needed', + 'alert_type': 'po_approval_needed', + 'type_class': 'action_needed', + 'severity': 'high' if po.priority == 'critical' else 'medium', + 'title': '', + 'message': '', + 'timestamp': datetime.utcnow().isoformat(), + 'metadata': { + 'po_id': str(po.id), + 'po_number': po.po_number, + 'supplier_id': str(po.supplier_id), + 'supplier_name': supplier_details.get('name', ''), + 'total_amount': float(po.total_amount), + 'currency': po.currency, + 'priority': po.priority, + 'required_delivery_date': po.required_delivery_date.isoformat() if po.required_delivery_date else None, + 'created_at': po.created_at.isoformat(), + 'financial_impact': float(po.total_amount), + 'urgency_score': 85, + 'deadline': deadline.isoformat() if deadline else None, + 'hours_until_consequence': round(hours_until_consequence, 1) if hours_until_consequence else None, + 'reasoning_data': po.reasoning_data or {} + }, + 'message_params': { + 'po_number': po.po_number, + 'supplier_name': supplier_details.get('name', ''), + 'total_amount': float(po.total_amount), + 'currency': po.currency, + 'priority': po.priority, + 'required_delivery_date': po.required_delivery_date.isoformat() if po.required_delivery_date else None, + 'items_count': 0, + 'created_at': po.created_at.isoformat() + }, + 'actions': ['approve_po', 'reject_po', 'modify_po'], + 'item_type': 'alert' + } + + # Publish to RabbitMQ + await rabbitmq_client.publish_event( + exchange_name='alerts.exchange', + routing_key=f'alert.{alert_data["severity"]}.procurement', + event_data=alert_data + ) + + alerts_emitted += 1 + logger.debug( + "PO approval alert emitted", + po_id=str(po.id), + po_number=po.po_number + ) + + except Exception as po_error: + logger.warning( + "Failed to emit alert for PO", + po_id=str(po.id), + po_number=po.po_number, + error=str(po_error) + ) + # Continue with other POs + + # Close RabbitMQ connection + await rabbitmq_client.close() + + logger.info( + "PO approval alerts emission completed", + alerts_emitted=alerts_emitted, + total_pending=len(pending_pos) + ) + + return alerts_emitted + + except Exception as e: + logger.error( + "Failed to emit PO approval alerts", + error=str(e), + virtual_tenant_id=str(virtual_tenant_id), + exc_info=True + ) + # Don't fail the cloning process + return alerts_emitted + + @router.post("/clone") async def clone_demo_data( base_tenant_id: str, @@ -420,6 +570,39 @@ async def clone_demo_data( # Commit all loaded data await db.commit() + # Emit alerts for pending approval POs (CRITICAL for demo dashboard) + alerts_emitted = 0 + try: + # Get all pending approval POs that were just created + pending_approval_pos = await db.execute( + select(PurchaseOrder).where( + PurchaseOrder.tenant_id == virtual_uuid, + PurchaseOrder.status == 'pending_approval' + ) + ) + pending_pos = pending_approval_pos.scalars().all() + + logger.info( + "Found pending approval POs for alert emission", + count=len(pending_pos), + virtual_tenant_id=virtual_tenant_id + ) + + # Emit alerts using refactored function + if pending_pos: + alerts_emitted = await _emit_po_approval_alerts_for_demo( + virtual_tenant_id=virtual_uuid, + pending_pos=pending_pos + ) + + except Exception as e: + logger.error( + "Failed to emit PO approval alerts during demo cloning", + error=str(e), + virtual_tenant_id=virtual_tenant_id + ) + # Don't fail the entire cloning process if alert emission fails + # Calculate total records total_records = (stats["procurement_plans"] + stats["procurement_requirements"] + stats["purchase_orders"] + stats["purchase_order_items"] + @@ -439,7 +622,8 @@ async def clone_demo_data( "status": "completed", "records_cloned": total_records, "duration_ms": duration_ms, - "details": stats + "details": stats, + "alerts_emitted": alerts_emitted } except ValueError as e: diff --git a/services/recipes/app/api/internal_demo.py b/services/recipes/app/api/internal_demo.py index 78b33824..38eefff5 100644 --- a/services/recipes/app/api/internal_demo.py +++ b/services/recipes/app/api/internal_demo.py @@ -188,6 +188,9 @@ async def clone_demo_data( "recipe_ingredients": 0 } + # First, build recipe ID map by processing all recipes + recipe_id_map = {} + # Create Recipes for recipe_data in seed_data.get('recipes', []): # Transform recipe ID using XOR @@ -263,8 +266,8 @@ async def clone_demo_data( db.add(new_recipe) stats["recipes"] += 1 - # Map recipe ID for ingredients - recipe_id_map = {recipe_data['id']: str(transformed_id)} + # Add recipe ID to map for ingredients + recipe_id_map[recipe_data['id']] = str(transformed_id) # Create Recipe Ingredients for recipe_ingredient_data in seed_data.get('recipe_ingredients', []): diff --git a/shared/clients/sales_client.py b/shared/clients/sales_client.py index c92c4a7b..f262b36a 100755 --- a/shared/clients/sales_client.py +++ b/shared/clients/sales_client.py @@ -45,7 +45,7 @@ class SalesServiceClient(BaseServiceClient): if product_id: params["product_id"] = product_id - result = await self.get("sales/sales", tenant_id=tenant_id, params=params) + result = await self.get("sales", tenant_id=tenant_id, params=params) # Handle both list and dict responses if result is None: diff --git a/shared/demo/fixtures/professional/07-procurement.json b/shared/demo/fixtures/professional/07-procurement.json index 44aa2993..a54c3d05 100644 --- a/shared/demo/fixtures/professional/07-procurement.json +++ b/shared/demo/fixtures/professional/07-procurement.json @@ -6,7 +6,7 @@ "po_number": "PO-LATE-0001", "supplier_id": "40000000-0000-0000-0000-000000000001", "order_date": "BASE_TS - 1d", - "status": "pending_approval", + "status": "confirmed", "priority": "high", "required_delivery_date": "BASE_TS - 4h", "estimated_delivery_date": "BASE_TS - 4h", @@ -60,7 +60,7 @@ "po_number": "PO-UPCOMING-0001", "supplier_id": "40000000-0000-0000-0000-000000000002", "order_date": "BASE_TS - 1h", - "status": "pending_approval", + "status": "confirmed", "priority": "medium", "required_delivery_date": "BASE_TS + 2h30m", "estimated_delivery_date": "BASE_TS + 2h30m", @@ -208,7 +208,7 @@ "tenant_id": "a1b2c3d4-e5f6-47a8-b9c0-d1e2f3a4b5c6", "po_number": "PO-2025-003", "supplier_id": "40000000-0000-0000-0000-000000000003", - "status": "approved", + "status": "pending_approval", "priority": "high", "subtotal": 490.0, "tax_amount": 102.9, @@ -221,18 +221,17 @@ "delivery_contact": "Pedro Calidad", "delivery_phone": "+34 910 123 456", "requires_approval": true, - "auto_approved": true, - "auto_approval_rule_id": "10000000-0000-0000-0000-000000000001", - "approved_by": "50000000-0000-0000-0000-000000000006", - "notes": "Pedido urgente para nueva línea de productos ecológicos - Auto-aprobado por IA", + "notes": "Pedido urgente para nueva línea de productos ecológicos - Requiere aprobación del gerente", "reasoning_data": { "type": "supplier_contract", "parameters": { "supplier_name": "Productos Ecológicos del Norte", - "product_names": ["Organic ingredients"], + "product_names": ["Harina de Espelta Ecológica"], "product_count": 1, "contract_terms": "certified_supplier", - "contract_quantity": 450.0 + "contract_quantity": 200.0, + "current_stock": 186.36, + "reorder_point": 50.0 }, "consequence": { "type": "quality_assurance", @@ -241,17 +240,14 @@ }, "metadata": { "trigger_source": "manual", - "ai_assisted": true, - "auto_approved": true, - "auto_approval_rule_id": "10000000-0000-0000-0000-000000000001" + "ai_assisted": true } }, "created_by": "50000000-0000-0000-0000-000000000005", "order_date": "BASE_TS - 3d", "required_delivery_date": "BASE_TS + 1d", "estimated_delivery_date": "BASE_TS + 2d", - "expected_delivery_date": "BASE_TS + 2d", - "approved_at": "BASE_TS - 2d" + "expected_delivery_date": "BASE_TS + 2d" }, { "id": "50000000-0000-0000-0000-000000000004", @@ -449,6 +445,55 @@ "estimated_delivery_date": "BASE_TS + 0.25d", "expected_delivery_date": "BASE_TS + 0.25d", "sent_to_supplier_at": "BASE_TS - 0.5d" + }, + { + "id": "50000000-0000-0000-0000-000000000008", + "tenant_id": "a1b2c3d4-e5f6-47a8-b9c0-d1e2f3a4b5c6", + "po_number": "PO-2025-008", + "supplier_id": "40000000-0000-0000-0000-000000000004", + "order_date": "BASE_TS - 0.25d", + "status": "pending_approval", + "priority": "medium", + "required_delivery_date": "BASE_TS + 2d", + "estimated_delivery_date": "BASE_TS + 2d", + "expected_delivery_date": "BASE_TS + 2d", + "subtotal": 180.0, + "tax_amount": 37.8, + "shipping_cost": 12.0, + "discount_amount": 0.0, + "total_amount": 229.8, + "currency": "EUR", + "delivery_address": "Calle Panadería, 45, 28001 Madrid", + "delivery_instructions": "Entrega en almacén seco - Zona A", + "delivery_contact": "Carlos Almacén", + "delivery_phone": "+34 910 123 456", + "requires_approval": true, + "notes": "Reposición de ingredientes básicos - Stock bajo en azúcar", + "reasoning_data": { + "type": "low_stock_detection", + "parameters": { + "supplier_name": "Distribuciones Alimentarias del Sur", + "product_names": ["Azúcar Blanco Refinado"], + "product_count": 1, + "current_stock": 24.98, + "required_stock": 120.0, + "days_until_stockout": 3, + "threshold_percentage": 66, + "stock_percentage": 20 + }, + "consequence": { + "type": "stockout_risk", + "severity": "medium", + "impact_days": 3, + "affected_products": ["Croissants", "Napolitanas", "Pan Dulce"], + "estimated_lost_orders": 15 + }, + "metadata": { + "trigger_source": "orchestrator_auto", + "ai_assisted": true + } + }, + "created_by": "50000000-0000-0000-0000-000000000005" } ], "purchase_order_items": [ @@ -693,6 +738,21 @@ "line_total": 71.2, "received_quantity": 0.0, "remaining_quantity": 8.0 + }, + { + "id": "51000000-0000-0000-0000-0000000000a7", + "tenant_id": "a1b2c3d4-e5f6-47a8-b9c0-d1e2f3a4b5c6", + "purchase_order_id": "50000000-0000-0000-0000-000000000008", + "inventory_product_id": "10000000-0000-0000-0000-000000000032", + "product_name": "Azúcar Blanco Refinado", + "product_code": "BAS-AZU-002", + "ordered_quantity": 200.0, + "unit_of_measure": "kilograms", + "unit_price": 0.9, + "line_total": 180.0, + "received_quantity": 0.0, + "remaining_quantity": 200.0, + "notes": "Reposición stock bajo - Nivel crítico detectado" } ] } \ No newline at end of file diff --git a/shared/monitoring/logging.py b/shared/monitoring/logging.py index f8850e0a..c6db0c10 100755 --- a/shared/monitoring/logging.py +++ b/shared/monitoring/logging.py @@ -9,6 +9,7 @@ import logging import logging.config import os import sys +import resource from typing import Dict, Any def setup_logging(service_name: str, log_level: str = "INFO", @@ -23,16 +24,37 @@ def setup_logging(service_name: str, log_level: str = "INFO", enable_file: Whether to enable file logging """ + # Check file descriptor limits + try: + soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) + if soft_limit < 1024: + print(f"Warning: Low file descriptor limit ({soft_limit}). Consider increasing with 'ulimit -n'") + if soft_limit < 256: + print("Critical: File descriptor limit is very low. File logging may fail.") + enable_file = False + except Exception: + # resource module might not be available on all platforms + pass + # Create logs directory if it doesn't exist and file logging is enabled log_dir = "/var/log" if enable_file: try: - os.makedirs(log_dir, exist_ok=True) - except PermissionError: + # First try to create/write to /var/log + test_file = os.path.join(log_dir, f".{service_name}_test") + with open(test_file, 'w') as f: + f.write("test") + os.remove(test_file) + except (PermissionError, OSError): # Fallback to local directory if can't write to /var/log log_dir = "./logs" - os.makedirs(log_dir, exist_ok=True) print(f"Warning: Could not write to /var/log, using {log_dir}") + + try: + os.makedirs(log_dir, exist_ok=True) + except Exception as e: + print(f"Warning: Could not create log directory {log_dir}: {e}") + enable_file = False # Disable file logging if we can't create directory # Define formatters formatters = { @@ -70,14 +92,25 @@ def setup_logging(service_name: str, log_level: str = "INFO", # Add file handler if enabled if enable_file: - handlers["file"] = { - "class": "logging.FileHandler", - "level": log_level, - "formatter": "detailed", - "filename": f"{log_dir}/{service_name}.log", - "mode": "a", - "encoding": "utf-8" - } + try: + # Test if we can actually write to the log file location + test_filename = f"{log_dir}/{service_name}.log" + test_dir = os.path.dirname(test_filename) + if not os.access(test_dir, os.W_OK): + print(f"Warning: Cannot write to log directory {test_dir}, disabling file logging") + enable_file = False + else: + handlers["file"] = { + "class": "logging.FileHandler", + "level": log_level, + "formatter": "detailed", + "filename": test_filename, + "mode": "a", + "encoding": "utf-8" + } + except Exception as e: + print(f"Warning: Could not configure file handler: {e}") + enable_file = False # Add logstash handler if in production logstash_host = os.getenv("LOGSTASH_HOST") @@ -141,6 +174,10 @@ def setup_logging(service_name: str, log_level: str = "INFO", logging.config.dictConfig(config) logger = logging.getLogger(__name__) logger.info(f"Logging configured for {service_name} at level {log_level}") + if enable_file: + logger.info(f"File logging enabled at {log_dir}/{service_name}.log") + else: + logger.info("File logging disabled") except Exception as e: # Fallback to basic logging if configuration fails logging.basicConfig( @@ -151,4 +188,10 @@ def setup_logging(service_name: str, log_level: str = "INFO", logger = logging.getLogger(__name__) logger.error(f"Failed to configure advanced logging for {service_name}: {e}") logger.info(f"Using basic logging configuration for {service_name}") + + # Additional debugging for file handler issues + if "file" in str(e).lower() or "handler" in str(e).lower(): + logger.error(f"File handler configuration failed. Check permissions for {log_dir}") + logger.error(f"Current working directory: {os.getcwd()}") + logger.error(f"Attempting to write to: {log_dir}/{service_name}.log")