diff --git a/gateway/app/routes/training.py b/gateway/app/routes/training.py index fbad9318..0bea905f 100644 --- a/gateway/app/routes/training.py +++ b/gateway/app/routes/training.py @@ -13,38 +13,6 @@ from app.core.config import settings logger = logging.getLogger(__name__) router = APIRouter() -@router.post("/train") -async def start_training(request: Request): - """Proxy training request to training service""" - try: - body = await request.body() - auth_header = request.headers.get("Authorization") - - async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.post( - f"{settings.TRAINING_SERVICE_URL}/train", - content=body, - headers={ - "Content-Type": "application/json", - "Authorization": auth_header - } - ) - - return JSONResponse( - status_code=response.status_code, - content=response.json() - ) - - except httpx.RequestError as e: - logger.error(f"Training service unavailable: {e}") - raise HTTPException( - status_code=503, - detail="Training service unavailable" - ) - except Exception as e: - logger.error(f"Training error: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - @router.get("/status/{training_job_id}") async def get_training_status(training_job_id: str, request: Request): """Get training job status""" @@ -130,3 +98,35 @@ async def get_training_jobs( except Exception as e: logger.error(f"Get training jobs error: {e}") raise HTTPException(status_code=500, detail="Internal server error") + +@router.post("/jobs") +async def start_training_job(request: Request): + """Start a new training job - Proxy to training service""" + try: + body = await request.body() + auth_header = request.headers.get("Authorization") + + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + f"{settings.TRAINING_SERVICE_URL}/training/jobs", # Correct path + content=body, + headers={ + "Content-Type": "application/json", + "Authorization": auth_header + } + ) + + return JSONResponse( + status_code=response.status_code, + content=response.json() + ) + + except httpx.RequestError as e: + logger.error(f"Training service unavailable: {e}") + raise HTTPException( + status_code=503, + detail="Training service unavailable" + ) + except Exception as e: + logger.error(f"Start training job error: {e}") + raise HTTPException(status_code=500, detail="Internal server error") \ No newline at end of file diff --git a/services/data/app/external/aemet.py b/services/data/app/external/aemet.py index 66c4ffe3..bf1242eb 100644 --- a/services/data/app/external/aemet.py +++ b/services/data/app/external/aemet.py @@ -136,9 +136,9 @@ class AEMETClient(BaseAPIClient): try: # Madrid area stations (simplified) madrid_stations = { - "3195": {"lat": 40.4168, "lon": -3.7038, "name": "Madrid Centro"}, - "3196": {"lat": 40.4518, "lon": -3.7246, "name": "Madrid Norte"}, - "3197": {"lat": 40.3833, "lon": -3.7167, "name": "Madrid Sur"} + "3195": {"lat": 40.4117, "lon": -3.6780, "name": "Madrid Centro"}, + "3129": {"lat": 40.4677, "lon": -3.5552, "name": "Madrid Norte"}, + "3197": {"lat": 40.2987, "lon": -3.7216, "name": "Madrid Sur"} } closest_station = None diff --git a/services/data/app/external/madrid_opendata.py b/services/data/app/external/madrid_opendata.py index d0091ab7..2bd655c0 100644 --- a/services/data/app/external/madrid_opendata.py +++ b/services/data/app/external/madrid_opendata.py @@ -630,10 +630,386 @@ class MadridOpenDataClient(BaseAPIClient): "source": "synthetic" } - # Placeholder methods for completeness async def get_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: - """Get historical traffic data""" - return [] + """Get historical traffic data from Madrid Open Data + + Args: + latitude: Location latitude + longitude: Location longitude + start_date: Start date for historical data + end_date: End date for historical data + + Returns: + List of historical traffic data dictionaries + """ + try: + logger.debug("Fetching Madrid historical traffic data", + lat=latitude, lon=longitude, + start=start_date, end=end_date) + + historical_data = [] + + # Generate historical data using synthetic generation for periods before API availability + # or when real data is not available + if (end_date - start_date).days <= 90: # Reasonable range for synthetic data + historical_data = await self._generate_historical_traffic(latitude, longitude, start_date, end_date) + logger.info("Generated synthetic historical traffic data", + records=len(historical_data)) + else: + logger.warning("Date range too large for historical traffic data", + days=(end_date - start_date).days) + return [] + + # Try to fetch real data if API key is available and for recent dates + if hasattr(self, 'api_key') and self.api_key: + try: + real_data = await self._fetch_real_historical_traffic(latitude, longitude, start_date, end_date) + if real_data: + # Merge real data with synthetic data or replace synthetic data + historical_data = real_data + logger.info("Fetched real historical traffic data", + records=len(real_data)) + except Exception as e: + logger.warning("Failed to fetch real historical data, using synthetic", error=str(e)) + + return historical_data + + except Exception as e: + logger.error("Error getting historical traffic data", error=str(e)) + return [] + + async def _fetch_real_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + """Fetch real historical traffic data from Madrid Open Data portal + + Madrid provides historical CSV files by month at: + https://datos.madrid.es/egob/catalogo/[ID]-[YEAR]-[MONTH]-trafico-historico.csv + """ + try: + historical_data = [] + current_date = start_date.replace(day=1) # Start from beginning of month + + while current_date <= end_date: + try: + # Madrid historical traffic CSV URL pattern + year = current_date.year + month = current_date.month + + # Try different URL patterns based on Madrid Open Data structure + historical_urls = [ + f"https://datos.madrid.es/egob/catalogo/300217-{year}-{month:02d}-trafico-historico.csv", + f"https://datos.madrid.es/egob/catalogo/trafico-historico-{year}-{month:02d}.csv", + f"https://datos.madrid.es/egob/catalogo/{year}{month:02d}-trafico-historico.csv" + ] + + for url in historical_urls: + csv_data = await self._fetch_historical_csv(url) + if csv_data: + # Parse CSV and filter by location + month_data = await self._parse_historical_csv(csv_data, latitude, longitude, start_date, end_date) + historical_data.extend(month_data) + logger.debug("Fetched historical data for month", + year=year, month=month, records=len(month_data)) + break + + # Move to next month + if current_date.month == 12: + current_date = current_date.replace(year=current_date.year + 1, month=1) + else: + current_date = current_date.replace(month=current_date.month + 1) + + except Exception as e: + logger.warning("Error fetching data for month", + year=current_date.year, month=current_date.month, error=str(e)) + # Move to next month even on error + if current_date.month == 12: + current_date = current_date.replace(year=current_date.year + 1, month=1) + else: + current_date = current_date.replace(month=current_date.month + 1) + + return historical_data + + except Exception as e: + logger.error("Error fetching real historical traffic data", error=str(e)) + return [] + + async def _fetch_historical_csv(self, url: str) -> Optional[str]: + """Fetch historical CSV data from Madrid Open Data""" + try: + import httpx + + headers = { + 'User-Agent': 'Mozilla/5.0 (compatible; Madrid-Traffic-Client/1.0)', + 'Accept': 'text/csv,application/csv,text/plain,*/*', + 'Accept-Language': 'es-ES,es;q=0.9,en;q=0.8', + } + + async with httpx.AsyncClient(timeout=60.0, headers=headers) as client: + logger.debug("Fetching historical CSV", url=url) + response = await client.get(url) + + if response.status_code == 200: + content = response.text + if content and len(content) > 100: # Ensure we got actual data + logger.debug("Successfully fetched CSV", + url=url, size=len(content)) + return content + else: + logger.debug("CSV not found", url=url, status=response.status_code) + + except Exception as e: + logger.debug("Error fetching CSV", url=url, error=str(e)) + + return None + + async def _parse_historical_csv(self, csv_content: str, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + """Parse Madrid historical traffic CSV and filter by location and date range""" + try: + import csv + from io import StringIO + + historical_records = [] + csv_reader = csv.DictReader(StringIO(csv_content), delimiter=';') + + # Get the nearest measurement points to our coordinates + measurement_points = await self._get_measurement_points_near_location(latitude, longitude) + target_point_ids = [point['id'] for point in measurement_points[:3]] # Use 3 nearest points + + for row in csv_reader: + try: + # Parse Madrid CSV format + # Expected columns: fecha, hora, idelem, intensidad, ocupacion, carga, nivelServicio, etc. + + # Extract date and time + if 'fecha' in row and 'hora' in row: + date_str = row.get('fecha', '').strip() + time_str = row.get('hora', '').strip() + + # Parse Madrid date format (usually DD/MM/YYYY) + if date_str and time_str: + try: + # Try different date formats + for date_format in ['%d/%m/%Y', '%Y-%m-%d', '%d-%m-%Y']: + try: + record_date = datetime.strptime(f"{date_str} {time_str}", f"{date_format} %H:%M") + break + except ValueError: + continue + else: + continue # Skip if no date format worked + + # Check if record is in our date range + if not (start_date <= record_date <= end_date): + continue + + except ValueError: + continue + else: + continue + + # Check if this record is from a measurement point near our location + point_id = row.get('idelem', '').strip() + if point_id not in target_point_ids: + continue + + # Parse traffic data + traffic_record = { + "date": record_date, + "traffic_volume": self._safe_int(row.get('intensidad', '0')), + "occupation_percentage": self._safe_int(row.get('ocupacion', '0')), + "load_percentage": self._safe_int(row.get('carga', '0')), + "service_level": self._safe_int(row.get('nivelServicio', '0')), + "measurement_point_id": point_id, + "measurement_point_name": row.get('descripcion', f'Point {point_id}'), + "road_type": row.get('tipo_elem', 'URB'), + "source": "madrid_opendata_historical" + } + + # Calculate derived metrics + service_level = traffic_record['service_level'] + if service_level == 0: # Fluid + congestion_level = "low" + avg_speed = 45 + pedestrian_multiplier = 1.0 + elif service_level == 1: # Dense + congestion_level = "medium" + avg_speed = 25 + pedestrian_multiplier = 1.5 + elif service_level == 2: # Congested + congestion_level = "high" + avg_speed = 15 + pedestrian_multiplier = 2.0 + else: # Cut/Blocked + congestion_level = "blocked" + avg_speed = 5 + pedestrian_multiplier = 0.5 + + traffic_record.update({ + "congestion_level": congestion_level, + "average_speed": avg_speed, + "pedestrian_count": int(100 * pedestrian_multiplier) + }) + + historical_records.append(traffic_record) + + except Exception as e: + logger.debug("Error parsing CSV row", error=str(e)) + continue + + return historical_records + + except Exception as e: + logger.error("Error parsing historical CSV", error=str(e)) + return [] + + async def _get_measurement_points_near_location(self, latitude: float, longitude: float) -> List[Dict[str, Any]]: + """Get measurement points near the specified location""" + try: + # Try to fetch current traffic data to get measurement points + current_traffic = await self._fetch_traffic_xml_data(self.traffic_endpoints[0]) + + if current_traffic: + # Calculate distances and sort by proximity + points_with_distance = [] + for point in current_traffic: + if point.get('latitude') and point.get('longitude'): + distance = self._calculate_distance( + latitude, longitude, + point['latitude'], point['longitude'] + ) + points_with_distance.append({ + 'id': point.get('idelem'), + 'distance': distance, + 'latitude': point['latitude'], + 'longitude': point['longitude'], + 'name': point.get('descripcion', '') + }) + + # Sort by distance and return closest points + points_with_distance.sort(key=lambda x: x['distance']) + return points_with_distance[:5] # Return 5 closest points + + # Fallback: return synthetic point IDs based on Madrid geography + return [ + {'id': 'madrid_centro_01', 'distance': 1.0}, + {'id': 'madrid_centro_02', 'distance': 2.0}, + {'id': 'madrid_centro_03', 'distance': 3.0} + ] + + except Exception as e: + logger.warning("Error getting measurement points", error=str(e)) + return [{'id': 'madrid_default', 'distance': 0.0}] + + async def _generate_historical_traffic(self, latitude: float, longitude: float, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + """Generate synthetic historical traffic data for the specified period + + This method creates realistic historical traffic patterns based on: + - Time of day patterns + - Day of week patterns + - Seasonal variations + - Random variations for realism + """ + try: + import random + from datetime import timedelta + + historical_data = [] + current_date = start_date + + # Seed random for consistent but varied data + random.seed(hash(f"{latitude}{longitude}")) + + while current_date <= end_date: + # Generate 24 hourly records for each day + for hour in range(24): + record_time = current_date.replace(hour=hour, minute=0, second=0, microsecond=0) + + # Base traffic calculation + base_traffic = 100 + hour_of_day = record_time.hour + day_of_week = record_time.weekday() # 0=Monday, 6=Sunday + month = record_time.month + + # Time of day patterns + if 7 <= hour_of_day <= 9: # Morning rush + traffic_multiplier = 2.2 + random.uniform(-0.3, 0.3) + congestion = "high" + avg_speed = 15 + random.randint(-5, 5) + elif 18 <= hour_of_day <= 20: # Evening rush + traffic_multiplier = 2.5 + random.uniform(-0.4, 0.4) + congestion = "high" + avg_speed = 12 + random.randint(-3, 8) + elif 12 <= hour_of_day <= 14: # Lunch time + traffic_multiplier = 1.6 + random.uniform(-0.2, 0.2) + congestion = "medium" + avg_speed = 25 + random.randint(-5, 10) + elif 22 <= hour_of_day or hour_of_day <= 6: # Night + traffic_multiplier = 0.3 + random.uniform(-0.1, 0.2) + congestion = "low" + avg_speed = 50 + random.randint(-10, 15) + else: # Regular hours + traffic_multiplier = 1.0 + random.uniform(-0.2, 0.2) + congestion = "medium" + avg_speed = 35 + random.randint(-10, 10) + + # Weekend adjustments + if day_of_week >= 5: # Weekend + if hour_of_day in [11, 12, 13, 14, 15]: # Weekend afternoon peak + traffic_multiplier *= 1.4 + congestion = "medium" + else: + traffic_multiplier *= 0.7 + if congestion == "high": + congestion = "medium" + + # Seasonal adjustments + if month in [7, 8]: # Summer - less traffic due to vacations + traffic_multiplier *= 0.8 + elif month in [11, 12]: # Holiday season - more traffic + traffic_multiplier *= 1.1 + + # Calculate final values + traffic_volume = max(10, int(base_traffic * traffic_multiplier)) + avg_speed = max(10, min(60, avg_speed)) + + # Pedestrian calculation + pedestrian_base = 150 + if 13 <= hour_of_day <= 15: # Lunch time + pedestrian_count = int(pedestrian_base * 2.5 * random.uniform(0.8, 1.2)) + elif 8 <= hour_of_day <= 9 or 18 <= hour_of_day <= 20: # Rush hours + pedestrian_count = int(pedestrian_base * 2.0 * random.uniform(0.8, 1.2)) + else: + pedestrian_count = int(pedestrian_base * 1.0 * random.uniform(0.5, 1.5)) + + # Create traffic record + traffic_record = { + "date": record_time, + "traffic_volume": traffic_volume, + "pedestrian_count": pedestrian_count, + "congestion_level": congestion, + "average_speed": avg_speed, + "occupation_percentage": min(100, traffic_volume // 2), + "load_percentage": min(100, traffic_volume // 3), + "measurement_point_id": f"madrid_historical_{hash(f'{latitude}{longitude}') % 1000}", + "measurement_point_name": f"Madrid Historical Point ({latitude:.4f}, {longitude:.4f})", + "road_type": "URB", + "source": "synthetic_historical" + } + + historical_data.append(traffic_record) + + # Move to next day + current_date += timedelta(days=1) + + logger.info("Generated historical traffic data", + records=len(historical_data), + start=start_date, + end=end_date) + + return historical_data + + except Exception as e: + logger.error("Error generating historical traffic data", error=str(e)) + return [] async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]: """Get traffic incidents and events""" diff --git a/services/data/app/services/traffic_service.py b/services/data/app/services/traffic_service.py index 3bf815dd..1735911d 100644 --- a/services/data/app/services/traffic_service.py +++ b/services/data/app/services/traffic_service.py @@ -90,8 +90,40 @@ class TrafficService: average_speed=record.average_speed, source=record.source ) for record in db_records] + + # If not in database, fetch from API and store + logger.debug("Fetching historical data from MADRID OPEN DATA") + traffic_data = await self.madrid_client.get_historical_traffic( + latitude, longitude, start_date, end_date + ) + + if traffic_data: + # Store in database for future use + try: + for data in traffic_data: + traffic_record = TrafficData( + id = id, + location_id = location_id, + date = data.get('date', datetime.now()), + traffic_volume = data.get('traffic_volume'), + pedestrian_count = data.get('pedestrian_count'), + congestion_level = data.get('congestion_level'), + average_speed = data.get('average_speed'), + source = "Madrid Open Data", + raw_data = str(data), + created_at = data.get('created_at'), + ) + db.add(traffic_record) + + await db.commit() + logger.debug("Historical data stored in database", count=len(traffic_record)) + except Exception as db_error: + logger.warning("Failed to store historical data in database", error=str(db_error)) + await db.rollback() + + return [TrafficDataResponse(**item) for item in traffic_record] else: - logger.debug("No historical traffic data found in database") + logger.warning("No historical traffic data received") return [] except Exception as e: diff --git a/services/training/app/main.py b/services/training/app/main.py index edc1e377..d83f526d 100644 --- a/services/training/app/main.py +++ b/services/training/app/main.py @@ -177,7 +177,6 @@ async def global_exception_handler(request: Request, exc: Exception): # Authentication is handled by API Gateway app.include_router( training.router, - prefix="/training", tags=["training"] )