Improve backend

This commit is contained in:
Urtzi Alfaro
2025-11-18 07:17:17 +01:00
parent d36f2ab9af
commit 5c45164c8e
61 changed files with 9846 additions and 495 deletions

View File

@@ -0,0 +1,29 @@
"""
Forecasting Service Jobs Package
Scheduled and background jobs for the forecasting service
"""
from .daily_validation import daily_validation_job, validate_date_range_job
from .sales_data_listener import (
handle_sales_import_completion,
handle_pos_sync_completion,
process_pending_validations
)
from .auto_backfill_job import (
auto_backfill_all_tenants,
process_all_pending_validations,
daily_validation_maintenance_job,
run_validation_maintenance_for_tenant
)
__all__ = [
"daily_validation_job",
"validate_date_range_job",
"handle_sales_import_completion",
"handle_pos_sync_completion",
"process_pending_validations",
"auto_backfill_all_tenants",
"process_all_pending_validations",
"daily_validation_maintenance_job",
"run_validation_maintenance_for_tenant",
]

View File

@@ -0,0 +1,275 @@
# ================================================================
# services/forecasting/app/jobs/auto_backfill_job.py
# ================================================================
"""
Automated Backfill Job
Scheduled job to automatically detect and backfill validation gaps.
Can be run daily or weekly to ensure all historical forecasts are validated.
"""
from typing import Dict, Any, List
from datetime import datetime, timezone
import structlog
import uuid
from app.services.historical_validation_service import HistoricalValidationService
from app.core.database import database_manager
from app.jobs.sales_data_listener import process_pending_validations
logger = structlog.get_logger()
async def auto_backfill_all_tenants(
tenant_ids: List[uuid.UUID],
lookback_days: int = 90,
max_gaps_per_tenant: int = 5
) -> Dict[str, Any]:
"""
Run auto backfill for multiple tenants
Args:
tenant_ids: List of tenant IDs to process
lookback_days: How far back to check for gaps
max_gaps_per_tenant: Maximum number of gaps to process per tenant
Returns:
Summary of backfill operations across all tenants
"""
try:
logger.info(
"Starting auto backfill for all tenants",
tenant_count=len(tenant_ids),
lookback_days=lookback_days
)
results = []
total_gaps_found = 0
total_gaps_processed = 0
total_successful = 0
for tenant_id in tenant_ids:
try:
async with database_manager.get_session() as db:
service = HistoricalValidationService(db)
result = await service.auto_backfill_gaps(
tenant_id=tenant_id,
lookback_days=lookback_days,
max_gaps_to_process=max_gaps_per_tenant
)
results.append({
"tenant_id": str(tenant_id),
"status": "success",
**result
})
total_gaps_found += result.get("gaps_found", 0)
total_gaps_processed += result.get("gaps_processed", 0)
total_successful += result.get("validations_completed", 0)
except Exception as e:
logger.error(
"Failed to auto backfill for tenant",
tenant_id=tenant_id,
error=str(e)
)
results.append({
"tenant_id": str(tenant_id),
"status": "failed",
"error": str(e)
})
logger.info(
"Auto backfill completed for all tenants",
tenant_count=len(tenant_ids),
total_gaps_found=total_gaps_found,
total_gaps_processed=total_gaps_processed,
total_successful=total_successful
)
return {
"status": "completed",
"tenants_processed": len(tenant_ids),
"total_gaps_found": total_gaps_found,
"total_gaps_processed": total_gaps_processed,
"total_validations_completed": total_successful,
"results": results
}
except Exception as e:
logger.error(
"Auto backfill job failed",
error=str(e)
)
return {
"status": "failed",
"error": str(e)
}
async def process_all_pending_validations(
tenant_ids: List[uuid.UUID],
max_per_tenant: int = 10
) -> Dict[str, Any]:
"""
Process all pending validations for multiple tenants
Args:
tenant_ids: List of tenant IDs to process
max_per_tenant: Maximum pending validations to process per tenant
Returns:
Summary of processing results
"""
try:
logger.info(
"Processing pending validations for all tenants",
tenant_count=len(tenant_ids)
)
results = []
total_pending = 0
total_processed = 0
total_successful = 0
for tenant_id in tenant_ids:
try:
result = await process_pending_validations(
tenant_id=tenant_id,
max_to_process=max_per_tenant
)
results.append({
"tenant_id": str(tenant_id),
**result
})
total_pending += result.get("pending_count", 0)
total_processed += result.get("processed", 0)
total_successful += result.get("successful", 0)
except Exception as e:
logger.error(
"Failed to process pending validations for tenant",
tenant_id=tenant_id,
error=str(e)
)
results.append({
"tenant_id": str(tenant_id),
"status": "failed",
"error": str(e)
})
logger.info(
"Pending validations processed for all tenants",
tenant_count=len(tenant_ids),
total_pending=total_pending,
total_processed=total_processed,
total_successful=total_successful
)
return {
"status": "completed",
"tenants_processed": len(tenant_ids),
"total_pending": total_pending,
"total_processed": total_processed,
"total_successful": total_successful,
"results": results
}
except Exception as e:
logger.error(
"Failed to process all pending validations",
error=str(e)
)
return {
"status": "failed",
"error": str(e)
}
async def daily_validation_maintenance_job(
tenant_ids: List[uuid.UUID]
) -> Dict[str, Any]:
"""
Daily validation maintenance job
Combines gap detection/backfill and pending validation processing.
Recommended to run once daily (e.g., 6:00 AM after orchestrator completes).
Args:
tenant_ids: List of tenant IDs to process
Returns:
Summary of all maintenance operations
"""
try:
logger.info(
"Starting daily validation maintenance",
tenant_count=len(tenant_ids),
timestamp=datetime.now(timezone.utc).isoformat()
)
# Step 1: Process pending validations (retry failures)
pending_result = await process_all_pending_validations(
tenant_ids=tenant_ids,
max_per_tenant=10
)
# Step 2: Auto backfill detected gaps
backfill_result = await auto_backfill_all_tenants(
tenant_ids=tenant_ids,
lookback_days=90,
max_gaps_per_tenant=5
)
logger.info(
"Daily validation maintenance completed",
pending_validations_processed=pending_result.get("total_processed", 0),
gaps_backfilled=backfill_result.get("total_validations_completed", 0)
)
return {
"status": "completed",
"timestamp": datetime.now(timezone.utc).isoformat(),
"tenants_processed": len(tenant_ids),
"pending_validations": pending_result,
"gap_backfill": backfill_result,
"summary": {
"total_pending_processed": pending_result.get("total_processed", 0),
"total_gaps_backfilled": backfill_result.get("total_validations_completed", 0),
"total_validations": (
pending_result.get("total_processed", 0) +
backfill_result.get("total_validations_completed", 0)
)
}
}
except Exception as e:
logger.error(
"Daily validation maintenance failed",
error=str(e)
)
return {
"status": "failed",
"timestamp": datetime.now(timezone.utc).isoformat(),
"error": str(e)
}
# Convenience function for single tenant
async def run_validation_maintenance_for_tenant(
tenant_id: uuid.UUID
) -> Dict[str, Any]:
"""
Run validation maintenance for a single tenant
Args:
tenant_id: Tenant identifier
Returns:
Maintenance results
"""
return await daily_validation_maintenance_job([tenant_id])

