Add frontend procurement implementation 2
This commit is contained in:
@@ -292,6 +292,7 @@ class EnhancedForecastingService:
|
||||
forecast_data = {
|
||||
"tenant_id": tenant_id,
|
||||
"inventory_product_id": request.inventory_product_id,
|
||||
"product_name": None, # Field is now nullable, use inventory_product_id as reference
|
||||
"location": request.location,
|
||||
"forecast_date": forecast_datetime,
|
||||
"predicted_demand": adjusted_prediction['prediction'],
|
||||
|
||||
@@ -15,6 +15,7 @@ from app.core.config import settings
|
||||
from app.core.database import init_database, get_db_health
|
||||
from app.api.orders import router as orders_router
|
||||
from app.api.procurement import router as procurement_router
|
||||
from app.services.procurement_scheduler_service import ProcurementSchedulerService
|
||||
|
||||
# Configure logging
|
||||
logger = structlog.get_logger()
|
||||
@@ -26,6 +27,16 @@ async def lifespan(app: FastAPI):
|
||||
# Startup
|
||||
try:
|
||||
await init_database()
|
||||
logger.info("Database initialized successfully")
|
||||
|
||||
# Initialize procurement scheduler service
|
||||
scheduler_service = ProcurementSchedulerService(settings)
|
||||
await scheduler_service.start()
|
||||
logger.info("Procurement scheduler service started")
|
||||
|
||||
# Store scheduler service in app state
|
||||
app.state.scheduler_service = scheduler_service
|
||||
|
||||
logger.info("Orders service started successfully")
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize orders service", error=str(e))
|
||||
@@ -35,6 +46,13 @@ async def lifespan(app: FastAPI):
|
||||
|
||||
# Shutdown
|
||||
logger.info("Orders service shutting down")
|
||||
try:
|
||||
# Stop scheduler service
|
||||
if hasattr(app.state, 'scheduler_service'):
|
||||
await app.state.scheduler_service.stop()
|
||||
logger.info("Scheduler service stopped")
|
||||
except Exception as e:
|
||||
logger.error("Error stopping scheduler service", error=str(e))
|
||||
|
||||
|
||||
# Create FastAPI application
|
||||
@@ -98,6 +116,21 @@ async def root():
|
||||
}
|
||||
|
||||
|
||||
@app.post("/test/procurement-scheduler")
|
||||
async def test_procurement_scheduler():
|
||||
"""Test endpoint to manually trigger procurement scheduler"""
|
||||
try:
|
||||
if hasattr(app.state, 'scheduler_service'):
|
||||
scheduler_service = app.state.scheduler_service
|
||||
await scheduler_service.test_procurement_generation()
|
||||
return {"message": "Procurement scheduler test triggered successfully"}
|
||||
else:
|
||||
return {"error": "Scheduler service not available"}
|
||||
except Exception as e:
|
||||
logger.error("Error testing procurement scheduler", error=str(e))
|
||||
return {"error": f"Failed to trigger scheduler test: {str(e)}"}
|
||||
|
||||
|
||||
@app.middleware("http")
|
||||
async def logging_middleware(request: Request, call_next):
|
||||
"""Add request logging middleware"""
|
||||
|
||||
248
services/orders/app/services/procurement_scheduler_service.py
Normal file
248
services/orders/app/services/procurement_scheduler_service.py
Normal file
@@ -0,0 +1,248 @@
|
||||
# services/orders/app/services/procurement_scheduler_service.py
|
||||
"""
|
||||
Procurement Scheduler Service - Daily procurement planning automation
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List, Dict, Any
|
||||
from uuid import UUID
|
||||
import structlog
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
|
||||
from shared.alerts.base_service import BaseAlertService, AlertServiceMixin
|
||||
from app.services.procurement_service import ProcurementService
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
class ProcurementSchedulerService(BaseAlertService, AlertServiceMixin):
|
||||
"""
|
||||
Procurement scheduler service for automated daily procurement planning
|
||||
Extends BaseAlertService to use proven scheduling infrastructure
|
||||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
super().__init__(config)
|
||||
self.procurement_service = None
|
||||
|
||||
async def start(self):
|
||||
"""Initialize scheduler and procurement service"""
|
||||
# Initialize base alert service
|
||||
await super().start()
|
||||
|
||||
logger.info("Procurement scheduler service started", service=self.config.SERVICE_NAME)
|
||||
|
||||
def setup_scheduled_checks(self):
|
||||
"""Configure daily procurement planning jobs"""
|
||||
# Daily procurement planning at 6:00 AM
|
||||
self.scheduler.add_job(
|
||||
func=self.run_daily_procurement_planning,
|
||||
trigger=CronTrigger(hour=6, minute=0),
|
||||
id="daily_procurement_planning",
|
||||
name="Daily Procurement Planning",
|
||||
misfire_grace_time=300,
|
||||
coalesce=True,
|
||||
max_instances=1
|
||||
)
|
||||
|
||||
# Weekly procurement optimization at 7:00 AM on Mondays
|
||||
self.scheduler.add_job(
|
||||
func=self.run_weekly_optimization,
|
||||
trigger=CronTrigger(day_of_week=0, hour=7, minute=0),
|
||||
id="weekly_procurement_optimization",
|
||||
name="Weekly Procurement Optimization",
|
||||
misfire_grace_time=600,
|
||||
coalesce=True,
|
||||
max_instances=1
|
||||
)
|
||||
|
||||
logger.info("Procurement scheduled jobs configured")
|
||||
|
||||
async def run_daily_procurement_planning(self):
|
||||
"""Execute daily procurement planning for all active tenants"""
|
||||
if not self.is_leader:
|
||||
logger.debug("Skipping procurement planning - not leader")
|
||||
return
|
||||
|
||||
try:
|
||||
self._checks_performed += 1
|
||||
logger.info("Starting daily procurement planning")
|
||||
|
||||
# Get active tenants
|
||||
active_tenants = await self.get_active_tenants()
|
||||
if not active_tenants:
|
||||
logger.info("No active tenants found for procurement planning")
|
||||
return
|
||||
|
||||
# Process each tenant
|
||||
processed_tenants = 0
|
||||
for tenant_id in active_tenants:
|
||||
try:
|
||||
await self.process_tenant_procurement(tenant_id)
|
||||
processed_tenants += 1
|
||||
except Exception as e:
|
||||
logger.error("Error processing tenant procurement",
|
||||
tenant_id=str(tenant_id),
|
||||
error=str(e))
|
||||
|
||||
logger.info("Daily procurement planning completed",
|
||||
total_tenants=len(active_tenants),
|
||||
processed_tenants=processed_tenants)
|
||||
|
||||
except Exception as e:
|
||||
self._errors_count += 1
|
||||
logger.error("Daily procurement planning failed", error=str(e))
|
||||
|
||||
async def get_active_tenants(self) -> List[UUID]:
|
||||
"""Override to return test tenant since tenants table is not in orders DB"""
|
||||
# For testing, return the known test tenant
|
||||
return [UUID('c464fb3e-7af2-46e6-9e43-85318f34199a')]
|
||||
|
||||
async def process_tenant_procurement(self, tenant_id: UUID):
|
||||
"""Process procurement planning for a specific tenant"""
|
||||
try:
|
||||
# Use default configuration since tenants table is not in orders DB
|
||||
planning_days = 7 # Default planning horizon
|
||||
|
||||
# Calculate planning date (tomorrow by default)
|
||||
planning_date = datetime.now().date() + timedelta(days=1)
|
||||
|
||||
# Create procurement service instance and generate plan
|
||||
from app.core.database import AsyncSessionLocal
|
||||
from app.schemas.procurement_schemas import GeneratePlanRequest
|
||||
from decimal import Decimal
|
||||
|
||||
async with AsyncSessionLocal() as session:
|
||||
procurement_service = ProcurementService(session, self.config)
|
||||
|
||||
# Check if plan already exists for this date
|
||||
existing_plan = await procurement_service.get_plan_by_date(
|
||||
tenant_id, planning_date
|
||||
)
|
||||
|
||||
if existing_plan:
|
||||
logger.debug("Procurement plan already exists",
|
||||
tenant_id=str(tenant_id),
|
||||
plan_date=str(planning_date))
|
||||
return
|
||||
|
||||
# Generate procurement plan
|
||||
request = GeneratePlanRequest(
|
||||
plan_date=planning_date,
|
||||
planning_horizon_days=planning_days,
|
||||
include_safety_stock=True,
|
||||
safety_stock_percentage=Decimal('20.0')
|
||||
)
|
||||
|
||||
result = await procurement_service.generate_procurement_plan(tenant_id, request)
|
||||
plan = result.plan if result.success else None
|
||||
|
||||
if plan:
|
||||
# Send notification about new plan
|
||||
await self.send_procurement_notification(
|
||||
tenant_id, plan, "plan_created"
|
||||
)
|
||||
|
||||
logger.info("Procurement plan created successfully",
|
||||
tenant_id=str(tenant_id),
|
||||
plan_id=str(plan.id),
|
||||
plan_date=str(planning_date))
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error processing tenant procurement",
|
||||
tenant_id=str(tenant_id),
|
||||
error=str(e))
|
||||
raise
|
||||
|
||||
async def run_weekly_optimization(self):
|
||||
"""Run weekly procurement optimization"""
|
||||
if not self.is_leader:
|
||||
logger.debug("Skipping weekly optimization - not leader")
|
||||
return
|
||||
|
||||
try:
|
||||
self._checks_performed += 1
|
||||
logger.info("Starting weekly procurement optimization")
|
||||
|
||||
active_tenants = await self.get_active_tenants()
|
||||
|
||||
for tenant_id in active_tenants:
|
||||
try:
|
||||
await self.optimize_tenant_procurement(tenant_id)
|
||||
except Exception as e:
|
||||
logger.error("Error in weekly optimization",
|
||||
tenant_id=str(tenant_id),
|
||||
error=str(e))
|
||||
|
||||
logger.info("Weekly procurement optimization completed")
|
||||
|
||||
except Exception as e:
|
||||
self._errors_count += 1
|
||||
logger.error("Weekly procurement optimization failed", error=str(e))
|
||||
|
||||
async def optimize_tenant_procurement(self, tenant_id: UUID):
|
||||
"""Optimize procurement planning for a tenant"""
|
||||
# Get plans from the last week
|
||||
end_date = datetime.now().date()
|
||||
start_date = end_date - timedelta(days=7)
|
||||
|
||||
# For now, just log the optimization - full implementation would analyze patterns
|
||||
logger.info("Processing weekly optimization",
|
||||
tenant_id=str(tenant_id),
|
||||
period=f"{start_date} to {end_date}")
|
||||
|
||||
# Simple recommendation: if no plans exist, suggest creating one
|
||||
recommendations = [{
|
||||
"type": "weekly_review",
|
||||
"severity": "low",
|
||||
"title": "Revisión Semanal de Compras",
|
||||
"message": "Es momento de revisar y optimizar tu planificación de compras semanal.",
|
||||
"metadata": {
|
||||
"tenant_id": str(tenant_id),
|
||||
"week_period": f"{start_date} to {end_date}"
|
||||
}
|
||||
}]
|
||||
|
||||
for recommendation in recommendations:
|
||||
await self.publish_item(
|
||||
tenant_id, recommendation, item_type='recommendation'
|
||||
)
|
||||
|
||||
|
||||
async def send_procurement_notification(self, tenant_id: UUID,
|
||||
plan, notification_type: str):
|
||||
"""Send procurement-related notifications"""
|
||||
try:
|
||||
if notification_type == "plan_created":
|
||||
alert_data = {
|
||||
"type": "procurement_plan_created",
|
||||
"severity": "low",
|
||||
"title": "Plan de Compras Creado",
|
||||
"message": f"Nuevo plan de compras generado para {plan.plan_date if plan else 'fecha desconocida'}",
|
||||
"metadata": {
|
||||
"tenant_id": str(tenant_id),
|
||||
"plan_id": str(plan.id) if plan else "unknown",
|
||||
"plan_date": str(plan.plan_date) if plan else "unknown",
|
||||
"auto_generated": getattr(plan, 'auto_generated', True)
|
||||
}
|
||||
}
|
||||
|
||||
await self.publish_item(tenant_id, alert_data, item_type='alert')
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error sending procurement notification",
|
||||
tenant_id=str(tenant_id),
|
||||
notification_type=notification_type,
|
||||
error=str(e))
|
||||
|
||||
async def test_procurement_generation(self):
|
||||
"""Test method to manually trigger procurement planning for testing"""
|
||||
test_tenant_id = UUID('c464fb3e-7af2-46e6-9e43-85318f34199a')
|
||||
logger.info("Testing procurement plan generation", tenant_id=str(test_tenant_id))
|
||||
|
||||
try:
|
||||
await self.process_tenant_procurement(test_tenant_id)
|
||||
logger.info("Test procurement generation completed successfully")
|
||||
except Exception as e:
|
||||
logger.error("Test procurement generation failed", error=str(e), tenant_id=str(test_tenant_id))
|
||||
@@ -163,6 +163,13 @@ class ProcurementService:
|
||||
|
||||
if requirements_data:
|
||||
await self.requirement_repo.create_requirements_batch(requirements_data)
|
||||
|
||||
# Update plan with correct total_requirements count
|
||||
await self.plan_repo.update_plan(
|
||||
plan.id,
|
||||
tenant_id,
|
||||
{"total_requirements": len(requirements_data)}
|
||||
)
|
||||
|
||||
await self.db.commit()
|
||||
|
||||
@@ -252,61 +259,6 @@ class ProcurementService:
|
||||
logger.error("Error getting dashboard data", error=str(e), tenant_id=tenant_id)
|
||||
return None
|
||||
|
||||
# ================================================================
|
||||
# DAILY SCHEDULER
|
||||
# ================================================================
|
||||
|
||||
async def run_daily_scheduler(self) -> None:
|
||||
"""Run the daily procurement planning scheduler"""
|
||||
logger.info("Starting daily procurement scheduler")
|
||||
|
||||
try:
|
||||
# This would typically be called by a cron job or scheduler service
|
||||
# Get all active tenants (this would come from tenant service)
|
||||
active_tenants = await self._get_active_tenants()
|
||||
|
||||
for tenant_id in active_tenants:
|
||||
try:
|
||||
await self._process_daily_plan_for_tenant(tenant_id)
|
||||
except Exception as e:
|
||||
logger.error("Error processing daily plan for tenant",
|
||||
error=str(e), tenant_id=tenant_id)
|
||||
continue
|
||||
|
||||
logger.info("Daily procurement scheduler completed", processed_tenants=len(active_tenants))
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in daily scheduler", error=str(e))
|
||||
|
||||
async def _process_daily_plan_for_tenant(self, tenant_id: uuid.UUID) -> None:
|
||||
"""Process daily procurement plan for a specific tenant"""
|
||||
try:
|
||||
today = date.today()
|
||||
|
||||
# Check if plan already exists for today
|
||||
existing_plan = await self.plan_repo.get_plan_by_date(today, tenant_id)
|
||||
if existing_plan:
|
||||
logger.info("Daily plan already exists", tenant_id=tenant_id, date=today)
|
||||
return
|
||||
|
||||
# Generate plan for today
|
||||
request = GeneratePlanRequest(
|
||||
plan_date=today,
|
||||
planning_horizon_days=settings.DEMAND_FORECAST_DAYS,
|
||||
include_safety_stock=True,
|
||||
safety_stock_percentage=Decimal(str(settings.SAFETY_STOCK_PERCENTAGE))
|
||||
)
|
||||
|
||||
result = await self.generate_procurement_plan(tenant_id, request)
|
||||
|
||||
if result.success:
|
||||
logger.info("Daily plan generated successfully", tenant_id=tenant_id)
|
||||
else:
|
||||
logger.error("Failed to generate daily plan",
|
||||
tenant_id=tenant_id, errors=result.errors)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error processing daily plan", error=str(e), tenant_id=tenant_id)
|
||||
|
||||
# ================================================================
|
||||
# PRIVATE HELPER METHODS
|
||||
@@ -339,18 +291,13 @@ class ProcurementService:
|
||||
|
||||
try:
|
||||
# Call forecast service for next day demand
|
||||
forecast_data = await self.forecast_client.create_realtime_prediction(
|
||||
forecast_data = await self.forecast_client.create_single_forecast(
|
||||
tenant_id=str(tenant_id),
|
||||
model_id="default", # Use default model or tenant-specific model
|
||||
target_date=target_date.isoformat(),
|
||||
features={
|
||||
"product_id": item_id,
|
||||
"current_stock": item.get('current_stock', 0),
|
||||
"historical_usage": item.get('avg_daily_usage', 0),
|
||||
"seasonality": self._calculate_seasonality_factor(target_date),
|
||||
"day_of_week": target_date.weekday(),
|
||||
"is_weekend": target_date.weekday() >= 5
|
||||
}
|
||||
inventory_product_id=item_id,
|
||||
forecast_date=target_date,
|
||||
location="default", # Use default location or get from config
|
||||
forecast_days=1,
|
||||
confidence_level=0.8
|
||||
)
|
||||
|
||||
if forecast_data:
|
||||
@@ -420,7 +367,7 @@ class ProcurementService:
|
||||
|
||||
forecast = forecasts[item_id]
|
||||
current_stock = Decimal(str(item.get('current_stock', 0)))
|
||||
predicted_demand = Decimal(str(forecast.get('predicted_value', 0)))
|
||||
predicted_demand = Decimal(str(forecast.get('predicted_demand', 0)))
|
||||
safety_stock = predicted_demand * (request.safety_stock_percentage / 100)
|
||||
|
||||
total_needed = predicted_demand + safety_stock
|
||||
@@ -440,9 +387,9 @@ class ProcurementService:
|
||||
'product_name': item.get('name', ''),
|
||||
'product_sku': item.get('sku', ''),
|
||||
'product_category': item.get('category', ''),
|
||||
'product_type': 'ingredient',
|
||||
'product_type': 'product',
|
||||
'required_quantity': predicted_demand,
|
||||
'unit_of_measure': item.get('unit', 'kg'),
|
||||
'unit_of_measure': item.get('unit', 'units'),
|
||||
'safety_stock_quantity': safety_stock,
|
||||
'total_quantity_needed': total_needed,
|
||||
'current_stock_level': current_stock,
|
||||
@@ -539,11 +486,6 @@ class ProcurementService:
|
||||
except Exception as e:
|
||||
logger.warning("Failed to publish event", error=str(e))
|
||||
|
||||
async def _get_active_tenants(self) -> List[uuid.UUID]:
|
||||
"""Get list of active tenant IDs"""
|
||||
# This would typically call the tenant service
|
||||
# For now, return empty list - would be implemented with actual tenant service
|
||||
return []
|
||||
|
||||
async def _get_procurement_summary(self, tenant_id: uuid.UUID) -> ProcurementSummary:
|
||||
"""Get procurement summary for dashboard"""
|
||||
|
||||
@@ -19,6 +19,9 @@ redis==5.0.1
|
||||
# Message queuing
|
||||
aio-pika==9.3.1
|
||||
|
||||
# Scheduling
|
||||
apscheduler==3.10.4
|
||||
|
||||
# Logging and monitoring
|
||||
structlog==23.2.0
|
||||
prometheus-client==0.19.0
|
||||
|
||||
Reference in New Issue
Block a user