Files
bakery-ia/services/external/app/services/poi_detection_service.py

467 lines
16 KiB
Python

"""
POI Detection Service
Automated Point of Interest detection using Overpass API (OpenStreetMap).
Detects nearby POIs around bakery locations and generates ML features
for location-based demand forecasting.
"""
import overpy
from typing import List, Dict, Any, Tuple, Optional
from datetime import datetime, timezone, timedelta
import asyncio
import structlog
import httpx
from math import radians, sin, cos, sqrt, atan2
import random
from app.core.poi_config import (
POI_CATEGORIES,
OVERPASS_API_URL,
OVERPASS_TIMEOUT_SECONDS,
OVERPASS_MAX_RETRIES,
OVERPASS_RETRY_DELAY_SECONDS,
DISTANCE_BANDS
)
logger = structlog.get_logger()
class POIDetectionService:
"""
Automated POI detection using Overpass API (OpenStreetMap).
Detects points of interest near bakery locations and calculates
ML features for demand forecasting with location-specific context.
"""
def __init__(self, overpass_url: str = OVERPASS_API_URL):
self.overpass_url = overpass_url
self.api = overpy.Overpass(url=overpass_url)
self.timeout = OVERPASS_TIMEOUT_SECONDS
async def detect_pois_for_bakery(
self,
latitude: float,
longitude: float,
tenant_id: str
) -> Dict[str, Any]:
"""
Detect all POIs around a bakery location.
Args:
latitude: Bakery latitude
longitude: Bakery longitude
tenant_id: Tenant identifier for logging
Returns:
Complete POI detection results with ML features
"""
logger.info(
"Starting POI detection",
tenant_id=tenant_id,
location=(latitude, longitude)
)
poi_results = {}
detection_errors = []
# Query each POI category with inter-query delays
category_items = list(POI_CATEGORIES.items())
for idx, (category_key, category) in enumerate(category_items):
try:
pois = await self._query_pois_with_retry(
latitude,
longitude,
category.osm_query,
category.search_radius_m,
category_key
)
# Calculate features for this category
features = self._calculate_poi_features(
pois,
(latitude, longitude),
category
)
poi_results[category_key] = {
"pois": pois,
"features": features,
"count": len(pois)
}
logger.info(
f"Detected {category_key}",
count=len(pois),
proximity_score=features["proximity_score"]
)
# Add delay between categories to respect rate limits
# (except after the last category)
if idx < len(category_items) - 1:
inter_query_delay = 2.0 + random.uniform(0.5, 1.5)
await asyncio.sleep(inter_query_delay)
except Exception as e:
logger.error(
f"Failed to detect {category_key}",
error=str(e),
tenant_id=tenant_id
)
detection_errors.append({
"category": category_key,
"error": str(e)
})
poi_results[category_key] = {
"pois": [],
"features": self._get_empty_features(),
"count": 0,
"error": str(e)
}
# Add a longer delay after an error before continuing
if idx < len(category_items) - 1:
error_recovery_delay = 3.0 + random.uniform(1.0, 2.0)
await asyncio.sleep(error_recovery_delay)
# Generate combined ML features
ml_features = self._generate_ml_features(poi_results)
# Generate summary
summary = self._generate_summary(poi_results)
detection_status = "completed" if not detection_errors else (
"partial" if len(detection_errors) < len(POI_CATEGORIES) else "failed"
)
return {
"tenant_id": tenant_id,
"location": {"latitude": latitude, "longitude": longitude},
"detection_timestamp": datetime.now(timezone.utc).isoformat(),
"detection_status": detection_status,
"detection_errors": detection_errors if detection_errors else None,
"poi_categories": poi_results,
"ml_features": ml_features,
"summary": summary
}
async def _query_pois_with_retry(
self,
latitude: float,
longitude: float,
osm_query: str,
radius_m: int,
category_key: str
) -> List[Dict[str, Any]]:
"""
Query Overpass API with exponential backoff retry logic.
Implements:
- Exponential backoff with jitter
- Extended delays for rate limiting errors
- Proper error type detection
"""
last_error = None
base_delay = OVERPASS_RETRY_DELAY_SECONDS
for attempt in range(OVERPASS_MAX_RETRIES):
try:
return await self._query_pois(
latitude, longitude, osm_query, radius_m
)
except Exception as e:
last_error = e
error_message = str(e).lower()
# Determine if this is a rate limiting error
is_rate_limit = any(phrase in error_message for phrase in [
'too many requests',
'rate limit',
'server load too high',
'quota exceeded',
'retry later',
'429',
'503',
'504'
])
if attempt < OVERPASS_MAX_RETRIES - 1:
# Calculate exponential backoff with jitter
# For rate limiting: use longer delays (10-30 seconds)
# For other errors: use standard backoff (2-8 seconds)
if is_rate_limit:
delay = base_delay * (3 ** attempt) + random.uniform(1, 5)
delay = min(delay, 30) # Cap at 30 seconds
else:
delay = base_delay * (2 ** attempt) + random.uniform(0.5, 1.5)
delay = min(delay, 10) # Cap at 10 seconds
logger.warning(
f"POI query retry {attempt + 1}/{OVERPASS_MAX_RETRIES}",
category=category_key,
error=str(e),
is_rate_limit=is_rate_limit,
retry_delay=f"{delay:.1f}s"
)
await asyncio.sleep(delay)
else:
logger.error(
"POI query failed after all retries",
category=category_key,
error=str(e),
is_rate_limit=is_rate_limit
)
raise last_error
async def _query_pois(
self,
latitude: float,
longitude: float,
osm_query: str,
radius_m: int
) -> List[Dict[str, Any]]:
"""
Query Overpass API for POIs in radius.
Raises:
Exception: With descriptive error message from Overpass API
"""
# Build Overpass QL query
query = f"""
[out:json][timeout:{self.timeout}];
(
node{osm_query}(around:{radius_m},{latitude},{longitude});
way{osm_query}(around:{radius_m},{latitude},{longitude});
);
out center;
"""
# Execute query (use asyncio thread pool for blocking overpy)
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(
None,
self.api.query,
query
)
except overpy.exception.OverpassTooManyRequests as e:
# Explicitly handle rate limiting
raise Exception("Too many requests - Overpass API rate limit exceeded") from e
except overpy.exception.OverpassGatewayTimeout as e:
# Query took too long
raise Exception("Gateway timeout - query too complex or server busy") from e
except overpy.exception.OverpassBadRequest as e:
# Query syntax error
raise Exception(f"Bad request - invalid query syntax: {str(e)}") from e
except Exception as e:
# Check if it's an HTTP error with status code
error_msg = str(e).lower()
if '429' in error_msg or 'too many' in error_msg:
raise Exception("Too many requests - rate limit exceeded") from e
elif '503' in error_msg or 'load too high' in error_msg:
raise Exception("Server load too high - Overpass API overloaded") from e
elif '504' in error_msg or 'timeout' in error_msg:
raise Exception("Gateway timeout - server busy") from e
else:
# Re-raise with original message
raise
# Parse results
pois = []
# Process nodes
for node in result.nodes:
pois.append({
"osm_id": str(node.id),
"type": "node",
"lat": float(node.lat),
"lon": float(node.lon),
"tags": dict(node.tags),
"name": node.tags.get("name", "Unnamed")
})
# Process ways (buildings, areas)
for way in result.ways:
# Get center point
if hasattr(way, 'center_lat') and way.center_lat:
lat, lon = float(way.center_lat), float(way.center_lon)
else:
# Calculate centroid from nodes
if way.nodes:
lats = [float(node.lat) for node in way.nodes]
lons = [float(node.lon) for node in way.nodes]
lat = sum(lats) / len(lats)
lon = sum(lons) / len(lons)
else:
continue
pois.append({
"osm_id": str(way.id),
"type": "way",
"lat": lat,
"lon": lon,
"tags": dict(way.tags),
"name": way.tags.get("name", "Unnamed")
})
return pois
def _calculate_poi_features(
self,
pois: List[Dict[str, Any]],
bakery_location: Tuple[float, float],
category
) -> Dict[str, float]:
"""Calculate ML features for POI category"""
if not pois:
return self._get_empty_features()
# Calculate distances
distances = []
for poi in pois:
dist_km = self._haversine_distance(
bakery_location,
(poi["lat"], poi["lon"])
)
distances.append(dist_km * 1000) # Convert to meters
# Feature Tier 1: Proximity Scores (PRIMARY)
proximity_score = sum(1.0 / (1.0 + d/1000) for d in distances)
weighted_proximity_score = proximity_score * category.weight
# Feature Tier 2: Distance Band Counts
count_0_100m = sum(1 for d in distances if d <= 100)
count_100_300m = sum(1 for d in distances if 100 < d <= 300)
count_300_500m = sum(1 for d in distances if 300 < d <= 500)
count_500_1000m = sum(1 for d in distances if 500 < d <= 1000)
# Feature Tier 3: Distance to Nearest
distance_to_nearest_m = min(distances) if distances else 9999.0
# Feature Tier 4: Binary Flags
has_within_100m = any(d <= 100 for d in distances)
has_within_300m = any(d <= 300 for d in distances)
has_within_500m = any(d <= 500 for d in distances)
return {
# Tier 1: Proximity scores (PRIMARY for ML)
"proximity_score": round(proximity_score, 4),
"weighted_proximity_score": round(weighted_proximity_score, 4),
# Tier 2: Distance bands
"count_0_100m": count_0_100m,
"count_100_300m": count_100_300m,
"count_300_500m": count_300_500m,
"count_500_1000m": count_500_1000m,
"total_count": len(pois),
# Tier 3: Distance to nearest
"distance_to_nearest_m": round(distance_to_nearest_m, 1),
# Tier 4: Binary flags
"has_within_100m": has_within_100m,
"has_within_300m": has_within_300m,
"has_within_500m": has_within_500m
}
def _generate_ml_features(self, poi_results: Dict[str, Any]) -> Dict[str, float]:
"""
Generate flat feature dictionary for ML model ingestion.
These features will be added to Prophet/XGBoost as regressors.
"""
ml_features = {}
for category_key, data in poi_results.items():
features = data.get("features", {})
# Flatten with category prefix
for feature_name, value in features.items():
ml_feature_name = f"poi_{category_key}_{feature_name}"
# Convert boolean to int for ML
if isinstance(value, bool):
value = 1 if value else 0
ml_features[ml_feature_name] = value
return ml_features
def _get_empty_features(self) -> Dict[str, float]:
"""Return zero features when no POIs found"""
return {
"proximity_score": 0.0,
"weighted_proximity_score": 0.0,
"count_0_100m": 0,
"count_100_300m": 0,
"count_300_500m": 0,
"count_500_1000m": 0,
"total_count": 0,
"distance_to_nearest_m": 9999.0,
"has_within_100m": False,
"has_within_300m": False,
"has_within_500m": False
}
def _haversine_distance(
self,
coord1: Tuple[float, float],
coord2: Tuple[float, float]
) -> float:
"""
Calculate distance between two coordinates in kilometers.
Uses Haversine formula for great-circle distance.
"""
lat1, lon1 = coord1
lat2, lon2 = coord2
R = 6371 # Earth radius in km
dlat = radians(lat2 - lat1)
dlon = radians(lon2 - lon1)
a = (sin(dlat/2)**2 +
cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon/2)**2)
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
def _generate_summary(self, poi_results: Dict[str, Any]) -> Dict[str, Any]:
"""Generate human-readable summary"""
total_pois = sum(r["count"] for r in poi_results.values())
categories_with_pois = [
k for k, v in poi_results.items() if v["count"] > 0
]
high_impact_categories = [
k for k, v in poi_results.items()
if v["features"]["proximity_score"] > 2.0
]
return {
"total_pois_detected": total_pois,
"categories_with_pois": categories_with_pois,
"high_impact_categories": high_impact_categories,
"categories_count": len(categories_with_pois)
}
async def health_check(self) -> Dict[str, Any]:
"""Check if Overpass API is accessible"""
try:
async with httpx.AsyncClient(timeout=5) as client:
response = await client.get(f"{self.overpass_url}/status")
is_healthy = response.status_code == 200
return {
"healthy": is_healthy,
"status_code": response.status_code,
"url": self.overpass_url
}
except Exception as e:
return {
"healthy": False,
"error": str(e),
"url": self.overpass_url
}