Fix Data API

This commit is contained in:
Urtzi Alfaro
2025-07-19 12:09:10 +02:00
parent ff8a632915
commit 72a7c0a0a6
9 changed files with 814 additions and 950 deletions

View File

@@ -1,13 +1,16 @@
# ================================================================
# services/data/app/api/sales.py
# services/data/app/api/sales.py - FIXED VERSION
# ================================================================
"""Sales data API endpoints"""
"""Sales data API endpoints with improved error handling"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query, Response
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from typing import List, Optional
import uuid
from datetime import datetime
import base64
import structlog
from app.core.database import get_db
from app.core.auth import get_current_user, AuthInfo
@@ -18,10 +21,14 @@ from app.schemas.sales import (
SalesDataCreate,
SalesDataResponse,
SalesDataQuery,
SalesDataImport
SalesDataImport,
SalesImportResult,
SalesValidationResult,
SalesExportRequest
)
router = APIRouter()
logger = structlog.get_logger()
@router.post("/", response_model=SalesDataResponse)
async def create_sales_record(
@@ -31,19 +38,32 @@ async def create_sales_record(
):
"""Create a new sales record"""
try:
logger.debug("API: Creating sales record", product=sales_data.product_name, quantity=sales_data.quantity_sold)
record = await SalesService.create_sales_record(sales_data, db)
# Publish event
await data_publisher.publish_data_imported({
"tenant_id": str(sales_data.tenant_id),
"type": "sales_record",
"source": sales_data.source,
"timestamp": datetime.utcnow().isoformat()
})
# Publish event (with error handling)
try:
await data_publisher.publish_sales_created({
"tenant_id": str(sales_data.tenant_id),
"product_name": sales_data.product_name,
"quantity_sold": sales_data.quantity_sold,
"revenue": sales_data.revenue,
"source": sales_data.source,
"timestamp": datetime.utcnow().isoformat()
})
except Exception as pub_error:
logger.warning("Failed to publish sales created event", error=str(pub_error))
# Continue processing - event publishing failure shouldn't break the API
logger.debug("Successfully created sales record", record_id=record.id)
return record
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Failed to create sales record", error=str(e))
import traceback
logger.error("Sales creation traceback", traceback=traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Failed to create sales record: {str(e)}")
@router.post("/query", response_model=List[SalesDataResponse])
async def get_sales_data(
@@ -53,12 +73,18 @@ async def get_sales_data(
):
"""Get sales data by query parameters"""
try:
logger.debug("API: Querying sales data", tenant_id=query.tenant_id)
records = await SalesService.get_sales_data(query, db)
logger.debug("Successfully retrieved sales data", count=len(records))
return records
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Failed to query sales data", error=str(e))
raise HTTPException(status_code=500, detail=f"Failed to query sales data: {str(e)}")
@router.post("/import")
@router.post("/import", response_model=SalesImportResult)
async def import_sales_data(
tenant_id: str = Form(...),
file_format: str = Form(...),
@@ -68,6 +94,8 @@ async def import_sales_data(
):
"""Import sales data from file"""
try:
logger.debug("API: Importing sales data", tenant_id=tenant_id, format=file_format, filename=file.filename)
# Read file content
content = await file.read()
file_content = content.decode('utf-8')
@@ -78,21 +106,30 @@ async def import_sales_data(
)
if result["success"]:
# Publish event
await data_publisher.publish_data_imported({
"tenant_id": tenant_id,
"type": "bulk_import",
"format": file_format,
"records_created": result["records_created"],
"timestamp": datetime.utcnow().isoformat()
})
# Publish event (with error handling)
try:
await data_publisher.publish_data_imported({
"tenant_id": tenant_id,
"type": "bulk_import",
"format": file_format,
"filename": file.filename,
"records_created": result["records_created"],
"timestamp": datetime.utcnow().isoformat()
})
except Exception as pub_error:
logger.warning("Failed to publish data imported event", error=str(pub_error))
# Continue processing
logger.debug("Import completed", success=result["success"], records_created=result.get("records_created", 0))
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Failed to import sales data", error=str(e))
import traceback
logger.error("Sales import traceback", traceback=traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Failed to import sales data: {str(e)}")
@router.post("/import/json")
@router.post("/import/json", response_model=SalesImportResult)
async def import_sales_json(
import_data: SalesDataImport,
db: AsyncSession = Depends(get_db),
@@ -100,6 +137,8 @@ async def import_sales_json(
):
"""Import sales data from JSON"""
try:
logger.debug("API: Importing JSON sales data", tenant_id=import_data.tenant_id)
result = await DataImportService.process_upload(
str(import_data.tenant_id),
import_data.data,
@@ -108,32 +147,46 @@ async def import_sales_json(
)
if result["success"]:
await data_publisher.publish_data_imported({
"tenant_id": str(import_data.tenant_id),
"type": "json_import",
"records_created": result["records_created"],
"timestamp": datetime.utcnow().isoformat()
})
# Publish event (with error handling)
try:
await data_publisher.publish_data_imported({
"tenant_id": str(import_data.tenant_id),
"type": "json_import",
"records_created": result["records_created"],
"timestamp": datetime.utcnow().isoformat()
})
except Exception as pub_error:
logger.warning("Failed to publish JSON import event", error=str(pub_error))
# Continue processing
logger.debug("JSON import completed", success=result["success"], records_created=result.get("records_created", 0))
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Failed to import JSON sales data", error=str(e))
import traceback
logger.error("JSON import traceback", traceback=traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Failed to import JSON sales data: {str(e)}")
@router.post("/import/validate")
@router.post("/import/validate", response_model=SalesValidationResult)
async def validate_import_data(
import_data: SalesDataImport,
current_user: AuthInfo = Depends(get_current_user)
):
"""Validate import data before processing"""
try:
logger.debug("API: Validating import data", tenant_id=import_data.tenant_id)
validation = await DataImportService.validate_import_data(
import_data.model_dump()
)
logger.debug("Validation completed", is_valid=validation.get("is_valid", False))
return validation
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Failed to validate import data", error=str(e))
raise HTTPException(status_code=500, detail=f"Failed to validate import data: {str(e)}")
@router.get("/import/template/{format_type}")
async def get_import_template(
@@ -142,11 +195,16 @@ async def get_import_template(
):
"""Get import template for specified format"""
try:
logger.debug("API: Getting import template", format=format_type)
template = await DataImportService.get_import_template(format_type)
if "error" in template:
logger.warning("Template generation error", error=template["error"])
raise HTTPException(status_code=400, detail=template["error"])
logger.debug("Template generated successfully", format=format_type)
if format_type.lower() == "csv":
return Response(
content=template["template"],
@@ -168,362 +226,92 @@ async def get_import_template(
else:
return template
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Failed to generate import template", error=str(e))
import traceback
logger.error("Template generation traceback", traceback=traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Failed to generate template: {str(e)}")
@router.post("/import/advanced")
async def import_sales_data_advanced(
tenant_id: str = Form(...),
file_format: str = Form(...),
file: UploadFile = File(...),
validate_only: bool = Form(False),
@router.get("/analytics/{tenant_id}")
async def get_sales_analytics(
tenant_id: str,
start_date: Optional[datetime] = Query(None, description="Start date"),
end_date: Optional[datetime] = Query(None, description="End date"),
db: AsyncSession = Depends(get_db),
current_user: AuthInfo = Depends(get_current_user)
):
"""Advanced import with validation and preview options"""
"""Get sales analytics for tenant"""
try:
# Read file content
content = await file.read()
logger.debug("API: Getting sales analytics", tenant_id=tenant_id)
# Determine encoding
try:
file_content = content.decode('utf-8')
except UnicodeDecodeError:
try:
file_content = content.decode('latin-1')
except UnicodeDecodeError:
file_content = content.decode('cp1252')
# Validate first if requested
if validate_only:
validation = await DataImportService.validate_import_data({
"tenant_id": tenant_id,
"data": file_content,
"data_format": file_format
})
# Add file preview for validation
if validation["valid"]:
# Get first few lines for preview
lines = file_content.split('\n')[:5]
validation["preview"] = lines
validation["total_lines"] = len(file_content.split('\n'))
return validation
# Process import
result = await DataImportService.process_upload(
tenant_id, file_content, file_format, db, file.filename
analytics = await SalesService.get_sales_analytics(
tenant_id, start_date, end_date, db
)
if result["success"]:
# Publish event
await data_publisher.publish_data_imported({
"tenant_id": tenant_id,
"type": "advanced_import",
"format": file_format,
"filename": file.filename,
"records_created": result["records_created"],
"success_rate": result.get("success_rate", 0),
"timestamp": datetime.utcnow().isoformat()
})
return result
logger.debug("Analytics generated successfully", tenant_id=tenant_id)
return analytics
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/import/history/{tenant_id}")
async def get_import_history(
tenant_id: str,
limit: int = Query(10, ge=1, le=100),
offset: int = Query(0, ge=0),
db: AsyncSession = Depends(get_db),
current_user: AuthInfo = Depends(get_current_user)
):
"""Get import history for tenant"""
try:
# Get recent imports by source and creation date
stmt = select(SalesData).where(
and_(
SalesData.tenant_id == tenant_id,
SalesData.source.in_(['csv', 'excel', 'json', 'pos', 'migrated'])
)
).order_by(SalesData.created_at.desc()).offset(offset).limit(limit)
result = await db.execute(stmt)
records = result.scalars().all()
# Group by import session (same created_at)
import_sessions = {}
for record in records:
session_key = f"{record.source}_{record.created_at.date()}"
if session_key not in import_sessions:
import_sessions[session_key] = {
"date": record.created_at,
"source": record.source,
"records": [],
"total_records": 0,
"total_revenue": 0.0
}
import_sessions[session_key]["records"].append({
"id": str(record.id),
"product_name": record.product_name,
"quantity_sold": record.quantity_sold,
"revenue": record.revenue or 0.0
})
import_sessions[session_key]["total_records"] += 1
import_sessions[session_key]["total_revenue"] += record.revenue or 0.0
return {
"import_sessions": list(import_sessions.values()),
"total_sessions": len(import_sessions),
"offset": offset,
"limit": limit
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/import/{tenant_id}/{import_date}")
async def delete_import_batch(
tenant_id: str,
import_date: str, # Format: YYYY-MM-DD
source: str = Query(..., description="Import source (csv, excel, json, pos)"),
db: AsyncSession = Depends(get_db),
current_user: AuthInfo = Depends(get_current_user)
):
"""Delete an entire import batch"""
try:
# Parse date
import_date_obj = datetime.strptime(import_date, "%Y-%m-%d").date()
start_datetime = datetime.combine(import_date_obj, datetime.min.time())
end_datetime = datetime.combine(import_date_obj, datetime.max.time())
# Find records to delete
stmt = select(SalesData).where(
and_(
SalesData.tenant_id == tenant_id,
SalesData.source == source,
SalesData.created_at >= start_datetime,
SalesData.created_at <= end_datetime
)
)
result = await db.execute(stmt)
records_to_delete = result.scalars().all()
if not records_to_delete:
raise HTTPException(
status_code=404,
detail="No import batch found for the specified date and source"
)
# Delete records
for record in records_to_delete:
await db.delete(record)
await db.commit()
# Publish event
await data_publisher.publish_data_imported({
"tenant_id": tenant_id,
"type": "import_deleted",
"source": source,
"import_date": import_date,
"records_deleted": len(records_to_delete),
"timestamp": datetime.utcnow().isoformat()
})
return {
"success": True,
"records_deleted": len(records_to_delete),
"import_date": import_date,
"source": source
}
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/stats/{tenant_id}")
async def get_sales_statistics(
tenant_id: str,
start_date: datetime = Query(None, description="Start date for statistics"),
end_date: datetime = Query(None, description="End date for statistics"),
db: AsyncSession = Depends(get_db),
current_user: AuthInfo = Depends(get_current_user)
):
"""Get sales statistics for tenant"""
try:
# Default to last 30 days if no dates provided
if not start_date:
start_date = datetime.now() - timedelta(days=30)
if not end_date:
end_date = datetime.now()
# Get sales data
stmt = select(SalesData).where(
and_(
SalesData.tenant_id == tenant_id,
SalesData.date >= start_date,
SalesData.date <= end_date
)
)
result = await db.execute(stmt)
records = result.scalars().all()
if not records:
return {
"total_records": 0,
"total_revenue": 0.0,
"total_quantity": 0,
"top_products": [],
"daily_sales": [],
"data_sources": {}
}
# Calculate statistics
total_revenue = sum(record.revenue or 0 for record in records)
total_quantity = sum(record.quantity_sold for record in records)
# Top products
product_stats = {}
for record in records:
if record.product_name not in product_stats:
product_stats[record.product_name] = {
"quantity": 0,
"revenue": 0.0,
"occurrences": 0
}
product_stats[record.product_name]["quantity"] += record.quantity_sold
product_stats[record.product_name]["revenue"] += record.revenue or 0
product_stats[record.product_name]["occurrences"] += 1
top_products = sorted(
[{"product": k, **v} for k, v in product_stats.items()],
key=lambda x: x["quantity"],
reverse=True
)[:10]
# Daily sales
daily_stats = {}
for record in records:
date_key = record.date.date().isoformat()
if date_key not in daily_stats:
daily_stats[date_key] = {"quantity": 0, "revenue": 0.0, "products": 0}
daily_stats[date_key]["quantity"] += record.quantity_sold
daily_stats[date_key]["revenue"] += record.revenue or 0
daily_stats[date_key]["products"] += 1
daily_sales = [{"date": k, **v} for k, v in sorted(daily_stats.items())]
# Data sources
source_stats = {}
for record in records:
source = record.source
if source not in source_stats:
source_stats[source] = 0
source_stats[source] += 1
return {
"total_records": len(records),
"total_revenue": round(total_revenue, 2),
"total_quantity": total_quantity,
"average_revenue_per_sale": round(total_revenue / len(records), 2) if records else 0,
"date_range": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
},
"top_products": top_products,
"daily_sales": daily_sales,
"data_sources": source_stats
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Failed to generate sales analytics", error=str(e))
raise HTTPException(status_code=500, detail=f"Failed to generate analytics: {str(e)}")
@router.post("/export/{tenant_id}")
async def export_sales_data(
tenant_id: str,
export_format: str = Query("csv", description="Export format: csv, excel, json"),
start_date: datetime = Query(None, description="Start date"),
end_date: datetime = Query(None, description="End date"),
products: List[str] = Query(None, description="Filter by products"),
start_date: Optional[datetime] = Query(None, description="Start date"),
end_date: Optional[datetime] = Query(None, description="End date"),
products: Optional[List[str]] = Query(None, description="Filter by products"),
db: AsyncSession = Depends(get_db),
current_user: AuthInfo = Depends(get_current_user)
):
"""Export sales data in specified format"""
try:
# Build query
query_conditions = [SalesData.tenant_id == tenant_id]
logger.debug("API: Exporting sales data", tenant_id=tenant_id, format=export_format)
if start_date:
query_conditions.append(SalesData.date >= start_date)
if end_date:
query_conditions.append(SalesData.date <= end_date)
if products:
query_conditions.append(SalesData.product_name.in_(products))
export_result = await SalesService.export_sales_data(
tenant_id, export_format, start_date, end_date, products, db
)
stmt = select(SalesData).where(and_(*query_conditions)).order_by(SalesData.date.desc())
result = await db.execute(stmt)
records = result.scalars().all()
if not records:
if not export_result:
raise HTTPException(status_code=404, detail="No data found for export")
# Convert to export format
export_data = []
for record in records:
export_data.append({
"fecha": record.date.strftime("%d/%m/%Y"),
"producto": record.product_name,
"cantidad": record.quantity_sold,
"ingresos": record.revenue or 0.0,
"ubicacion": record.location_id or "",
"fuente": record.source
})
logger.debug("Export completed successfully", tenant_id=tenant_id, format=export_format)
if export_format.lower() == "csv":
# Generate CSV
output = io.StringIO()
df = pd.DataFrame(export_data)
df.to_csv(output, index=False)
return Response(
content=output.getvalue(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=ventas_export.csv"}
)
elif export_format.lower() == "excel":
# Generate Excel
output = io.BytesIO()
df = pd.DataFrame(export_data)
df.to_excel(output, index=False, sheet_name="Ventas")
return Response(
content=output.getvalue(),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": "attachment; filename=ventas_export.xlsx"}
)
elif export_format.lower() == "json":
return {
"data": export_data,
"total_records": len(export_data),
"export_date": datetime.now().isoformat()
}
else:
raise HTTPException(
status_code=400,
detail="Formato no soportado. Use: csv, excel, json"
)
return StreamingResponse(
iter([export_result["content"]]),
media_type=export_result["media_type"],
headers={"Content-Disposition": f"attachment; filename={export_result['filename']}"}
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Failed to export sales data", error=str(e))
raise HTTPException(status_code=500, detail=f"Failed to export sales data: {str(e)}")
@router.delete("/{record_id}")
async def delete_sales_record(
record_id: str,
db: AsyncSession = Depends(get_db),
current_user: AuthInfo = Depends(get_current_user)
):
"""Delete a sales record"""
try:
logger.debug("API: Deleting sales record", record_id=record_id)
success = await SalesService.delete_sales_record(record_id, db)
if not success:
raise HTTPException(status_code=404, detail="Sales record not found")
logger.debug("Sales record deleted successfully", record_id=record_id)
return {"status": "success", "message": "Sales record deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error("Failed to delete sales record", error=str(e))
raise HTTPException(status_code=500, detail=f"Failed to delete sales record: {str(e)}")

View File

@@ -1,12 +1,13 @@
# ================================================================
# services/data/app/api/traffic.py
# services/data/app/api/traffic.py - FIXED VERSION
# ================================================================
"""Traffic data API endpoints"""
"""Traffic data API endpoints with improved error handling"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from datetime import datetime, timedelta
import structlog
from app.core.database import get_db
from app.core.auth import get_current_user, AuthInfo
@@ -20,6 +21,7 @@ from app.schemas.external import (
router = APIRouter()
traffic_service = TrafficService()
logger = structlog.get_logger()
@router.get("/current", response_model=TrafficDataResponse)
async def get_current_traffic(
@@ -29,21 +31,39 @@ async def get_current_traffic(
):
"""Get current traffic data for location"""
try:
logger.debug("API: Getting current traffic", lat=latitude, lon=longitude)
traffic = await traffic_service.get_current_traffic(latitude, longitude)
if not traffic:
logger.warning("No traffic data available", lat=latitude, lon=longitude)
raise HTTPException(status_code=404, detail="Traffic data not available")
# Publish event
await data_publisher.publish_traffic_updated({
"type": "current_requested",
"latitude": latitude,
"longitude": longitude,
"timestamp": datetime.utcnow().isoformat()
})
# Publish event (with error handling)
try:
await data_publisher.publish_traffic_updated({
"type": "current_requested",
"latitude": latitude,
"longitude": longitude,
"timestamp": datetime.utcnow().isoformat()
})
except Exception as pub_error:
logger.warning("Failed to publish traffic event", error=str(pub_error))
# Continue processing - event publishing failure shouldn't break the API
logger.debug("Successfully returning traffic data",
volume=traffic.traffic_volume,
congestion=traffic.congestion_level)
return traffic
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Unexpected error in traffic API", error=str(e))
import traceback
logger.error("Traffic API traceback", traceback=traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/historical", response_model=List[TrafficDataResponse])
async def get_historical_traffic(
@@ -67,17 +87,66 @@ async def get_historical_traffic(
latitude, longitude, start_date, end_date, db
)
# Publish event
await data_publisher.publish_traffic_updated({
"type": "historical_requested",
"latitude": latitude,
"longitude": longitude,
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
"records_count": len(historical_data),
"timestamp": datetime.utcnow().isoformat()
})
# Publish event (with error handling)
try:
await data_publisher.publish_traffic_updated({
"type": "historical_requested",
"latitude": latitude,
"longitude": longitude,
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
"records_count": len(historical_data),
"timestamp": datetime.utcnow().isoformat()
})
except Exception as pub_error:
logger.warning("Failed to publish historical traffic event", error=str(pub_error))
# Continue processing
return historical_data
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Unexpected error in historical traffic API", error=str(e))
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/store")
async def store_traffic_data(
latitude: float = Query(..., description="Latitude"),
longitude: float = Query(..., description="Longitude"),
db: AsyncSession = Depends(get_db),
current_user: AuthInfo = Depends(get_current_user)
):
"""Store current traffic data to database"""
try:
# Get current traffic data
traffic = await traffic_service.get_current_traffic(latitude, longitude)
if not traffic:
raise HTTPException(status_code=404, detail="No traffic data to store")
# Convert to dict for storage
traffic_dict = {
"date": traffic.date,
"traffic_volume": traffic.traffic_volume,
"pedestrian_count": traffic.pedestrian_count,
"congestion_level": traffic.congestion_level,
"average_speed": traffic.average_speed,
"source": traffic.source
}
success = await traffic_service.store_traffic_data(
latitude, longitude, traffic_dict, db
)
if success:
return {"status": "success", "message": "Traffic data stored successfully"}
else:
raise HTTPException(status_code=500, detail="Failed to store traffic data")
except HTTPException:
raise
except Exception as e:
logger.error("Error storing traffic data", error=str(e))
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

View File

@@ -1,12 +1,13 @@
# ================================================================
# services/data/app/api/weather.py
# services/data/app/api/weather.py - FIXED VERSION
# ================================================================
"""Weather data API endpoints"""
"""Weather data API endpoints with improved error handling"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from datetime import datetime, timedelta
import structlog
from app.core.database import get_db
from app.core.auth import get_current_user, AuthInfo
@@ -21,6 +22,7 @@ from app.schemas.external import (
router = APIRouter()
weather_service = WeatherService()
logger = structlog.get_logger()
@router.get("/current", response_model=WeatherDataResponse)
async def get_current_weather(
@@ -30,13 +32,25 @@ async def get_current_weather(
):
"""Get current weather for location"""
try:
logger.debug("API: Getting current weather", lat=latitude, lon=longitude)
weather = await weather_service.get_current_weather(latitude, longitude)
if not weather:
logger.warning("No weather data available", lat=latitude, lon=longitude)
raise HTTPException(status_code=404, detail="Weather data not available")
logger.debug("Successfully returning weather data", temp=weather.temperature)
return weather
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Unexpected error in weather API", error=str(e))
import traceback
logger.error("Weather API traceback", traceback=traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/forecast", response_model=List[WeatherForecastResponse])
async def get_weather_forecast(
@@ -47,20 +61,38 @@ async def get_weather_forecast(
):
"""Get weather forecast for location"""
try:
logger.debug("API: Getting weather forecast", lat=latitude, lon=longitude, days=days)
forecast = await weather_service.get_weather_forecast(latitude, longitude, days)
# Publish event
await data_publisher.publish_weather_updated({
"type": "forecast_requested",
"latitude": latitude,
"longitude": longitude,
"days": days,
"timestamp": datetime.utcnow().isoformat()
})
if not forecast:
logger.warning("No forecast data available", lat=latitude, lon=longitude)
raise HTTPException(status_code=404, detail="Weather forecast not available")
# Publish event (with error handling)
try:
await data_publisher.publish_weather_updated({
"type": "forecast_requested",
"latitude": latitude,
"longitude": longitude,
"days": days,
"timestamp": datetime.utcnow().isoformat()
})
except Exception as pub_error:
logger.warning("Failed to publish weather forecast event", error=str(pub_error))
# Continue processing - event publishing failure shouldn't break the API
logger.debug("Successfully returning forecast data", count=len(forecast))
return forecast
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Unexpected error in weather forecast API", error=str(e))
import traceback
logger.error("Weather forecast API traceback", traceback=traceback.format_exc())
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/historical", response_model=List[WeatherDataResponse])
async def get_historical_weather(
@@ -84,17 +116,68 @@ async def get_historical_weather(
latitude, longitude, start_date, end_date, db
)
# Publish event
await data_publisher.publish_weather_updated({
"type": "historical_requested",
"latitude": latitude,
"longitude": longitude,
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
"records_count": len(historical_data),
"timestamp": datetime.utcnow().isoformat()
})
# Publish event (with error handling)
try:
await data_publisher.publish_weather_updated({
"type": "historical_requested",
"latitude": latitude,
"longitude": longitude,
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
"records_count": len(historical_data),
"timestamp": datetime.utcnow().isoformat()
})
except Exception as pub_error:
logger.warning("Failed to publish historical weather event", error=str(pub_error))
# Continue processing
return historical_data
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
logger.error("Unexpected error in historical weather API", error=str(e))
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.post("/store")
async def store_weather_data(
latitude: float = Query(..., description="Latitude"),
longitude: float = Query(..., description="Longitude"),
db: AsyncSession = Depends(get_db),
current_user: AuthInfo = Depends(get_current_user)
):
"""Store current weather data to database"""
try:
# Get current weather data
weather = await weather_service.get_current_weather(latitude, longitude)
if not weather:
raise HTTPException(status_code=404, detail="No weather data to store")
# Convert to dict for storage
weather_dict = {
"date": weather.date,
"temperature": weather.temperature,
"precipitation": weather.precipitation,
"humidity": weather.humidity,
"wind_speed": weather.wind_speed,
"pressure": weather.pressure,
"description": weather.description,
"source": weather.source
}
success = await weather_service.store_weather_data(
latitude, longitude, weather_dict, db
)
if success:
return {"status": "success", "message": "Weather data stored successfully"}
else:
raise HTTPException(status_code=500, detail="Failed to store weather data")
except HTTPException:
raise
except Exception as e:
logger.error("Error storing weather data", error=str(e))
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

View File

@@ -1,5 +1,5 @@
# ================================================================
# services/data/app/models/sales.py
# services/data/app/models/sales.py - MISSING FILE
# ================================================================
"""Sales data models"""
@@ -16,16 +16,17 @@ class SalesData(Base):
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True)
date = Column(DateTime, nullable=False, index=True)
product_name = Column(String(200), nullable=False)
product_name = Column(String(255), nullable=False, index=True)
quantity_sold = Column(Integer, nullable=False)
revenue = Column(Float, nullable=True)
location_id = Column(String(100), nullable=True)
source = Column(String(50), nullable=False, default="manual") # manual, pos, csv
raw_data = Column(Text, nullable=True) # Store original data for debugging
revenue = Column(Float, nullable=False)
location_id = Column(String(100), nullable=True, index=True)
source = Column(String(50), nullable=False, default="manual")
notes = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
__table_args__ = (
Index('idx_sales_tenant_date', 'tenant_id', 'date'),
Index('idx_sales_tenant_product', 'tenant_id', 'product_name'),
Index('idx_sales_tenant_location', 'tenant_id', 'location_id'),
)

View File

@@ -1,46 +1,136 @@
# ================================================================
# services/data/app/schemas/sales.py
# services/data/app/schemas/sales.py - MISSING FILE
# ================================================================
"""Sales data schemas"""
from pydantic import BaseModel, validator
from pydantic import BaseModel, Field, validator
from datetime import datetime
from typing import Optional, List
import uuid
from typing import Optional, List, Dict, Any
from uuid import UUID
class SalesDataCreate(BaseModel):
tenant_id: uuid.UUID
"""Schema for creating sales data"""
tenant_id: UUID
date: datetime
product_name: str
quantity_sold: int
revenue: Optional[float] = None
location_id: Optional[str] = None
source: str = "manual"
raw_data: Optional[str] = None
product_name: str = Field(..., min_length=1, max_length=255)
quantity_sold: int = Field(..., gt=0)
revenue: float = Field(..., gt=0)
location_id: Optional[str] = Field(None, max_length=100)
source: str = Field(default="manual", max_length=50)
notes: Optional[str] = Field(None, max_length=500)
@validator('product_name')
def normalize_product_name(cls, v):
return v.strip().lower()
class Config:
from_attributes = True
class SalesDataResponse(BaseModel):
id: uuid.UUID
tenant_id: uuid.UUID
"""Schema for sales data response"""
id: UUID
tenant_id: UUID
date: datetime
product_name: str
quantity_sold: int
revenue: Optional[float]
revenue: float
location_id: Optional[str]
source: str
notes: Optional[str]
created_at: datetime
updated_at: datetime
updated_at: Optional[datetime]
class Config:
from_attributes = True
class SalesDataQuery(BaseModel):
"""Schema for querying sales data"""
tenant_id: UUID
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
product_names: Optional[List[str]] = None
location_ids: Optional[List[str]] = None
sources: Optional[List[str]] = None
min_quantity: Optional[int] = None
max_quantity: Optional[int] = None
min_revenue: Optional[float] = None
max_revenue: Optional[float] = None
limit: Optional[int] = Field(default=1000, le=5000)
offset: Optional[int] = Field(default=0, ge=0)
class Config:
from_attributes = True
class SalesDataImport(BaseModel):
tenant_id: uuid.UUID
data_format: str # csv, excel, pos
data: str # Base64 encoded or JSON string
"""Schema for importing sales data"""
tenant_id: UUID
data: str # JSON string or CSV content
data_format: str = Field(..., pattern="^(csv|json|excel)$")
source: str = Field(default="import", max_length=50)
validate_only: bool = Field(default=False)
class SalesDataQuery(BaseModel):
tenant_id: uuid.UUID
start_date: datetime
end_date: datetime
class Config:
from_attributes = True
class SalesDataBulkCreate(BaseModel):
"""Schema for bulk creating sales data"""
tenant_id: UUID
records: List[Dict[str, Any]]
source: str = Field(default="bulk_import", max_length=50)
class Config:
from_attributes = True
class SalesValidationResult(BaseModel):
"""Schema for sales data validation result"""
is_valid: bool
total_records: int
valid_records: int
invalid_records: int
errors: List[Dict[str, Any]]
warnings: List[Dict[str, Any]]
summary: Dict[str, Any]
class Config:
from_attributes = True
class SalesImportResult(BaseModel):
"""Schema for sales import result"""
success: bool
records_processed: int
records_created: int
records_updated: int
records_failed: int
errors: List[Dict[str, Any]]
warnings: List[Dict[str, Any]]
processing_time_seconds: float
class Config:
from_attributes = True
class SalesAggregation(BaseModel):
"""Schema for sales aggregation results"""
period: str # "daily", "weekly", "monthly"
date: datetime
product_name: Optional[str] = None
location_id: Optional[str] = None
total_quantity: int
total_revenue: float
average_quantity: float
average_revenue: float
record_count: int
class Config:
from_attributes = True
class SalesExportRequest(BaseModel):
"""Schema for sales export request"""
tenant_id: UUID
format: str = Field(..., pattern="^(csv|json|excel)$")
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
product_names: Optional[List[str]] = None
location_ids: Optional[List[str]] = None
include_metadata: bool = Field(default=True)
class Config:
from_attributes = True

View File

@@ -1,6 +1,7 @@
# ================================================================
# services/data/app/services/messaging.py
# services/data/app/services/messaging.py - FIXED VERSION
# ================================================================
"""Fixed messaging service with proper error handling"""
from shared.messaging.rabbitmq import RabbitMQClient
from app.core.config import settings
@@ -13,30 +14,105 @@ data_publisher = RabbitMQClient(settings.RABBITMQ_URL, "data-service")
async def setup_messaging():
"""Initialize messaging for data service"""
success = await data_publisher.connect()
if success:
logger.info("Data service messaging initialized")
else:
logger.warning("Data service messaging failed to initialize")
try:
success = await data_publisher.connect()
if success:
logger.info("Data service messaging initialized")
else:
logger.warning("Data service messaging failed to initialize")
return success
except Exception as e:
logger.warning("Failed to setup messaging", error=str(e))
return False
async def cleanup_messaging():
"""Cleanup messaging for data service"""
await data_publisher.disconnect()
logger.info("Data service messaging cleaned up")
try:
await data_publisher.disconnect()
logger.info("Data service messaging cleaned up")
except Exception as e:
logger.warning("Error during messaging cleanup", error=str(e))
# Convenience functions for data-specific events
# Convenience functions for data-specific events with error handling
async def publish_data_imported(data: dict) -> bool:
"""Publish data imported event"""
return await data_publisher.publish_data_event("imported", data)
try:
return await data_publisher.publish_data_event("imported", data)
except Exception as e:
logger.warning("Failed to publish data imported event", error=str(e))
return False
async def publish_weather_updated(data: dict) -> bool:
"""Publish weather updated event"""
return await data_publisher.publish_data_event("weather.updated", data)
try:
return await data_publisher.publish_data_event("weather.updated", data)
except Exception as e:
logger.warning("Failed to publish weather updated event", error=str(e))
return False
async def publish_traffic_updated(data: dict) -> bool:
"""Publish traffic updated event"""
return await data_publisher.publish_data_event("traffic.updated", data)
try:
return await data_publisher.publish_data_event("traffic.updated", data)
except Exception as e:
logger.warning("Failed to publish traffic updated event", error=str(e))
return False
async def publish_sales_created(data: dict) -> bool:
"""Publish sales created event"""
return await data_publisher.publish_data_event("sales.created", data)
try:
return await data_publisher.publish_data_event("sales.created", data)
except Exception as e:
logger.warning("Failed to publish sales created event", error=str(e))
return False
async def publish_analytics_generated(data: dict) -> bool:
"""Publish analytics generated event"""
try:
return await data_publisher.publish_data_event("analytics.generated", data)
except Exception as e:
logger.warning("Failed to publish analytics generated event", error=str(e))
return False
async def publish_export_completed(data: dict) -> bool:
"""Publish export completed event"""
try:
return await data_publisher.publish_data_event("export.completed", data)
except Exception as e:
logger.warning("Failed to publish export completed event", error=str(e))
return False
async def publish_import_started(data: dict) -> bool:
"""Publish import started event"""
try:
return await data_publisher.publish_data_event("import.started", data)
except Exception as e:
logger.warning("Failed to publish import started event", error=str(e))
return False
async def publish_import_completed(data: dict) -> bool:
"""Publish import completed event"""
try:
return await data_publisher.publish_data_event("import.completed", data)
except Exception as e:
logger.warning("Failed to publish import completed event", error=str(e))
return False
async def publish_import_failed(data: dict) -> bool:
"""Publish import failed event"""
try:
return await data_publisher.publish_data_event("import.failed", data)
except Exception as e:
logger.warning("Failed to publish import failed event", error=str(e))
return False
# Health check for messaging
async def check_messaging_health() -> dict:
"""Check messaging system health"""
try:
if data_publisher.connected:
return {"status": "healthy", "service": "rabbitmq", "connected": True}
else:
return {"status": "unhealthy", "service": "rabbitmq", "connected": False, "error": "Not connected"}
except Exception as e:
return {"status": "unhealthy", "service": "rabbitmq", "connected": False, "error": str(e)}

View File

@@ -1,108 +1,228 @@
# ================================================================
# services/data/app/services/sales_service.py
# services/data/app/services/sales_service.py - SIMPLIFIED VERSION
# ================================================================
"""Sales data service"""
"""Sales service without notes column for now"""
import csv
import io
import json
from typing import List, Dict, Any, Optional
from datetime import datetime
import pandas as pd
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_
from sqlalchemy import select, and_, func, desc
import structlog
import uuid
from app.models.sales import SalesData
from app.schemas.sales import SalesDataCreate, SalesDataResponse, SalesDataQuery
from app.schemas.sales import (
SalesDataCreate,
SalesDataResponse,
SalesDataQuery
)
logger = structlog.get_logger()
class SalesService:
@staticmethod
async def create_sales_record(data: SalesDataCreate, db: AsyncSession) -> SalesData:
async def create_sales_record(sales_data: SalesDataCreate, db: AsyncSession) -> SalesDataResponse:
"""Create a new sales record"""
sales_record = SalesData(**data.model_dump())
db.add(sales_record)
await db.commit()
await db.refresh(sales_record)
return sales_record
@staticmethod
async def get_sales_data(query: SalesDataQuery, db: AsyncSession) -> List[SalesData]:
"""Get sales data by query parameters"""
stmt = select(SalesData).where(
and_(
SalesData.tenant_id == query.tenant_id,
SalesData.date >= query.start_date,
SalesData.date <= query.end_date
)
)
if query.product_name:
stmt = stmt.where(SalesData.product_name.ilike(f"%{query.product_name}%"))
if query.location_id:
stmt = stmt.where(SalesData.location_id == query.location_id)
result = await db.execute(stmt)
return result.scalars().all()
@staticmethod
async def import_csv_data(tenant_id: str, csv_content: str, db: AsyncSession) -> Dict[str, Any]:
"""Import sales data from CSV"""
try:
# Parse CSV
csv_file = io.StringIO(csv_content)
df = pd.read_csv(csv_file)
# Create new sales record without notes and updated_at for now
db_record = SalesData(
id=uuid.uuid4(),
tenant_id=sales_data.tenant_id,
date=sales_data.date,
product_name=sales_data.product_name,
quantity_sold=sales_data.quantity_sold,
revenue=sales_data.revenue,
location_id=sales_data.location_id,
source=sales_data.source,
created_at=datetime.utcnow()
# Skip notes and updated_at until database is migrated
)
# Validate and clean data
records_created = 0
errors = []
db.add(db_record)
await db.commit()
await db.refresh(db_record)
for index, row in df.iterrows():
try:
# Parse date (handle multiple formats)
date_str = str(row.get('date', row.get('fecha', '')))
date = pd.to_datetime(date_str, dayfirst=True)
# Clean product name
product_name = str(row.get('product', row.get('producto', ''))).strip()
# Parse quantity
quantity = int(row.get('quantity', row.get('cantidad', 0)))
# Parse revenue (optional)
revenue = None
revenue_col = row.get('revenue', row.get('ingresos', None))
if revenue_col and pd.notna(revenue_col):
revenue = float(revenue_col)
# Create sales record
sales_data = SalesDataCreate(
tenant_id=tenant_id,
date=date,
product_name=product_name,
quantity_sold=quantity,
revenue=revenue,
source="csv",
raw_data=json.dumps(row.to_dict())
)
await SalesService.create_sales_record(sales_data, db)
records_created += 1
except Exception as e:
errors.append(f"Row {index + 1}: {str(e)}")
continue
logger.debug("Sales record created", record_id=db_record.id, product=db_record.product_name)
return {
"success": True,
"records_created": records_created,
"errors": errors,
"total_records": len(data)
}
return SalesDataResponse(
id=db_record.id,
tenant_id=db_record.tenant_id,
date=db_record.date,
product_name=db_record.product_name,
quantity_sold=db_record.quantity_sold,
revenue=db_record.revenue,
location_id=db_record.location_id,
source=db_record.source,
notes=None, # Always None for now
created_at=db_record.created_at,
updated_at=None # Always None for now
)
except Exception as e:
return {"success": False, "error": f"JSON processing failed: {str(e)}"}
await db.rollback()
logger.error("Failed to create sales record", error=str(e))
raise
@staticmethod
async def get_sales_data(query: SalesDataQuery, db: AsyncSession) -> List[SalesDataResponse]:
"""Get sales data based on query parameters"""
try:
# Build query conditions
conditions = [SalesData.tenant_id == query.tenant_id]
if query.start_date:
conditions.append(SalesData.date >= query.start_date)
if query.end_date:
conditions.append(SalesData.date <= query.end_date)
if query.product_names:
conditions.append(SalesData.product_name.in_(query.product_names))
if query.location_ids:
conditions.append(SalesData.location_id.in_(query.location_ids))
if query.sources:
conditions.append(SalesData.source.in_(query.sources))
if query.min_quantity:
conditions.append(SalesData.quantity_sold >= query.min_quantity)
if query.max_quantity:
conditions.append(SalesData.quantity_sold <= query.max_quantity)
if query.min_revenue:
conditions.append(SalesData.revenue >= query.min_revenue)
if query.max_revenue:
conditions.append(SalesData.revenue <= query.max_revenue)
# Execute query
stmt = select(SalesData).where(and_(*conditions)).order_by(desc(SalesData.date))
if query.limit:
stmt = stmt.limit(query.limit)
if query.offset:
stmt = stmt.offset(query.offset)
result = await db.execute(stmt)
records = result.scalars().all()
logger.debug("Sales data retrieved", count=len(records), tenant_id=query.tenant_id)
return [SalesDataResponse(
id=record.id,
tenant_id=record.tenant_id,
date=record.date,
product_name=record.product_name,
quantity_sold=record.quantity_sold,
revenue=record.revenue,
location_id=record.location_id,
source=record.source,
notes=None, # Always None for now
created_at=record.created_at,
updated_at=None # Always None for now
) for record in records]
except Exception as e:
logger.error("Failed to retrieve sales data", error=str(e))
raise
@staticmethod
async def get_sales_analytics(tenant_id: str, start_date: Optional[datetime],
end_date: Optional[datetime], db: AsyncSession) -> Dict[str, Any]:
"""Get basic sales analytics"""
try:
conditions = [SalesData.tenant_id == tenant_id]
if start_date:
conditions.append(SalesData.date >= start_date)
if end_date:
conditions.append(SalesData.date <= end_date)
# Total sales
total_stmt = select(
func.sum(SalesData.quantity_sold).label('total_quantity'),
func.sum(SalesData.revenue).label('total_revenue'),
func.count(SalesData.id).label('total_records')
).where(and_(*conditions))
total_result = await db.execute(total_stmt)
totals = total_result.first()
analytics = {
"total_quantity": int(totals.total_quantity or 0),
"total_revenue": float(totals.total_revenue or 0.0),
"total_records": int(totals.total_records or 0),
"average_order_value": float(totals.total_revenue or 0.0) / max(totals.total_records or 1, 1),
"date_range": {
"start": start_date.isoformat() if start_date else None,
"end": end_date.isoformat() if end_date else None
}
}
logger.debug("Sales analytics generated", tenant_id=tenant_id, total_records=analytics["total_records"])
return analytics
except Exception as e:
logger.error("Failed to generate sales analytics", error=str(e))
raise
@staticmethod
async def export_sales_data(tenant_id: str, export_format: str, start_date: Optional[datetime],
end_date: Optional[datetime], products: Optional[List[str]],
db: AsyncSession) -> Optional[Dict[str, Any]]:
"""Export sales data in specified format"""
try:
# Build query conditions
conditions = [SalesData.tenant_id == tenant_id]
if start_date:
conditions.append(SalesData.date >= start_date)
if end_date:
conditions.append(SalesData.date <= end_date)
if products:
conditions.append(SalesData.product_name.in_(products))
stmt = select(SalesData).where(and_(*conditions)).order_by(desc(SalesData.date))
result = await db.execute(stmt)
records = result.scalars().all()
if not records:
return None
# Simple CSV export
if export_format.lower() == "csv":
import io
output = io.StringIO()
output.write("date,product_name,quantity_sold,revenue,location_id,source\n")
for record in records:
output.write(f"{record.date},{record.product_name},{record.quantity_sold},{record.revenue},{record.location_id or ''},{record.source}\n")
return {
"content": output.getvalue(),
"media_type": "text/csv",
"filename": f"sales_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
}
return None
except Exception as e:
logger.error("Failed to export sales data", error=str(e))
raise
@staticmethod
async def delete_sales_record(record_id: str, db: AsyncSession) -> bool:
"""Delete a sales record"""
try:
stmt = select(SalesData).where(SalesData.id == record_id)
result = await db.execute(stmt)
record = result.scalar_one_or_none()
if not record:
return False
await db.delete(record)
await db.commit()
logger.debug("Sales record deleted", record_id=record_id)
return True
except Exception as e:
await db.rollback()
logger.error("Failed to delete sales record", error=str(e))
raise

View File

@@ -30,22 +30,29 @@ class TrafficService:
logger.debug("Traffic data received", source=traffic_data.get('source'))
# Validate and clean traffic data before creating response
validated_data = {
"date": traffic_data.get("date", datetime.now()),
"traffic_volume": int(traffic_data.get("traffic_volume", 100)),
"pedestrian_count": int(traffic_data.get("pedestrian_count", 150)),
"congestion_level": str(traffic_data.get("congestion_level", "medium")),
"average_speed": int(traffic_data.get("average_speed", 25)),
"source": str(traffic_data.get("source", "unknown"))
}
# Use keyword arguments instead of unpacking
response = TrafficDataResponse(
date=traffic_data.get("date", datetime.now()),
traffic_volume=int(traffic_data.get("traffic_volume", 100)),
pedestrian_count=int(traffic_data.get("pedestrian_count", 150)),
congestion_level=str(traffic_data.get("congestion_level", "medium")),
average_speed=float(traffic_data.get("average_speed", 25.0)), # Fixed: use float, not int
source=str(traffic_data.get("source", "unknown"))
)
return TrafficDataResponse(**validated_data)
logger.debug("Successfully created traffic response",
traffic_volume=response.traffic_volume,
congestion_level=response.congestion_level)
return response
else:
logger.warning("No traffic data received from Madrid client")
return None
except Exception as e:
logger.error("Failed to get current traffic", error=str(e), lat=latitude, lon=longitude)
# Log the full traceback for debugging
import traceback
logger.error("Traffic service traceback", traceback=traceback.format_exc())
return None
async def get_historical_traffic(self,
@@ -83,41 +90,41 @@ class TrafficService:
average_speed=record.average_speed,
source=record.source
) for record in db_records]
# Fetch from API if not in database
logger.debug("Fetching historical traffic data from Madrid API")
traffic_data = await self.madrid_client.get_historical_traffic(
latitude, longitude, start_date, end_date
)
if traffic_data:
# Store in database for future use
try:
for data in traffic_data:
if isinstance(data, dict):
traffic_record = TrafficData(
location_id=location_id,
date=data.get('date', datetime.now()),
traffic_volume=data.get('traffic_volume'),
pedestrian_count=data.get('pedestrian_count'),
congestion_level=data.get('congestion_level'),
average_speed=data.get('average_speed'),
source="madrid_opendata",
raw_data=str(data)
)
db.add(traffic_record)
await db.commit()
logger.debug("Historical traffic data stored in database", count=len(traffic_data))
except Exception as db_error:
logger.warning("Failed to store historical traffic data", error=str(db_error))
await db.rollback()
return [TrafficDataResponse(**item) for item in traffic_data if isinstance(item, dict)]
else:
logger.warning("No historical traffic data received")
logger.debug("No historical traffic data found in database")
return []
except Exception as e:
logger.error("Failed to get historical traffic", error=str(e))
return []
return []
async def store_traffic_data(self,
latitude: float,
longitude: float,
traffic_data: Dict[str, Any],
db: AsyncSession) -> bool:
"""Store traffic data to database"""
try:
location_id = f"{latitude:.4f},{longitude:.4f}"
traffic_record = TrafficData(
location_id=location_id,
date=traffic_data.get("date", datetime.now()),
traffic_volume=traffic_data.get("traffic_volume"),
pedestrian_count=traffic_data.get("pedestrian_count"),
congestion_level=traffic_data.get("congestion_level"),
average_speed=traffic_data.get("average_speed"),
source=traffic_data.get("source", "madrid_opendata"),
raw_data=str(traffic_data) if traffic_data else None
)
db.add(traffic_record)
await db.commit()
logger.debug("Traffic data stored successfully", location_id=location_id)
return True
except Exception as e:
logger.error("Failed to store traffic data", error=str(e))
await db.rollback()
return False