Add POI feature and imporve the overall backend implementation

This commit is contained in:
Urtzi Alfaro
2025-11-12 15:34:10 +01:00
parent e8096cd979
commit 5783c7ed05
173 changed files with 16862 additions and 9078 deletions

View File

@@ -116,20 +116,22 @@ class EnhancedBakeryDataProcessor:
weather_data: pd.DataFrame,
traffic_data: pd.DataFrame,
inventory_product_id: str,
poi_features: Dict[str, Any] = None,
tenant_id: str = None,
job_id: str = None,
session=None) -> pd.DataFrame:
"""
Prepare comprehensive training data for a specific product with repository logging.
Args:
sales_data: Historical sales data for the product
weather_data: Weather data
traffic_data: Traffic data
inventory_product_id: Inventory product UUID for logging
poi_features: POI features (location-based, static)
tenant_id: Optional tenant ID for tracking
job_id: Optional job ID for tracking
Returns:
DataFrame ready for Prophet training with 'ds' and 'y' columns plus features
"""
@@ -250,6 +252,18 @@ class EnhancedBakeryDataProcessor:
inventory_product_id=inventory_product_id,
total_features=len(daily_sales.columns))
logger.debug("Starting Step 8b: Add POI features",
inventory_product_id=inventory_product_id)
# Step 8b: Add POI features (static, location-based)
if poi_features:
daily_sales = self._add_poi_features(daily_sales, poi_features)
logger.debug("Step 8b completed: Add POI features",
inventory_product_id=inventory_product_id,
poi_feature_count=len(poi_features))
else:
logger.debug("Step 8b skipped: No POI features available",
inventory_product_id=inventory_product_id)
logger.debug("Starting Step 9: Handle missing values",
inventory_product_id=inventory_product_id)
# Step 7: Handle missing values
@@ -331,6 +345,7 @@ class EnhancedBakeryDataProcessor:
future_dates: pd.DatetimeIndex,
weather_forecast: pd.DataFrame = None,
traffic_forecast: pd.DataFrame = None,
poi_features: Dict[str, Any] = None,
historical_data: pd.DataFrame = None) -> pd.DataFrame:
"""
Create features for future predictions with proper date handling.
@@ -339,6 +354,7 @@ class EnhancedBakeryDataProcessor:
future_dates: Future dates to predict
weather_forecast: Weather forecast data
traffic_forecast: Traffic forecast data
poi_features: POI features (location-based, static)
historical_data: Historical data for creating lagged and rolling features
Returns:
@@ -390,6 +406,10 @@ class EnhancedBakeryDataProcessor:
logger.warning("No historical data provided, lagged features will be NaN")
future_df = self._add_advanced_features(future_df)
# Add POI features (static, location-based)
if poi_features:
future_df = self._add_poi_features(future_df, poi_features)
future_df = future_df.rename(columns={'date': 'ds'})
# Handle missing values in future data
@@ -1171,7 +1191,42 @@ class EnhancedBakeryDataProcessor:
df[col] = df[col].fillna(default_value)
return df
def _add_poi_features(self, df: pd.DataFrame, poi_features: Dict[str, Any]) -> pd.DataFrame:
"""
Add POI features to training dataframe.
POI features are static (location-based, not time-varying),
so they're broadcast to all rows in the dataframe.
Args:
df: Training dataframe
poi_features: Dictionary of POI ML features
Returns:
Dataframe with POI features added as columns
"""
if not poi_features:
logger.warning("No POI features to add")
return df
logger.info(f"Adding {len(poi_features)} POI features to dataframe")
# Add each POI feature as a column with constant value
for feature_name, feature_value in poi_features.items():
# Convert boolean to int for ML compatibility
if isinstance(feature_value, bool):
feature_value = 1 if feature_value else 0
df[feature_name] = feature_value
logger.info(
"POI features added successfully",
feature_count=len(poi_features),
feature_names=list(poi_features.keys())[:5] # Log first 5 for brevity
)
return df
def _prepare_prophet_format(self, df: pd.DataFrame) -> pd.DataFrame:
"""Prepare data in Prophet format with enhanced validation"""
prophet_df = df.copy()

View File

@@ -0,0 +1,204 @@
"""
POI Feature Integrator
Integrates POI features into ML training pipeline.
Fetches POI context from External service and merges features into training data.
"""
import httpx
from typing import Dict, Any, Optional, List
import structlog
import pandas as pd
logger = structlog.get_logger()
class POIFeatureIntegrator:
"""
POI feature integration for ML training.
Fetches POI context from External service and adds features
to training dataframes for location-based demand forecasting.
"""
def __init__(self, external_service_url: str = "http://external-service:8000"):
"""
Initialize POI feature integrator.
Args:
external_service_url: Base URL for external service
"""
self.external_service_url = external_service_url.rstrip("/")
self.poi_context_endpoint = f"{self.external_service_url}/poi-context"
async def fetch_poi_features(
self,
tenant_id: str,
latitude: float,
longitude: float,
force_refresh: bool = False
) -> Optional[Dict[str, Any]]:
"""
Fetch POI features for tenant location.
First checks if POI context exists, if not, triggers detection.
Args:
tenant_id: Tenant UUID
latitude: Bakery latitude
longitude: Bakery longitude
force_refresh: Force re-detection
Returns:
Dictionary with POI features or None if detection fails
"""
try:
async with httpx.AsyncClient(timeout=60.0) as client:
# Try to get existing POI context first
if not force_refresh:
try:
response = await client.get(
f"{self.poi_context_endpoint}/{tenant_id}"
)
if response.status_code == 200:
data = response.json()
poi_context = data.get("poi_context", {})
# Check if stale
if not data.get("is_stale", False):
logger.info(
"Using existing POI context",
tenant_id=tenant_id
)
return poi_context.get("ml_features", {})
else:
logger.info(
"POI context is stale, refreshing",
tenant_id=tenant_id
)
force_refresh = True
except httpx.HTTPStatusError as e:
if e.response.status_code != 404:
raise
logger.info(
"No existing POI context, will detect",
tenant_id=tenant_id
)
# Detect or refresh POIs
logger.info(
"Detecting POIs for tenant",
tenant_id=tenant_id,
location=(latitude, longitude)
)
response = await client.post(
f"{self.poi_context_endpoint}/{tenant_id}/detect",
params={
"latitude": latitude,
"longitude": longitude,
"force_refresh": force_refresh
}
)
response.raise_for_status()
result = response.json()
poi_context = result.get("poi_context", {})
ml_features = poi_context.get("ml_features", {})
logger.info(
"POI detection completed",
tenant_id=tenant_id,
total_pois=poi_context.get("total_pois_detected", 0),
feature_count=len(ml_features)
)
return ml_features
except httpx.HTTPError as e:
logger.error(
"Failed to fetch POI features",
tenant_id=tenant_id,
error=str(e),
exc_info=True
)
return None
except Exception as e:
logger.error(
"Unexpected error fetching POI features",
tenant_id=tenant_id,
error=str(e),
exc_info=True
)
return None
def add_poi_features_to_dataframe(
self,
df: pd.DataFrame,
poi_features: Dict[str, Any]
) -> pd.DataFrame:
"""
Add POI features to training dataframe.
POI features are static (don't vary by date), so they're
broadcast to all rows in the dataframe.
Args:
df: Training dataframe
poi_features: Dictionary of POI ML features
Returns:
Dataframe with POI features added as columns
"""
if not poi_features:
logger.warning("No POI features to add")
return df
logger.info(
"Adding POI features to dataframe",
feature_count=len(poi_features),
dataframe_rows=len(df)
)
# Add each POI feature as a column with constant value
for feature_name, feature_value in poi_features.items():
df[feature_name] = feature_value
logger.info(
"POI features added successfully",
new_columns=list(poi_features.keys())
)
return df
def get_poi_feature_names(self, poi_features: Dict[str, Any]) -> List[str]:
"""
Get list of POI feature names for model registration.
Args:
poi_features: Dictionary of POI ML features
Returns:
List of feature names
"""
return list(poi_features.keys()) if poi_features else []
async def check_poi_service_health(self) -> bool:
"""
Check if POI service is accessible.
Returns:
True if service is healthy, False otherwise
"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{self.poi_context_endpoint}/health"
)
return response.status_code == 200
except Exception as e:
logger.error(
"POI service health check failed",
error=str(e)
)
return False

