From 312fdc8ef3c546c6e81364feb54ca8748d8d1d41 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Fri, 8 Aug 2025 23:29:48 +0200 Subject: [PATCH] Improve the traffic fetching system --- docs/TRAFFIC_DATA_STORAGE.md | 220 ++++++++++++++++++ .../components/EnhancedTrainingProgress.tsx | 48 ++-- services/data/app/api/traffic.py | 56 +++++ services/data/app/services/traffic_service.py | 181 ++++++++++++-- .../versions/001_create_traffic_data_table.py | 54 +++++ services/training/app/services/data_client.py | 52 +++++ .../app/services/training_orchestrator.py | 72 +++++- shared/clients/data_client.py | 48 +++- 8 files changed, 680 insertions(+), 51 deletions(-) create mode 100644 docs/TRAFFIC_DATA_STORAGE.md create mode 100644 services/data/migrations/versions/001_create_traffic_data_table.py diff --git a/docs/TRAFFIC_DATA_STORAGE.md b/docs/TRAFFIC_DATA_STORAGE.md new file mode 100644 index 00000000..88594727 --- /dev/null +++ b/docs/TRAFFIC_DATA_STORAGE.md @@ -0,0 +1,220 @@ +# Traffic Data Storage for Re-Training + +## Overview + +This document describes the enhanced traffic data storage system implemented to ensure that fetched traffic data is stored in the database for future use in model re-training. + +## Architecture + +### Database Schema + +The `traffic_data` table stores all traffic data with the following schema: + +```sql +CREATE TABLE traffic_data ( + id UUID PRIMARY KEY, + location_id VARCHAR(100) NOT NULL, -- Format: "lat,lon" (e.g., "40.4168,-3.7038") + date TIMESTAMP WITH TIME ZONE NOT NULL, + traffic_volume INTEGER, + pedestrian_count INTEGER, + congestion_level VARCHAR(20), -- "low", "medium", "high", "blocked" + average_speed FLOAT, + source VARCHAR(50) NOT NULL DEFAULT 'madrid_opendata', + raw_data TEXT, -- JSON string of original data + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL +); + +-- Indexes for efficient querying +CREATE INDEX idx_traffic_location_date ON traffic_data(location_id, date); +CREATE INDEX idx_traffic_date_range ON traffic_data(date); +``` + +### Key Components + +#### 1. Enhanced TrafficService (`services/data/app/services/traffic_service.py`) + +**New Methods:** +- `_store_traffic_data_batch()`: Efficiently stores multiple traffic records with duplicate detection +- `_validate_traffic_data()`: Validates traffic data before storage +- `get_stored_traffic_for_training()`: Retrieves stored traffic data specifically for training + +**Enhanced Methods:** +- `get_historical_traffic()`: Now automatically stores fetched data for future re-training + +#### 2. Training Data Orchestrator (`services/training/app/services/training_orchestrator.py`) + +**New Methods:** +- `retrieve_stored_traffic_for_retraining()`: Retrieves previously stored traffic data for re-training +- `_log_traffic_data_storage()`: Logs traffic data storage for audit purposes + +**Enhanced Methods:** +- `_collect_traffic_data_with_timeout()`: Now includes storage logging and validation + +#### 3. Data Service Client (`shared/clients/data_client.py`) + +**New Methods:** +- `get_stored_traffic_data_for_training()`: Dedicated method for retrieving stored training data + +#### 4. API Endpoints (`services/data/app/api/traffic.py`) + +**New Endpoint:** +- `POST /tenants/{tenant_id}/traffic/stored`: Retrieves stored traffic data for training purposes + +## Data Flow + +### Initial Training +1. Training orchestrator requests traffic data +2. Data service checks database first +3. If not found, fetches from Madrid Open Data API +4. **Data is automatically stored in database** +5. Returns data to training orchestrator +6. Training completes using fetched data + +### Re-Training +1. Training orchestrator requests stored traffic data +2. Data service queries database using location and date range +3. Returns stored data without making API calls +4. Training completes using stored data + +## Storage Logic + +### Duplicate Prevention +- Before storing, the system checks for existing records with the same location and date +- Only new records are stored to avoid database bloat + +### Batch Processing +- Traffic data is stored in batches of 100 records for efficiency +- Each batch is committed separately to handle large datasets + +### Data Validation +- Traffic volume: 0-10,000 vehicles per hour +- Pedestrian count: 0-10,000 people per hour +- Average speed: 0-200 km/h +- Congestion level: "low", "medium", "high", "blocked" + +## Benefits + +### 1. Improved Re-Training Performance +- No need to re-fetch external API data +- Faster training iterations +- Reduced API rate limiting issues + +### 2. Data Consistency +- Same traffic data used across multiple training runs +- Reproducible training results +- Historical data preservation + +### 3. Cost Efficiency +- Reduced API calls to external services +- Lower bandwidth usage +- Better resource utilization + +### 4. Offline Training +- Training can proceed even if external APIs are unavailable +- Increased system resilience + +## Usage Examples + +### Retrieving Stored Traffic Data +```python +from services.training.app.services.training_orchestrator import TrainingDataOrchestrator + +orchestrator = TrainingDataOrchestrator() + +# Get stored traffic data for re-training +traffic_data = await orchestrator.retrieve_stored_traffic_for_retraining( + bakery_location=(40.4168, -3.7038), # Madrid coordinates + start_date=datetime(2024, 1, 1), + end_date=datetime(2024, 12, 31), + tenant_id="tenant-123" +) +``` + +### Checking Storage Status +```python +# The system automatically logs storage operations +# Check logs for entries like: +# "Traffic data stored for re-training" - indicates successful storage +# "Retrieved X stored traffic records for training" - indicates successful retrieval +``` + +## Monitoring + +### Storage Metrics +- Number of records stored per location +- Storage success rate +- Duplicate detection rate + +### Retrieval Metrics +- Query response time +- Records retrieved per request +- Re-training data availability + +### Audit Trail +All traffic data operations are logged with: +- Location coordinates +- Date ranges +- Record counts +- Storage/retrieval timestamps +- Purpose (training/re-training) + +## Migration + +To enable traffic data storage on existing deployments: + +1. **Run Database Migration:** + ```bash + cd services/data + alembic upgrade head + ``` + +2. **Restart Data Service:** + ```bash + docker-compose restart data-service + ``` + +3. **Verify Storage:** + - Check logs for "Traffic data stored for re-training" messages + - Query database: `SELECT COUNT(*) FROM traffic_data;` + +## Configuration + +No additional configuration is required. The system automatically: +- Detects when traffic data should be stored +- Handles duplicate prevention +- Manages database transactions +- Provides fallback mechanisms + +## Troubleshooting + +### Common Issues + +**1. Storage Failures** +- Check database connectivity +- Verify table schema exists +- Review validation errors in logs + +**2. No Stored Data Available** +- Ensure initial training has been completed +- Check date ranges are within stored data period +- Verify location coordinates match stored data + +**3. Performance Issues** +- Monitor database query performance +- Check index usage +- Consider data archival for old records + +### Error Messages + +- `"No stored traffic data found for re-training"`: Normal when no previous training has occurred +- `"Failed to store traffic data batch"`: Database connectivity or validation issue +- `"Invalid traffic data, skipping"`: Data validation failure - check raw API response + +## Future Enhancements + +1. **Data Archival**: Automatic archival of old traffic data +2. **Data Compression**: Compress raw_data field for storage efficiency +3. **Regional Expansion**: Support for traffic data from other cities +4. **Real-time Updates**: Continuous traffic data collection and storage +5. **Analytics**: Traffic pattern analysis and reporting \ No newline at end of file diff --git a/frontend/src/components/EnhancedTrainingProgress.tsx b/frontend/src/components/EnhancedTrainingProgress.tsx index bf67416d..bdad1a8d 100644 --- a/frontend/src/components/EnhancedTrainingProgress.tsx +++ b/frontend/src/components/EnhancedTrainingProgress.tsx @@ -149,8 +149,8 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini

