diff --git a/services/data/app/api/traffic.py b/services/data/app/api/traffic.py index 512c3037..3ca4a2a8 100644 --- a/services/data/app/api/traffic.py +++ b/services/data/app/api/traffic.py @@ -15,13 +15,11 @@ from app.services.traffic_service import TrafficService from app.services.messaging import data_publisher from app.schemas.external import ( TrafficDataResponse, - LocationRequest, - DateRangeRequest + HistoricalTrafficRequest ) from shared.auth.decorators import ( - get_current_user_dep, - get_current_tenant_id_dep + get_current_user_dep ) router = APIRouter(tags=["traffic"]) @@ -71,37 +69,34 @@ async def get_current_traffic( logger.error("Traffic API traceback", traceback=traceback.format_exc()) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") -@router.get("/tenants/{tenant_id}/traffic/historical", response_model=List[TrafficDataResponse]) +@router.post("/tenants/{tenant_id}/traffic/historical") async def get_historical_traffic( - latitude: float = Query(..., description="Latitude"), - longitude: float = Query(..., description="Longitude"), - start_date: datetime = Query(..., description="Start date"), - end_date: datetime = Query(..., description="End date"), + request: HistoricalTrafficRequest, db: AsyncSession = Depends(get_db), tenant_id: UUID = Path(..., description="Tenant ID"), current_user: Dict[str, Any] = Depends(get_current_user_dep), ): - """Get historical traffic data""" + """Get historical traffic data with date range in payload""" try: # Validate date range - if end_date <= start_date: + if request.end_date <= request.start_date: raise HTTPException(status_code=400, detail="End date must be after start date") - if (end_date - start_date).days > 90: + if (request.end_date - request.start_date).days > 90: raise HTTPException(status_code=400, detail="Date range cannot exceed 90 days") historical_data = await traffic_service.get_historical_traffic( - latitude, longitude, start_date, end_date, db + request.latitude, request.longitude, request.start_date, request.end_date, db ) # Publish event (with error handling) try: await data_publisher.publish_traffic_updated({ "type": "historical_requested", - "latitude": latitude, - "longitude": longitude, - "start_date": start_date.isoformat(), - "end_date": end_date.isoformat(), + "latitude": request.latitude, + "longitude": request.longitude, + "start_date": request.start_date.isoformat(), + "end_date": request.end_date.isoformat(), "records_count": len(historical_data), "timestamp": datetime.utcnow().isoformat() }) @@ -112,49 +107,7 @@ async def get_historical_traffic( return historical_data except HTTPException: - # Re-raise HTTP exceptions raise except Exception as e: logger.error("Unexpected error in historical traffic API", error=str(e)) - raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") - -@router.post("/tenants/{tenant_id}/traffic/store") -async def store_traffic_data( - latitude: float = Query(..., description="Latitude"), - longitude: float = Query(..., description="Longitude"), - db: AsyncSession = Depends(get_db), - tenant_id: UUID = Path(..., description="Tenant ID"), - current_user: Dict[str, Any] = Depends(get_current_user_dep) -): - """Store current traffic data to database""" - try: - # Get current traffic data - traffic = await traffic_service.get_current_traffic(latitude, longitude) - - if not traffic: - raise HTTPException(status_code=404, detail="No traffic data to store") - - # Convert to dict for storage - traffic_dict = { - "date": traffic.date, - "traffic_volume": traffic.traffic_volume, - "pedestrian_count": traffic.pedestrian_count, - "congestion_level": traffic.congestion_level, - "average_speed": traffic.average_speed, - "source": traffic.source - } - - success = await traffic_service.store_traffic_data( - latitude, longitude, traffic_dict, db - ) - - if success: - return {"status": "success", "message": "Traffic data stored successfully"} - else: - raise HTTPException(status_code=500, detail="Failed to store traffic data") - - except HTTPException: - raise - except Exception as e: - logger.error("Error storing traffic data", error=str(e)) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") \ No newline at end of file diff --git a/services/data/app/api/weather.py b/services/data/app/api/weather.py index 07883b01..d9d62ac3 100644 --- a/services/data/app/api/weather.py +++ b/services/data/app/api/weather.py @@ -14,16 +14,24 @@ from app.schemas.external import ( from app.services.weather_service import WeatherService from app.services.messaging import publish_weather_updated +from app.schemas.external import ( + HistoricalWeatherRequest +) + # Import unified authentication from shared library from shared.auth.decorators import ( get_current_user_dep, get_current_tenant_id_dep ) +from sqlalchemy.ext.asyncio import AsyncSession +from app.core.database import get_db + router = APIRouter(tags=["weather"]) logger = structlog.get_logger() +weather_service = WeatherService() -@router.get("/tenants/{tenant_id}/wetaher/current", response_model=WeatherDataResponse) +@router.get("/tenants/{tenant_id}/weather/current", response_model=WeatherDataResponse) async def get_current_weather( latitude: float = Query(..., description="Latitude"), longitude: float = Query(..., description="Longitude"), @@ -38,7 +46,6 @@ async def get_current_weather( tenant_id=tenant_id, user_id=current_user["user_id"]) - weather_service = WeatherService() weather = await weather_service.get_current_weather(latitude, longitude) if not weather: @@ -81,7 +88,6 @@ async def get_weather_forecast( days=days, tenant_id=tenant_id) - weather_service = WeatherService() forecast = await weather_service.get_weather_forecast(latitude, longitude, days) if not forecast: @@ -109,30 +115,47 @@ async def get_weather_forecast( logger.error("Failed to get weather forecast", error=str(e)) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") -@router.get("/tenants/{tenant_id}/weather/history", response_model=List[WeatherDataResponse]) -async def get_weather_history( - start_date: date = Query(..., description="Start date"), - end_date: date = Query(..., description="End date"), - latitude: float = Query(..., description="Latitude"), - longitude: float = Query(..., description="Longitude"), - tenant_id: str = Path(..., description="Tenant ID") +@router.post("/tenants/{tenant_id}/weather/historical") +async def get_historical_weather( + request: HistoricalWeatherRequest, + db: AsyncSession = Depends(get_db), + tenant_id: UUID = Path(..., description="Tenant ID"), + current_user: Dict[str, Any] = Depends(get_current_user_dep), ): - """Get historical weather data""" + """Get historical weather data with date range in payload""" try: - logger.debug("Getting weather history", - start_date=start_date, - end_date=end_date, - tenant_id=tenant_id) + # Validate date range + if request.end_date <= request.start_date: + raise HTTPException(status_code=400, detail="End date must be after start date") - weather_service = WeatherService() - history = await weather_service.get_historical_weather( - latitude, longitude, start_date, end_date + if (request.end_date - request.start_date).days > 90: + raise HTTPException(status_code=400, detail="Date range cannot exceed 90 days") + + historical_data = await weather_service.get_historical_weather( + request.latitude, request.longitude, request.start_date, request.end_date, db ) - return history + # Publish event (with error handling) + try: + await publish_weather_updated({ + "type": "historical_requested", + "latitude": request.latitude, + "longitude": request.longitude, + "start_date": request.start_date.isoformat(), + "end_date": request.end_date.isoformat(), + "records_count": len(historical_data), + "timestamp": datetime.utcnow().isoformat() + }) + except Exception as pub_error: + logger.warning("Failed to publish historical weather event", error=str(pub_error)) + # Continue processing + return historical_data + + except HTTPException: + raise except Exception as e: - logger.error("Failed to get weather history", error=str(e)) + logger.error("Unexpected error in historical weather API", error=str(e)) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/tenants/{tenant_id}weather/sync") @@ -149,8 +172,6 @@ async def sync_weather_data( user_id=current_user["user_id"], force=force) - weather_service = WeatherService() - # Check if user has permission to sync (could be admin only) if current_user.get("role") not in ["admin", "manager"]: raise HTTPException( diff --git a/services/data/app/schemas/external.py b/services/data/app/schemas/external.py index 72bd6488..6a623feb 100644 --- a/services/data/app/schemas/external.py +++ b/services/data/app/schemas/external.py @@ -43,3 +43,15 @@ class LocationRequest(BaseModel): class DateRangeRequest(BaseModel): start_date: datetime end_date: datetime + +class HistoricalTrafficRequest(BaseModel): + latitude: float + longitude: float + start_date: datetime + end_date: datetime + +class HistoricalWeatherRequest(BaseModel): + latitude: float + longitude: float + start_date: datetime + end_date: datetime \ No newline at end of file diff --git a/services/training/app/services/data_client.py b/services/training/app/services/data_client.py index 1d9faccf..2777dc12 100644 --- a/services/training/app/services/data_client.py +++ b/services/training/app/services/data_client.py @@ -43,7 +43,7 @@ class DataServiceClient: # Make request via gateway async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.get( + response = await client.post( f"{self.base_url}/api/v1/tenants/{tenant_id}/sales", headers=headers, params=params @@ -87,58 +87,102 @@ class DataServiceClient: tenant_id=tenant_id) return [] - async def fetch_weather_data(self, tenant_id: str) -> List[Dict[str, Any]]: - """Fetch weather data via API Gateway""" + async def fetch_weather_data( + self, + tenant_id: str, + start_date: str, + end_date: str, + latitude: Optional[float] = None, + longitude: Optional[float] = None + ) -> List[Dict[str, Any]]: + """ + Fetch historical weather data for training via API Gateway using POST + """ try: + # Get service token token = await service_auth.get_service_token() + + # Prepare headers headers = service_auth.get_request_headers(tenant_id) headers["Authorization"] = f"Bearer {token}" + headers["Content-Type"] = "application/json" - params = {strta date, end date, lat, long } + # Prepare request payload + payload = { + "start_date": start_date, + "end_date": end_date, + "latitude": latitude or 40.4168, # Default Madrid coordinates + "longitude": longitude or -3.7038 + } + # Make POST request via gateway async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.get( - f"{self.base_url}/api/v1/tenants/{tenant_id}/weather/history", + response = await client.post( + f"{self.base_url}/api/v1/tenants/{tenant_id}/weather/historical", headers=headers, - params=params, - timeout=30.0 + json=payload ) + logger.info(f"Weather data request: {response.status_code}", + tenant_id=tenant_id, + url=response.url) + if response.status_code == 200: - data = response.json() - logger.info(f"Fetched {len(data)} weather records", tenant_id=tenant_id) - return data + return response.json() else: - logger.warning(f"Weather data fetch failed: {response.status_code}", - tenant_id=tenant_id) + logger.error(f"Failed to fetch weather data: {response.status_code} - {response.text}") return [] except Exception as e: - logger.warning(f"Error fetching weather data: {e}", tenant_id=tenant_id) + logger.error(f"Error fetching weather data: {str(e)}") return [] - - async def fetch_traffic_data(self, tenant_id: str) -> List[Dict[str, Any]]: - """Fetch traffic data via API Gateway""" + + async def fetch_traffic_data( + self, + tenant_id: str, + start_date: str, + end_date: str, + latitude: Optional[float] = None, + longitude: Optional[float] = None + ) -> List[Dict[str, Any]]: + """ + Fetch historical traffic data for training via API Gateway using POST + """ try: + # Get service token token = await service_auth.get_service_token() + + # Prepare headers headers = service_auth.get_request_headers(tenant_id) headers["Authorization"] = f"Bearer {token}" + headers["Content-Type"] = "application/json" + # Prepare request payload + payload = { + "start_date": start_date, + "end_date": end_date, + "latitude": latitude or 40.4168, # Default Madrid coordinates + "longitude": longitude or -3.7038 + } + + # Make POST request via gateway async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/api/v1/tenants/{tenant_id}/traffic/historical", - headers=headers + headers=headers, + json=payload ) + logger.info(f"Traffic data request: {response.status_code}", + tenant_id=tenant_id, + url=response.url) + if response.status_code == 200: - data = response.json() - logger.info(f"Fetched {len(data)} traffic records", tenant_id=tenant_id) - return data + return response.json() else: - logger.warning(f"Traffic data fetch failed: {response.status_code}", - tenant_id=tenant_id) + logger.error(f"Failed to fetch traffic data: {response.status_code} - {response.text}") return [] except Exception as e: - logger.warning(f"Error fetching traffic data: {e}", tenant_id=tenant_id) + logger.error(f"Error fetching traffic data: {str(e)}") return [] \ No newline at end of file diff --git a/services/training/app/services/training_service.py b/services/training/app/services/training_service.py index ddaec8b9..52e29653 100644 --- a/services/training/app/services/training_service.py +++ b/services/training/app/services/training_service.py @@ -140,17 +140,29 @@ class TrainingService: # Fetch sales data from data service sales_data = await self.data_client.fetch_sales_data(tenant_id) - # Fetch external data if requested + if not sales_data: + raise ValueError("No sales data found for training") + + # Determine date range from sales data + start_date, end_date = await self._determine_sales_date_range(sales_data) + + # Fetch external data if requested using the sales date range weather_data = [] traffic_data = [] - if request.include_weather: - await self._update_job_status(db, job_id, "running", 15, "Fetching weather data") - weather_data = await self.data_client.fetch_weather_data(tenant_id) + await self._update_job_status(db, job_id, "running", 15, "Fetching weather data") + weather_data = await self.data_client.fetch_weather_data( + tenant_id, + start_date=start_date.isoformat(), + end_date=end_date.isoformat() + ) - if request.include_traffic: - await self._update_job_status(db, job_id, "running", 25, "Fetching traffic data") - traffic_data = await self.data_client.fetch_traffic_data(tenant_id) + await self._update_job_status(db, job_id, "running", 25, "Fetching traffic data") + traffic_data = await self.data_client.fetch_traffic_data( + tenant_id, + start_date=start_date.isoformat(), + end_date=end_date.isoformat() + ) # Execute ML training await self._update_job_status(db, job_id, "running", 35, "Processing training data") @@ -668,4 +680,26 @@ class TrainingService: except Exception as e: logger.error(f"Failed to get training logs: {str(e)}") - return None \ No newline at end of file + return None + + async def _determine_sales_date_range(self, sales_data: List[Dict]) -> tuple[datetime, datetime]: + """Determine start and end dates from sales data""" + if not sales_data: + raise ValueError("No sales data available to determine date range") + + dates = [] + for record in sales_data: + if 'date' in record: + if isinstance(record['date'], str): + dates.append(datetime.fromisoformat(record['date'].replace('Z', '+00:00'))) + elif isinstance(record['date'], datetime): + dates.append(record['date']) + + if not dates: + raise ValueError("No valid dates found in sales data") + + start_date = min(dates) + end_date = max(dates) + + logger.info(f"Determined sales date range: {start_date} to {end_date}") + return start_date, end_date \ No newline at end of file diff --git a/tests/test_onboarding_flow.sh b/tests/test_onboarding_flow.sh index b715bb20..cc035175 100755 --- a/tests/test_onboarding_flow.sh +++ b/tests/test_onboarding_flow.sh @@ -327,8 +327,6 @@ BAKERY_DATA="{ \"city\": \"Madrid\", \"postal_code\": \"28001\", \"phone\": \"+34600123456\", - \"include_weather\": \"True\", - \"include_traffic\": \"True\", \"latitude\": $MOCK_LATITUDE, \"longitude\": $MOCK_LONGITUDE }" @@ -651,6 +649,8 @@ fi TRAINING_DATA="{ \"tenant_id\": \"$TENANT_ID\", \"selected_products\": [$REAL_PRODUCTS], + \"include_weather\": \"True\", + \"include_traffic\": \"True\", \"training_parameters\": { \"forecast_horizon\": 7, \"validation_split\": 0.2,