View File

@@ -210,9 +210,23 @@ class EnhancedBakeryMLTrainer:
# Process data for each product using enhanced processor
logger.info("Processing data using enhanced processor")
processed_data = await self._process_all_products_enhanced(
sales_df, weather_df, traffic_df, products, tenant_id, job_id, session
sales_df, weather_df, traffic_df, products, tenant_id, job_id, training_dataset.poi_features, session
)
# Validate that we have processed data
if not processed_data or len(processed_data) == 0:
error_msg = f"No products could be processed successfully. Found {len(products)} products in sales data but all failed during processing."
logger.error("Training aborted - no processed data",
tenant_id=tenant_id,
job_id=job_id,
products_found=len(products),
products_processed=0)
raise ValueError(error_msg)
logger.info(f"Successfully processed {len(processed_data)} out of {len(products)} products",
products_processed=len(processed_data),
products_found=len(products))
# Categorize all products for category-specific forecasting
logger.info("Categorizing products for optimized forecasting")
product_categories = await self._categorize_all_products(
@@ -491,6 +505,7 @@ class EnhancedBakeryMLTrainer:
products: List[str],
tenant_id: str,
job_id: str,
poi_features: Dict[str, Any],
session=None) -> Dict[str, pd.DataFrame]:
"""Process data for all products using enhanced processor with repository tracking"""
processed_data = {}
@@ -515,6 +530,7 @@ class EnhancedBakeryMLTrainer:
weather_data=weather_df,
traffic_data=traffic_df,
inventory_product_id=inventory_product_id,
poi_features=poi_features, # POI features for location-based forecasting
tenant_id=tenant_id,
job_id=job_id,
session=session # Pass the session to avoid creating new ones
@@ -1132,6 +1148,7 @@ class EnhancedBakeryMLTrainer:
weather_data=test_weather_df,
traffic_data=test_traffic_df,
inventory_product_id=inventory_product_id,
poi_features=test_dataset.poi_features, # POI features for location-based forecasting
tenant_id=tenant_id
)

