Files
bakery-ia/services/external/app/ingestion/ingestion_manager.py

269 lines
8.3 KiB
Python
Raw Normal View History

# services/external/app/ingestion/ingestion_manager.py
"""
Data Ingestion Manager - Coordinates multi-city data collection
"""
from typing import List, Dict, Any
from datetime import datetime, timedelta
import structlog
import asyncio
from app.registry.city_registry import CityRegistry
from .adapters import get_adapter
from app.repositories.city_data_repository import CityDataRepository
from app.core.database import database_manager
logger = structlog.get_logger()
class DataIngestionManager:
"""Orchestrates data ingestion across all cities"""
def __init__(self):
self.registry = CityRegistry()
self.database_manager = database_manager
async def initialize_all_cities(self, months: int = 24):
"""
Initialize historical data for all enabled cities
Called by Kubernetes Init Job
"""
enabled_cities = self.registry.get_enabled_cities()
logger.info(
"Starting full data initialization",
cities=len(enabled_cities),
months=months
)
end_date = datetime.now()
start_date = end_date - timedelta(days=months * 30)
tasks = [
self.initialize_city(city.city_id, start_date, end_date)
for city in enabled_cities
]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = sum(1 for r in results if r is True)
failures = len(results) - successes
logger.info(
"Data initialization complete",
total=len(results),
successes=successes,
failures=failures
)
return successes == len(results)
async def initialize_city(
self,
city_id: str,
start_date: datetime,
end_date: datetime
) -> bool:
"""Initialize historical data for a single city (idempotent)"""
try:
city = self.registry.get_city(city_id)
if not city:
logger.error("City not found", city_id=city_id)
return False
logger.info(
"Initializing city data",
city=city.name,
start=start_date.date(),
end=end_date.date()
)
# Check if data already exists (idempotency)
async with self.database_manager.get_session() as session:
repo = CityDataRepository(session)
coverage = await repo.get_data_coverage(city_id, start_date, end_date)
days_in_range = (end_date - start_date).days
expected_records = days_in_range # One record per day minimum
# If we have >= 90% coverage, skip initialization
threshold = expected_records * 0.9
weather_sufficient = coverage['weather'] >= threshold
traffic_sufficient = coverage['traffic'] >= threshold
if weather_sufficient and traffic_sufficient:
logger.info(
"City data already initialized, skipping",
city=city.name,
weather_records=coverage['weather'],
traffic_records=coverage['traffic'],
threshold=int(threshold)
)
return True
logger.info(
"Insufficient data coverage, proceeding with initialization",
city=city.name,
existing_weather=coverage['weather'],
existing_traffic=coverage['traffic'],
expected=expected_records
)
adapter = get_adapter(
city_id,
{
"weather_config": city.weather_config,
"traffic_config": city.traffic_config
}
)
if not await adapter.validate_connection():
logger.error("Adapter validation failed", city=city.name)
return False
weather_data = await adapter.fetch_historical_weather(
start_date, end_date
)
traffic_data = await adapter.fetch_historical_traffic(
start_date, end_date
)
async with self.database_manager.get_session() as session:
repo = CityDataRepository(session)
weather_stored = await repo.bulk_store_weather(
city_id, weather_data
)
traffic_stored = await repo.bulk_store_traffic(
city_id, traffic_data
)
logger.info(
"City initialization complete",
city=city.name,
weather_records=weather_stored,
traffic_records=traffic_stored
)
return True
except Exception as e:
logger.error(
"City initialization failed",
city_id=city_id,
error=str(e)
)
return False
async def rotate_monthly_data(self):
"""
Rotate 24-month window: delete old, ingest new
Called by Kubernetes CronJob monthly
"""
enabled_cities = self.registry.get_enabled_cities()
logger.info("Starting monthly data rotation", cities=len(enabled_cities))
now = datetime.now()
cutoff_date = now - timedelta(days=24 * 30)
last_month_end = now.replace(day=1) - timedelta(days=1)
last_month_start = last_month_end.replace(day=1)
tasks = []
for city in enabled_cities:
tasks.append(
self._rotate_city_data(
city.city_id,
cutoff_date,
last_month_start,
last_month_end
)
)
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = sum(1 for r in results if r is True)
logger.info(
"Monthly rotation complete",
total=len(results),
successes=successes
)
async def _rotate_city_data(
self,
city_id: str,
cutoff_date: datetime,
new_start: datetime,
new_end: datetime
) -> bool:
"""Rotate data for a single city"""
try:
city = self.registry.get_city(city_id)
if not city:
return False
logger.info(
"Rotating city data",
city=city.name,
cutoff=cutoff_date.date(),
new_month=new_start.strftime("%Y-%m")
)
async with self.database_manager.get_session() as session:
repo = CityDataRepository(session)
deleted_weather = await repo.delete_weather_before(
city_id, cutoff_date
)
deleted_traffic = await repo.delete_traffic_before(
city_id, cutoff_date
)
logger.info(
"Old data deleted",
city=city.name,
weather_deleted=deleted_weather,
traffic_deleted=deleted_traffic
)
adapter = get_adapter(city_id, {
"weather_config": city.weather_config,
"traffic_config": city.traffic_config
})
new_weather = await adapter.fetch_historical_weather(
new_start, new_end
)
new_traffic = await adapter.fetch_historical_traffic(
new_start, new_end
)
async with self.database_manager.get_session() as session:
repo = CityDataRepository(session)
weather_stored = await repo.bulk_store_weather(
city_id, new_weather
)
traffic_stored = await repo.bulk_store_traffic(
city_id, new_traffic
)
logger.info(
"New data ingested",
city=city.name,
weather_added=weather_stored,
traffic_added=traffic_stored
)
return True
except Exception as e:
logger.error(
"City rotation failed",
city_id=city_id,
error=str(e)
)
return False