-
-
+
+
@@ -172,7 +172,7 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini
@@ -186,7 +186,7 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini
{/* Header */}
-
+

@@ -198,7 +198,7 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini

{/* Main Progress Section */} -
+
{/* Overall Progress Bar */}
@@ -207,7 +207,7 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini
@@ -218,10 +218,10 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini
{/* Current Step Info */} -
+
-
+
@@ -232,8 +232,8 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini

{currentStepInfo.description}

-
-

+

+

{currentStepInfo.tip}

@@ -246,11 +246,11 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini {progressSteps.map((step, index) => (
@@ -258,12 +258,12 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini {step.completed ? ( ) : step.current ? ( -
+
) : (
)} {step.name} @@ -274,7 +274,7 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini {/* Enhanced Stats Grid */}
-
+
Productos Procesados @@ -285,14 +285,14 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini {progress.productsTotal > 0 && (
)}
-
+
Tiempo Restante @@ -305,7 +305,7 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini
-
+
Precisión Esperada @@ -329,14 +329,14 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini {/* Expected Benefits - Only show if progress < 80% to keep user engaged */} {progress.progress < 80 && ( -
+

Lo que podrás hacer una vez completado

{EXPECTED_BENEFITS.map((benefit, index) => ( -
-
+
+

@@ -354,7 +354,7 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini {/* Timeout Warning Modal */} {showTimeoutWarning && (
-
+

@@ -367,13 +367,13 @@ export default function EnhancedTrainingProgress({ progress, onTimeout }: Traini
diff --git a/services/data/app/api/traffic.py b/services/data/app/api/traffic.py index 2260017d..f8521ff7 100644 --- a/services/data/app/api/traffic.py +++ b/services/data/app/api/traffic.py @@ -110,4 +110,60 @@ async def get_historical_traffic( raise except Exception as e: logger.error("Unexpected error in historical traffic API", error=str(e)) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + +@router.post("/tenants/{tenant_id}/traffic/stored") +async def get_stored_traffic_for_training( + request: HistoricalTrafficRequest, + db: AsyncSession = Depends(get_db), + tenant_id: UUID = Path(..., description="Tenant ID"), + current_user: Dict[str, Any] = Depends(get_current_user_dep), +): + """Get stored traffic data specifically for training/re-training purposes""" + try: + # Validate date range + if request.end_date <= request.start_date: + raise HTTPException(status_code=400, detail="End date must be after start date") + + # Allow longer date ranges for training (up to 3 years) + if (request.end_date - request.start_date).days > 1095: + raise HTTPException(status_code=400, detail="Date range cannot exceed 3 years for training data") + + logger.info("Retrieving stored traffic data for training", + tenant_id=str(tenant_id), + location=f"{request.latitude},{request.longitude}", + date_range=f"{request.start_date} to {request.end_date}") + + # Use the dedicated method for training data retrieval + stored_data = await traffic_service.get_stored_traffic_for_training( + request.latitude, request.longitude, request.start_date, request.end_date, db + ) + + # Log retrieval for audit purposes + logger.info("Stored traffic data retrieved for training", + records_count=len(stored_data), + tenant_id=str(tenant_id), + purpose="model_training") + + # Publish event for monitoring + try: + await publish_traffic_updated({ + "type": "stored_data_retrieved_for_training", + "latitude": request.latitude, + "longitude": request.longitude, + "start_date": request.start_date.isoformat(), + "end_date": request.end_date.isoformat(), + "records_count": len(stored_data), + "tenant_id": str(tenant_id), + "timestamp": datetime.utcnow().isoformat() + }) + except Exception as pub_error: + logger.warning("Failed to publish stored traffic retrieval event", error=str(pub_error)) + + return stored_data + + except HTTPException: + raise + except Exception as e: + logger.error("Unexpected error in stored traffic retrieval API", error=str(e)) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") \ No newline at end of file diff --git a/services/data/app/services/traffic_service.py b/services/data/app/services/traffic_service.py index 898f34c1..8d576ecc 100644 --- a/services/data/app/services/traffic_service.py +++ b/services/data/app/services/traffic_service.py @@ -63,7 +63,7 @@ class TrafficService: start_date: datetime, end_date: datetime, db: AsyncSession) -> List[TrafficDataResponse]: - """Get historical traffic data""" + """Get historical traffic data with enhanced storage for re-training""" try: logger.debug("Getting historical traffic", lat=latitude, lon=longitude, @@ -100,27 +100,12 @@ class TrafficService: ) if traffic_data: - # Store in database for future use - try: - for data in traffic_data: - traffic_record = TrafficData( - location_id=location_id, - date=data.get('date', datetime.now()), - traffic_volume=data.get('traffic_volume'), - pedestrian_count=data.get('pedestrian_count'), - congestion_level=data.get('congestion_level'), - average_speed=data.get('average_speed'), - source="madrid_opendata", - raw_data=str(data), - created_at=datetime.now() - ) - db.add(traffic_record) - - await db.commit() - logger.debug("Historical data stored in database", count=len(traffic_data)) - except Exception as db_error: - logger.warning("Failed to store historical data in database", error=str(db_error)) - await db.rollback() + # Enhanced storage with better error handling and validation + stored_count = await self._store_traffic_data_batch( + traffic_data, location_id, db + ) + logger.info("Traffic data stored for re-training", + fetched=len(traffic_data), stored=stored_count, location=location_id) return [TrafficDataResponse(**item) for item in traffic_data] @@ -137,7 +122,7 @@ class TrafficService: longitude: float, traffic_data: Dict[str, Any], db: AsyncSession) -> bool: - """Store traffic data to database""" + """Store single traffic data record to database""" try: location_id = f"{latitude:.4f},{longitude:.4f}" @@ -161,4 +146,152 @@ class TrafficService: except Exception as e: logger.error("Failed to store traffic data", error=str(e)) await db.rollback() - return False \ No newline at end of file + return False + + async def _store_traffic_data_batch(self, + traffic_data: List[Dict[str, Any]], + location_id: str, + db: AsyncSession) -> int: + """Store batch of traffic data with enhanced validation and duplicate handling""" + stored_count = 0 + + try: + # Check for existing records to avoid duplicates + if traffic_data: + dates = [data.get('date') for data in traffic_data if data.get('date')] + if dates: + # Query existing records for this location and date range + existing_stmt = select(TrafficData.date).where( + and_( + TrafficData.location_id == location_id, + TrafficData.date.in_(dates) + ) + ) + result = await db.execute(existing_stmt) + existing_dates = {row[0] for row in result.fetchall()} + + logger.debug(f"Found {len(existing_dates)} existing records for location {location_id}") + else: + existing_dates = set() + else: + existing_dates = set() + + # Store only new records + for data in traffic_data: + try: + record_date = data.get('date') + if not record_date or record_date in existing_dates: + continue # Skip duplicates + + # Validate required fields + if not self._validate_traffic_data(data): + logger.warning("Invalid traffic data, skipping", data=data) + continue + + traffic_record = TrafficData( + location_id=location_id, + date=record_date, + traffic_volume=data.get('traffic_volume'), + pedestrian_count=data.get('pedestrian_count'), + congestion_level=data.get('congestion_level'), + average_speed=data.get('average_speed'), + source=data.get('source', 'madrid_opendata'), + raw_data=str(data) + ) + + db.add(traffic_record) + stored_count += 1 + + # Commit in batches to avoid memory issues + if stored_count % 100 == 0: + await db.commit() + logger.debug(f"Committed batch of {stored_count} records") + + except Exception as record_error: + logger.warning("Failed to store individual traffic record", + error=str(record_error), data=data) + continue + + # Final commit + await db.commit() + logger.info(f"Successfully stored {stored_count} traffic records for location {location_id}") + + except Exception as e: + logger.error("Failed to store traffic data batch", + error=str(e), location_id=location_id) + await db.rollback() + + return stored_count + + def _validate_traffic_data(self, data: Dict[str, Any]) -> bool: + """Validate traffic data before storage""" + required_fields = ['date'] + + # Check required fields + for field in required_fields: + if not data.get(field): + return False + + # Validate data types and ranges + traffic_volume = data.get('traffic_volume') + if traffic_volume is not None and (traffic_volume < 0 or traffic_volume > 10000): + return False + + pedestrian_count = data.get('pedestrian_count') + if pedestrian_count is not None and (pedestrian_count < 0 or pedestrian_count > 10000): + return False + + average_speed = data.get('average_speed') + if average_speed is not None and (average_speed < 0 or average_speed > 200): + return False + + congestion_level = data.get('congestion_level') + if congestion_level and congestion_level not in ['low', 'medium', 'high', 'blocked']: + return False + + return True + + async def get_stored_traffic_for_training(self, + latitude: float, + longitude: float, + start_date: datetime, + end_date: datetime, + db: AsyncSession) -> List[Dict[str, Any]]: + """Retrieve stored traffic data specifically for training purposes""" + try: + location_id = f"{latitude:.4f},{longitude:.4f}" + + stmt = select(TrafficData).where( + and_( + TrafficData.location_id == location_id, + TrafficData.date >= start_date, + TrafficData.date <= end_date + ) + ).order_by(TrafficData.date) + + result = await db.execute(stmt) + records = result.scalars().all() + + # Convert to training format + training_data = [] + for record in records: + training_data.append({ + 'date': record.date, + 'traffic_volume': record.traffic_volume, + 'pedestrian_count': record.pedestrian_count, + 'congestion_level': record.congestion_level, + 'average_speed': record.average_speed, + 'location_id': record.location_id, + 'source': record.source, + 'measurement_point_id': record.raw_data # Contains additional metadata + }) + + logger.info(f"Retrieved {len(training_data)} traffic records for training", + location_id=location_id, start=start_date, end=end_date) + + return training_data + + except Exception as e: + logger.error("Failed to retrieve traffic data for training", + error=str(e), location_id=location_id) + return [] \ No newline at end of file diff --git a/services/data/migrations/versions/001_create_traffic_data_table.py b/services/data/migrations/versions/001_create_traffic_data_table.py new file mode 100644 index 00000000..83b32cc0 --- /dev/null +++ b/services/data/migrations/versions/001_create_traffic_data_table.py @@ -0,0 +1,54 @@ +"""Create traffic_data table for storing traffic data for re-training + +Revision ID: 001_traffic_data +Revises: +Create Date: 2025-01-08 12:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID + +# revision identifiers, used by Alembic. +revision = '001_traffic_data' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + """Create traffic_data table""" + op.create_table('traffic_data', + sa.Column('id', UUID(as_uuid=True), nullable=False, primary_key=True), + sa.Column('location_id', sa.String(100), nullable=False, index=True), + sa.Column('date', sa.DateTime(timezone=True), nullable=False, index=True), + sa.Column('traffic_volume', sa.Integer, nullable=True), + sa.Column('pedestrian_count', sa.Integer, nullable=True), + sa.Column('congestion_level', sa.String(20), nullable=True), + sa.Column('average_speed', sa.Float, nullable=True), + sa.Column('source', sa.String(50), nullable=False, server_default='madrid_opendata'), + sa.Column('raw_data', sa.Text, nullable=True), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), + ) + + # Create index for efficient querying by location and date + op.create_index( + 'idx_traffic_location_date', + 'traffic_data', + ['location_id', 'date'] + ) + + # Create index for date range queries + op.create_index( + 'idx_traffic_date_range', + 'traffic_data', + ['date'] + ) + + +def downgrade(): + """Drop traffic_data table""" + op.drop_index('idx_traffic_date_range', table_name='traffic_data') + op.drop_index('idx_traffic_location_date', table_name='traffic_data') + op.drop_table('traffic_data') \ No newline at end of file diff --git a/services/training/app/services/data_client.py b/services/training/app/services/data_client.py index e1ac00af..5ae7f6c0 100644 --- a/services/training/app/services/data_client.py +++ b/services/training/app/services/data_client.py @@ -24,6 +24,13 @@ class DataClient: # Get the shared data client configured for this service self.data_client = get_data_client(settings, "training") + # Check if the new method is available for stored traffic data + if hasattr(self.data_client, 'get_stored_traffic_data_for_training'): + self.supports_stored_traffic_data = True + else: + self.supports_stored_traffic_data = False + logger.warning("Stored traffic data method not available in data client") + # Or alternatively, get all clients at once: # self.clients = get_service_clients(settings, "training") # Then use: self.clients.data.get_sales_data(...) @@ -147,6 +154,51 @@ class DataClient: logger.error(f"Error fetching traffic data: {e}", tenant_id=tenant_id) return [] + async def fetch_stored_traffic_data_for_training( + self, + tenant_id: str, + start_date: str, + end_date: str, + latitude: Optional[float] = None, + longitude: Optional[float] = None + ) -> List[Dict[str, Any]]: + """ + Fetch stored traffic data specifically for training/re-training + This method accesses previously stored traffic data without making new API calls + """ + try: + if self.supports_stored_traffic_data: + # Use the dedicated stored traffic data method + stored_traffic_data = await self.data_client.get_stored_traffic_data_for_training( + tenant_id=tenant_id, + start_date=start_date, + end_date=end_date, + latitude=latitude, + longitude=longitude + ) + + if stored_traffic_data: + logger.info(f"Retrieved {len(stored_traffic_data)} stored traffic records for training", + tenant_id=tenant_id) + return stored_traffic_data + else: + logger.warning("No stored traffic data available for training", tenant_id=tenant_id) + return [] + else: + # Fallback to regular traffic data method + logger.info("Using fallback traffic data method for training") + return await self.fetch_traffic_data( + tenant_id=tenant_id, + start_date=start_date, + end_date=end_date, + latitude=latitude, + longitude=longitude + ) + + except Exception as e: + logger.error(f"Error fetching stored traffic data for training: {e}", tenant_id=tenant_id) + return [] + async def validate_data_quality( self, tenant_id: str, diff --git a/services/training/app/services/training_orchestrator.py b/services/training/app/services/training_orchestrator.py index 2773b513..d19d72be 100644 --- a/services/training/app/services/training_orchestrator.py +++ b/services/training/app/services/training_orchestrator.py @@ -360,7 +360,7 @@ class TrainingDataOrchestrator: aligned_range: AlignedDateRange, tenant_id: str ) -> List[Dict[str, Any]]: - """Collect traffic data with timeout and Madrid constraint validation""" + """Collect traffic data with enhanced storage and retrieval for re-training""" try: # Double-check Madrid constraint before making request @@ -374,6 +374,7 @@ class TrainingDataOrchestrator: start_date_str = aligned_range.start.isoformat() end_date_str = aligned_range.end.isoformat() + # Fetch traffic data - this will automatically store it for future re-training traffic_data = await self.data_client.fetch_traffic_data( tenant_id=tenant_id, start_date=start_date_str, @@ -383,7 +384,11 @@ class TrainingDataOrchestrator: # Validate traffic data if self._validate_traffic_data(traffic_data): - logger.info(f"Collected {len(traffic_data)} valid traffic records") + logger.info(f"Collected and stored {len(traffic_data)} valid traffic records for re-training") + + # Log storage success for audit purposes + self._log_traffic_data_storage(lat, lon, aligned_range, len(traffic_data)) + return traffic_data else: logger.warning("Invalid traffic data received") @@ -396,6 +401,69 @@ class TrainingDataOrchestrator: logger.warning(f"Traffic data collection failed: {e}") return [] + def _log_traffic_data_storage(self, + lat: float, + lon: float, + aligned_range: AlignedDateRange, + record_count: int): + """Log traffic data storage for audit and re-training tracking""" + logger.info( + "Traffic data stored for re-training", + location=f"{lat:.4f},{lon:.4f}", + date_range=f"{aligned_range.start.isoformat()} to {aligned_range.end.isoformat()}", + records_stored=record_count, + storage_timestamp=datetime.now().isoformat(), + purpose="model_training_and_retraining" + ) + + async def retrieve_stored_traffic_for_retraining( + self, + bakery_location: Tuple[float, float], + start_date: datetime, + end_date: datetime, + tenant_id: str + ) -> List[Dict[str, Any]]: + """ + Retrieve previously stored traffic data for model re-training + This method specifically accesses the stored traffic data without making new API calls + """ + lat, lon = bakery_location + + try: + # Use the dedicated stored traffic data method for training + stored_traffic_data = await self.data_client.fetch_stored_traffic_data_for_training( + tenant_id=tenant_id, + start_date=start_date.isoformat(), + end_date=end_date.isoformat(), + latitude=lat, + longitude=lon + ) + + if stored_traffic_data: + logger.info( + f"Retrieved {len(stored_traffic_data)} stored traffic records for re-training", + location=f"{lat:.4f},{lon:.4f}", + date_range=f"{start_date.isoformat()} to {end_date.isoformat()}", + tenant_id=tenant_id + ) + + return stored_traffic_data + else: + logger.warning( + "No stored traffic data found for re-training", + location=f"{lat:.4f},{lon:.4f}", + date_range=f"{start_date.isoformat()} to {end_date.isoformat()}" + ) + return [] + + except Exception as e: + logger.error( + f"Failed to retrieve stored traffic data for re-training: {e}", + location=f"{lat:.4f},{lon:.4f}", + tenant_id=tenant_id + ) + return [] + def _validate_weather_data(self, weather_data: List[Dict[str, Any]]) -> bool: """Validate weather data quality""" if not weather_data: diff --git a/shared/clients/data_client.py b/shared/clients/data_client.py index d21701bf..4b9af5b6 100644 --- a/shared/clients/data_client.py +++ b/shared/clients/data_client.py @@ -317,7 +317,53 @@ class DataServiceClient(BaseServiceClient): else: logger.error("Failed to fetch traffic data - _make_request returned None") logger.error("This could be due to: network timeout, HTTP error, authentication failure, or service unavailable") - return [] + return None + + async def get_stored_traffic_data_for_training( + self, + tenant_id: str, + start_date: str, + end_date: str, + latitude: Optional[float] = None, + longitude: Optional[float] = None + ) -> Optional[List[Dict[str, Any]]]: + """ + Get stored traffic data specifically for model training/re-training + This method prioritizes database-stored data over API calls + """ + # Prepare request payload + payload = { + "start_date": start_date, + "end_date": end_date, + "latitude": latitude or 40.4168, # Default Madrid coordinates + "longitude": longitude or -3.7038, + "stored_only": True # Flag to indicate we want stored data only + } + + logger.info(f"Training traffic data request: {payload}", tenant_id=tenant_id) + + # Standard timeout since we're only querying the database + training_timeout = httpx.Timeout( + connect=30.0, + read=120.0, # 2 minutes should be enough for database query + write=30.0, + pool=30.0 + ) + + result = await self._make_request( + "POST", + "traffic/stored", # New endpoint for stored traffic data + tenant_id=tenant_id, + data=payload, + timeout=training_timeout + ) + + if result: + logger.info(f"Successfully retrieved {len(result)} stored traffic records for training") + return result + else: + logger.warning("No stored traffic data available for training") + return None # ================================================================ # PRODUCTS