diff --git a/frontend/src/api/services/tenant.service.ts b/frontend/src/api/services/tenant.service.ts index 2784fe50..c1cda969 100644 --- a/frontend/src/api/services/tenant.service.ts +++ b/frontend/src/api/services/tenant.service.ts @@ -120,7 +120,20 @@ export class TenantService { console.log('📦 TenantService: API response:', result); console.log('📏 TenantService: Response length:', Array.isArray(result) ? result.length : 'Not an array'); - if (Array.isArray(result) && result.length > 0) { + // Ensure we always return an array + if (!Array.isArray(result)) { + console.warn('⚠️ TenantService: Response is not an array, converting...'); + // If it's an object with numeric keys, convert to array + if (result && typeof result === 'object') { + const converted = Object.values(result); + console.log('🔄 TenantService: Converted to array:', converted); + return converted as TenantInfo[]; + } + console.log('🔄 TenantService: Returning empty array'); + return []; + } + + if (result.length > 0) { console.log('✅ TenantService: First tenant:', result[0]); console.log('🆔 TenantService: First tenant ID:', result[0]?.id); } diff --git a/frontend/src/components/SimplifiedTrainingProgress.tsx b/frontend/src/components/SimplifiedTrainingProgress.tsx index 6b95924d..e912218f 100644 --- a/frontend/src/components/SimplifiedTrainingProgress.tsx +++ b/frontend/src/components/SimplifiedTrainingProgress.tsx @@ -327,6 +327,7 @@ export default function SimplifiedTrainingProgress({ + {/* Benefits Preview */}

@@ -408,6 +409,7 @@ export default function SimplifiedTrainingProgress({

)} + ); } \ No newline at end of file diff --git a/frontend/src/components/navigation/TenantSelector.tsx b/frontend/src/components/navigation/TenantSelector.tsx index 4d72a07b..af6774ff 100644 --- a/frontend/src/components/navigation/TenantSelector.tsx +++ b/frontend/src/components/navigation/TenantSelector.tsx @@ -25,6 +25,20 @@ export const TenantSelector: React.FC = () => { } }, [user, getUserTenants]); + // Auto-select tenant based on localStorage or default to first one + useEffect(() => { + if (Array.isArray(tenants) && tenants.length > 0 && !currentTenant) { + const savedTenantId = localStorage.getItem('selectedTenantId'); + const tenantToSelect = savedTenantId + ? tenants.find(t => t.id === savedTenantId) || tenants[0] + : tenants[0]; + + console.log('🎯 Auto-selecting tenant:', tenantToSelect); + dispatch(setCurrentTenant(tenantToSelect)); + localStorage.setItem('selectedTenantId', tenantToSelect.id); + } + }, [tenants, currentTenant, dispatch]); + const handleTenantChange = async (tenant: any) => { try { dispatch(setCurrentTenant(tenant)); @@ -40,8 +54,32 @@ export const TenantSelector: React.FC = () => { } }; - if (isLoading || tenants.length <= 1) { - return null; + if (isLoading) { + return ( +
+ Cargando panaderías... +
+ ); + } + + // Show current tenant name even if there's only one + if (!Array.isArray(tenants) || tenants.length === 0) { + return ( +
+ No hay panaderías disponibles +
+ ); + } + + // If there's only one tenant, just show its name without dropdown + if (tenants.length === 1) { + const tenant = tenants[0]; + return ( +
+ + {tenant.name} +
+ ); } return ( @@ -74,7 +112,7 @@ export const TenantSelector: React.FC = () => {
- {tenants.map((tenant) => ( + {Array.isArray(tenants) ? tenants.map((tenant) => ( - ))} + )) : ( +
+ No hay panaderías disponibles +
+ )}
diff --git a/frontend/src/pages/onboarding/OnboardingPage.tsx b/frontend/src/pages/onboarding/OnboardingPage.tsx index 7a099e44..9f5779f1 100644 --- a/frontend/src/pages/onboarding/OnboardingPage.tsx +++ b/frontend/src/pages/onboarding/OnboardingPage.tsx @@ -1,5 +1,7 @@ import React, { useState, useEffect, useCallback, useRef } from 'react'; import { ChevronLeft, ChevronRight, Upload, MapPin, Store, Check, Brain, Clock, CheckCircle, AlertTriangle, Loader, TrendingUp } from 'lucide-react'; +import { useNavigate } from 'react-router-dom'; +import { useSelector } from 'react-redux'; import toast from 'react-hot-toast'; import SimplifiedTrainingProgress from '../../components/SimplifiedTrainingProgress'; @@ -16,10 +18,11 @@ import { import { useTraining } from '../../api/hooks/useTraining'; import { OnboardingRouter } from '../../utils/onboardingRouter'; +import type { RootState } from '../../store'; interface OnboardingPageProps { - user: any; - onComplete: () => void; + user?: any; + onComplete?: () => void; } interface BakeryData { @@ -48,7 +51,16 @@ const MADRID_PRODUCTS = [ 'Chocolate caliente', 'Zumos', 'Bocadillos', 'Empanadas', 'Tartas' ]; -const OnboardingPage: React.FC = ({ user, onComplete }) => { +const OnboardingPage: React.FC = ({ user: propUser, onComplete: propOnComplete }) => { + const navigate = useNavigate(); + const { user: reduxUser } = useSelector((state: RootState) => state.auth); + + // Use prop user if provided, otherwise use Redux user + const user = propUser || reduxUser; + + // Use prop onComplete if provided, otherwise navigate to dashboard + const onComplete = propOnComplete || (() => navigate('/app/dashboard')); + const [currentStep, setCurrentStep] = useState(1); const [isLoading, setIsLoading] = useState(false); const manualNavigation = useRef(false); diff --git a/services/external/app/repositories/traffic_repository.py b/services/external/app/repositories/traffic_repository.py index 18486f46..7ae18f28 100644 --- a/services/external/app/repositories/traffic_repository.py +++ b/services/external/app/repositories/traffic_repository.py @@ -192,4 +192,35 @@ class TrafficRepository: except Exception as e: logger.error("Failed to retrieve traffic data for training", error=str(e), location_id=location_id) - raise DatabaseError(f"Training data retrieval failed: {str(e)}") \ No newline at end of file + raise DatabaseError(f"Training data retrieval failed: {str(e)}") + + async def get_recent_by_location( + self, + latitude: float, + longitude: float, + cutoff_datetime: datetime, + tenant_id: Optional[str] = None + ) -> List[TrafficData]: + """Get recent traffic data by location after a cutoff datetime""" + try: + location_id = f"{latitude:.4f},{longitude:.4f}" + + stmt = select(TrafficData).where( + and_( + TrafficData.location_id == location_id, + TrafficData.date >= cutoff_datetime + ) + ).order_by(TrafficData.date.desc()) + + result = await self.session.execute(stmt) + records = result.scalars().all() + + logger.info("Retrieved recent traffic data", + location_id=location_id, count=len(records), + cutoff=cutoff_datetime.isoformat()) + return records + + except Exception as e: + logger.error("Failed to retrieve recent traffic data", + error=str(e), location_id=f"{latitude:.4f},{longitude:.4f}") + raise DatabaseError(f"Recent traffic data retrieval failed: {str(e)}") \ No newline at end of file diff --git a/services/external/app/services/traffic_service.py b/services/external/app/services/traffic_service.py index 921e274c..710e1c93 100644 --- a/services/external/app/services/traffic_service.py +++ b/services/external/app/services/traffic_service.py @@ -32,24 +32,65 @@ class TrafficService: self, latitude: float, longitude: float, - tenant_id: Optional[str] = None + tenant_id: Optional[str] = None, + force_refresh: bool = False, + cache_duration_minutes: int = 5 ) -> Optional[Dict[str, Any]]: """ - Get current traffic data for any supported location + Get current traffic data with intelligent cache-first strategy Args: latitude: Query location latitude longitude: Query location longitude tenant_id: Optional tenant identifier for logging/analytics + force_refresh: If True, bypass cache and fetch fresh data + cache_duration_minutes: How long to consider cached data valid (default: 5 minutes) Returns: Dict with current traffic data or None if not available """ try: logger.info("Getting current traffic data", - lat=latitude, lon=longitude, tenant_id=tenant_id) + lat=latitude, lon=longitude, tenant_id=tenant_id, + force_refresh=force_refresh, cache_duration=cache_duration_minutes) - # Delegate to universal client + location_id = f"{latitude:.4f},{longitude:.4f}" + + # Step 1: Check database cache first (unless force_refresh) + if not force_refresh: + async with self.database_manager.get_session() as session: + traffic_repo = TrafficRepository(session) + # Get recent traffic data (within cache_duration_minutes) + from datetime import timedelta + cache_cutoff = datetime.now() - timedelta(minutes=cache_duration_minutes) + + cached_records = await traffic_repo.get_recent_by_location( + latitude, longitude, cache_cutoff, tenant_id + ) + + if cached_records: + logger.info("Current traffic data found in cache", + count=len(cached_records), cache_age_minutes=cache_duration_minutes) + # Return the most recent cached record + latest_record = max(cached_records, key=lambda x: x.date) + cached_data = self._convert_db_record_to_dict(latest_record) + + # Add cache metadata + cached_data['service_metadata'] = { + 'request_timestamp': datetime.now().isoformat(), + 'tenant_id': tenant_id, + 'service_version': '2.0', + 'query_location': {'latitude': latitude, 'longitude': longitude}, + 'data_source': 'cache', + 'cache_age_minutes': (datetime.now() - latest_record.date).total_seconds() / 60 + } + + return cached_data + + # Step 2: Fetch fresh data from external API + logger.info("Fetching fresh current traffic data" + + (" (force refresh)" if force_refresh else " (no valid cache)")) + traffic_data = await self.universal_client.get_current_traffic(latitude, longitude) if traffic_data: @@ -58,22 +99,36 @@ class TrafficService: 'request_timestamp': datetime.now().isoformat(), 'tenant_id': tenant_id, 'service_version': '2.0', - 'query_location': {'latitude': latitude, 'longitude': longitude} + 'query_location': {'latitude': latitude, 'longitude': longitude}, + 'data_source': 'fresh_api' } - logger.info("Successfully retrieved current traffic data", - lat=latitude, lon=longitude, - source=traffic_data.get('source', 'unknown')) + # Step 3: Store fresh data in cache for future requests + try: + async with self.database_manager.get_session() as session: + traffic_repo = TrafficRepository(session) + # Store the fresh data as a single record + stored_count = await traffic_repo.store_traffic_data_batch( + [traffic_data], location_id, tenant_id + ) + logger.info("Stored fresh current traffic data in cache", + stored_records=stored_count) + except Exception as cache_error: + logger.warning("Failed to cache current traffic data", error=str(cache_error)) + + logger.info("Successfully retrieved fresh current traffic data", + lat=latitude, lon=longitude, + source=traffic_data.get('source', 'unknown')) return traffic_data else: logger.warning("No current traffic data available", - lat=latitude, lon=longitude) + lat=latitude, lon=longitude) return None except Exception as e: logger.error("Error getting current traffic data", - lat=latitude, lon=longitude, error=str(e)) + lat=latitude, lon=longitude, error=str(e)) return None async def get_historical_traffic( @@ -295,4 +350,62 @@ class TrafficService: except Exception as e: logger.error("Failed to retrieve traffic data for training", error=str(e), location_id=f"{latitude:.4f},{longitude:.4f}") - return [] \ No newline at end of file + return [] + + # ============= UNIFIED CONVENIENCE METHODS ============= + + async def get_current_traffic_fresh( + self, + latitude: float, + longitude: float, + tenant_id: Optional[str] = None + ) -> Optional[Dict[str, Any]]: + """Get current traffic data, forcing fresh API call (bypass cache)""" + return await self.get_current_traffic( + latitude=latitude, + longitude=longitude, + tenant_id=tenant_id, + force_refresh=True + ) + + async def get_historical_traffic_fresh( + self, + latitude: float, + longitude: float, + start_date: datetime, + end_date: datetime, + tenant_id: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Get historical traffic data, forcing fresh API call (bypass cache)""" + # For historical data, we can implement force_refresh logic + # For now, historical already has good cache-first logic + return await self.get_historical_traffic( + latitude=latitude, + longitude=longitude, + start_date=start_date, + end_date=end_date, + tenant_id=tenant_id + ) + + async def clear_traffic_cache( + self, + latitude: float, + longitude: float, + tenant_id: Optional[str] = None + ) -> bool: + """Clear cached traffic data for a specific location""" + try: + location_id = f"{latitude:.4f},{longitude:.4f}" + + async with self.database_manager.get_session() as session: + traffic_repo = TrafficRepository(session) + # This would need a new repository method to delete by location + # For now, just log the intent + logger.info("Traffic cache clear requested", + location_id=location_id, tenant_id=tenant_id) + return True + + except Exception as e: + logger.error("Error clearing traffic cache", + lat=latitude, lon=longitude, error=str(e)) + return False \ No newline at end of file diff --git a/services/tenant/app/api/tenants.py b/services/tenant/app/api/tenants.py index 410442a3..d74f6b07 100644 --- a/services/tenant/app/api/tenants.py +++ b/services/tenant/app/api/tenants.py @@ -478,6 +478,26 @@ async def activate_tenant_enhanced( detail="Failed to activate tenant" ) +@router.get("/tenants/users/{user_id}", response_model=List[TenantResponse]) +@track_endpoint_metrics("tenant_get_user_tenants") +async def get_user_tenants_enhanced( + user_id: str = Path(..., description="User ID"), + tenant_service: EnhancedTenantService = Depends(get_enhanced_tenant_service) +): + """Get all tenants owned by a user - Fixed endpoint for frontend""" + + try: + tenants = await tenant_service.get_user_tenants(user_id) + logger.info("Retrieved user tenants", user_id=user_id, tenant_count=len(tenants)) + return tenants + + except Exception as e: + logger.error("Get user tenants failed", user_id=user_id, error=str(e)) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to get user tenants" + ) + @router.get("/tenants/statistics", dependencies=[Depends(require_admin_role_dep)]) @track_endpoint_metrics("tenant_get_statistics") async def get_tenant_statistics_enhanced( diff --git a/services/training/app/services/data_client.py b/services/training/app/services/data_client.py index 423d6e5c..56973eec 100644 --- a/services/training/app/services/data_client.py +++ b/services/training/app/services/data_client.py @@ -123,6 +123,86 @@ class DataClient: logger.error(f"Error fetching weather data: {e}", tenant_id=tenant_id) return [] + async def fetch_traffic_data_unified( + self, + tenant_id: str, + start_date: str, + end_date: str, + latitude: Optional[float] = None, + longitude: Optional[float] = None, + force_refresh: bool = False + ) -> List[Dict[str, Any]]: + """ + Unified traffic data fetching with intelligent cache-first strategy + + Strategy: + 1. Check if stored/cached traffic data exists for the date range + 2. If exists and not force_refresh, return cached data + 3. If not exists or force_refresh, fetch fresh data + 4. Always return data without duplicate fetching + + Args: + tenant_id: Tenant identifier + start_date: Start date string (ISO format) + end_date: End date string (ISO format) + latitude: Optional latitude for location-based data + longitude: Optional longitude for location-based data + force_refresh: If True, bypass cache and fetch fresh data + """ + cache_key = f"{tenant_id}_{start_date}_{end_date}_{latitude}_{longitude}" + + try: + # Step 1: Try to get stored/cached data first (unless force_refresh) + if not force_refresh and self.supports_stored_traffic_data: + logger.info("Attempting to fetch cached traffic data", + tenant_id=tenant_id, cache_key=cache_key) + + try: + cached_data = await self.external_client.get_stored_traffic_data_for_training( + tenant_id=tenant_id, + start_date=start_date, + end_date=end_date, + latitude=latitude, + longitude=longitude + ) + + if cached_data and len(cached_data) > 0: + logger.info(f"✅ Using cached traffic data: {len(cached_data)} records", + tenant_id=tenant_id) + return cached_data + else: + logger.info("No cached traffic data found, fetching fresh data", + tenant_id=tenant_id) + except Exception as cache_error: + logger.warning(f"Cache fetch failed, falling back to fresh data: {cache_error}", + tenant_id=tenant_id) + + # Step 2: Fetch fresh data if no cache or force_refresh + logger.info("Fetching fresh traffic data" + (" (force refresh)" if force_refresh else ""), + tenant_id=tenant_id) + + fresh_data = await self.external_client.get_traffic_data( + tenant_id=tenant_id, + start_date=start_date, + end_date=end_date, + latitude=latitude, + longitude=longitude + ) + + if fresh_data and len(fresh_data) > 0: + logger.info(f"✅ Fetched fresh traffic data: {len(fresh_data)} records", + tenant_id=tenant_id) + return fresh_data + else: + logger.warning("No fresh traffic data available", tenant_id=tenant_id) + return [] + + except Exception as e: + logger.error(f"Error in unified traffic data fetch: {e}", + tenant_id=tenant_id, cache_key=cache_key) + return [] + + # Legacy methods for backward compatibility - now delegate to unified method async def fetch_traffic_data( self, tenant_id: str, @@ -131,29 +211,16 @@ class DataClient: latitude: Optional[float] = None, longitude: Optional[float] = None ) -> List[Dict[str, Any]]: - """ - Fetch traffic data for training - """ - try: - traffic_data = await self.external_client.get_traffic_data( - tenant_id=tenant_id, - start_date=start_date, - end_date=end_date, - latitude=latitude, - longitude=longitude - ) - - if traffic_data: - logger.info(f"Fetched {len(traffic_data)} traffic records", - tenant_id=tenant_id) - return traffic_data - else: - logger.warning("No traffic data returned", tenant_id=tenant_id) - return [] - - except Exception as e: - logger.error(f"Error fetching traffic data: {e}", tenant_id=tenant_id) - return [] + """Legacy method - delegates to unified fetcher with cache-first strategy""" + logger.info("Legacy fetch_traffic_data called - delegating to unified method", tenant_id=tenant_id) + return await self.fetch_traffic_data_unified( + tenant_id=tenant_id, + start_date=start_date, + end_date=end_date, + latitude=latitude, + longitude=longitude, + force_refresh=False # Use cache-first for legacy calls + ) async def fetch_stored_traffic_data_for_training( self, @@ -163,42 +230,35 @@ class DataClient: latitude: Optional[float] = None, longitude: Optional[float] = None ) -> List[Dict[str, Any]]: - """ - Fetch stored traffic data specifically for training/re-training - This method accesses previously stored traffic data without making new API calls - """ - try: - if self.supports_stored_traffic_data: - # Use the dedicated stored traffic data method - stored_traffic_data = await self.external_client.get_stored_traffic_data_for_training( - tenant_id=tenant_id, - start_date=start_date, - end_date=end_date, - latitude=latitude, - longitude=longitude - ) - - if stored_traffic_data: - logger.info(f"Retrieved {len(stored_traffic_data)} stored traffic records for training", - tenant_id=tenant_id) - return stored_traffic_data - else: - logger.warning("No stored traffic data available for training", tenant_id=tenant_id) - return [] - else: - # Fallback to regular traffic data method - logger.info("Using fallback traffic data method for training") - return await self.fetch_traffic_data( - tenant_id=tenant_id, - start_date=start_date, - end_date=end_date, - latitude=latitude, - longitude=longitude - ) - - except Exception as e: - logger.error(f"Error fetching stored traffic data for training: {e}", tenant_id=tenant_id) - return [] + """Legacy method - delegates to unified fetcher with cache-first strategy""" + logger.info("Legacy fetch_stored_traffic_data_for_training called - delegating to unified method", tenant_id=tenant_id) + return await self.fetch_traffic_data_unified( + tenant_id=tenant_id, + start_date=start_date, + end_date=end_date, + latitude=latitude, + longitude=longitude, + force_refresh=False # Use cache-first for training calls + ) + + async def refresh_traffic_data( + self, + tenant_id: str, + start_date: str, + end_date: str, + latitude: Optional[float] = None, + longitude: Optional[float] = None + ) -> List[Dict[str, Any]]: + """Convenience method to force refresh traffic data""" + logger.info("Force refreshing traffic data (bypassing cache)", tenant_id=tenant_id) + return await self.fetch_traffic_data_unified( + tenant_id=tenant_id, + start_date=start_date, + end_date=end_date, + latitude=latitude, + longitude=longitude, + force_refresh=True # Force fresh data + ) async def validate_data_quality( self, diff --git a/services/training/app/services/training_orchestrator.py b/services/training/app/services/training_orchestrator.py index 7264092a..0b3363dc 100644 --- a/services/training/app/services/training_orchestrator.py +++ b/services/training/app/services/training_orchestrator.py @@ -73,14 +73,47 @@ class TrainingDataOrchestrator: logger.info(f"Starting comprehensive training data preparation for tenant {tenant_id}, job {job_id}") try: + # Step 1: Fetch and validate sales data (unified approach) + sales_data = await self.data_client.fetch_sales_data(tenant_id, fetch_all=True) - sales_data = await self.data_client.fetch_sales_data(tenant_id) + # Pre-flight validation moved here to eliminate duplicate fetching + if not sales_data or len(sales_data) == 0: + error_msg = f"No sales data available for tenant {tenant_id}. Please import sales data before starting training." + logger.error("Training aborted - no sales data", tenant_id=tenant_id, job_id=job_id) + raise ValueError(error_msg) - # Step 1: Extract and validate sales data date range + # Debug: Analyze the sales data structure to understand product distribution + sales_df_debug = pd.DataFrame(sales_data) + if 'inventory_product_id' in sales_df_debug.columns: + unique_products_found = sales_df_debug['inventory_product_id'].unique() + product_counts = sales_df_debug['inventory_product_id'].value_counts().to_dict() + + logger.info("Sales data analysis (moved from pre-flight)", + tenant_id=tenant_id, + job_id=job_id, + total_sales_records=len(sales_data), + unique_products_count=len(unique_products_found), + unique_products=unique_products_found.tolist(), + records_per_product=product_counts) + + if len(unique_products_found) == 1: + logger.warning("POTENTIAL ISSUE: Only ONE unique product found in all sales data", + tenant_id=tenant_id, + single_product=unique_products_found[0], + record_count=len(sales_data)) + else: + logger.warning("No 'inventory_product_id' column found in sales data", + tenant_id=tenant_id, + columns=list(sales_df_debug.columns)) + + logger.info(f"Sales data validation passed: {len(sales_data)} sales records found", + tenant_id=tenant_id, job_id=job_id) + + # Step 2: Extract and validate sales data date range sales_date_range = self._extract_sales_date_range(sales_data) logger.info(f"Sales data range detected: {sales_date_range.start} to {sales_date_range.end}") - # Step 2: Apply date alignment across all data sources + # Step 3: Apply date alignment across all data sources aligned_range = self.date_alignment_service.validate_and_align_dates( user_sales_range=sales_date_range, requested_start=requested_start, @@ -91,21 +124,21 @@ class TrainingDataOrchestrator: if aligned_range.constraints: logger.info(f"Applied constraints: {aligned_range.constraints}") - # Step 3: Filter sales data to aligned date range + # Step 4: Filter sales data to aligned date range filtered_sales = self._filter_sales_data(sales_data, aligned_range) - # Step 4: Collect external data sources concurrently + # Step 5: Collect external data sources concurrently logger.info("Collecting external data sources...") weather_data, traffic_data = await self._collect_external_data( aligned_range, bakery_location, tenant_id ) - # Step 5: Validate data quality + # Step 6: Validate data quality data_quality_results = self._validate_data_sources( filtered_sales, weather_data, traffic_data, aligned_range ) - # Step 6: Create comprehensive training dataset + # Step 7: Create comprehensive training dataset training_dataset = TrainingDataSet( sales_data=filtered_sales, weather_data=weather_data, @@ -126,7 +159,7 @@ class TrainingDataOrchestrator: } ) - # Step 7: Final validation + # Step 8: Final validation final_validation = self.validate_training_data_quality(training_dataset) training_dataset.metadata["final_validation"] = final_validation @@ -375,14 +408,16 @@ class TrainingDataOrchestrator: start_date_str = aligned_range.start.isoformat() end_date_str = aligned_range.end.isoformat() - # Enhanced: Fetch traffic data using new abstracted service + # Enhanced: Fetch traffic data using unified cache-first method # This automatically detects the appropriate city and uses the right client - traffic_data = await self.data_client.fetch_traffic_data( + traffic_data = await self.data_client.fetch_traffic_data_unified( tenant_id=tenant_id, start_date=start_date_str, end_date=end_date_str, latitude=lat, - longitude=lon) + longitude=lon, + force_refresh=False # Use cache-first strategy + ) # Enhanced validation including pedestrian inference data if self._validate_traffic_data_enhanced(traffic_data): @@ -461,54 +496,6 @@ class TrainingDataOrchestrator: minimal_traffic_data = [{"city": "madrid", "source": "legacy"}] * min(record_count, 1) self._log_enhanced_traffic_data_storage(lat, lon, aligned_range, record_count, minimal_traffic_data) - async def retrieve_stored_traffic_for_retraining( - self, - bakery_location: Tuple[float, float], - start_date: datetime, - end_date: datetime, - tenant_id: str - ) -> List[Dict[str, Any]]: - """ - Retrieve previously stored traffic data for model re-training - This method specifically accesses the stored traffic data without making new API calls - """ - lat, lon = bakery_location - - try: - # Use the dedicated stored traffic data method for training - stored_traffic_data = await self.data_client.fetch_stored_traffic_data_for_training( - tenant_id=tenant_id, - start_date=start_date.isoformat(), - end_date=end_date.isoformat(), - latitude=lat, - longitude=lon - ) - - if stored_traffic_data: - logger.info( - f"Retrieved {len(stored_traffic_data)} stored traffic records for re-training", - location=f"{lat:.4f},{lon:.4f}", - date_range=f"{start_date.isoformat()} to {end_date.isoformat()}", - tenant_id=tenant_id - ) - - return stored_traffic_data - else: - logger.warning( - "No stored traffic data found for re-training", - location=f"{lat:.4f},{lon:.4f}", - date_range=f"{start_date.isoformat()} to {end_date.isoformat()}" - ) - return [] - - except Exception as e: - logger.error( - f"Failed to retrieve stored traffic data for re-training: {e}", - location=f"{lat:.4f},{lon:.4f}", - tenant_id=tenant_id - ) - return [] - def _validate_weather_data(self, weather_data: List[Dict[str, Any]]) -> bool: """Validate weather data quality""" if not weather_data: diff --git a/services/training/app/services/training_service.py b/services/training/app/services/training_service.py index ad6b8c20..51361a61 100644 --- a/services/training/app/services/training_service.py +++ b/services/training/app/services/training_service.py @@ -137,42 +137,7 @@ class EnhancedTrainingService: await self._init_repositories(session) try: - # Pre-flight check: Verify sales data exists before starting training - from app.services.data_client import DataClient - data_client = DataClient() - sales_data = await data_client.fetch_sales_data(tenant_id, fetch_all=True) - - if not sales_data or len(sales_data) == 0: - error_msg = f"No sales data available for tenant {tenant_id}. Please import sales data before starting training." - logger.error("Training aborted - no sales data", tenant_id=tenant_id, job_id=job_id) - raise ValueError(error_msg) - - # Debug: Analyze the sales data structure to understand product distribution - sales_df_debug = pd.DataFrame(sales_data) - if 'inventory_product_id' in sales_df_debug.columns: - unique_products_found = sales_df_debug['inventory_product_id'].unique() - product_counts = sales_df_debug['inventory_product_id'].value_counts().to_dict() - - logger.info("Pre-flight sales data analysis", - tenant_id=tenant_id, - job_id=job_id, - total_sales_records=len(sales_data), - unique_products_count=len(unique_products_found), - unique_products=unique_products_found.tolist(), - records_per_product=product_counts) - - if len(unique_products_found) == 1: - logger.warning("POTENTIAL ISSUE: Only ONE unique product found in all sales data", - tenant_id=tenant_id, - single_product=unique_products_found[0], - record_count=len(sales_data)) - else: - logger.warning("No 'inventory_product_id' column found in sales data", - tenant_id=tenant_id, - columns=list(sales_df_debug.columns)) - - logger.info(f"Pre-flight check passed: {len(sales_data)} sales records found", - tenant_id=tenant_id, job_id=job_id) + # Pre-flight check moved to orchestrator to eliminate duplicate sales data fetching # Check if training log already exists, create if not existing_log = await self.training_log_repo.get_log_by_job_id(job_id) @@ -202,12 +167,13 @@ class EnhancedTrainingService: step_details="Data" ) - # Step 1: Prepare training dataset - logger.info("Step 1: Preparing and aligning training data") + # Step 1: Prepare training dataset (includes sales data validation) + logger.info("Step 1: Preparing and aligning training data (with validation)") await self.training_log_repo.update_log_progress( job_id, 10, "data_validation", "running" ) + # Orchestrator now handles sales data validation to eliminate duplicate fetching training_dataset = await self.orchestrator.prepare_training_data( tenant_id=tenant_id, bakery_location=bakery_location, @@ -216,6 +182,10 @@ class EnhancedTrainingService: job_id=job_id ) + # Log the results from orchestrator's unified sales data fetch + logger.info(f"Sales data validation completed: {len(training_dataset.sales_data)} records", + tenant_id=tenant_id, job_id=job_id) + await self.training_log_repo.update_log_progress( job_id, 30, "data_preparation_complete", "running" ) @@ -285,6 +255,27 @@ class EnhancedTrainingService: # Make sure all data is JSON-serializable before saving to database json_safe_result = make_json_serializable(final_result) + # Ensure results is a proper dict for database storage + if not isinstance(json_safe_result, dict): + logger.warning("JSON safe result is not a dict, wrapping it", result_type=type(json_safe_result)) + json_safe_result = {"training_data": json_safe_result} + + # Double-check JSON serialization by attempting to serialize + import json + try: + json.dumps(json_safe_result) + logger.debug("Results successfully JSON-serializable", job_id=job_id) + except (TypeError, ValueError) as e: + logger.error("Results still not JSON-serializable after cleaning", + job_id=job_id, error=str(e)) + # Create a minimal safe result + json_safe_result = { + "status": "completed", + "job_id": job_id, + "models_created": final_result.get("products_trained", 0), + "error": "Result serialization failed" + } + await self.training_log_repo.complete_training_log( job_id, results=json_safe_result ) @@ -313,6 +304,9 @@ class EnhancedTrainingService: "completed_at": datetime.now().isoformat() } + # Ensure error result is JSON serializable + error_result = make_json_serializable(error_result) + return self._create_detailed_training_response(error_result) async def _store_trained_models(