View File

@@ -33,18 +33,22 @@ class ParallelProductProgressTracker:
def __init__(self, job_id: str, tenant_id: str, total_products: int):
self.job_id = job_id
self.tenant_id = tenant_id
self.total_products = total_products
self.total_products = max(total_products, 1) # Ensure at least 1 to avoid division by zero
self.products_completed = 0
self._lock = asyncio.Lock()
self.start_time = datetime.now(timezone.utc)
# Calculate progress increment per product
# Training range (from PROGRESS_TRAINING_RANGE_START to PROGRESS_TRAINING_RANGE_END) divided by number of products
self.progress_per_product = PROGRESS_TRAINING_RANGE_WIDTH / total_products if total_products > 0 else 0
self.progress_per_product = PROGRESS_TRAINING_RANGE_WIDTH / self.total_products if self.total_products > 0 else 0
if total_products == 0:
logger.warning("ParallelProductProgressTracker initialized with zero products",
job_id=job_id)
logger.info("ParallelProductProgressTracker initialized",
job_id=job_id,
total_products=total_products,
total_products=self.total_products,
progress_per_product=f"{self.progress_per_product:.2f}%")
async def mark_product_completed(self, product_name: str) -> int:
@@ -87,7 +91,10 @@ class ParallelProductProgressTracker:
# Calculate overall progress (PROGRESS_TRAINING_RANGE_START% base + progress from completed products)
# This calculation is done on the frontend/consumer side based on the event data
overall_progress = PROGRESS_TRAINING_RANGE_START + int((current_progress / self.total_products) * PROGRESS_TRAINING_RANGE_WIDTH)
if self.total_products > 0:
overall_progress = PROGRESS_TRAINING_RANGE_START + int((current_progress / self.total_products) * PROGRESS_TRAINING_RANGE_WIDTH)
else:
overall_progress = PROGRESS_TRAINING_RANGE_START
logger.info("Product training completed",
job_id=self.job_id,
@@ -101,8 +108,13 @@ class ParallelProductProgressTracker:
def get_progress(self) -> dict:
"""Get current progress summary"""
if self.total_products > 0:
progress_percentage = PROGRESS_TRAINING_RANGE_START + int((self.products_completed / self.total_products) * PROGRESS_TRAINING_RANGE_WIDTH)
else:
progress_percentage = PROGRESS_TRAINING_RANGE_START
return {
"products_completed": self.products_completed,
"total_products": self.total_products,
"progress_percentage": PROGRESS_TRAINING_RANGE_START + int((self.products_completed / self.total_products) * PROGRESS_TRAINING_RANGE_WIDTH)
"progress_percentage": progress_percentage
}

View File

@@ -15,7 +15,7 @@ import pandas as pd
from app.services.data_client import DataClient
from app.services.date_alignment_service import DateAlignmentService, DateRange, DataSourceType, AlignedDateRange
from app.ml.poi_feature_integrator import POIFeatureIntegrator
from app.services.training_events import publish_training_failed
logger = structlog.get_logger()
@@ -26,6 +26,7 @@ class TrainingDataSet:
sales_data: List[Dict[str, Any]]
weather_data: List[Dict[str, Any]]
traffic_data: List[Dict[str, Any]]
poi_features: Dict[str, Any] # POI features for location-based forecasting
date_range: AlignedDateRange
metadata: Dict[str, Any]
@@ -36,10 +37,12 @@ class TrainingDataOrchestrator:
Uses the new abstracted traffic service layer for multi-city support.
"""
def __init__(self,
date_alignment_service: DateAlignmentService = None):
def __init__(self,
date_alignment_service: DateAlignmentService = None,
poi_feature_integrator: POIFeatureIntegrator = None):
self.data_client = DataClient()
self.date_alignment_service = date_alignment_service or DateAlignmentService()
self.poi_feature_integrator = poi_feature_integrator or POIFeatureIntegrator()
self.max_concurrent_requests = 5 # Increased for better performance
async def prepare_training_data(
@@ -122,20 +125,21 @@ class TrainingDataOrchestrator:
# Step 5: Collect external data sources concurrently
logger.info("Collecting external data sources...")
weather_data, traffic_data = await self._collect_external_data(
weather_data, traffic_data, poi_features = await self._collect_external_data(
aligned_range, bakery_location, tenant_id
)
# Step 6: Validate data quality
data_quality_results = self._validate_data_sources(
filtered_sales, weather_data, traffic_data, aligned_range
)
# Step 7: Create comprehensive training dataset
training_dataset = TrainingDataSet(
sales_data=filtered_sales,
weather_data=weather_data,
traffic_data=traffic_data,
poi_features=poi_features or {}, # POI features (static, location-based)
date_range=aligned_range,
metadata={
"tenant_id": tenant_id,
@@ -148,7 +152,8 @@ class TrainingDataOrchestrator:
"original_sales_range": {
"start": sales_date_range.start.isoformat(),
"end": sales_date_range.end.isoformat()
}
},
"poi_features_count": len(poi_features) if poi_features else 0
}
)
@@ -160,6 +165,7 @@ class TrainingDataOrchestrator:
logger.info(f" - Sales records: {len(filtered_sales)}")
logger.info(f" - Weather records: {len(weather_data)}")
logger.info(f" - Traffic records: {len(traffic_data)}")
logger.info(f" - POI features: {len(poi_features) if poi_features else 0}")
logger.info(f" - Data quality score: {final_validation.get('data_quality_score', 'N/A')}")
return training_dataset
@@ -329,21 +335,21 @@ class TrainingDataOrchestrator:
aligned_range: AlignedDateRange,
bakery_location: Tuple[float, float],
tenant_id: str
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Collect weather and traffic data concurrently with enhanced error handling"""
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], Dict[str, Any]]:
"""Collect weather, traffic, and POI data concurrently with enhanced error handling"""
lat, lon = bakery_location
# Create collection tasks with timeout
tasks = []
# Weather data collection
if DataSourceType.WEATHER_FORECAST in aligned_range.available_sources:
weather_task = asyncio.create_task(
self._collect_weather_data_with_timeout(lat, lon, aligned_range, tenant_id)
)
tasks.append(("weather", weather_task))
# Enhanced Traffic data collection (supports multiple cities)
if DataSourceType.MADRID_TRAFFIC in aligned_range.available_sources:
logger.info(f"🚛 Traffic data source available for multiple cities, creating collection task for date range: {aligned_range.start} to {aligned_range.end}")
@@ -353,7 +359,13 @@ class TrainingDataOrchestrator:
tasks.append(("traffic", traffic_task))
else:
logger.warning(f"🚫 Traffic data source NOT available in sources: {[s.value for s in aligned_range.available_sources]}")
# POI features collection (static, location-based)
poi_task = asyncio.create_task(
self._collect_poi_features(lat, lon, tenant_id)
)
tasks.append(("poi", poi_task))
# Execute tasks concurrently with proper error handling
results = {}
if tasks:
@@ -362,24 +374,76 @@ class TrainingDataOrchestrator:
*[task for _, task in tasks],
return_exceptions=True
)
for i, (task_name, _) in enumerate(tasks):
result = completed_tasks[i]
if isinstance(result, Exception):
logger.warning(f"{task_name} data collection failed: {result}")
results[task_name] = []
results[task_name] = [] if task_name != "poi" else {}
else:
results[task_name] = result
logger.info(f"{task_name} data collection completed: {len(result)} records")
if task_name == "poi":
logger.info(f"{task_name} features collected: {len(result) if result else 0} features")
else:
logger.info(f"{task_name} data collection completed: {len(result)} records")
except Exception as e:
logger.error(f"Error in concurrent data collection: {str(e)}")
results = {"weather": [], "traffic": []}
results = {"weather": [], "traffic": [], "poi": {}}
weather_data = results.get("weather", [])
traffic_data = results.get("traffic", [])
return weather_data, traffic_data
poi_features = results.get("poi", {})
return weather_data, traffic_data, poi_features
async def _collect_poi_features(
self,
lat: float,
lon: float,
tenant_id: str
) -> Dict[str, Any]:
"""
Collect POI features for bakery location.
POI features are static (location-based, not time-varying).
"""
try:
logger.info(
"Collecting POI features",
tenant_id=tenant_id,
location=(lat, lon)
)
poi_features = await self.poi_feature_integrator.fetch_poi_features(
tenant_id=tenant_id,
latitude=lat,
longitude=lon,
force_refresh=False
)
if poi_features:
logger.info(
"POI features collected successfully",
tenant_id=tenant_id,
feature_count=len(poi_features)
)
else:
logger.warning(
"No POI features collected (service may be unavailable)",
tenant_id=tenant_id
)
return poi_features or {}
except Exception as e:
logger.error(
"Failed to collect POI features, continuing without them",
tenant_id=tenant_id,
error=str(e),
exc_info=True
)
return {}
async def _collect_weather_data_with_timeout(
self,