Files
bakery-ia/EXTERNAL_DATA_SERVICE_REDESIGN.md

2661 lines
78 KiB
Markdown

# External Data Service Architectural Redesign
**Project:** Bakery IA - External Data Service
**Version:** 2.0.0
**Date:** 2025-10-07
**Status:** Complete Architecture & Implementation Plan
---
## Executive Summary
This document provides a complete architectural redesign of the external data service to eliminate redundant per-tenant data fetching, enable multi-city support, implement automated 24-month rolling windows, and leverage Kubernetes for lifecycle management.
### Key Problems Addressed
1.**Per-tenant redundant fetching** → Centralized city-based data storage
2.**Geographic limitation (Madrid only)** → Multi-city extensible architecture
3.**Redundant downloads for same city** → Shared data layer with geolocation mapping
4.**Slow training pipeline** → Pre-populated historical datasets via K8s Jobs
5.**Static data windows** → Automated 24-month rolling updates via CronJobs
---
## Part 1: High-Level Architecture
### 1.1 Architecture Overview
```
┌─────────────────────────────────────────────────────────────────────┐
│ KUBERNETES ORCHESTRATION │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Init Job │ │ Monthly CronJob │ │
│ │ (One-time) │ │ (Scheduled) │ │
│ ├──────────────────┤ ├──────────────────┤ │
│ │ • Load 24 months │ │ • Expire old │ │
│ │ • All cities │ │ • Ingest new │ │
│ │ • Traffic + Wx │ │ • Rotate window │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ └────────┬────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Data Ingestion Manager │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ Madrid │ │ Valencia │ │ Barcelona │ │ │
│ │ │ Adapter │ │ Adapter │ │ Adapter │ ... │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ └─────────────────────────┬────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Shared Storage Layer (PostgreSQL + Redis) │ │
│ │ - City-based historical data (24-month window) │ │
│ │ - Traffic: city_traffic_data table │ │
│ │ - Weather: city_weather_data table │ │
│ │ - Redis cache for fast access during training │ │
│ └─────────────────────────┬────────────────────────────────┘ │
│ │ │
└────────────────────────────┼─────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ External Data Service (FastAPI) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Geolocation Mapper: Tenant → City │ │
│ │ - Maps (lat, lon) to nearest supported city │ │
│ │ - Returns city-specific cached data │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ API Endpoints │ │
│ │ GET /api/v1/tenants/{id}/external/historical-weather │ │
│ │ GET /api/v1/tenants/{id}/external/historical-traffic │ │
│ │ GET /api/v1/cities │ │
│ │ GET /api/v1/cities/{city_id}/data-availability │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└───────────────────────────────┬───────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Training Service Consumer │
│ - Requests historical data for tenant location │
│ - Receives pre-populated city data (instant response) │
│ - No waiting for external API calls │
└─────────────────────────────────────────────────────────────────────┘
```
### 1.2 Data Flow
#### **Initialization Phase (Kubernetes Job)**
```
1. Job starts → Read city registry config
2. For each city:
a. Instantiate city-specific adapter (Madrid, Valencia, etc.)
b. Fetch last 24 months of traffic data
c. Fetch last 24 months of weather data
d. Store in shared PostgreSQL tables (city_id indexed)
e. Warm Redis cache
3. Job completes → Service deployment readiness probe passes
```
#### **Monthly Maintenance (Kubernetes CronJob)**
```
1. CronJob triggers (1st of month, 2am UTC)
2. For each city:
a. Delete data older than 24 months
b. Fetch latest available month's data
c. Append to shared tables
d. Invalidate old cache entries
3. Log completion metrics
```
#### **Runtime Request Flow**
```
1. Training service → GET /api/v1/tenants/{id}/external/historical-traffic
2. External service:
a. Extract tenant lat/lon from tenant profile
b. Geolocation mapper → Find nearest city
c. Query city_traffic_data WHERE city_id=X AND date BETWEEN ...
d. Return cached results (< 100ms)
3. Training service receives data instantly
```
---
## Part 2: Component Breakdown
### 2.1 City Registry & Geolocation Mapper
**File:** `services/external/app/registry/city_registry.py`
```python
# services/external/app/registry/city_registry.py
"""
City Registry - Configuration-driven multi-city support
"""
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from enum import Enum
import math
class Country(str, Enum):
SPAIN = "ES"
FRANCE = "FR"
# Extensible
class WeatherProvider(str, Enum):
AEMET = "aemet" # Spain
METEO_FRANCE = "meteo_france" # France
OPEN_WEATHER = "open_weather" # Global fallback
class TrafficProvider(str, Enum):
MADRID_OPENDATA = "madrid_opendata"
VALENCIA_OPENDATA = "valencia_opendata"
BARCELONA_OPENDATA = "barcelona_opendata"
@dataclass
class CityDefinition:
"""City configuration with data source specifications"""
city_id: str
name: str
country: Country
latitude: float
longitude: float
radius_km: float # Coverage radius
# Data providers
weather_provider: WeatherProvider
weather_config: Dict[str, Any] # Provider-specific config
traffic_provider: TrafficProvider
traffic_config: Dict[str, Any]
# Metadata
timezone: str
population: int
enabled: bool = True
class CityRegistry:
"""Central registry of supported cities"""
CITIES: List[CityDefinition] = [
CityDefinition(
city_id="madrid",
name="Madrid",
country=Country.SPAIN,
latitude=40.4168,
longitude=-3.7038,
radius_km=30.0,
weather_provider=WeatherProvider.AEMET,
weather_config={
"station_ids": ["3195", "3129", "3197"],
"municipality_code": "28079"
},
traffic_provider=TrafficProvider.MADRID_OPENDATA,
traffic_config={
"current_xml_url": "https://datos.madrid.es/egob/catalogo/...",
"historical_base_url": "https://datos.madrid.es/...",
"measurement_points_csv": "https://datos.madrid.es/..."
},
timezone="Europe/Madrid",
population=3_200_000
),
CityDefinition(
city_id="valencia",
name="Valencia",
country=Country.SPAIN,
latitude=39.4699,
longitude=-0.3763,
radius_km=25.0,
weather_provider=WeatherProvider.AEMET,
weather_config={
"station_ids": ["8416"],
"municipality_code": "46250"
},
traffic_provider=TrafficProvider.VALENCIA_OPENDATA,
traffic_config={
"api_endpoint": "https://valencia.opendatasoft.com/api/..."
},
timezone="Europe/Madrid",
population=800_000,
enabled=False # Not yet implemented
),
CityDefinition(
city_id="barcelona",
name="Barcelona",
country=Country.SPAIN,
latitude=41.3851,
longitude=2.1734,
radius_km=30.0,
weather_provider=WeatherProvider.AEMET,
weather_config={
"station_ids": ["0076"],
"municipality_code": "08019"
},
traffic_provider=TrafficProvider.BARCELONA_OPENDATA,
traffic_config={
"api_endpoint": "https://opendata-ajuntament.barcelona.cat/..."
},
timezone="Europe/Madrid",
population=1_600_000,
enabled=False # Not yet implemented
)
]
@classmethod
def get_enabled_cities(cls) -> List[CityDefinition]:
"""Get all enabled cities"""
return [city for city in cls.CITIES if city.enabled]
@classmethod
def get_city(cls, city_id: str) -> Optional[CityDefinition]:
"""Get city by ID"""
for city in cls.CITIES:
if city.city_id == city_id:
return city
return None
@classmethod
def find_nearest_city(cls, latitude: float, longitude: float) -> Optional[CityDefinition]:
"""Find nearest enabled city to coordinates"""
enabled_cities = cls.get_enabled_cities()
if not enabled_cities:
return None
min_distance = float('inf')
nearest_city = None
for city in enabled_cities:
distance = cls._haversine_distance(
latitude, longitude,
city.latitude, city.longitude
)
if distance <= city.radius_km and distance < min_distance:
min_distance = distance
nearest_city = city
return nearest_city
@staticmethod
def _haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance in km between two coordinates"""
R = 6371 # Earth radius in km
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) ** 2 +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) ** 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
```
**File:** `services/external/app/registry/geolocation_mapper.py`
```python
# services/external/app/registry/geolocation_mapper.py
"""
Geolocation Mapper - Maps tenant locations to cities
"""
from typing import Optional, Tuple
import structlog
from .city_registry import CityRegistry, CityDefinition
logger = structlog.get_logger()
class GeolocationMapper:
"""Maps tenant coordinates to nearest supported city"""
def __init__(self):
self.registry = CityRegistry()
def map_tenant_to_city(
self,
latitude: float,
longitude: float
) -> Optional[Tuple[CityDefinition, float]]:
"""
Map tenant coordinates to nearest city
Returns:
Tuple of (CityDefinition, distance_km) or None if no match
"""
nearest_city = self.registry.find_nearest_city(latitude, longitude)
if not nearest_city:
logger.warning(
"No supported city found for coordinates",
lat=latitude,
lon=longitude
)
return None
distance = self.registry._haversine_distance(
latitude, longitude,
nearest_city.latitude, nearest_city.longitude
)
logger.info(
"Mapped tenant to city",
lat=latitude,
lon=longitude,
city=nearest_city.name,
distance_km=round(distance, 2)
)
return (nearest_city, distance)
def validate_location_support(self, latitude: float, longitude: float) -> bool:
"""Check if coordinates are supported"""
result = self.map_tenant_to_city(latitude, longitude)
return result is not None
```
### 2.2 Data Ingestion Manager with Adapter Pattern
**File:** `services/external/app/ingestion/base_adapter.py`
```python
# services/external/app/ingestion/base_adapter.py
"""
Base adapter interface for city-specific data sources
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from datetime import datetime
class CityDataAdapter(ABC):
"""Abstract base class for city-specific data adapters"""
def __init__(self, city_id: str, config: Dict[str, Any]):
self.city_id = city_id
self.config = config
@abstractmethod
async def fetch_historical_weather(
self,
start_date: datetime,
end_date: datetime
) -> List[Dict[str, Any]]:
"""Fetch historical weather data for date range"""
pass
@abstractmethod
async def fetch_historical_traffic(
self,
start_date: datetime,
end_date: datetime
) -> List[Dict[str, Any]]:
"""Fetch historical traffic data for date range"""
pass
@abstractmethod
async def validate_connection(self) -> bool:
"""Validate connection to data source"""
pass
def get_city_id(self) -> str:
"""Get city identifier"""
return self.city_id
```
**File:** `services/external/app/ingestion/adapters/madrid_adapter.py`
```python
# services/external/app/ingestion/adapters/madrid_adapter.py
"""
Madrid city data adapter - Uses existing AEMET and Madrid OpenData clients
"""
from typing import List, Dict, Any
from datetime import datetime
import structlog
from ..base_adapter import CityDataAdapter
from app.external.aemet import AEMETClient
from app.external.apis.madrid_traffic_client import MadridTrafficClient
logger = structlog.get_logger()
class MadridAdapter(CityDataAdapter):
"""Adapter for Madrid using AEMET + Madrid OpenData"""
def __init__(self, city_id: str, config: Dict[str, Any]):
super().__init__(city_id, config)
self.aemet_client = AEMETClient()
self.traffic_client = MadridTrafficClient()
# Madrid center coordinates
self.madrid_lat = 40.4168
self.madrid_lon = -3.7038
async def fetch_historical_weather(
self,
start_date: datetime,
end_date: datetime
) -> List[Dict[str, Any]]:
"""Fetch historical weather from AEMET"""
try:
logger.info(
"Fetching Madrid historical weather",
start=start_date.isoformat(),
end=end_date.isoformat()
)
weather_data = await self.aemet_client.get_historical_weather(
self.madrid_lat,
self.madrid_lon,
start_date,
end_date
)
# Enrich with city_id
for record in weather_data:
record['city_id'] = self.city_id
record['city_name'] = 'Madrid'
logger.info(
"Madrid weather data fetched",
records=len(weather_data)
)
return weather_data
except Exception as e:
logger.error("Error fetching Madrid weather", error=str(e))
return []
async def fetch_historical_traffic(
self,
start_date: datetime,
end_date: datetime
) -> List[Dict[str, Any]]:
"""Fetch historical traffic from Madrid OpenData"""
try:
logger.info(
"Fetching Madrid historical traffic",
start=start_date.isoformat(),
end=end_date.isoformat()
)
traffic_data = await self.traffic_client.get_historical_traffic(
self.madrid_lat,
self.madrid_lon,
start_date,
end_date
)
# Enrich with city_id
for record in traffic_data:
record['city_id'] = self.city_id
record['city_name'] = 'Madrid'
logger.info(
"Madrid traffic data fetched",
records=len(traffic_data)
)
return traffic_data
except Exception as e:
logger.error("Error fetching Madrid traffic", error=str(e))
return []
async def validate_connection(self) -> bool:
"""Validate connection to AEMET and Madrid OpenData"""
try:
# Test weather connection
test_weather = await self.aemet_client.get_current_weather(
self.madrid_lat,
self.madrid_lon
)
# Test traffic connection
test_traffic = await self.traffic_client.get_current_traffic(
self.madrid_lat,
self.madrid_lon
)
return test_weather is not None and test_traffic is not None
except Exception as e:
logger.error("Madrid adapter connection validation failed", error=str(e))
return False
```
**File:** `services/external/app/ingestion/adapters/__init__.py`
```python
# services/external/app/ingestion/adapters/__init__.py
"""
Adapter registry - Maps city IDs to adapter implementations
"""
from typing import Dict, Type
from ..base_adapter import CityDataAdapter
from .madrid_adapter import MadridAdapter
# Registry: city_id → Adapter class
ADAPTER_REGISTRY: Dict[str, Type[CityDataAdapter]] = {
"madrid": MadridAdapter,
# "valencia": ValenciaAdapter, # Future
# "barcelona": BarcelonaAdapter, # Future
}
def get_adapter(city_id: str, config: Dict) -> CityDataAdapter:
"""Factory to instantiate appropriate adapter"""
adapter_class = ADAPTER_REGISTRY.get(city_id)
if not adapter_class:
raise ValueError(f"No adapter registered for city: {city_id}")
return adapter_class(city_id, config)
```
**File:** `services/external/app/ingestion/ingestion_manager.py`
```python
# services/external/app/ingestion/ingestion_manager.py
"""
Data Ingestion Manager - Coordinates multi-city data collection
"""
from typing import List, Dict, Any
from datetime import datetime, timedelta
import structlog
import asyncio
from app.registry.city_registry import CityRegistry
from .adapters import get_adapter
from app.repositories.city_data_repository import CityDataRepository
from app.core.database import database_manager
logger = structlog.get_logger()
class DataIngestionManager:
"""Orchestrates data ingestion across all cities"""
def __init__(self):
self.registry = CityRegistry()
self.database_manager = database_manager
async def initialize_all_cities(self, months: int = 24):
"""
Initialize historical data for all enabled cities
Called by Kubernetes Init Job
"""
enabled_cities = self.registry.get_enabled_cities()
logger.info(
"Starting full data initialization",
cities=len(enabled_cities),
months=months
)
# Calculate date range
end_date = datetime.now()
start_date = end_date - timedelta(days=months * 30)
# Process cities concurrently
tasks = [
self.initialize_city(city.city_id, start_date, end_date)
for city in enabled_cities
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Log results
successes = sum(1 for r in results if r is True)
failures = len(results) - successes
logger.info(
"Data initialization complete",
total=len(results),
successes=successes,
failures=failures
)
return successes == len(results)
async def initialize_city(
self,
city_id: str,
start_date: datetime,
end_date: datetime
) -> bool:
"""Initialize historical data for a single city"""
try:
city = self.registry.get_city(city_id)
if not city:
logger.error("City not found", city_id=city_id)
return False
logger.info(
"Initializing city data",
city=city.name,
start=start_date.date(),
end=end_date.date()
)
# Get appropriate adapter
adapter = get_adapter(
city_id,
{
"weather_config": city.weather_config,
"traffic_config": city.traffic_config
}
)
# Validate connection
if not await adapter.validate_connection():
logger.error("Adapter validation failed", city=city.name)
return False
# Fetch weather data
weather_data = await adapter.fetch_historical_weather(
start_date, end_date
)
# Fetch traffic data
traffic_data = await adapter.fetch_historical_traffic(
start_date, end_date
)
# Store in database
async with self.database_manager.get_session() as session:
repo = CityDataRepository(session)
weather_stored = await repo.bulk_store_weather(
city_id, weather_data
)
traffic_stored = await repo.bulk_store_traffic(
city_id, traffic_data
)
logger.info(
"City initialization complete",
city=city.name,
weather_records=weather_stored,
traffic_records=traffic_stored
)
return True
except Exception as e:
logger.error(
"City initialization failed",
city_id=city_id,
error=str(e)
)
return False
async def rotate_monthly_data(self):
"""
Rotate 24-month window: delete old, ingest new
Called by Kubernetes CronJob monthly
"""
enabled_cities = self.registry.get_enabled_cities()
logger.info("Starting monthly data rotation", cities=len(enabled_cities))
now = datetime.now()
cutoff_date = now - timedelta(days=24 * 30) # 24 months ago
# Last month's date range
last_month_end = now.replace(day=1) - timedelta(days=1)
last_month_start = last_month_end.replace(day=1)
tasks = []
for city in enabled_cities:
tasks.append(
self._rotate_city_data(
city.city_id,
cutoff_date,
last_month_start,
last_month_end
)
)
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = sum(1 for r in results if r is True)
logger.info(
"Monthly rotation complete",
total=len(results),
successes=successes
)
async def _rotate_city_data(
self,
city_id: str,
cutoff_date: datetime,
new_start: datetime,
new_end: datetime
) -> bool:
"""Rotate data for a single city"""
try:
city = self.registry.get_city(city_id)
if not city:
return False
logger.info(
"Rotating city data",
city=city.name,
cutoff=cutoff_date.date(),
new_month=new_start.strftime("%Y-%m")
)
async with self.database_manager.get_session() as session:
repo = CityDataRepository(session)
# Delete old data
deleted_weather = await repo.delete_weather_before(
city_id, cutoff_date
)
deleted_traffic = await repo.delete_traffic_before(
city_id, cutoff_date
)
logger.info(
"Old data deleted",
city=city.name,
weather_deleted=deleted_weather,
traffic_deleted=deleted_traffic
)
# Fetch new month's data
adapter = get_adapter(city_id, {
"weather_config": city.weather_config,
"traffic_config": city.traffic_config
})
new_weather = await adapter.fetch_historical_weather(
new_start, new_end
)
new_traffic = await adapter.fetch_historical_traffic(
new_start, new_end
)
# Store new data
async with self.database_manager.get_session() as session:
repo = CityDataRepository(session)
weather_stored = await repo.bulk_store_weather(
city_id, new_weather
)
traffic_stored = await repo.bulk_store_traffic(
city_id, new_traffic
)
logger.info(
"New data ingested",
city=city.name,
weather_added=weather_stored,
traffic_added=traffic_stored
)
return True
except Exception as e:
logger.error(
"City rotation failed",
city_id=city_id,
error=str(e)
)
return False
```
### 2.3 Shared Storage/Cache Interface
**File:** `services/external/app/repositories/city_data_repository.py`
```python
# services/external/app/repositories/city_data_repository.py
"""
City Data Repository - Manages shared city-based data storage
"""
from typing import List, Dict, Any, Optional
from datetime import datetime
from sqlalchemy import select, delete, and_
from sqlalchemy.ext.asyncio import AsyncSession
import structlog
from app.models.city_weather import CityWeatherData
from app.models.city_traffic import CityTrafficData
logger = structlog.get_logger()
class CityDataRepository:
"""Repository for city-based historical data"""
def __init__(self, session: AsyncSession):
self.session = session
# ============= WEATHER OPERATIONS =============
async def bulk_store_weather(
self,
city_id: str,
weather_records: List[Dict[str, Any]]
) -> int:
"""Bulk insert weather records for a city"""
if not weather_records:
return 0
try:
objects = []
for record in weather_records:
obj = CityWeatherData(
city_id=city_id,
date=record.get('date'),
temperature=record.get('temperature'),
precipitation=record.get('precipitation'),
humidity=record.get('humidity'),
wind_speed=record.get('wind_speed'),
pressure=record.get('pressure'),
description=record.get('description'),
source=record.get('source', 'ingestion'),
raw_data=record.get('raw_data')
)
objects.append(obj)
self.session.add_all(objects)
await self.session.commit()
logger.info(
"Weather data stored",
city_id=city_id,
records=len(objects)
)
return len(objects)
except Exception as e:
await self.session.rollback()
logger.error(
"Error storing weather data",
city_id=city_id,
error=str(e)
)
raise
async def get_weather_by_city_and_range(
self,
city_id: str,
start_date: datetime,
end_date: datetime
) -> List[CityWeatherData]:
"""Get weather data for city within date range"""
stmt = select(CityWeatherData).where(
and_(
CityWeatherData.city_id == city_id,
CityWeatherData.date >= start_date,
CityWeatherData.date <= end_date
)
).order_by(CityWeatherData.date)
result = await self.session.execute(stmt)
return result.scalars().all()
async def delete_weather_before(
self,
city_id: str,
cutoff_date: datetime
) -> int:
"""Delete weather records older than cutoff date"""
stmt = delete(CityWeatherData).where(
and_(
CityWeatherData.city_id == city_id,
CityWeatherData.date < cutoff_date
)
)
result = await self.session.execute(stmt)
await self.session.commit()
return result.rowcount
# ============= TRAFFIC OPERATIONS =============
async def bulk_store_traffic(
self,
city_id: str,
traffic_records: List[Dict[str, Any]]
) -> int:
"""Bulk insert traffic records for a city"""
if not traffic_records:
return 0
try:
objects = []
for record in traffic_records:
obj = CityTrafficData(
city_id=city_id,
date=record.get('date'),
traffic_volume=record.get('traffic_volume'),
pedestrian_count=record.get('pedestrian_count'),
congestion_level=record.get('congestion_level'),
average_speed=record.get('average_speed'),
source=record.get('source', 'ingestion'),
raw_data=record.get('raw_data')
)
objects.append(obj)
self.session.add_all(objects)
await self.session.commit()
logger.info(
"Traffic data stored",
city_id=city_id,
records=len(objects)
)
return len(objects)
except Exception as e:
await self.session.rollback()
logger.error(
"Error storing traffic data",
city_id=city_id,
error=str(e)
)
raise
async def get_traffic_by_city_and_range(
self,
city_id: str,
start_date: datetime,
end_date: datetime
) -> List[CityTrafficData]:
"""Get traffic data for city within date range"""
stmt = select(CityTrafficData).where(
and_(
CityTrafficData.city_id == city_id,
CityTrafficData.date >= start_date,
CityTrafficData.date <= end_date
)
).order_by(CityTrafficData.date)
result = await self.session.execute(stmt)
return result.scalars().all()
async def delete_traffic_before(
self,
city_id: str,
cutoff_date: datetime
) -> int:
"""Delete traffic records older than cutoff date"""
stmt = delete(CityTrafficData).where(
and_(
CityTrafficData.city_id == city_id,
CityTrafficData.date < cutoff_date
)
)
result = await self.session.execute(stmt)
await self.session.commit()
return result.rowcount
```
**Database Models:**
**File:** `services/external/app/models/city_weather.py`
```python
# services/external/app/models/city_weather.py
"""
City Weather Data Model - Shared city-based weather storage
"""
from sqlalchemy import Column, String, Float, DateTime, Text, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from datetime import datetime
import uuid
from app.core.database import Base
class CityWeatherData(Base):
"""City-based historical weather data"""
__tablename__ = "city_weather_data"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
city_id = Column(String(50), nullable=False, index=True)
date = Column(DateTime(timezone=True), nullable=False, index=True)
# Weather metrics
temperature = Column(Float, nullable=True)
precipitation = Column(Float, nullable=True)
humidity = Column(Float, nullable=True)
wind_speed = Column(Float, nullable=True)
pressure = Column(Float, nullable=True)
description = Column(String(200), nullable=True)
# Metadata
source = Column(String(50), nullable=False)
raw_data = Column(JSONB, nullable=True)
# Timestamps
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
updated_at = Column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)
# Composite index for fast queries
__table_args__ = (
Index('idx_city_weather_lookup', 'city_id', 'date'),
)
```
**File:** `services/external/app/models/city_traffic.py`
```python
# services/external/app/models/city_traffic.py
"""
City Traffic Data Model - Shared city-based traffic storage
"""
from sqlalchemy import Column, String, Integer, Float, DateTime, Text, Index
from sqlalchemy.dialects.postgresql import UUID, JSONB
from datetime import datetime
import uuid
from app.core.database import Base
class CityTrafficData(Base):
"""City-based historical traffic data"""
__tablename__ = "city_traffic_data"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
city_id = Column(String(50), nullable=False, index=True)
date = Column(DateTime(timezone=True), nullable=False, index=True)
# Traffic metrics
traffic_volume = Column(Integer, nullable=True)
pedestrian_count = Column(Integer, nullable=True)
congestion_level = Column(String(20), nullable=True)
average_speed = Column(Float, nullable=True)
# Metadata
source = Column(String(50), nullable=False)
raw_data = Column(JSONB, nullable=True)
# Timestamps
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
updated_at = Column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)
# Composite index for fast queries
__table_args__ = (
Index('idx_city_traffic_lookup', 'city_id', 'date'),
)
```
### 2.4 Redis Cache Layer
**File:** `services/external/app/cache/redis_cache.py`
```python
# services/external/app/cache/redis_cache.py
"""
Redis cache layer for fast training data access
"""
from typing import List, Dict, Any, Optional
import json
from datetime import datetime, timedelta
import structlog
import redis.asyncio as redis
from app.core.config import settings
logger = structlog.get_logger()
class ExternalDataCache:
"""Redis cache for external data service"""
def __init__(self):
self.redis_client = redis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=True
)
self.ttl = 86400 * 7 # 7 days
# ============= WEATHER CACHE =============
def _weather_cache_key(
self,
city_id: str,
start_date: datetime,
end_date: datetime
) -> str:
"""Generate cache key for weather data"""
return f"weather:{city_id}:{start_date.date()}:{end_date.date()}"
async def get_cached_weather(
self,
city_id: str,
start_date: datetime,
end_date: datetime
) -> Optional[List[Dict[str, Any]]]:
"""Get cached weather data"""
try:
key = self._weather_cache_key(city_id, start_date, end_date)
cached = await self.redis_client.get(key)
if cached:
logger.debug("Weather cache hit", city_id=city_id, key=key)
return json.loads(cached)
logger.debug("Weather cache miss", city_id=city_id, key=key)
return None
except Exception as e:
logger.error("Error reading weather cache", error=str(e))
return None
async def set_cached_weather(
self,
city_id: str,
start_date: datetime,
end_date: datetime,
data: List[Dict[str, Any]]
):
"""Set cached weather data"""
try:
key = self._weather_cache_key(city_id, start_date, end_date)
# Serialize datetime objects
serializable_data = []
for record in data:
record_copy = record.copy()
if isinstance(record_copy.get('date'), datetime):
record_copy['date'] = record_copy['date'].isoformat()
serializable_data.append(record_copy)
await self.redis_client.setex(
key,
self.ttl,
json.dumps(serializable_data)
)
logger.debug("Weather data cached", city_id=city_id, records=len(data))
except Exception as e:
logger.error("Error caching weather data", error=str(e))
# ============= TRAFFIC CACHE =============
def _traffic_cache_key(
self,
city_id: str,
start_date: datetime,
end_date: datetime
) -> str:
"""Generate cache key for traffic data"""
return f"traffic:{city_id}:{start_date.date()}:{end_date.date()}"
async def get_cached_traffic(
self,
city_id: str,
start_date: datetime,
end_date: datetime
) -> Optional[List[Dict[str, Any]]]:
"""Get cached traffic data"""
try:
key = self._traffic_cache_key(city_id, start_date, end_date)
cached = await self.redis_client.get(key)
if cached:
logger.debug("Traffic cache hit", city_id=city_id, key=key)
return json.loads(cached)
logger.debug("Traffic cache miss", city_id=city_id, key=key)
return None
except Exception as e:
logger.error("Error reading traffic cache", error=str(e))
return None
async def set_cached_traffic(
self,
city_id: str,
start_date: datetime,
end_date: datetime,
data: List[Dict[str, Any]]
):
"""Set cached traffic data"""
try:
key = self._traffic_cache_key(city_id, start_date, end_date)
# Serialize datetime objects
serializable_data = []
for record in data:
record_copy = record.copy()
if isinstance(record_copy.get('date'), datetime):
record_copy['date'] = record_copy['date'].isoformat()
serializable_data.append(record_copy)
await self.redis_client.setex(
key,
self.ttl,
json.dumps(serializable_data)
)
logger.debug("Traffic data cached", city_id=city_id, records=len(data))
except Exception as e:
logger.error("Error caching traffic data", error=str(e))
async def invalidate_city_cache(self, city_id: str):
"""Invalidate all cache entries for a city"""
try:
pattern = f"*:{city_id}:*"
async for key in self.redis_client.scan_iter(match=pattern):
await self.redis_client.delete(key)
logger.info("City cache invalidated", city_id=city_id)
except Exception as e:
logger.error("Error invalidating cache", error=str(e))
```
---
## Part 3: Kubernetes Manifests
### 3.1 Init Job - Initial Data Load
**File:** `infrastructure/kubernetes/external/init-job.yaml`
```yaml
# infrastructure/kubernetes/external/init-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: external-data-init
namespace: bakery-ia
labels:
app: external-service
component: data-initialization
spec:
ttlSecondsAfterFinished: 86400 # Clean up after 1 day
backoffLimit: 3
template:
metadata:
labels:
app: external-service
job: data-init
spec:
restartPolicy: OnFailure
initContainers:
# Wait for database to be ready
- name: wait-for-db
image: postgres:15-alpine
command:
- sh
- -c
- |
until pg_isready -h external-db -p 5432 -U external_user; do
echo "Waiting for database..."
sleep 2
done
echo "Database is ready"
env:
- name: PGPASSWORD
valueFrom:
secretKeyRef:
name: external-db-secret
key: password
containers:
- name: data-loader
image: bakery-ia/external-service:latest
imagePullPolicy: Always
command:
- python
- -m
- app.jobs.initialize_data
args:
- "--months=24"
- "--log-level=INFO"
env:
# Database
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: external-db-secret
key: url
# Redis
- name: REDIS_URL
valueFrom:
configMapKeyRef:
name: external-config
key: redis-url
# API Keys
- name: AEMET_API_KEY
valueFrom:
secretKeyRef:
name: external-api-keys
key: aemet-key
- name: MADRID_OPENDATA_API_KEY
valueFrom:
secretKeyRef:
name: external-api-keys
key: madrid-key
# Job configuration
- name: JOB_MODE
value: "initialize"
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
volumeMounts:
- name: config
mountPath: /app/config
volumes:
- name: config
configMap:
name: external-config
```
### 3.2 Monthly CronJob - Data Rotation
**File:** `infrastructure/kubernetes/external/cronjob.yaml`
```yaml
# infrastructure/kubernetes/external/cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: external-data-rotation
namespace: bakery-ia
labels:
app: external-service
component: data-rotation
spec:
# Run on 1st of each month at 2:00 AM UTC
schedule: "0 2 1 * *"
# Keep last 3 successful jobs for debugging
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
# Don't start new job if previous is still running
concurrencyPolicy: Forbid
jobTemplate:
metadata:
labels:
app: external-service
job: data-rotation
spec:
ttlSecondsAfterFinished: 172800 # 2 days
backoffLimit: 2
template:
metadata:
labels:
app: external-service
cronjob: data-rotation
spec:
restartPolicy: OnFailure
containers:
- name: data-rotator
image: bakery-ia/external-service:latest
imagePullPolicy: Always
command:
- python
- -m
- app.jobs.rotate_data
args:
- "--log-level=INFO"
- "--notify-slack=true"
env:
# Database
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: external-db-secret
key: url
# Redis
- name: REDIS_URL
valueFrom:
configMapKeyRef:
name: external-config
key: redis-url
# API Keys
- name: AEMET_API_KEY
valueFrom:
secretKeyRef:
name: external-api-keys
key: aemet-key
- name: MADRID_OPENDATA_API_KEY
valueFrom:
secretKeyRef:
name: external-api-keys
key: madrid-key
# Slack notification
- name: SLACK_WEBHOOK_URL
valueFrom:
secretKeyRef:
name: slack-secrets
key: webhook-url
optional: true
# Job configuration
- name: JOB_MODE
value: "rotate"
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
volumeMounts:
- name: config
mountPath: /app/config
volumes:
- name: config
configMap:
name: external-config
```
### 3.3 Main Service Deployment
**File:** `infrastructure/kubernetes/external/deployment.yaml`
```yaml
# infrastructure/kubernetes/external/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: external-service
namespace: bakery-ia
labels:
app: external-service
version: "2.0"
spec:
replicas: 2
selector:
matchLabels:
app: external-service
template:
metadata:
labels:
app: external-service
version: "2.0"
spec:
# Wait for init job to complete before deploying
initContainers:
- name: check-data-initialized
image: postgres:15-alpine
command:
- sh
- -c
- |
echo "Checking if data initialization is complete..."
until psql "$DATABASE_URL" -c "SELECT COUNT(*) FROM city_weather_data LIMIT 1;" > /dev/null 2>&1; do
echo "Waiting for initial data load..."
sleep 10
done
echo "Data is initialized"
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: external-db-secret
key: url
containers:
- name: external-api
image: bakery-ia/external-service:latest
imagePullPolicy: Always
ports:
- name: http
containerPort: 8000
protocol: TCP
env:
# Database
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: external-db-secret
key: url
# Redis
- name: REDIS_URL
valueFrom:
configMapKeyRef:
name: external-config
key: redis-url
# API Keys
- name: AEMET_API_KEY
valueFrom:
secretKeyRef:
name: external-api-keys
key: aemet-key
- name: MADRID_OPENDATA_API_KEY
valueFrom:
secretKeyRef:
name: external-api-keys
key: madrid-key
# Service config
- name: LOG_LEVEL
value: "INFO"
- name: CORS_ORIGINS
value: "*"
# Readiness probe - checks if data is available
readinessProbe:
httpGet:
path: /health/ready
port: http
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
# Liveness probe
livenessProbe:
httpGet:
path: /health/live
port: http
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
volumeMounts:
- name: config
mountPath: /app/config
volumes:
- name: config
configMap:
name: external-config
```
### 3.4 ConfigMap and Secrets
**File:** `infrastructure/kubernetes/external/configmap.yaml`
```yaml
# infrastructure/kubernetes/external/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: external-config
namespace: bakery-ia
data:
redis-url: "redis://external-redis:6379/0"
# City configuration (can be overridden)
enabled-cities: "madrid"
# Data retention
retention-months: "24"
# Cache TTL
cache-ttl-days: "7"
```
**File:** `infrastructure/kubernetes/external/secrets.yaml` (template)
```yaml
# infrastructure/kubernetes/external/secrets.yaml
# NOTE: In production, use sealed-secrets or external secrets operator
apiVersion: v1
kind: Secret
metadata:
name: external-api-keys
namespace: bakery-ia
type: Opaque
stringData:
aemet-key: "YOUR_AEMET_API_KEY_HERE"
madrid-key: "YOUR_MADRID_OPENDATA_KEY_HERE"
---
apiVersion: v1
kind: Secret
metadata:
name: external-db-secret
namespace: bakery-ia
type: Opaque
stringData:
url: "postgresql+asyncpg://external_user:password@external-db:5432/external_db"
password: "YOUR_DB_PASSWORD_HERE"
```
### 3.5 Job Scripts
**File:** `services/external/app/jobs/initialize_data.py`
```python
# services/external/app/jobs/initialize_data.py
"""
Kubernetes Init Job - Initialize 24-month historical data
"""
import asyncio
import argparse
import sys
import structlog
from app.ingestion.ingestion_manager import DataIngestionManager
from app.core.database import database_manager
logger = structlog.get_logger()
async def main(months: int = 24):
"""Initialize historical data for all enabled cities"""
logger.info("Starting data initialization job", months=months)
try:
# Initialize database
await database_manager.initialize()
# Run ingestion
manager = DataIngestionManager()
success = await manager.initialize_all_cities(months=months)
if success:
logger.info("✅ Data initialization completed successfully")
sys.exit(0)
else:
logger.error("❌ Data initialization failed")
sys.exit(1)
except Exception as e:
logger.error("❌ Fatal error during initialization", error=str(e))
sys.exit(1)
finally:
await database_manager.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Initialize historical data")
parser.add_argument("--months", type=int, default=24, help="Number of months to load")
parser.add_argument("--log-level", default="INFO", help="Log level")
args = parser.parse_args()
# Configure logging
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(args.log_level)
)
asyncio.run(main(months=args.months))
```
**File:** `services/external/app/jobs/rotate_data.py`
```python
# services/external/app/jobs/rotate_data.py
"""
Kubernetes CronJob - Monthly data rotation (24-month window)
"""
import asyncio
import argparse
import sys
import structlog
from app.ingestion.ingestion_manager import DataIngestionManager
from app.core.database import database_manager
logger = structlog.get_logger()
async def main():
"""Rotate 24-month data window"""
logger.info("Starting monthly data rotation job")
try:
# Initialize database
await database_manager.initialize()
# Run rotation
manager = DataIngestionManager()
await manager.rotate_monthly_data()
logger.info("✅ Data rotation completed successfully")
sys.exit(0)
except Exception as e:
logger.error("❌ Fatal error during rotation", error=str(e))
sys.exit(1)
finally:
await database_manager.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Rotate historical data")
parser.add_argument("--log-level", default="INFO", help="Log level")
parser.add_argument("--notify-slack", type=bool, default=False, help="Send Slack notification")
args = parser.parse_args()
# Configure logging
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(args.log_level)
)
asyncio.run(main())
```
---
## Part 4: Updated API Endpoints
### 4.1 New City-Based Endpoints
**File:** `services/external/app/api/city_operations.py`
```python
# services/external/app/api/city_operations.py
"""
City Operations API - New endpoints for city-based data access
"""
from fastapi import APIRouter, Depends, HTTPException, Query, Path
from typing import List
from datetime import datetime
from uuid import UUID
import structlog
from app.schemas.city_data import CityInfoResponse, DataAvailabilityResponse
from app.schemas.weather import WeatherDataResponse
from app.schemas.traffic import TrafficDataResponse
from app.registry.city_registry import CityRegistry
from app.registry.geolocation_mapper import GeolocationMapper
from app.repositories.city_data_repository import CityDataRepository
from app.cache.redis_cache import ExternalDataCache
from shared.routing.route_builder import RouteBuilder
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
route_builder = RouteBuilder('external')
router = APIRouter(tags=["city-operations"])
logger = structlog.get_logger()
@router.get(
route_builder.build_base_route("cities"),
response_model=List[CityInfoResponse]
)
async def list_supported_cities():
"""List all enabled cities with data availability"""
registry = CityRegistry()
cities = registry.get_enabled_cities()
return [
CityInfoResponse(
city_id=city.city_id,
name=city.name,
country=city.country.value,
latitude=city.latitude,
longitude=city.longitude,
radius_km=city.radius_km,
weather_provider=city.weather_provider.value,
traffic_provider=city.traffic_provider.value,
enabled=city.enabled
)
for city in cities
]
@router.get(
route_builder.build_operations_route("cities/{city_id}/availability"),
response_model=DataAvailabilityResponse
)
async def get_city_data_availability(
city_id: str = Path(..., description="City ID"),
db: AsyncSession = Depends(get_db)
):
"""Get data availability for a specific city"""
registry = CityRegistry()
city = registry.get_city(city_id)
if not city:
raise HTTPException(status_code=404, detail="City not found")
repo = CityDataRepository(db)
# Query min/max dates
weather_stmt = await db.execute(
"SELECT MIN(date), MAX(date), COUNT(*) FROM city_weather_data WHERE city_id = :city_id",
{"city_id": city_id}
)
weather_min, weather_max, weather_count = weather_stmt.fetchone()
traffic_stmt = await db.execute(
"SELECT MIN(date), MAX(date), COUNT(*) FROM city_traffic_data WHERE city_id = :city_id",
{"city_id": city_id}
)
traffic_min, traffic_max, traffic_count = traffic_stmt.fetchone()
return DataAvailabilityResponse(
city_id=city_id,
city_name=city.name,
weather_available=weather_count > 0,
weather_start_date=weather_min.isoformat() if weather_min else None,
weather_end_date=weather_max.isoformat() if weather_max else None,
weather_record_count=weather_count,
traffic_available=traffic_count > 0,
traffic_start_date=traffic_min.isoformat() if traffic_min else None,
traffic_end_date=traffic_max.isoformat() if traffic_max else None,
traffic_record_count=traffic_count
)
@router.get(
route_builder.build_operations_route("historical-weather-optimized"),
response_model=List[WeatherDataResponse]
)
async def get_historical_weather_optimized(
tenant_id: UUID = Path(..., description="Tenant ID"),
latitude: float = Query(..., description="Latitude"),
longitude: float = Query(..., description="Longitude"),
start_date: datetime = Query(..., description="Start date"),
end_date: datetime = Query(..., description="End date"),
db: AsyncSession = Depends(get_db)
):
"""
Get historical weather data using city-based cached data
This is the FAST endpoint for training service
"""
try:
# Map tenant location to city
mapper = GeolocationMapper()
mapping = mapper.map_tenant_to_city(latitude, longitude)
if not mapping:
raise HTTPException(
status_code=404,
detail="No supported city found for this location"
)
city, distance = mapping
logger.info(
"Fetching historical weather from cache",
tenant_id=tenant_id,
city=city.name,
distance_km=round(distance, 2)
)
# Try cache first
cache = ExternalDataCache()
cached_data = await cache.get_cached_weather(
city.city_id, start_date, end_date
)
if cached_data:
logger.info("Weather cache hit", records=len(cached_data))
return cached_data
# Cache miss - query database
repo = CityDataRepository(db)
db_records = await repo.get_weather_by_city_and_range(
city.city_id, start_date, end_date
)
# Convert to response format
response_data = [
WeatherDataResponse(
id=str(record.id),
location_id=f"{city.city_id}_{record.date.date()}",
date=record.date.isoformat(),
temperature=record.temperature,
precipitation=record.precipitation,
humidity=record.humidity,
wind_speed=record.wind_speed,
pressure=record.pressure,
description=record.description,
source=record.source,
created_at=record.created_at.isoformat(),
updated_at=record.updated_at.isoformat()
)
for record in db_records
]
# Store in cache for next time
await cache.set_cached_weather(
city.city_id, start_date, end_date, response_data
)
logger.info(
"Historical weather data retrieved",
records=len(response_data),
source="database"
)
return response_data
except HTTPException:
raise
except Exception as e:
logger.error("Error fetching historical weather", error=str(e))
raise HTTPException(status_code=500, detail="Internal server error")
@router.get(
route_builder.build_operations_route("historical-traffic-optimized"),
response_model=List[TrafficDataResponse]
)
async def get_historical_traffic_optimized(
tenant_id: UUID = Path(..., description="Tenant ID"),
latitude: float = Query(..., description="Latitude"),
longitude: float = Query(..., description="Longitude"),
start_date: datetime = Query(..., description="Start date"),
end_date: datetime = Query(..., description="End date"),
db: AsyncSession = Depends(get_db)
):
"""
Get historical traffic data using city-based cached data
This is the FAST endpoint for training service
"""
try:
# Map tenant location to city
mapper = GeolocationMapper()
mapping = mapper.map_tenant_to_city(latitude, longitude)
if not mapping:
raise HTTPException(
status_code=404,
detail="No supported city found for this location"
)
city, distance = mapping
logger.info(
"Fetching historical traffic from cache",
tenant_id=tenant_id,
city=city.name,
distance_km=round(distance, 2)
)
# Try cache first
cache = ExternalDataCache()
cached_data = await cache.get_cached_traffic(
city.city_id, start_date, end_date
)
if cached_data:
logger.info("Traffic cache hit", records=len(cached_data))
return cached_data
# Cache miss - query database
repo = CityDataRepository(db)
db_records = await repo.get_traffic_by_city_and_range(
city.city_id, start_date, end_date
)
# Convert to response format
response_data = [
TrafficDataResponse(
date=record.date.isoformat(),
traffic_volume=record.traffic_volume,
pedestrian_count=record.pedestrian_count,
congestion_level=record.congestion_level,
average_speed=record.average_speed,
source=record.source
)
for record in db_records
]
# Store in cache for next time
await cache.set_cached_traffic(
city.city_id, start_date, end_date, response_data
)
logger.info(
"Historical traffic data retrieved",
records=len(response_data),
source="database"
)
return response_data
except HTTPException:
raise
except Exception as e:
logger.error("Error fetching historical traffic", error=str(e))
raise HTTPException(status_code=500, detail="Internal server error")
```
### 4.2 Schema Definitions
**File:** `services/external/app/schemas/city_data.py`
```python
# services/external/app/schemas/city_data.py
"""
City Data Schemas - New response types for city-based operations
"""
from pydantic import BaseModel, Field
from typing import Optional
class CityInfoResponse(BaseModel):
"""Information about a supported city"""
city_id: str
name: str
country: str
latitude: float
longitude: float
radius_km: float
weather_provider: str
traffic_provider: str
enabled: bool
class DataAvailabilityResponse(BaseModel):
"""Data availability for a city"""
city_id: str
city_name: str
# Weather availability
weather_available: bool
weather_start_date: Optional[str] = None
weather_end_date: Optional[str] = None
weather_record_count: int = 0
# Traffic availability
traffic_available: bool
traffic_start_date: Optional[str] = None
traffic_end_date: Optional[str] = None
traffic_record_count: int = 0
```
---
## Part 5: Frontend Integration
### 5.1 Updated TypeScript Types
**File:** `frontend/src/api/types/external.ts` (additions)
```typescript
// frontend/src/api/types/external.ts
// ADD TO EXISTING FILE
// ================================================================
// CITY-BASED DATA TYPES (NEW)
// ================================================================
/**
* City information response
* Backend: services/external/app/schemas/city_data.py:CityInfoResponse
*/
export interface CityInfoResponse {
city_id: string;
name: string;
country: string;
latitude: number;
longitude: number;
radius_km: number;
weather_provider: string;
traffic_provider: string;
enabled: boolean;
}
/**
* Data availability response
* Backend: services/external/app/schemas/city_data.py:DataAvailabilityResponse
*/
export interface DataAvailabilityResponse {
city_id: string;
city_name: string;
// Weather availability
weather_available: boolean;
weather_start_date: string | null;
weather_end_date: string | null;
weather_record_count: number;
// Traffic availability
traffic_available: boolean;
traffic_start_date: string | null;
traffic_end_date: string | null;
traffic_record_count: number;
}
```
### 5.2 API Service Methods
**File:** `frontend/src/api/services/external.ts` (new file)
```typescript
// frontend/src/api/services/external.ts
/**
* External Data API Service
* Handles weather and traffic data operations
*/
import { apiClient } from '../client';
import type {
CityInfoResponse,
DataAvailabilityResponse,
WeatherDataResponse,
TrafficDataResponse,
HistoricalWeatherRequest,
HistoricalTrafficRequest,
} from '../types/external';
class ExternalDataService {
/**
* List all supported cities
*/
async listCities(): Promise<CityInfoResponse[]> {
const response = await apiClient.get<CityInfoResponse[]>(
'/api/v1/external/cities'
);
return response.data;
}
/**
* Get data availability for a specific city
*/
async getCityAvailability(cityId: string): Promise<DataAvailabilityResponse> {
const response = await apiClient.get<DataAvailabilityResponse>(
`/api/v1/external/operations/cities/${cityId}/availability`
);
return response.data;
}
/**
* Get historical weather data (optimized city-based endpoint)
*/
async getHistoricalWeatherOptimized(
tenantId: string,
params: {
latitude: number;
longitude: number;
start_date: string;
end_date: string;
}
): Promise<WeatherDataResponse[]> {
const response = await apiClient.get<WeatherDataResponse[]>(
`/api/v1/tenants/${tenantId}/external/operations/historical-weather-optimized`,
{ params }
);
return response.data;
}
/**
* Get historical traffic data (optimized city-based endpoint)
*/
async getHistoricalTrafficOptimized(
tenantId: string,
params: {
latitude: number;
longitude: number;
start_date: string;
end_date: string;
}
): Promise<TrafficDataResponse[]> {
const response = await apiClient.get<TrafficDataResponse[]>(
`/api/v1/tenants/${tenantId}/external/operations/historical-traffic-optimized`,
{ params }
);
return response.data;
}
/**
* Legacy: Get historical weather (non-optimized)
* @deprecated Use getHistoricalWeatherOptimized instead
*/
async getHistoricalWeather(
tenantId: string,
request: HistoricalWeatherRequest
): Promise<WeatherDataResponse[]> {
const response = await apiClient.post<WeatherDataResponse[]>(
`/api/v1/tenants/${tenantId}/external/operations/weather/historical`,
request
);
return response.data;
}
/**
* Legacy: Get historical traffic (non-optimized)
* @deprecated Use getHistoricalTrafficOptimized instead
*/
async getHistoricalTraffic(
tenantId: string,
request: HistoricalTrafficRequest
): Promise<TrafficDataResponse[]> {
const response = await apiClient.post<TrafficDataResponse[]>(
`/api/v1/tenants/${tenantId}/external/operations/traffic/historical`,
request
);
return response.data;
}
}
export const externalDataService = new ExternalDataService();
export default externalDataService;
```
### 5.3 Contract Synchronization Process
**Document:** Frontend API contract sync workflow
```markdown
# Frontend-Backend Contract Synchronization
## When to Update
Trigger frontend updates when ANY of these occur:
1. New API endpoint added
2. Request/response schema changed
3. Enum values modified
4. Required/optional fields changed
## Process
### Step 1: Detect Backend Changes
```bash
# Monitor these files for changes:
services/external/app/schemas/*.py
services/external/app/api/*.py
```
### Step 2: Update TypeScript Types
```bash
# Location: frontend/src/api/types/external.ts
# 1. Compare backend Pydantic models with TS interfaces
# 2. Add/update interfaces to match
# 3. Add JSDoc comments with backend file references
```
### Step 3: Update API Service Methods
```bash
# Location: frontend/src/api/services/external.ts
# 1. Add new methods for new endpoints
# 2. Update method signatures for schema changes
# 3. Update endpoint URLs to match route_builder output
```
### Step 4: Validate
```bash
# Run type check
npm run type-check
# Test compilation
npm run build
```
### Step 5: Integration Test
```bash
# Test actual API calls
npm run test:integration
```
## Example: Adding New Endpoint
**Backend (Python):**
```python
@router.get("/cities/{city_id}/stats", response_model=CityStatsResponse)
async def get_city_stats(city_id: str):
...
```
**Frontend Steps:**
1. Add type: `frontend/src/api/types/external.ts`
```typescript
export interface CityStatsResponse {
city_id: string;
total_records: number;
last_updated: string;
}
```
2. Add method: `frontend/src/api/services/external.ts`
```typescript
async getCityStats(cityId: string): Promise<CityStatsResponse> {
const response = await apiClient.get<CityStatsResponse>(
`/api/v1/external/cities/${cityId}/stats`
);
return response.data;
}
```
3. Verify type safety:
```typescript
const stats = await externalDataService.getCityStats('madrid');
console.log(stats.total_records); // TypeScript autocomplete works!
```
## Automation (Future)
Consider implementing:
- OpenAPI spec generation from FastAPI
- TypeScript type generation from OpenAPI
- Contract testing (Pact, etc.)
```
---
## Part 6: Migration Plan
### 6.1 Migration Phases
#### Phase 1: Infrastructure Setup (Week 1)
- ✅ Create new database tables (`city_weather_data`, `city_traffic_data`)
- ✅ Deploy Redis for caching
- ✅ Create Kubernetes secrets and configmaps
- ✅ Deploy init job (without running)
#### Phase 2: Code Implementation (Week 2-3)
- ✅ Implement city registry and geolocation mapper
- ✅ Implement Madrid adapter (reuse existing clients)
- ✅ Implement ingestion manager
- ✅ Implement city data repository
- ✅ Implement Redis cache layer
- ✅ Create init and rotation job scripts
#### Phase 3: Initial Data Load (Week 4)
- ✅ Test init job in staging
- ✅ Run init job in production (24-month load)
- ✅ Validate data integrity
- ✅ Warm Redis cache
#### Phase 4: API Migration (Week 5)
- ✅ Deploy new city-based endpoints
- ✅ Update training service to use optimized endpoints
- ✅ Update frontend types and services
- ✅ Run parallel (old + new endpoints)
#### Phase 5: Cutover (Week 6)
- ✅ Switch training service to new endpoints
- ✅ Monitor performance (should be <100ms)
- ✅ Verify cache hit rates
- ✅ Deprecate old endpoints
#### Phase 6: Cleanup (Week 7)
- ✅ Remove old per-tenant data fetching code
- ✅ Schedule first monthly CronJob
- ✅ Document new architecture
- ✅ Remove backward compatibility code
### 6.2 Rollback Plan
If issues occur during cutover:
```yaml
# Rollback steps
1. Update training service config:
USE_OPTIMIZED_EXTERNAL_ENDPOINTS: false
2. Traffic routes back to old endpoints
3. New infrastructure remains running (no data loss)
4. Investigate issues, fix, retry cutover
```
### 6.3 Testing Strategy
**Unit Tests:**
```python
# tests/unit/test_geolocation_mapper.py
def test_map_tenant_to_madrid():
mapper = GeolocationMapper()
city, distance = mapper.map_tenant_to_city(40.42, -3.70)
assert city.city_id == "madrid"
assert distance < 5.0
```
**Integration Tests:**
```python
# tests/integration/test_ingestion.py
async def test_initialize_city_data():
manager = DataIngestionManager()
success = await manager.initialize_city(
"madrid",
datetime(2023, 1, 1),
datetime(2023, 1, 31)
)
assert success
```
**Performance Tests:**
```python
# tests/performance/test_cache_performance.py
async def test_historical_weather_response_time():
start = time.time()
data = await get_historical_weather_optimized(...)
duration = time.time() - start
assert duration < 0.1 # <100ms
assert len(data) > 0
```
---
## Part 7: Observability & Monitoring
### 7.1 Metrics to Track
```python
# services/external/app/metrics/city_metrics.py
from prometheus_client import Counter, Histogram, Gauge
# Data ingestion metrics
ingestion_records_total = Counter(
'external_ingestion_records_total',
'Total records ingested',
['city_id', 'data_type']
)
ingestion_duration_seconds = Histogram(
'external_ingestion_duration_seconds',
'Ingestion duration',
['city_id', 'data_type']
)
# Cache metrics
cache_hit_total = Counter(
'external_cache_hit_total',
'Cache hits',
['data_type']
)
cache_miss_total = Counter(
'external_cache_miss_total',
'Cache misses',
['data_type']
)
# Data availability
city_data_records_gauge = Gauge(
'external_city_data_records',
'Current record count per city',
['city_id', 'data_type']
)
# API performance
api_request_duration_seconds = Histogram(
'external_api_request_duration_seconds',
'API request duration',
['endpoint', 'city_id']
)
```
### 7.2 Logging Strategy
```python
# Structured logging examples
# Ingestion
logger.info(
"City data initialization started",
city=city.name,
start_date=start_date.isoformat(),
end_date=end_date.isoformat(),
expected_records=estimated_count
)
# Cache
logger.info(
"Cache hit",
cache_key=key,
city_id=city_id,
hit_rate=hit_rate,
response_time_ms=duration * 1000
)
# API
logger.info(
"Historical data request",
tenant_id=tenant_id,
city=city.name,
distance_km=distance,
date_range_days=(end_date - start_date).days,
records_returned=len(data),
source="cache" if cached else "database"
)
```
### 7.3 Alerts
```yaml
# Prometheus alert rules
groups:
- name: external_data_service
interval: 30s
rules:
# Data freshness
- alert: ExternalDataStale
expr: |
(time() - external_city_data_last_update_timestamp) > 86400 * 7
for: 1h
labels:
severity: warning
annotations:
summary: "City data not updated in 7 days"
# Cache health
- alert: ExternalCacheHitRateLow
expr: |
rate(external_cache_hit_total[5m]) /
(rate(external_cache_hit_total[5m]) + rate(external_cache_miss_total[5m])) < 0.7
for: 15m
labels:
severity: warning
annotations:
summary: "Cache hit rate below 70%"
# Ingestion failures
- alert: ExternalIngestionFailed
expr: |
external_ingestion_failures_total > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Data ingestion job failed"
```
---
## Conclusion
This architecture redesign delivers:
1. **✅ Centralized data management** - No more per-tenant redundant fetching
2. **✅ Multi-city scalability** - Easy to add Valencia, Barcelona, etc.
3. **✅ Sub-100ms training data access** - Redis + PostgreSQL cache
4. **✅ Automated 24-month windows** - Kubernetes CronJobs handle rotation
5. **✅ Zero downtime deployment** - Init job ensures data before service start
6. **✅ Observable & maintainable** - Metrics, logs, alerts built-in
7. **✅ Type-safe frontend integration** - Strict contract sync process
**Next Steps:**
1. Review and approve architecture
2. Begin Phase 1 (Infrastructure)
3. Implement in phases with rollback capability
4. Monitor performance improvements
5. Plan Valencia/Barcelona adapter implementations
---
**Document Version:** 1.0
**Last Updated:** 2025-10-07
**Approved By:** [Pending Review]