From 23e088dcb492f025a4fe47893f2b40ab123812fb Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Tue, 9 Sep 2025 17:40:57 +0200 Subject: [PATCH] Add frontend procurement implementation 2 --- .../app/services/forecasting_service.py | 1 + services/orders/app/main.py | 33 +++ .../services/procurement_scheduler_service.py | 248 ++++++++++++++++++ .../app/services/procurement_service.py | 90 ++----- services/orders/requirements.txt | 3 + shared/clients/forecast_client.py | 39 ++- 6 files changed, 339 insertions(+), 75 deletions(-) create mode 100644 services/orders/app/services/procurement_scheduler_service.py diff --git a/services/forecasting/app/services/forecasting_service.py b/services/forecasting/app/services/forecasting_service.py index ec519987..0475378f 100644 --- a/services/forecasting/app/services/forecasting_service.py +++ b/services/forecasting/app/services/forecasting_service.py @@ -292,6 +292,7 @@ class EnhancedForecastingService: forecast_data = { "tenant_id": tenant_id, "inventory_product_id": request.inventory_product_id, + "product_name": None, # Field is now nullable, use inventory_product_id as reference "location": request.location, "forecast_date": forecast_datetime, "predicted_demand": adjusted_prediction['prediction'], diff --git a/services/orders/app/main.py b/services/orders/app/main.py index 842271aa..99a9399a 100644 --- a/services/orders/app/main.py +++ b/services/orders/app/main.py @@ -15,6 +15,7 @@ from app.core.config import settings from app.core.database import init_database, get_db_health from app.api.orders import router as orders_router from app.api.procurement import router as procurement_router +from app.services.procurement_scheduler_service import ProcurementSchedulerService # Configure logging logger = structlog.get_logger() @@ -26,6 +27,16 @@ async def lifespan(app: FastAPI): # Startup try: await init_database() + logger.info("Database initialized successfully") + + # Initialize procurement scheduler service + scheduler_service = ProcurementSchedulerService(settings) + await scheduler_service.start() + logger.info("Procurement scheduler service started") + + # Store scheduler service in app state + app.state.scheduler_service = scheduler_service + logger.info("Orders service started successfully") except Exception as e: logger.error("Failed to initialize orders service", error=str(e)) @@ -35,6 +46,13 @@ async def lifespan(app: FastAPI): # Shutdown logger.info("Orders service shutting down") + try: + # Stop scheduler service + if hasattr(app.state, 'scheduler_service'): + await app.state.scheduler_service.stop() + logger.info("Scheduler service stopped") + except Exception as e: + logger.error("Error stopping scheduler service", error=str(e)) # Create FastAPI application @@ -98,6 +116,21 @@ async def root(): } +@app.post("/test/procurement-scheduler") +async def test_procurement_scheduler(): + """Test endpoint to manually trigger procurement scheduler""" + try: + if hasattr(app.state, 'scheduler_service'): + scheduler_service = app.state.scheduler_service + await scheduler_service.test_procurement_generation() + return {"message": "Procurement scheduler test triggered successfully"} + else: + return {"error": "Scheduler service not available"} + except Exception as e: + logger.error("Error testing procurement scheduler", error=str(e)) + return {"error": f"Failed to trigger scheduler test: {str(e)}"} + + @app.middleware("http") async def logging_middleware(request: Request, call_next): """Add request logging middleware""" diff --git a/services/orders/app/services/procurement_scheduler_service.py b/services/orders/app/services/procurement_scheduler_service.py new file mode 100644 index 00000000..4567b783 --- /dev/null +++ b/services/orders/app/services/procurement_scheduler_service.py @@ -0,0 +1,248 @@ +# services/orders/app/services/procurement_scheduler_service.py +""" +Procurement Scheduler Service - Daily procurement planning automation +""" + +import asyncio +from datetime import datetime, timedelta +from typing import List, Dict, Any +from uuid import UUID +import structlog +from apscheduler.triggers.cron import CronTrigger + +from shared.alerts.base_service import BaseAlertService, AlertServiceMixin +from app.services.procurement_service import ProcurementService + +logger = structlog.get_logger() + + +class ProcurementSchedulerService(BaseAlertService, AlertServiceMixin): + """ + Procurement scheduler service for automated daily procurement planning + Extends BaseAlertService to use proven scheduling infrastructure + """ + + def __init__(self, config): + super().__init__(config) + self.procurement_service = None + + async def start(self): + """Initialize scheduler and procurement service""" + # Initialize base alert service + await super().start() + + logger.info("Procurement scheduler service started", service=self.config.SERVICE_NAME) + + def setup_scheduled_checks(self): + """Configure daily procurement planning jobs""" + # Daily procurement planning at 6:00 AM + self.scheduler.add_job( + func=self.run_daily_procurement_planning, + trigger=CronTrigger(hour=6, minute=0), + id="daily_procurement_planning", + name="Daily Procurement Planning", + misfire_grace_time=300, + coalesce=True, + max_instances=1 + ) + + # Weekly procurement optimization at 7:00 AM on Mondays + self.scheduler.add_job( + func=self.run_weekly_optimization, + trigger=CronTrigger(day_of_week=0, hour=7, minute=0), + id="weekly_procurement_optimization", + name="Weekly Procurement Optimization", + misfire_grace_time=600, + coalesce=True, + max_instances=1 + ) + + logger.info("Procurement scheduled jobs configured") + + async def run_daily_procurement_planning(self): + """Execute daily procurement planning for all active tenants""" + if not self.is_leader: + logger.debug("Skipping procurement planning - not leader") + return + + try: + self._checks_performed += 1 + logger.info("Starting daily procurement planning") + + # Get active tenants + active_tenants = await self.get_active_tenants() + if not active_tenants: + logger.info("No active tenants found for procurement planning") + return + + # Process each tenant + processed_tenants = 0 + for tenant_id in active_tenants: + try: + await self.process_tenant_procurement(tenant_id) + processed_tenants += 1 + except Exception as e: + logger.error("Error processing tenant procurement", + tenant_id=str(tenant_id), + error=str(e)) + + logger.info("Daily procurement planning completed", + total_tenants=len(active_tenants), + processed_tenants=processed_tenants) + + except Exception as e: + self._errors_count += 1 + logger.error("Daily procurement planning failed", error=str(e)) + + async def get_active_tenants(self) -> List[UUID]: + """Override to return test tenant since tenants table is not in orders DB""" + # For testing, return the known test tenant + return [UUID('c464fb3e-7af2-46e6-9e43-85318f34199a')] + + async def process_tenant_procurement(self, tenant_id: UUID): + """Process procurement planning for a specific tenant""" + try: + # Use default configuration since tenants table is not in orders DB + planning_days = 7 # Default planning horizon + + # Calculate planning date (tomorrow by default) + planning_date = datetime.now().date() + timedelta(days=1) + + # Create procurement service instance and generate plan + from app.core.database import AsyncSessionLocal + from app.schemas.procurement_schemas import GeneratePlanRequest + from decimal import Decimal + + async with AsyncSessionLocal() as session: + procurement_service = ProcurementService(session, self.config) + + # Check if plan already exists for this date + existing_plan = await procurement_service.get_plan_by_date( + tenant_id, planning_date + ) + + if existing_plan: + logger.debug("Procurement plan already exists", + tenant_id=str(tenant_id), + plan_date=str(planning_date)) + return + + # Generate procurement plan + request = GeneratePlanRequest( + plan_date=planning_date, + planning_horizon_days=planning_days, + include_safety_stock=True, + safety_stock_percentage=Decimal('20.0') + ) + + result = await procurement_service.generate_procurement_plan(tenant_id, request) + plan = result.plan if result.success else None + + if plan: + # Send notification about new plan + await self.send_procurement_notification( + tenant_id, plan, "plan_created" + ) + + logger.info("Procurement plan created successfully", + tenant_id=str(tenant_id), + plan_id=str(plan.id), + plan_date=str(planning_date)) + + except Exception as e: + logger.error("Error processing tenant procurement", + tenant_id=str(tenant_id), + error=str(e)) + raise + + async def run_weekly_optimization(self): + """Run weekly procurement optimization""" + if not self.is_leader: + logger.debug("Skipping weekly optimization - not leader") + return + + try: + self._checks_performed += 1 + logger.info("Starting weekly procurement optimization") + + active_tenants = await self.get_active_tenants() + + for tenant_id in active_tenants: + try: + await self.optimize_tenant_procurement(tenant_id) + except Exception as e: + logger.error("Error in weekly optimization", + tenant_id=str(tenant_id), + error=str(e)) + + logger.info("Weekly procurement optimization completed") + + except Exception as e: + self._errors_count += 1 + logger.error("Weekly procurement optimization failed", error=str(e)) + + async def optimize_tenant_procurement(self, tenant_id: UUID): + """Optimize procurement planning for a tenant""" + # Get plans from the last week + end_date = datetime.now().date() + start_date = end_date - timedelta(days=7) + + # For now, just log the optimization - full implementation would analyze patterns + logger.info("Processing weekly optimization", + tenant_id=str(tenant_id), + period=f"{start_date} to {end_date}") + + # Simple recommendation: if no plans exist, suggest creating one + recommendations = [{ + "type": "weekly_review", + "severity": "low", + "title": "Revisión Semanal de Compras", + "message": "Es momento de revisar y optimizar tu planificación de compras semanal.", + "metadata": { + "tenant_id": str(tenant_id), + "week_period": f"{start_date} to {end_date}" + } + }] + + for recommendation in recommendations: + await self.publish_item( + tenant_id, recommendation, item_type='recommendation' + ) + + + async def send_procurement_notification(self, tenant_id: UUID, + plan, notification_type: str): + """Send procurement-related notifications""" + try: + if notification_type == "plan_created": + alert_data = { + "type": "procurement_plan_created", + "severity": "low", + "title": "Plan de Compras Creado", + "message": f"Nuevo plan de compras generado para {plan.plan_date if plan else 'fecha desconocida'}", + "metadata": { + "tenant_id": str(tenant_id), + "plan_id": str(plan.id) if plan else "unknown", + "plan_date": str(plan.plan_date) if plan else "unknown", + "auto_generated": getattr(plan, 'auto_generated', True) + } + } + + await self.publish_item(tenant_id, alert_data, item_type='alert') + + except Exception as e: + logger.error("Error sending procurement notification", + tenant_id=str(tenant_id), + notification_type=notification_type, + error=str(e)) + + async def test_procurement_generation(self): + """Test method to manually trigger procurement planning for testing""" + test_tenant_id = UUID('c464fb3e-7af2-46e6-9e43-85318f34199a') + logger.info("Testing procurement plan generation", tenant_id=str(test_tenant_id)) + + try: + await self.process_tenant_procurement(test_tenant_id) + logger.info("Test procurement generation completed successfully") + except Exception as e: + logger.error("Test procurement generation failed", error=str(e), tenant_id=str(test_tenant_id)) \ No newline at end of file diff --git a/services/orders/app/services/procurement_service.py b/services/orders/app/services/procurement_service.py index 2516f87c..f17617f3 100644 --- a/services/orders/app/services/procurement_service.py +++ b/services/orders/app/services/procurement_service.py @@ -163,6 +163,13 @@ class ProcurementService: if requirements_data: await self.requirement_repo.create_requirements_batch(requirements_data) + + # Update plan with correct total_requirements count + await self.plan_repo.update_plan( + plan.id, + tenant_id, + {"total_requirements": len(requirements_data)} + ) await self.db.commit() @@ -252,61 +259,6 @@ class ProcurementService: logger.error("Error getting dashboard data", error=str(e), tenant_id=tenant_id) return None - # ================================================================ - # DAILY SCHEDULER - # ================================================================ - - async def run_daily_scheduler(self) -> None: - """Run the daily procurement planning scheduler""" - logger.info("Starting daily procurement scheduler") - - try: - # This would typically be called by a cron job or scheduler service - # Get all active tenants (this would come from tenant service) - active_tenants = await self._get_active_tenants() - - for tenant_id in active_tenants: - try: - await self._process_daily_plan_for_tenant(tenant_id) - except Exception as e: - logger.error("Error processing daily plan for tenant", - error=str(e), tenant_id=tenant_id) - continue - - logger.info("Daily procurement scheduler completed", processed_tenants=len(active_tenants)) - - except Exception as e: - logger.error("Error in daily scheduler", error=str(e)) - - async def _process_daily_plan_for_tenant(self, tenant_id: uuid.UUID) -> None: - """Process daily procurement plan for a specific tenant""" - try: - today = date.today() - - # Check if plan already exists for today - existing_plan = await self.plan_repo.get_plan_by_date(today, tenant_id) - if existing_plan: - logger.info("Daily plan already exists", tenant_id=tenant_id, date=today) - return - - # Generate plan for today - request = GeneratePlanRequest( - plan_date=today, - planning_horizon_days=settings.DEMAND_FORECAST_DAYS, - include_safety_stock=True, - safety_stock_percentage=Decimal(str(settings.SAFETY_STOCK_PERCENTAGE)) - ) - - result = await self.generate_procurement_plan(tenant_id, request) - - if result.success: - logger.info("Daily plan generated successfully", tenant_id=tenant_id) - else: - logger.error("Failed to generate daily plan", - tenant_id=tenant_id, errors=result.errors) - - except Exception as e: - logger.error("Error processing daily plan", error=str(e), tenant_id=tenant_id) # ================================================================ # PRIVATE HELPER METHODS @@ -339,18 +291,13 @@ class ProcurementService: try: # Call forecast service for next day demand - forecast_data = await self.forecast_client.create_realtime_prediction( + forecast_data = await self.forecast_client.create_single_forecast( tenant_id=str(tenant_id), - model_id="default", # Use default model or tenant-specific model - target_date=target_date.isoformat(), - features={ - "product_id": item_id, - "current_stock": item.get('current_stock', 0), - "historical_usage": item.get('avg_daily_usage', 0), - "seasonality": self._calculate_seasonality_factor(target_date), - "day_of_week": target_date.weekday(), - "is_weekend": target_date.weekday() >= 5 - } + inventory_product_id=item_id, + forecast_date=target_date, + location="default", # Use default location or get from config + forecast_days=1, + confidence_level=0.8 ) if forecast_data: @@ -420,7 +367,7 @@ class ProcurementService: forecast = forecasts[item_id] current_stock = Decimal(str(item.get('current_stock', 0))) - predicted_demand = Decimal(str(forecast.get('predicted_value', 0))) + predicted_demand = Decimal(str(forecast.get('predicted_demand', 0))) safety_stock = predicted_demand * (request.safety_stock_percentage / 100) total_needed = predicted_demand + safety_stock @@ -440,9 +387,9 @@ class ProcurementService: 'product_name': item.get('name', ''), 'product_sku': item.get('sku', ''), 'product_category': item.get('category', ''), - 'product_type': 'ingredient', + 'product_type': 'product', 'required_quantity': predicted_demand, - 'unit_of_measure': item.get('unit', 'kg'), + 'unit_of_measure': item.get('unit', 'units'), 'safety_stock_quantity': safety_stock, 'total_quantity_needed': total_needed, 'current_stock_level': current_stock, @@ -539,11 +486,6 @@ class ProcurementService: except Exception as e: logger.warning("Failed to publish event", error=str(e)) - async def _get_active_tenants(self) -> List[uuid.UUID]: - """Get list of active tenant IDs""" - # This would typically call the tenant service - # For now, return empty list - would be implemented with actual tenant service - return [] async def _get_procurement_summary(self, tenant_id: uuid.UUID) -> ProcurementSummary: """Get procurement summary for dashboard""" diff --git a/services/orders/requirements.txt b/services/orders/requirements.txt index 8a1967fd..911dd275 100644 --- a/services/orders/requirements.txt +++ b/services/orders/requirements.txt @@ -19,6 +19,9 @@ redis==5.0.1 # Message queuing aio-pika==9.3.1 +# Scheduling +apscheduler==3.10.4 + # Logging and monitoring structlog==23.2.0 prometheus-client==0.19.0 diff --git a/shared/clients/forecast_client.py b/shared/clients/forecast_client.py index 3289a3ea..51c8b56e 100644 --- a/shared/clients/forecast_client.py +++ b/shared/clients/forecast_client.py @@ -5,6 +5,7 @@ Handles all API calls to the forecasting service """ from typing import Dict, Any, Optional, List +from datetime import date from .base_service_client import BaseServiceClient from shared.config.base import BaseServiceSettings @@ -100,6 +101,7 @@ class ForecastServiceClient(BaseServiceClient): model_id: str, target_date: str, features: Dict[str, Any], + inventory_product_id: Optional[str] = None, **kwargs ) -> Optional[Dict[str, Any]]: """Create a real-time prediction""" @@ -109,7 +111,42 @@ class ForecastServiceClient(BaseServiceClient): "features": features, **kwargs } - return await self.post("predictions", data=data, tenant_id=tenant_id) + + # Add inventory_product_id if provided (required by forecasting service) + if inventory_product_id: + data["inventory_product_id"] = inventory_product_id + + return await self.post("forecasts/single", data=data, tenant_id=tenant_id) + + async def create_single_forecast( + self, + tenant_id: str, + inventory_product_id: str, + forecast_date: date, + location: str = "default", + forecast_days: int = 1, + confidence_level: float = 0.8, + **kwargs + ) -> Optional[Dict[str, Any]]: + """Create a single product forecast using new API format""" + from datetime import date as date_type + + # Convert date to string if needed + if isinstance(forecast_date, date_type): + forecast_date_str = forecast_date.isoformat() + else: + forecast_date_str = str(forecast_date) + + data = { + "inventory_product_id": inventory_product_id, + "forecast_date": forecast_date_str, + "forecast_days": forecast_days, + "location": location, + "confidence_level": confidence_level, + **kwargs + } + + return await self.post("forecasts/single", data=data, tenant_id=tenant_id) # ================================================================ # FORECAST VALIDATION & METRICS