diff --git a/gateway/app/routes/tenant.py b/gateway/app/routes/tenant.py index defd279b..a6ba4e63 100644 --- a/gateway/app/routes/tenant.py +++ b/gateway/app/routes/tenant.py @@ -64,6 +64,12 @@ async def proxy_tenant_weather(request: Request, tenant_id: str = Path(...), pat target_path = f"/api/v1/tenants/{tenant_id}/weather/{path}".rstrip("/") return await _proxy_to_data_service(request, target_path) +@router.api_route("/{tenant_id}/traffic/{path:path}", methods=["GET", "POST", "OPTIONS"]) +async def proxy_tenant_traffic(request: Request, tenant_id: str = Path(...), path: str = ""): + """Proxy tenant traffic requests to data service""" + target_path = f"/api/v1/tenants/{tenant_id}/traffic/{path}".rstrip("/") + return await _proxy_to_data_service(request, target_path) + @router.api_route("/{tenant_id}/analytics/{path:path}", methods=["GET", "POST", "OPTIONS"]) async def proxy_tenant_analytics(request: Request, tenant_id: str = Path(...), path: str = ""): """Proxy tenant analytics requests to data service""" diff --git a/services/data/app/external/madrid_opendata.py b/services/data/app/external/madrid_opendata.py index 57002c89..641e719e 100644 --- a/services/data/app/external/madrid_opendata.py +++ b/services/data/app/external/madrid_opendata.py @@ -15,7 +15,7 @@ Features: import math import xml.etree.ElementTree as ET from typing import List, Dict, Any, Optional, Tuple -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import structlog import re from dataclasses import dataclass @@ -480,7 +480,19 @@ class MadridOpenDataClient(BaseAPIClient): # Parse date record_date = self._parse_madrid_date(row.get('fecha', '').strip().strip('"')) - if not record_date or not (start_date <= record_date <= end_date): + if not record_date: + return None + + # ✅ CRITICAL FIX: Ensure both dates are timezone-aware for comparison + if start_date.tzinfo is None: + start_date = start_date.replace(tzinfo=timezone.utc) + if end_date.tzinfo is None: + end_date = end_date.replace(tzinfo=timezone.utc) + if record_date.tzinfo is None: + record_date = record_date.replace(tzinfo=timezone.utc) + + # Now we can safely compare timezone-aware datetimes + if not (start_date <= record_date <= end_date): return None # Parse traffic data @@ -749,15 +761,21 @@ class MadridOpenDataClient(BaseAPIClient): return int(base * multiplier) def _parse_madrid_date(self, fecha_str: str) -> Optional[datetime]: - """Parse Madrid date format""" + """Parse Madrid date format with timezone awareness""" if not fecha_str: return None try: - return datetime.strptime(fecha_str, '%Y-%m-%d %H:%M:%S') + # Parse the date as timezone-naive first + dt = datetime.strptime(fecha_str, '%Y-%m-%d %H:%M:%S') + # Convert to timezone-aware (assume Madrid/UTC timezone) + return dt.replace(tzinfo=timezone.utc) except ValueError: try: - return datetime.strptime(fecha_str, '%d/%m/%Y %H:%M:%S') + # Try alternative format + dt = datetime.strptime(fecha_str, '%d/%m/%Y %H:%M:%S') + # Convert to timezone-aware (assume Madrid/UTC timezone) + return dt.replace(tzinfo=timezone.utc) except ValueError: return None diff --git a/services/data/app/services/traffic_service.py b/services/data/app/services/traffic_service.py index 1735911d..898f34c1 100644 --- a/services/data/app/services/traffic_service.py +++ b/services/data/app/services/traffic_service.py @@ -13,6 +13,8 @@ from app.models.traffic import TrafficData from app.external.madrid_opendata import MadridOpenDataClient from app.schemas.external import TrafficDataResponse +import uuid + logger = structlog.get_logger() class TrafficService: @@ -102,26 +104,26 @@ class TrafficService: try: for data in traffic_data: traffic_record = TrafficData( - id = id, - location_id = location_id, - date = data.get('date', datetime.now()), - traffic_volume = data.get('traffic_volume'), - pedestrian_count = data.get('pedestrian_count'), - congestion_level = data.get('congestion_level'), - average_speed = data.get('average_speed'), - source = "Madrid Open Data", - raw_data = str(data), - created_at = data.get('created_at'), + location_id=location_id, + date=data.get('date', datetime.now()), + traffic_volume=data.get('traffic_volume'), + pedestrian_count=data.get('pedestrian_count'), + congestion_level=data.get('congestion_level'), + average_speed=data.get('average_speed'), + source="madrid_opendata", + raw_data=str(data), + created_at=datetime.now() ) db.add(traffic_record) await db.commit() - logger.debug("Historical data stored in database", count=len(traffic_record)) + logger.debug("Historical data stored in database", count=len(traffic_data)) except Exception as db_error: logger.warning("Failed to store historical data in database", error=str(db_error)) await db.rollback() + + return [TrafficDataResponse(**item) for item in traffic_data] - return [TrafficDataResponse(**item) for item in traffic_record] else: logger.warning("No historical traffic data received") return [] diff --git a/services/data/migrations/versions/add_timezone_to_datetime_columns.py b/services/data/migrations/versions/add_timezone_to_datetime_columns.py new file mode 100644 index 00000000..2b2e1f7a --- /dev/null +++ b/services/data/migrations/versions/add_timezone_to_datetime_columns.py @@ -0,0 +1,49 @@ +# ================================================================ +# services/data/migrations/versions/20250727_add_timezone_to_datetime_columns.py +# ================================================================ +"""Add timezone support to datetime columns + +Revision ID: 20250727_193000 +Revises: +Create Date: 2025-07-27 19:30:00.000000 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '20250727_193000' +down_revision = None # Replace with actual previous revision if exists +branch_labels = None +depends_on = None + +def upgrade() -> None: + """Convert TIMESTAMP WITHOUT TIME ZONE to TIMESTAMP WITH TIME ZONE""" + + # Weather data table + op.execute("ALTER TABLE weather_data ALTER COLUMN date TYPE TIMESTAMP WITH TIME ZONE USING date AT TIME ZONE 'UTC'") + op.execute("ALTER TABLE weather_data ALTER COLUMN created_at TYPE TIMESTAMP WITH TIME ZONE USING created_at AT TIME ZONE 'UTC'") + + # Weather forecasts table + op.execute("ALTER TABLE weather_forecasts ALTER COLUMN forecast_date TYPE TIMESTAMP WITH TIME ZONE USING forecast_date AT TIME ZONE 'UTC'") + op.execute("ALTER TABLE weather_forecasts ALTER COLUMN generated_at TYPE TIMESTAMP WITH TIME ZONE USING generated_at AT TIME ZONE 'UTC'") + + # Traffic data table + op.execute("ALTER TABLE traffic_data ALTER COLUMN date TYPE TIMESTAMP WITH TIME ZONE USING date AT TIME ZONE 'UTC'") + op.execute("ALTER TABLE traffic_data ALTER COLUMN created_at TYPE TIMESTAMP WITH TIME ZONE USING created_at AT TIME ZONE 'UTC'") + +def downgrade() -> None: + """Convert TIMESTAMP WITH TIME ZONE back to TIMESTAMP WITHOUT TIME ZONE""" + + # Weather data table + op.execute("ALTER TABLE weather_data ALTER COLUMN date TYPE TIMESTAMP WITHOUT TIME ZONE USING date AT TIME ZONE 'UTC'") + op.execute("ALTER TABLE weather_data ALTER COLUMN created_at TYPE TIMESTAMP WITHOUT TIME ZONE USING created_at AT TIME ZONE 'UTC'") + + # Weather forecasts table + op.execute("ALTER TABLE weather_forecasts ALTER COLUMN forecast_date TYPE TIMESTAMP WITHOUT TIME ZONE USING forecast_date AT TIME ZONE 'UTC'") + op.execute("ALTER TABLE weather_forecasts ALTER COLUMN generated_at TYPE TIMESTAMP WITHOUT TIME ZONE USING generated_at AT TIME ZONE 'UTC'") + + # Traffic data table + op.execute("ALTER TABLE traffic_data ALTER COLUMN date TYPE TIMESTAMP WITHOUT TIME ZONE USING date AT TIME ZONE 'UTC'") + op.execute("ALTER TABLE traffic_data ALTER COLUMN created_at TYPE TIMESTAMP WITHOUT TIME ZONE USING created_at AT TIME ZONE 'UTC'") \ No newline at end of file diff --git a/services/training/app/api/training.py b/services/training/app/api/training.py index 09423ed3..7bd2fd66 100644 --- a/services/training/app/api/training.py +++ b/services/training/app/api/training.py @@ -153,7 +153,8 @@ async def get_training_job( job_id: str, tenant_id: UUID = Path(..., description="Tenant ID"), current_user: Dict[str, Any] = Depends(get_current_user_dep), - training_service: TrainingService = Depends(get_training_service) + training_service: TrainingService = Depends(get_training_service), + db: AsyncSession = Depends(get_db_session) ): """Get specific training job details""" try: @@ -164,17 +165,24 @@ async def get_training_job( job_id=job_id, tenant_id=tenant_id_str) - job = await training_service.get_training_job(job_id) + job_log = await training_service.get_job_status(db, job_id, tenant_id_str) # Verify tenant access - if job.tenant_id != tenant_id: + if job_log.tenant_id != tenant_id: logger.warning("Unauthorized job access attempt", job_id=job_id, tenant_id=str(tenant_id), job_tenant_id=job.tenant_id) raise HTTPException(status_code=404, detail="Job not found") - return job + return TrainingJobResponse( + job_id=job_log.job_id, + status=TrainingStatus(job_log.status), + message=_generate_status_message(job_log.status, job_log.current_step), + tenant_id=str(job_log.tenant_id), + created_at=job_log.start_time, + estimated_duration_minutes=_estimate_duration(job_log.status, job_log.progress) + ) except HTTPException: raise @@ -501,4 +509,31 @@ async def retrain_all_products( logger.error("Failed to start retraining", error=str(e), tenant_id=tenant_id) - raise HTTPException(status_code=500, detail=f"Failed to start retraining: {str(e)}") \ No newline at end of file + raise HTTPException(status_code=500, detail=f"Failed to start retraining: {str(e)}") + +def _generate_status_message(status: str, current_step: str) -> str: + """Generate appropriate status message""" + status_messages = { + "pending": "Training job is queued", + "running": f"Training in progress: {current_step}", + "completed": "Training completed successfully", + "failed": "Training failed", + "cancelled": "Training was cancelled" + } + return status_messages.get(status, f"Status: {status}") + +def _estimate_duration(status: str, progress: int) -> int: + """Estimate remaining duration in minutes""" + if status == "completed": + return 0 + elif status == "failed" or status == "cancelled": + return 0 + elif status == "pending": + return 30 # Default estimate + else: # running + if progress > 0: + # Rough estimate based on progress + remaining_progress = 100 - progress + return max(1, int((remaining_progress / max(progress, 1)) * 10)) + else: + return 25 # Default for running jobs diff --git a/services/training/app/services/data_client.py b/services/training/app/services/data_client.py index dc053308..a82b8b67 100644 --- a/services/training/app/services/data_client.py +++ b/services/training/app/services/data_client.py @@ -11,7 +11,7 @@ class DataServiceClient: def __init__(self): self.base_url = settings.API_GATEWAY_URL - self.timeout = 30.0 + self.timeout = 2000.0 async def fetch_sales_data( self, @@ -196,7 +196,8 @@ class DataServiceClient: logger.info(f"Traffic request payload: {payload}", tenant_id=tenant_id) # Make POST request via gateway - async with httpx.AsyncClient(timeout=self.timeout) as client: + timeout_config = httpx.Timeout(connect=30.0, read=self.timeout, write=30.0, pool=30.0) + async with httpx.AsyncClient(timeout=timeout_config) as client: response = await client.post( f"{self.base_url}/api/v1/tenants/{tenant_id}/traffic/historical", headers=headers, diff --git a/services/training/app/services/training_service.py b/services/training/app/services/training_service.py index ff4bfd5d..6a6eb665 100644 --- a/services/training/app/services/training_service.py +++ b/services/training/app/services/training_service.py @@ -535,79 +535,6 @@ class TrainingService: logger.error(f"Failed to update job status: {str(e)}") await db.rollback() - async def _fetch_sales_data(self, - tenant_id: str, - request: Any, - limit: Optional[int] = None) -> List[Dict]: - """Fetch sales data from data service""" - try: - # Call data service to get sales data - async with httpx.AsyncClient() as client: - params = {} - headers = { - "X-Tenant-ID": tenant_id - } - - if hasattr(request, 'start_date') and request.start_date: - params["start_date"] = request.start_date.isoformat() - - if hasattr(request, 'end_date') and request.end_date: - params["end_date"] = request.end_date.isoformat() - - if limit: - params["limit"] = limit - - response = await client.get( - f"{settings.DATA_SERVICE_URL}/api/v1/tenants/{tenant_id}/sales", - params=params, - headers=headers, - timeout=30.0 - ) - - if response.status_code == 200: - return response.json().get("sales", []) - else: - logger.error(f"Failed to fetch sales data: {response.status_code}") - return [] - - except Exception as e: - logger.error(f"Error fetching sales data: {str(e)}") - return [] - - async def _fetch_product_sales_data(self, - tenant_id: str, - product_name: str, - request: Any) -> List[Dict]: - """Fetch sales data for a specific product""" - try: - async with httpx.AsyncClient() as client: - params = { - "tenant_id": tenant_id, - "product_name": product_name - } - - if hasattr(request, 'start_date') and request.start_date: - params["start_date"] = request.start_date.isoformat() - - if hasattr(request, 'end_date') and request.end_date: - params["end_date"] = request.end_date.isoformat() - - response = await client.get( - f"{settings.DATA_SERVICE_URL}/api/sales/product/{product_name}", - params=params, - timeout=30.0 - ) - - if response.status_code == 200: - return response.json().get("sales", []) - else: - logger.error(f"Failed to fetch product sales data: {response.status_code}") - return [] - - except Exception as e: - logger.error(f"Error fetching product sales data: {str(e)}") - return [] - async def _store_trained_models(self, db: AsyncSession, tenant_id: str, diff --git a/tests/test_onboarding_flow.sh b/tests/test_onboarding_flow.sh index cc035175..d7af1128 100755 --- a/tests/test_onboarding_flow.sh +++ b/tests/test_onboarding_flow.sh @@ -675,7 +675,7 @@ echo "Training HTTP Status Code: $HTTP_CODE" echo "Training Response:" echo "$TRAINING_RESPONSE" | python3 -m json.tool 2>/dev/null || echo "$TRAINING_RESPONSE" -TRAINING_TASK_ID=$(extract_json_field "$TRAINING_RESPONSE" "task_id") +TRAINING_TASK_ID=$(extract_json_field "$TRAINING_RESPONSE" "job_id") if [ -z "$TRAINING_TASK_ID" ]; then TRAINING_TASK_ID=$(extract_json_field "$TRAINING_RESPONSE" "id") fi @@ -686,13 +686,13 @@ if [ -n "$TRAINING_TASK_ID" ]; then log_step "4.2. Monitoring training progress" # Poll training status (limited polling for test) - MAX_POLLS=5 + MAX_POLLS=100 POLL_COUNT=0 while [ $POLL_COUNT -lt $MAX_POLLS ]; do echo "Polling training status... ($((POLL_COUNT+1))/$MAX_POLLS)" - STATUS_RESPONSE=$(curl -s -X GET "$API_BASE/api/v1/tenants/$TENANT_ID/training/status/$TRAINING_TASK_ID" \ + STATUS_RESPONSE=$(curl -s -X GET "$API_BASE/api/v1/tenants/$TENANT_ID/training/jobs/$TRAINING_TASK_ID" \ -H "Authorization: Bearer $ACCESS_TOKEN" \ -H "X-Tenant-ID: $TENANT_ID")