View File

@@ -0,0 +1,147 @@
# ================================================================
# services/forecasting/app/jobs/daily_validation.py
# ================================================================
"""
Daily Validation Job
Scheduled job to validate previous day's forecasts against actual sales.
This job is called by the orchestrator as part of the daily workflow.
"""
from typing import Dict, Any, Optional
from datetime import datetime, timedelta, timezone
import structlog
import uuid
from app.services.validation_service import ValidationService
from app.core.database import database_manager
logger = structlog.get_logger()
async def daily_validation_job(
tenant_id: uuid.UUID,
orchestration_run_id: Optional[uuid.UUID] = None
) -> Dict[str, Any]:
"""
Validate yesterday's forecasts against actual sales
This function is designed to be called by the orchestrator as part of
the daily workflow (Step 5: validate_previous_forecasts).
Args:
tenant_id: Tenant identifier
orchestration_run_id: Optional orchestration run ID for tracking
Returns:
Dictionary with validation results
"""
async with database_manager.get_session() as db:
try:
logger.info(
"Starting daily validation job",
tenant_id=tenant_id,
orchestration_run_id=orchestration_run_id
)
validation_service = ValidationService(db)
# Validate yesterday's forecasts
result = await validation_service.validate_yesterday(
tenant_id=tenant_id,
orchestration_run_id=orchestration_run_id,
triggered_by="orchestrator"
)
logger.info(
"Daily validation job completed",
tenant_id=tenant_id,
validation_run_id=result.get("validation_run_id"),
forecasts_evaluated=result.get("forecasts_evaluated"),
forecasts_with_actuals=result.get("forecasts_with_actuals"),
overall_mape=result.get("overall_metrics", {}).get("mape")
)
return result
except Exception as e:
logger.error(
"Daily validation job failed",
tenant_id=tenant_id,
orchestration_run_id=orchestration_run_id,
error=str(e),
error_type=type(e).__name__
)
return {
"status": "failed",
"error": str(e),
"tenant_id": str(tenant_id),
"orchestration_run_id": str(orchestration_run_id) if orchestration_run_id else None
}
async def validate_date_range_job(
tenant_id: uuid.UUID,
start_date: datetime,
end_date: datetime,
orchestration_run_id: Optional[uuid.UUID] = None
) -> Dict[str, Any]:
"""
Validate forecasts for a specific date range
Useful for backfilling validation metrics when historical data is uploaded.
Args:
tenant_id: Tenant identifier
start_date: Start of validation period
end_date: End of validation period
orchestration_run_id: Optional orchestration run ID for tracking
Returns:
Dictionary with validation results
"""
async with database_manager.get_session() as db:
try:
logger.info(
"Starting date range validation job",
tenant_id=tenant_id,
start_date=start_date.isoformat(),
end_date=end_date.isoformat(),
orchestration_run_id=orchestration_run_id
)
validation_service = ValidationService(db)
result = await validation_service.validate_date_range(
tenant_id=tenant_id,
start_date=start_date,
end_date=end_date,
orchestration_run_id=orchestration_run_id,
triggered_by="scheduled"
)
logger.info(
"Date range validation job completed",
tenant_id=tenant_id,
validation_run_id=result.get("validation_run_id"),
forecasts_evaluated=result.get("forecasts_evaluated"),
forecasts_with_actuals=result.get("forecasts_with_actuals")
)
return result
except Exception as e:
logger.error(
"Date range validation job failed",
tenant_id=tenant_id,
start_date=start_date.isoformat(),
end_date=end_date.isoformat(),
error=str(e),
error_type=type(e).__name__
)
return {
"status": "failed",
"error": str(e),
"tenant_id": str(tenant_id),
"orchestration_run_id": str(orchestration_run_id) if orchestration_run_id else None
}

