205 lines
6.5 KiB
Python
205 lines
6.5 KiB
Python
"""
|
|
POI Feature Integrator
|
|
|
|
Integrates POI features into ML training pipeline.
|
|
Fetches POI context from External service and merges features into training data.
|
|
"""
|
|
|
|
import httpx
|
|
from typing import Dict, Any, Optional, List
|
|
import structlog
|
|
import pandas as pd
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class POIFeatureIntegrator:
|
|
"""
|
|
POI feature integration for ML training.
|
|
|
|
Fetches POI context from External service and adds features
|
|
to training dataframes for location-based demand forecasting.
|
|
"""
|
|
|
|
def __init__(self, external_service_url: str = "http://external-service:8000"):
|
|
"""
|
|
Initialize POI feature integrator.
|
|
|
|
Args:
|
|
external_service_url: Base URL for external service
|
|
"""
|
|
self.external_service_url = external_service_url.rstrip("/")
|
|
self.poi_context_endpoint = f"{self.external_service_url}/poi-context"
|
|
|
|
async def fetch_poi_features(
|
|
self,
|
|
tenant_id: str,
|
|
latitude: float,
|
|
longitude: float,
|
|
force_refresh: bool = False
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Fetch POI features for tenant location.
|
|
|
|
First checks if POI context exists, if not, triggers detection.
|
|
|
|
Args:
|
|
tenant_id: Tenant UUID
|
|
latitude: Bakery latitude
|
|
longitude: Bakery longitude
|
|
force_refresh: Force re-detection
|
|
|
|
Returns:
|
|
Dictionary with POI features or None if detection fails
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
# Try to get existing POI context first
|
|
if not force_refresh:
|
|
try:
|
|
response = await client.get(
|
|
f"{self.poi_context_endpoint}/{tenant_id}"
|
|
)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
poi_context = data.get("poi_context", {})
|
|
|
|
# Check if stale
|
|
if not data.get("is_stale", False):
|
|
logger.info(
|
|
"Using existing POI context",
|
|
tenant_id=tenant_id
|
|
)
|
|
return poi_context.get("ml_features", {})
|
|
else:
|
|
logger.info(
|
|
"POI context is stale, refreshing",
|
|
tenant_id=tenant_id
|
|
)
|
|
force_refresh = True
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code != 404:
|
|
raise
|
|
logger.info(
|
|
"No existing POI context, will detect",
|
|
tenant_id=tenant_id
|
|
)
|
|
|
|
# Detect or refresh POIs
|
|
logger.info(
|
|
"Detecting POIs for tenant",
|
|
tenant_id=tenant_id,
|
|
location=(latitude, longitude)
|
|
)
|
|
|
|
response = await client.post(
|
|
f"{self.poi_context_endpoint}/{tenant_id}/detect",
|
|
params={
|
|
"latitude": latitude,
|
|
"longitude": longitude,
|
|
"force_refresh": force_refresh
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
poi_context = result.get("poi_context", {})
|
|
ml_features = poi_context.get("ml_features", {})
|
|
|
|
logger.info(
|
|
"POI detection completed",
|
|
tenant_id=tenant_id,
|
|
total_pois=poi_context.get("total_pois_detected", 0),
|
|
feature_count=len(ml_features)
|
|
)
|
|
|
|
return ml_features
|
|
|
|
except httpx.HTTPError as e:
|
|
logger.error(
|
|
"Failed to fetch POI features",
|
|
tenant_id=tenant_id,
|
|
error=str(e),
|
|
exc_info=True
|
|
)
|
|
return None
|
|
except Exception as e:
|
|
logger.error(
|
|
"Unexpected error fetching POI features",
|
|
tenant_id=tenant_id,
|
|
error=str(e),
|
|
exc_info=True
|
|
)
|
|
return None
|
|
|
|
def add_poi_features_to_dataframe(
|
|
self,
|
|
df: pd.DataFrame,
|
|
poi_features: Dict[str, Any]
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Add POI features to training dataframe.
|
|
|
|
POI features are static (don't vary by date), so they're
|
|
broadcast to all rows in the dataframe.
|
|
|
|
Args:
|
|
df: Training dataframe
|
|
poi_features: Dictionary of POI ML features
|
|
|
|
Returns:
|
|
Dataframe with POI features added as columns
|
|
"""
|
|
if not poi_features:
|
|
logger.warning("No POI features to add")
|
|
return df
|
|
|
|
logger.info(
|
|
"Adding POI features to dataframe",
|
|
feature_count=len(poi_features),
|
|
dataframe_rows=len(df)
|
|
)
|
|
|
|
# Add each POI feature as a column with constant value
|
|
for feature_name, feature_value in poi_features.items():
|
|
df[feature_name] = feature_value
|
|
|
|
logger.info(
|
|
"POI features added successfully",
|
|
new_columns=list(poi_features.keys())
|
|
)
|
|
|
|
return df
|
|
|
|
def get_poi_feature_names(self, poi_features: Dict[str, Any]) -> List[str]:
|
|
"""
|
|
Get list of POI feature names for model registration.
|
|
|
|
Args:
|
|
poi_features: Dictionary of POI ML features
|
|
|
|
Returns:
|
|
List of feature names
|
|
"""
|
|
return list(poi_features.keys()) if poi_features else []
|
|
|
|
async def check_poi_service_health(self) -> bool:
|
|
"""
|
|
Check if POI service is accessible.
|
|
|
|
Returns:
|
|
True if service is healthy, False otherwise
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
response = await client.get(
|
|
f"{self.poi_context_endpoint}/health"
|
|
)
|
|
return response.status_code == 200
|
|
except Exception as e:
|
|
logger.error(
|
|
"POI service health check failed",
|
|
error=str(e)
|
|
)
|
|
return False
|