diff --git a/gateway/app/middleware/auth.py b/gateway/app/middleware/auth.py index a9850550..216ad089 100644 --- a/gateway/app/middleware/auth.py +++ b/gateway/app/middleware/auth.py @@ -182,9 +182,11 @@ class AuthMiddleware(BaseHTTPMiddleware): return False # Validate token type - if payload.get("type") != "access": + token_type = payload.get("type") + if token_type not in ["access", "service"]: logger.warning(f"Invalid token type: {payload.get('type')}") return False + return True @@ -193,12 +195,19 @@ class AuthMiddleware(BaseHTTPMiddleware): Convert JWT payload to user context format FIXED: Proper mapping between JWT structure and user context """ - return { + base_context = { "user_id": payload["user_id"], "email": payload["email"], "exp": payload["exp"], "valid": True } + + if payload.get("service"): + base_context["service"] = payload["service"] + base_context["type"] = "service" + logger.debug(f"Service authentication: {payload['service']}") + + return base_context async def _verify_with_auth_service(self, token: str) -> Optional[Dict[str, Any]]: """ diff --git a/services/training/app/core/service_auth.py b/services/training/app/core/service_auth.py new file mode 100644 index 00000000..127e958a --- /dev/null +++ b/services/training/app/core/service_auth.py @@ -0,0 +1,82 @@ +import time +import structlog +from typing import Dict, Any +from shared.auth.jwt_handler import JWTHandler +from app.core.config import settings + +logger = structlog.get_logger() + +class ServiceAuthenticator: + """Handles service-to-service authentication via gateway""" + + def __init__(self): + self.jwt_handler = JWTHandler(settings.JWT_SECRET_KEY) + self._cached_token = None + self._token_expires_at = 0 + + async def get_service_token(self) -> str: + """ + Get a valid service token, using cache when possible + Creates JWT tokens that the gateway will accept + """ + current_time = int(time.time()) + + # Return cached token if still valid (with 5 min buffer) + if (self._cached_token and + self._token_expires_at > current_time + 300): + return self._cached_token + + # Create new service token + token_expires_at = current_time + 3600 # 1 hour + + service_payload = { + # ✅ Required fields for gateway middleware + "sub": "training-service", + "user_id": "training-service", + "email": "training-service@internal", + "type": "access", # ✅ Must be "access" for gateway + + # ✅ Expiration and timing + "exp": token_expires_at, + "iat": current_time, + "iss": "training-service", + + # ✅ Service identification + "service": "training", + "full_name": "Training Service", + "is_verified": True, + "is_active": True, + + # ✅ Optional tenant context (can be overridden per request) + "tenant_id": None + } + + try: + token = self.jwt_handler.create_access_token_from_payload(service_payload) + + # Cache the token + self._cached_token = token + self._token_expires_at = token_expires_at + + logger.debug("Created new service token", expires_at=token_expires_at) + return token + + except Exception as e: + logger.error(f"Failed to create service token: {e}") + raise ValueError(f"Service token creation failed: {e}") + + def get_request_headers(self, tenant_id: str = None) -> Dict[str, str]: + """Get standard headers for service requests""" + headers = { + "Content-Type": "application/json", + "X-Service": "training-service", + "User-Agent": "training-service/1.0.0" + } + + if tenant_id: + headers["X-Tenant-ID"] = str(tenant_id) + + return headers + +# Global authenticator instance +service_auth = ServiceAuthenticator() \ No newline at end of file diff --git a/services/training/app/ml/prophet_manager.py b/services/training/app/ml/prophet_manager.py index e3441c25..ece4d52f 100644 --- a/services/training/app/ml/prophet_manager.py +++ b/services/training/app/ml/prophet_manager.py @@ -18,6 +18,7 @@ import joblib from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score import json from pathlib import Path +import math from app.core.config import settings @@ -177,8 +178,16 @@ class BakeryProphetManager: """Prepare data for Prophet training""" prophet_data = df.copy() - # Ensure ds column is datetime - prophet_data['ds'] = pd.to_datetime(prophet_data['ds']) + # Prophet column mapping + if 'date' in prophet_data.columns: + prophet_data['ds'] = prophet_data['date'] + if 'quantity' in prophet_data.columns: + prophet_data['y'] = prophet_data['quantity'] + + # ✅ CRITICAL FIX: Remove timezone from ds column + if 'ds' in prophet_data.columns: + prophet_data['ds'] = pd.to_datetime(prophet_data['ds']).dt.tz_localize(None) + logger.info(f"Removed timezone from ds column") # Handle missing values in target if prophet_data['y'].isna().any(): @@ -345,7 +354,14 @@ class BakeryProphetManager: rmse = np.sqrt(mse) # MAPE (Mean Absolute Percentage Error) - mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100 + non_zero_mask = y_true != 0 + if np.sum(non_zero_mask) == 0: + mape = 0.0 # Return 0 instead of Infinity + else: + mape_values = np.abs((y_true[non_zero_mask] - y_pred[non_zero_mask]) / y_true[non_zero_mask]) + mape = np.mean(mape_values) * 100 + if math.isinf(mape) or math.isnan(mape): + mape = 0.0 # R-squared r2 = r2_score(y_true, y_pred) diff --git a/services/training/app/ml/trainer.py b/services/training/app/ml/trainer.py index 614d064f..7e7e35cc 100644 --- a/services/training/app/ml/trainer.py +++ b/services/training/app/ml/trainer.py @@ -244,6 +244,10 @@ class BakeryMLTrainer: if sales_df.empty: raise ValueError(f"No sales data provided for tenant {tenant_id}") + if 'quantity_sold' in sales_df.columns and 'quantity' not in sales_df.columns: + sales_df['quantity'] = sales_df['quantity_sold'] + logger.info("Mapped 'quantity_sold' to 'quantity' column") + required_columns = ['date', 'product_name', 'quantity'] missing_columns = [col for col in required_columns if col not in sales_df.columns] if missing_columns: diff --git a/services/training/app/services/data_client.py b/services/training/app/services/data_client.py new file mode 100644 index 00000000..99f209d3 --- /dev/null +++ b/services/training/app/services/data_client.py @@ -0,0 +1,140 @@ +import httpx +import structlog +from typing import List, Dict, Any, Optional +from app.core.config import settings +from app.core.service_auth import service_auth + +logger = structlog.get_logger() + +class DataServiceClient: + """Client for fetching data through the API Gateway""" + + def __init__(self): + self.base_url = settings.API_GATEWAY_URL + self.timeout = 30.0 + + async def fetch_sales_data( + self, + tenant_id: str, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + product_name: Optional[str] = None + ) -> List[Dict[str, Any]]: + """ + Fetch sales data for training via API Gateway + ✅ Uses proper service authentication + """ + try: + # Get service token + token = await service_auth.get_service_token() + + # Prepare headers + headers = service_auth.get_request_headers(tenant_id) + headers["Authorization"] = f"Bearer {token}" + + # Prepare query parameters + params = {} + if start_date: + params["start_date"] = start_date + if end_date: + params["end_date"] = end_date + if product_name: + params["product_name"] = product_name + + # Make request via gateway + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + f"{self.base_url}/api/v1/tenants/{tenant_id}/sales", + headers=headers, + params=params + ) + + logger.info(f"Sales data request: {response.status_code}", + tenant_id=tenant_id, + url=response.url) + + if response.status_code == 200: + data = response.json() + logger.info(f"Successfully fetched {len(data)} sales records via gateway", + tenant_id=tenant_id) + return data + + elif response.status_code == 401: + logger.error("Authentication failed with gateway", + tenant_id=tenant_id, + response_text=response.text) + return [] + + elif response.status_code == 404: + logger.warning("Sales data endpoint not found", + tenant_id=tenant_id, + url=response.url) + return [] + + else: + logger.error(f"Gateway request failed: HTTP {response.status_code}", + tenant_id=tenant_id, + response_text=response.text) + return [] + + except httpx.TimeoutException: + logger.error("Timeout when fetching sales data via gateway", + tenant_id=tenant_id) + return [] + + except Exception as e: + logger.error(f"Error fetching sales data via gateway: {e}", + tenant_id=tenant_id) + return [] + + async def fetch_weather_data(self, tenant_id: str) -> List[Dict[str, Any]]: + """Fetch weather data via API Gateway""" + try: + token = await service_auth.get_service_token() + headers = service_auth.get_request_headers(tenant_id) + headers["Authorization"] = f"Bearer {token}" + + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + f"{self.base_url}/api/v1/tenants/{tenant_id}/weather/history", + headers=headers + ) + + if response.status_code == 200: + data = response.json() + logger.info(f"Fetched {len(data)} weather records", tenant_id=tenant_id) + return data + else: + logger.warning(f"Weather data fetch failed: {response.status_code}", + tenant_id=tenant_id) + return [] + + except Exception as e: + logger.warning(f"Error fetching weather data: {e}", tenant_id=tenant_id) + return [] + + async def fetch_traffic_data(self, tenant_id: str) -> List[Dict[str, Any]]: + """Fetch traffic data via API Gateway""" + try: + token = await service_auth.get_service_token() + headers = service_auth.get_request_headers(tenant_id) + headers["Authorization"] = f"Bearer {token}" + + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + f"{self.base_url}/api/v1/tenants/{tenant_id}/traffic/historical", + headers=headers + ) + + if response.status_code == 200: + data = response.json() + logger.info(f"Fetched {len(data)} traffic records", tenant_id=tenant_id) + return data + else: + logger.warning(f"Traffic data fetch failed: {response.status_code}", + tenant_id=tenant_id) + return [] + + except Exception as e: + logger.warning(f"Error fetching traffic data: {e}", tenant_id=tenant_id) + return [] \ No newline at end of file diff --git a/services/training/app/services/training_service.py b/services/training/app/services/training_service.py index d4bfb041..8451f831 100644 --- a/services/training/app/services/training_service.py +++ b/services/training/app/services/training_service.py @@ -19,6 +19,7 @@ from app.schemas.training import TrainingJobRequest, SingleProductTrainingReques from app.services.messaging import publish_job_completed, publish_job_failed from app.core.config import settings from shared.monitoring.metrics import MetricsCollector +from app.services.data_client import DataServiceClient logger = logging.getLogger(__name__) metrics = MetricsCollector("training-service") @@ -31,6 +32,7 @@ class TrainingService: def __init__(self): self.ml_trainer = BakeryMLTrainer() + self.data_client = DataServiceClient() async def execute_training_job_simple(self, job_id: str, tenant_id_str: str, request: TrainingJobRequest): """Simple wrapper that creates its own database session""" @@ -136,7 +138,7 @@ class TrainingService: await self._update_job_status(db, job_id, "running", 5, "Fetching training data") # Fetch sales data from data service - sales_data = await self._fetch_sales_data(tenant_id, request) + sales_data = await self.data_client.fetch_sales_data(tenant_id) # Fetch external data if requested weather_data = [] @@ -144,11 +146,11 @@ class TrainingService: if request.include_weather: await self._update_job_status(db, job_id, "running", 15, "Fetching weather data") - weather_data = await self._fetch_weather_data(tenant_id, request) + weather_data = await self.data_client.fetch_weather_data(tenant_id) if request.include_traffic: await self._update_job_status(db, job_id, "running", 25, "Fetching traffic data") - traffic_data = await self._fetch_traffic_data(tenant_id, request) + traffic_data = await self.data_client.fetch_traffic_data(tenant_id) # Execute ML training await self._update_job_status(db, job_id, "running", 35, "Processing training data") diff --git a/shared/config/base.py b/shared/config/base.py index 6f7d595b..fd08a13e 100644 --- a/shared/config/base.py +++ b/shared/config/base.py @@ -95,6 +95,7 @@ class BaseServiceSettings(BaseSettings): # Service-to-Service Authentication SERVICE_API_KEY: str = os.getenv("SERVICE_API_KEY", "service-api-key-change-in-production") ENABLE_SERVICE_AUTH: bool = os.getenv("ENABLE_SERVICE_AUTH", "false").lower() == "true" + API_GATEWAY_URL: str = os.getenv("API_GATEWAY_URL", "http://gateway:8000") # Password Requirements PASSWORD_MIN_LENGTH: int = int(os.getenv("PASSWORD_MIN_LENGTH", "8")) diff --git a/test_training_service_token.sh b/test_training_service_token.sh new file mode 100644 index 00000000..326167bb --- /dev/null +++ b/test_training_service_token.sh @@ -0,0 +1,235 @@ +#!/bin/bash +# ================================================================= +# Test Training Service Token Generation and Gateway Validation +# ================================================================= + +set -e + +# Configuration +API_BASE="http://localhost:8000" +AUTH_BASE="http://localhost:8001" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +log_info() { echo -e "${BLUE}ℹ️ $1${NC}"; } +log_success() { echo -e "${GREEN}✅ $1${NC}"; } +log_warning() { echo -e "${YELLOW}⚠️ $1${NC}"; } +log_error() { echo -e "${RED}❌ $1${NC}"; } +log_step() { echo -e "${BLUE}🔄 $1${NC}"; } + +echo "🧪 Testing Training Service Token with Gateway Middleware" +echo "========================================================" + +# Step 1: Create a service token like training service would +log_step "Step 1: Creating Service Token (Training Service Style)" + +# Use Python to create the same token the training service would create +SERVICE_TOKEN=$(python3 -c " +import sys +import time +import os +sys.path.append('.') + +# Import your shared JWT handler +from shared.auth.jwt_handler import JWTHandler + +# Use same secret as gateway/auth service +JWT_SECRET = os.getenv('JWT_SECRET_KEY', 'your-super-secret-jwt-key-change-in-production-min-32-characters-long') + +# Create JWT handler +jwt_handler = JWTHandler(JWT_SECRET) + +# Create service payload (same as training service would) +service_payload = { + 'sub': 'training-service', + 'user_id': 'training-service', + 'email': 'training-service@internal', + 'service': 'training', + 'type': 'access', # Important: must be 'access' type + 'exp': int(time.time()) + 3600, # 1 hour + 'iat': int(time.time()), + 'iss': 'training-service', + 'full_name': 'Training Service', + 'is_verified': True, + 'is_active': True +} + +# Create token +token = jwt_handler.create_access_token_from_payload(service_payload) +print(token) +") + +if [ -z "$SERVICE_TOKEN" ]; then + log_error "Failed to create service token" + exit 1 +fi + +log_success "Service token created successfully" +echo "Token: ${SERVICE_TOKEN:0:50}..." + +echo "" + +# Step 2: Decode and inspect the token +log_step "Step 2: Decoding Service Token Payload" + +# Decode the payload to see what's inside +PAYLOAD=$(echo "$SERVICE_TOKEN" | cut -d'.' -f2) +# Add padding if needed +while [ $((${#PAYLOAD} % 4)) -ne 0 ]; do + PAYLOAD="${PAYLOAD}=" +done + +echo "Service Token Payload:" +echo "$PAYLOAD" | base64 -d 2>/dev/null | jq '.' || echo "Failed to decode" + +echo "" + +# Step 3: Test token with gateway middleware +log_step "Step 3: Testing Service Token with Gateway Middleware" + +# Test a tenant-scoped endpoint that training service would call +TENANT_ID="b2a268a0-904f-4182-8f81-ec25d0e6def7" # From your test + +log_info "Testing GET /api/v1/tenants/$TENANT_ID/sales with service token..." + +GATEWAY_RESPONSE=$(curl -s -w "\nHTTP_CODE:%{http_code}\n" -X GET \ + "$API_BASE/api/v1/tenants/$TENANT_ID/sales" \ + -H "Authorization: Bearer $SERVICE_TOKEN" \ + -H "X-Tenant-ID: $TENANT_ID" \ + -H "X-Service: training-service" \ + -H "Content-Type: application/json") + +echo "Gateway Response:" +echo "$GATEWAY_RESPONSE" + +# Check the result +if echo "$GATEWAY_RESPONSE" | grep -q "HTTP_CODE:200"; then + log_success "✅ Service token ACCEPTED by gateway middleware!" + log_success "Training service authentication would work!" +elif echo "$GATEWAY_RESPONSE" | grep -q "HTTP_CODE:401"; then + log_error "❌ Service token REJECTED by gateway middleware (401 Unauthorized)" + log_warning "This explains why training service fails" +elif echo "$GATEWAY_RESPONSE" | grep -q "HTTP_CODE:404"; then + log_warning "⚠️ Endpoint not found (404) - but token was accepted by middleware" + log_success "Authentication passed, routing issue" +else + log_warning "Unexpected HTTP response code" +fi + +echo "" + +# Step 4: Test with a known working user token for comparison +log_step "Step 4: Comparison Test with User Token" + +# Get a real user token from the onboarding test +USER_TOKEN="" +if [ -f "/tmp/test_user_token.txt" ]; then + USER_TOKEN=$(cat /tmp/test_user_token.txt) +fi + +if [ -z "$USER_TOKEN" ]; then + log_info "Creating a user token for comparison..." + + # Quick user login to get a token + USER_LOGIN_RESPONSE=$(curl -s -X POST "$API_BASE/api/v1/auth/login" \ + -H "Content-Type: application/json" \ + -d '{ + "email": "onboarding.test.1753606890@bakery.com", + "password": "TestPassword123!" + }') + + USER_TOKEN=$(echo "$USER_LOGIN_RESPONSE" | jq -r '.access_token' 2>/dev/null) +fi + +if [ -n "$USER_TOKEN" ] && [ "$USER_TOKEN" != "null" ]; then + log_info "Testing same endpoint with user token..." + + USER_RESPONSE=$(curl -s -w "\nHTTP_CODE:%{http_code}\n" -X GET \ + "$API_BASE/api/v1/tenants/$TENANT_ID/sales" \ + -H "Authorization: Bearer $USER_TOKEN" \ + -H "X-Tenant-ID: $TENANT_ID") + + if echo "$USER_RESPONSE" | grep -q "HTTP_CODE:200"; then + log_success "User token works - gateway middleware is functioning" + elif echo "$USER_RESPONSE" | grep -q "HTTP_CODE:401"; then + log_warning "User token also fails - gateway middleware issue" + else + log_info "User token response: $(echo "$USER_RESPONSE" | tail -1)" + fi +else + log_warning "Could not get user token for comparison" +fi + +echo "" + +# Step 5: Test gateway auth verification endpoint +log_step "Step 5: Testing Token with Gateway Auth Verification" + +log_info "Testing service token with /api/v1/auth/verify..." +VERIFY_RESPONSE=$(curl -s -X POST "$API_BASE/api/v1/auth/verify" \ + -H "Authorization: Bearer $SERVICE_TOKEN") + +echo "Verification Response:" +echo "$VERIFY_RESPONSE" | jq '.' 2>/dev/null || echo "$VERIFY_RESPONSE" + +if echo "$VERIFY_RESPONSE" | jq -e '.valid' > /dev/null 2>&1; then + if [ "$(echo "$VERIFY_RESPONSE" | jq -r '.valid')" = "true" ]; then + log_success "Service token is VALID according to auth service" + else + log_error "Service token is INVALID according to auth service" + fi +else + log_warning "Verification response doesn't contain 'valid' field" +fi + +echo "" + +# Step 6: Diagnosis and recommendations +log_step "Step 6: Diagnosis and Recommendations" + +# Check JWT secrets match +log_info "Checking JWT secret consistency..." +if docker-compose exec -T gateway env 2>/dev/null | grep -q JWT_SECRET_KEY; then + log_success "Gateway has JWT_SECRET_KEY configured" +else + log_error "Gateway missing JWT_SECRET_KEY configuration" +fi + +if docker-compose exec -T auth-service env 2>/dev/null | grep -q JWT_SECRET_KEY; then + log_success "Auth service has JWT_SECRET_KEY configured" +else + log_error "Auth service missing JWT_SECRET_KEY configuration" +fi + +echo "" +echo "🏁 Test Summary:" +echo "==================" + +if echo "$GATEWAY_RESPONSE" | grep -q "HTTP_CODE:200"; then + echo "✅ Service token authentication: WORKING" + echo "✅ Training service should be able to fetch sales data" + echo "" + echo "🎯 Next Steps:" + echo "1. Update training service to use gateway URL: http://gateway:8000" + echo "2. Ensure training service creates tokens with same payload structure" + echo "3. Test with: docker-compose restart training-service" +elif echo "$GATEWAY_RESPONSE" | grep -q "HTTP_CODE:401"; then + echo "❌ Service token authentication: FAILING" + echo "❌ This explains why training service gets 401 errors" + echo "" + echo "🔧 Fixes needed:" + echo "1. Check JWT_SECRET_KEY matches across services" + echo "2. Update gateway middleware to accept service tokens" + echo "3. Verify token payload structure matches gateway expectations" +else + echo "⚠️ Inconclusive test results" + echo "Check the response details above" +fi + +echo "" +echo "🧪 Test completed!" \ No newline at end of file