View File

@@ -0,0 +1,276 @@
# ================================================================
# services/forecasting/app/jobs/sales_data_listener.py
# ================================================================
"""
Sales Data Listener
Listens for sales data import completions and triggers validation backfill.
Can be called via webhook, message queue, or direct API call from sales service.
"""
from typing import Dict, Any, Optional
from datetime import datetime, date
import structlog
import uuid
from app.services.historical_validation_service import HistoricalValidationService
from app.core.database import database_manager
logger = structlog.get_logger()
async def handle_sales_import_completion(
tenant_id: uuid.UUID,
import_job_id: str,
start_date: date,
end_date: date,
records_count: int,
import_source: str = "import"
) -> Dict[str, Any]:
"""
Handle sales data import completion event
This function is called when the sales service completes a data import.
It registers the update and triggers validation for the imported date range.
Args:
tenant_id: Tenant identifier
import_job_id: Sales import job ID
start_date: Start date of imported data
end_date: End date of imported data
records_count: Number of records imported
import_source: Source of import (csv, xlsx, api, pos_sync)
Returns:
Dictionary with registration and validation results
"""
async with database_manager.get_session() as db:
try:
logger.info(
"Handling sales import completion",
tenant_id=tenant_id,
import_job_id=import_job_id,
date_range=f"{start_date} to {end_date}",
records_count=records_count
)
service = HistoricalValidationService(db)
# Register the sales data update and trigger validation
result = await service.register_sales_data_update(
tenant_id=tenant_id,
start_date=start_date,
end_date=end_date,
records_affected=records_count,
update_source=import_source,
import_job_id=import_job_id,
auto_trigger_validation=True
)
logger.info(
"Sales import completion handled",
tenant_id=tenant_id,
import_job_id=import_job_id,
update_id=result.get("update_id"),
validation_triggered=result.get("validation_triggered")
)
return {
"status": "success",
"tenant_id": str(tenant_id),
"import_job_id": import_job_id,
**result
}
except Exception as e:
logger.error(
"Failed to handle sales import completion",
tenant_id=tenant_id,
import_job_id=import_job_id,
error=str(e),
error_type=type(e).__name__
)
return {
"status": "failed",
"error": str(e),
"tenant_id": str(tenant_id),
"import_job_id": import_job_id
}
async def handle_pos_sync_completion(
tenant_id: uuid.UUID,
sync_log_id: str,
sync_date: date,
records_synced: int
) -> Dict[str, Any]:
"""
Handle POS sync completion event
Called when POS data is synchronized to the sales service.
Args:
tenant_id: Tenant identifier
sync_log_id: POS sync log ID
sync_date: Date of synced data
records_synced: Number of records synced
Returns:
Dictionary with registration and validation results
"""
async with database_manager.get_session() as db:
try:
logger.info(
"Handling POS sync completion",
tenant_id=tenant_id,
sync_log_id=sync_log_id,
sync_date=sync_date.isoformat(),
records_synced=records_synced
)
service = HistoricalValidationService(db)
# For POS syncs, we typically validate just the sync date
result = await service.register_sales_data_update(
tenant_id=tenant_id,
start_date=sync_date,
end_date=sync_date,
records_affected=records_synced,
update_source="pos_sync",
import_job_id=sync_log_id,
auto_trigger_validation=True
)
logger.info(
"POS sync completion handled",
tenant_id=tenant_id,
sync_log_id=sync_log_id,
update_id=result.get("update_id")
)
return {
"status": "success",
"tenant_id": str(tenant_id),
"sync_log_id": sync_log_id,
**result
}
except Exception as e:
logger.error(
"Failed to handle POS sync completion",
tenant_id=tenant_id,
sync_log_id=sync_log_id,
error=str(e)
)
return {
"status": "failed",
"error": str(e),
"tenant_id": str(tenant_id),
"sync_log_id": sync_log_id
}
async def process_pending_validations(
tenant_id: Optional[uuid.UUID] = None,
max_to_process: int = 10
) -> Dict[str, Any]:
"""
Process pending validation requests
Can be run as a scheduled job to handle any pending validations
that failed to trigger automatically.
Args:
tenant_id: Optional tenant ID to filter (process all tenants if None)
max_to_process: Maximum number of pending validations to process
Returns:
Summary of processing results
"""
async with database_manager.get_session() as db:
try:
logger.info(
"Processing pending validations",
tenant_id=tenant_id,
max_to_process=max_to_process
)
service = HistoricalValidationService(db)
if tenant_id:
# Process specific tenant
pending = await service.get_pending_validations(
tenant_id=tenant_id,
limit=max_to_process
)
else:
# Would need to implement get_all_pending_validations for all tenants
# For now, require tenant_id
logger.warning("Processing all tenants not implemented, tenant_id required")
return {
"status": "skipped",
"message": "tenant_id required"
}
if not pending:
logger.info("No pending validations found")
return {
"status": "success",
"pending_count": 0,
"processed": 0
}
results = []
for update_record in pending:
try:
result = await service.backfill_validation(
tenant_id=update_record.tenant_id,
start_date=update_record.update_date_start,
end_date=update_record.update_date_end,
triggered_by="pending_processor",
sales_data_update_id=update_record.id
)
results.append({
"update_id": str(update_record.id),
"status": "success",
"validation_run_id": result.get("validation_run_id")
})
except Exception as e:
logger.error(
"Failed to process pending validation",
update_id=update_record.id,
error=str(e)
)
results.append({
"update_id": str(update_record.id),
"status": "failed",
"error": str(e)
})
successful = sum(1 for r in results if r["status"] == "success")
logger.info(
"Pending validations processed",
pending_count=len(pending),
processed=len(results),
successful=successful
)
return {
"status": "success",
"pending_count": len(pending),
"processed": len(results),
"successful": successful,
"failed": len(results) - successful,
"results": results
}
except Exception as e:
logger.error(
"Failed to process pending validations",
error=str(e)
)
return {
"status": "failed",
"error": str(e)
}