From 592a81076233c2223c7129d60f23e87fae3680f0 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Fri, 18 Jul 2025 11:51:43 +0200 Subject: [PATCH] Fix issues --- docker-compose.yml | 2 +- gateway/app/routes/data.py | 111 +-- services/auth/app/services/auth_service.py | 1 - services/data/Dockerfile | 2 +- services/data/app/api/sales.py | 529 ++++++++++++++ services/data/app/api/traffic.py | 83 +++ services/data/app/api/weather.py | 100 +++ services/data/app/core/auth.py | 36 + services/data/app/core/config.py | 41 +- services/data/app/core/database.py | 50 +- services/data/app/external/__init__.py | 0 services/data/app/external/aemet.py | 254 +++++++ services/data/app/external/base_client.py | 67 ++ services/data/app/external/madrid_opendata.py | 235 +++++++ services/data/app/main.py | 75 +- services/data/app/models/sales.py | 31 + services/data/app/models/traffic.py | 29 + services/data/app/models/weather.py | 50 ++ services/data/app/schemas/external.py | 45 ++ services/data/app/schemas/sales.py | 46 ++ .../data/app/services/data_import_service.py | 648 ++++++++++++++++++ services/data/app/services/messaging.py | 168 +++++ services/data/app/services/sales_service.py | 108 +++ services/data/app/services/traffic_service.py | 90 +++ services/data/app/services/weather_service.py | 103 +++ services/data/migrations/alembic.ini | 117 ++++ services/data/migrations/env.py | 68 ++ services/data/migrations/script.py.mako | 29 + services/data/requirements.txt | 45 +- services/data/tests/__init__.py | 0 services/data/tests/conftest.py | 82 +++ services/data/tests/test_data.py | 94 +++ services/data/tests/test_external.py | 87 +++ shared/monitoring/metrics.py | 117 +++- test_data.py | 385 +++++++++++ 35 files changed, 3806 insertions(+), 122 deletions(-) create mode 100644 services/data/app/api/sales.py create mode 100644 services/data/app/api/traffic.py create mode 100644 services/data/app/api/weather.py create mode 100644 services/data/app/core/auth.py create mode 100644 services/data/app/external/__init__.py create mode 100644 services/data/app/external/aemet.py create mode 100644 services/data/app/external/base_client.py create mode 100644 services/data/app/external/madrid_opendata.py create mode 100644 services/data/app/models/sales.py create mode 100644 services/data/app/models/traffic.py create mode 100644 services/data/app/models/weather.py create mode 100644 services/data/app/schemas/external.py create mode 100644 services/data/app/schemas/sales.py create mode 100644 services/data/app/services/data_import_service.py create mode 100644 services/data/app/services/messaging.py create mode 100644 services/data/app/services/sales_service.py create mode 100644 services/data/app/services/traffic_service.py create mode 100644 services/data/app/services/weather_service.py create mode 100644 services/data/migrations/alembic.ini create mode 100644 services/data/migrations/env.py create mode 100644 services/data/migrations/script.py.mako create mode 100644 services/data/tests/__init__.py create mode 100644 services/data/tests/conftest.py create mode 100644 services/data/tests/test_data.py create mode 100644 services/data/tests/test_external.py create mode 100644 test_data.py diff --git a/docker-compose.yml b/docker-compose.yml index 88657da3..ac4c3a18 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -279,7 +279,7 @@ services: - REDIS_URL=redis://redis:6379/3 - RABBITMQ_URL=amqp://bakery:forecast123@rabbitmq:5672/ - AUTH_SERVICE_URL=http://auth-service:8000 - - AEMET_API_KEY=your-aemet-api-key-here + - AEMET_API_KEY=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1YWxmYXJvQGdtYWlsLmNvbSIsImp0aSI6ImRjZWVmNTEwLTdmYzEtNGMxNy1hODZkLWQ4NzdlZDc5ZDllNyIsImlzcyI6IkFFTUVUIiwiaWF0IjoxNzUyODMwMDg3LCJ1c2VySWQiOiJkY2VlZjUxMC03ZmMxLTRjMTctYTg2ZC1kODc3ZWQ3OWQ5ZTciLCJyb2xlIjoiIn0.C047gaiEhWhH4ItDgkHSwg8HzKTzw87TOPRTRf8j-2w - MADRID_OPENDATA_API_KEY=your-madrid-opendata-key-here - SERVICE_NAME=data-service - SERVICE_VERSION=1.0.0 diff --git a/gateway/app/routes/data.py b/gateway/app/routes/data.py index c0df3cc1..c23ea161 100644 --- a/gateway/app/routes/data.py +++ b/gateway/app/routes/data.py @@ -1,66 +1,71 @@ -""" -Data routes for gateway -""" +# ================================================================ +# gateway/app/routes/data.py +# ================================================================ +"""Data service routes for API Gateway""" -from fastapi import APIRouter, Request, HTTPException -from fastapi.responses import JSONResponse +from fastapi import APIRouter, Request, Depends, HTTPException +from fastapi.responses import StreamingResponse import httpx -import logging +import structlog from app.core.config import settings +from app.core.auth import verify_token -logger = logging.getLogger(__name__) +logger = structlog.get_logger() router = APIRouter() -@router.post("/upload") -async def upload_sales_data(request: Request): - """Proxy data upload to data service""" +@router.api_route("/sales/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) +async def proxy_sales(request: Request, path: str, current_user: dict = Depends(verify_token)): + """Proxy sales data requests to data service""" + return await _proxy_request(request, f"/api/v1/sales/{path}") + +@router.api_route("/weather/{path:path}", methods=["GET", "POST"]) +async def proxy_weather(request: Request, path: str, current_user: dict = Depends(verify_token)): + """Proxy weather requests to data service""" + return await _proxy_request(request, f"/api/v1/weather/{path}") + +@router.api_route("/traffic/{path:path}", methods=["GET", "POST"]) +async def proxy_traffic(request: Request, path: str, current_user: dict = Depends(verify_token)): + """Proxy traffic requests to data service""" + return await _proxy_request(request, f"/api/v1/traffic/{path}") + +async def _proxy_request(request: Request, target_path: str): + """Proxy request to data service""" try: - body = await request.body() - auth_header = request.headers.get("Authorization") + url = f"{settings.DATA_SERVICE_URL}{target_path}" + + # Forward headers (including auth) + headers = dict(request.headers) + headers.pop("host", None) # Remove host header + + # Get request body if present + body = None + if request.method in ["POST", "PUT", "PATCH"]: + body = await request.body() async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.post( - f"{settings.DATA_SERVICE_URL}/upload", - content=body, - headers={ - "Content-Type": "application/json", - "Authorization": auth_header - } - ) - - return JSONResponse( - status_code=response.status_code, - content=response.json() + response = await client.request( + method=request.method, + url=url, + params=request.query_params, + headers=headers, + content=body ) + # Return streaming response for large payloads + if int(response.headers.get("content-length", 0)) > 1024: + return StreamingResponse( + iter([response.content]), + status_code=response.status_code, + headers=dict(response.headers), + media_type=response.headers.get("content-type") + ) + else: + return response.json() if response.headers.get("content-type", "").startswith("application/json") else response.content + except httpx.RequestError as e: - logger.error(f"Data service unavailable: {e}") - raise HTTPException( - status_code=503, - detail="Data service unavailable" - ) - -@router.get("/sales") -async def get_sales_data(request: Request): - """Get sales data""" - try: - auth_header = request.headers.get("Authorization") - - async with httpx.AsyncClient(timeout=10.0) as client: - response = await client.get( - f"{settings.DATA_SERVICE_URL}/sales", - headers={"Authorization": auth_header} - ) - - return JSONResponse( - status_code=response.status_code, - content=response.json() - ) - - except httpx.RequestError as e: - logger.error(f"Data service unavailable: {e}") - raise HTTPException( - status_code=503, - detail="Data service unavailable" - ) + logger.error("Data service request failed", error=str(e)) + raise HTTPException(status_code=503, detail="Data service unavailable") + except Exception as e: + logger.error("Unexpected error in data proxy", error=str(e)) + raise HTTPException(status_code=500, detail="Internal server error") \ No newline at end of file diff --git a/services/auth/app/services/auth_service.py b/services/auth/app/services/auth_service.py index 28b34493..c597d2ab 100644 --- a/services/auth/app/services/auth_service.py +++ b/services/auth/app/services/auth_service.py @@ -62,7 +62,6 @@ class AuthService: "user_events", "user.registered", UserRegisteredEvent( - event_id=str(user.id), service_name="auth-service", timestamp=datetime.now(timezone.utc), data={ diff --git a/services/data/Dockerfile b/services/data/Dockerfile index 6f6cab04..2f7bf19c 100644 --- a/services/data/Dockerfile +++ b/services/data/Dockerfile @@ -37,4 +37,4 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1 # Run application -CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/services/data/app/api/sales.py b/services/data/app/api/sales.py new file mode 100644 index 00000000..c48170cc --- /dev/null +++ b/services/data/app/api/sales.py @@ -0,0 +1,529 @@ +# ================================================================ +# services/data/app/api/sales.py +# ================================================================ +"""Sales data API endpoints""" + +from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query +from sqlalchemy.ext.asyncio import AsyncSession +from typing import List +import uuid +from datetime import datetime + +from app.core.database import get_db +from app.core.auth import verify_token +from app.services.sales_service import SalesService +from app.services.data_import_service import DataImportService +from app.services.messaging import data_publisher +from app.schemas.sales import ( + SalesDataCreate, + SalesDataResponse, + SalesDataQuery, + SalesDataImport +) + +router = APIRouter() + +@router.post("/", response_model=SalesDataResponse) +async def create_sales_record( + sales_data: SalesDataCreate, + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Create a new sales record""" + try: + record = await SalesService.create_sales_record(sales_data, db) + + # Publish event + await data_publisher.publish_data_imported({ + "tenant_id": str(sales_data.tenant_id), + "type": "sales_record", + "source": sales_data.source, + "timestamp": datetime.utcnow().isoformat() + }) + + return record + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/query", response_model=List[SalesDataResponse]) +async def get_sales_data( + query: SalesDataQuery, + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Get sales data by query parameters""" + try: + records = await SalesService.get_sales_data(query, db) + return records + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/import") +async def import_sales_data( + tenant_id: str = Form(...), + file_format: str = Form(...), + file: UploadFile = File(...), + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Import sales data from file""" + try: + # Read file content + content = await file.read() + file_content = content.decode('utf-8') + + # Process import + result = await DataImportService.process_upload( + tenant_id, file_content, file_format, db + ) + + if result["success"]: + # Publish event + await data_publisher.publish_data_imported({ + "tenant_id": tenant_id, + "type": "bulk_import", + "format": file_format, + "records_created": result["records_created"], + "timestamp": datetime.utcnow().isoformat() + }) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/import/json") +async def import_sales_json( + import_data: SalesDataImport, + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Import sales data from JSON""" + try: + result = await DataImportService.process_upload( + str(import_data.tenant_id), + import_data.data, + import_data.data_format, + db + ) + + if result["success"]: + await data_publisher.publish_data_imported({ + "tenant_id": str(import_data.tenant_id), + "type": "json_import", + "records_created": result["records_created"], + "timestamp": datetime.utcnow().isoformat() + }) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/import/validate") +async def validate_import_data( + import_data: SalesDataImport, + current_user: dict = Depends(verify_token) +): + """Validate import data before processing""" + try: + validation = await DataImportService.validate_import_data( + import_data.model_dump() + ) + return validation + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/import/template/{format_type}") +async def get_import_template( + format_type: str, + current_user: dict = Depends(verify_token) +): + """Get import template for specified format""" + try: + template = await DataImportService.get_import_template(format_type) + + if "error" in template: + raise HTTPException(status_code=400, detail=template["error"]) + + if format_type.lower() == "csv": + return Response( + content=template["template"], + media_type="text/csv", + headers={"Content-Disposition": f"attachment; filename={template['filename']}"} + ) + elif format_type.lower() == "json": + return Response( + content=template["template"], + media_type="application/json", + headers={"Content-Disposition": f"attachment; filename={template['filename']}"} + ) + elif format_type.lower() in ["excel", "xlsx"]: + return Response( + content=base64.b64decode(template["template"]), + media_type=template["content_type"], + headers={"Content-Disposition": f"attachment; filename={template['filename']}"} + ) + else: + return template + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/import/advanced") +async def import_sales_data_advanced( + tenant_id: str = Form(...), + file_format: str = Form(...), + file: UploadFile = File(...), + validate_only: bool = Form(False), + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Advanced import with validation and preview options""" + try: + # Read file content + content = await file.read() + + # Determine encoding + try: + file_content = content.decode('utf-8') + except UnicodeDecodeError: + try: + file_content = content.decode('latin-1') + except UnicodeDecodeError: + file_content = content.decode('cp1252') + + # Validate first if requested + if validate_only: + validation = await DataImportService.validate_import_data({ + "tenant_id": tenant_id, + "data": file_content, + "data_format": file_format + }) + + # Add file preview for validation + if validation["valid"]: + # Get first few lines for preview + lines = file_content.split('\n')[:5] + validation["preview"] = lines + validation["total_lines"] = len(file_content.split('\n')) + + return validation + + # Process import + result = await DataImportService.process_upload( + tenant_id, file_content, file_format, db, file.filename + ) + + if result["success"]: + # Publish event + await data_publisher.publish_data_imported({ + "tenant_id": tenant_id, + "type": "advanced_import", + "format": file_format, + "filename": file.filename, + "records_created": result["records_created"], + "success_rate": result.get("success_rate", 0), + "timestamp": datetime.utcnow().isoformat() + }) + + return result + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/import/history/{tenant_id}") +async def get_import_history( + tenant_id: str, + limit: int = Query(10, ge=1, le=100), + offset: int = Query(0, ge=0), + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Get import history for tenant""" + try: + # Get recent imports by source and creation date + stmt = select(SalesData).where( + and_( + SalesData.tenant_id == tenant_id, + SalesData.source.in_(['csv', 'excel', 'json', 'pos', 'migrated']) + ) + ).order_by(SalesData.created_at.desc()).offset(offset).limit(limit) + + result = await db.execute(stmt) + records = result.scalars().all() + + # Group by import session (same created_at) + import_sessions = {} + for record in records: + session_key = f"{record.source}_{record.created_at.date()}" + if session_key not in import_sessions: + import_sessions[session_key] = { + "date": record.created_at, + "source": record.source, + "records": [], + "total_records": 0, + "total_revenue": 0.0 + } + + import_sessions[session_key]["records"].append({ + "id": str(record.id), + "product_name": record.product_name, + "quantity_sold": record.quantity_sold, + "revenue": record.revenue or 0.0 + }) + import_sessions[session_key]["total_records"] += 1 + import_sessions[session_key]["total_revenue"] += record.revenue or 0.0 + + return { + "import_sessions": list(import_sessions.values()), + "total_sessions": len(import_sessions), + "offset": offset, + "limit": limit + } + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.delete("/import/{tenant_id}/{import_date}") +async def delete_import_batch( + tenant_id: str, + import_date: str, # Format: YYYY-MM-DD + source: str = Query(..., description="Import source (csv, excel, json, pos)"), + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Delete an entire import batch""" + try: + # Parse date + import_date_obj = datetime.strptime(import_date, "%Y-%m-%d").date() + start_datetime = datetime.combine(import_date_obj, datetime.min.time()) + end_datetime = datetime.combine(import_date_obj, datetime.max.time()) + + # Find records to delete + stmt = select(SalesData).where( + and_( + SalesData.tenant_id == tenant_id, + SalesData.source == source, + SalesData.created_at >= start_datetime, + SalesData.created_at <= end_datetime + ) + ) + + result = await db.execute(stmt) + records_to_delete = result.scalars().all() + + if not records_to_delete: + raise HTTPException( + status_code=404, + detail="No import batch found for the specified date and source" + ) + + # Delete records + for record in records_to_delete: + await db.delete(record) + + await db.commit() + + # Publish event + await data_publisher.publish_data_imported({ + "tenant_id": tenant_id, + "type": "import_deleted", + "source": source, + "import_date": import_date, + "records_deleted": len(records_to_delete), + "timestamp": datetime.utcnow().isoformat() + }) + + return { + "success": True, + "records_deleted": len(records_to_delete), + "import_date": import_date, + "source": source + } + + except ValueError: + raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/stats/{tenant_id}") +async def get_sales_statistics( + tenant_id: str, + start_date: datetime = Query(None, description="Start date for statistics"), + end_date: datetime = Query(None, description="End date for statistics"), + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Get sales statistics for tenant""" + try: + # Default to last 30 days if no dates provided + if not start_date: + start_date = datetime.now() - timedelta(days=30) + if not end_date: + end_date = datetime.now() + + # Get sales data + stmt = select(SalesData).where( + and_( + SalesData.tenant_id == tenant_id, + SalesData.date >= start_date, + SalesData.date <= end_date + ) + ) + + result = await db.execute(stmt) + records = result.scalars().all() + + if not records: + return { + "total_records": 0, + "total_revenue": 0.0, + "total_quantity": 0, + "top_products": [], + "daily_sales": [], + "data_sources": {} + } + + # Calculate statistics + total_revenue = sum(record.revenue or 0 for record in records) + total_quantity = sum(record.quantity_sold for record in records) + + # Top products + product_stats = {} + for record in records: + if record.product_name not in product_stats: + product_stats[record.product_name] = { + "quantity": 0, + "revenue": 0.0, + "occurrences": 0 + } + product_stats[record.product_name]["quantity"] += record.quantity_sold + product_stats[record.product_name]["revenue"] += record.revenue or 0 + product_stats[record.product_name]["occurrences"] += 1 + + top_products = sorted( + [{"product": k, **v} for k, v in product_stats.items()], + key=lambda x: x["quantity"], + reverse=True + )[:10] + + # Daily sales + daily_stats = {} + for record in records: + date_key = record.date.date().isoformat() + if date_key not in daily_stats: + daily_stats[date_key] = {"quantity": 0, "revenue": 0.0, "products": 0} + daily_stats[date_key]["quantity"] += record.quantity_sold + daily_stats[date_key]["revenue"] += record.revenue or 0 + daily_stats[date_key]["products"] += 1 + + daily_sales = [{"date": k, **v} for k, v in sorted(daily_stats.items())] + + # Data sources + source_stats = {} + for record in records: + source = record.source + if source not in source_stats: + source_stats[source] = 0 + source_stats[source] += 1 + + return { + "total_records": len(records), + "total_revenue": round(total_revenue, 2), + "total_quantity": total_quantity, + "average_revenue_per_sale": round(total_revenue / len(records), 2) if records else 0, + "date_range": { + "start": start_date.isoformat(), + "end": end_date.isoformat() + }, + "top_products": top_products, + "daily_sales": daily_sales, + "data_sources": source_stats + } + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/export/{tenant_id}") +async def export_sales_data( + tenant_id: str, + export_format: str = Query("csv", description="Export format: csv, excel, json"), + start_date: datetime = Query(None, description="Start date"), + end_date: datetime = Query(None, description="End date"), + products: List[str] = Query(None, description="Filter by products"), + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Export sales data in specified format""" + try: + # Build query + query_conditions = [SalesData.tenant_id == tenant_id] + + if start_date: + query_conditions.append(SalesData.date >= start_date) + if end_date: + query_conditions.append(SalesData.date <= end_date) + if products: + query_conditions.append(SalesData.product_name.in_(products)) + + stmt = select(SalesData).where(and_(*query_conditions)).order_by(SalesData.date.desc()) + + result = await db.execute(stmt) + records = result.scalars().all() + + if not records: + raise HTTPException(status_code=404, detail="No data found for export") + + # Convert to export format + export_data = [] + for record in records: + export_data.append({ + "fecha": record.date.strftime("%d/%m/%Y"), + "producto": record.product_name, + "cantidad": record.quantity_sold, + "ingresos": record.revenue or 0.0, + "ubicacion": record.location_id or "", + "fuente": record.source + }) + + if export_format.lower() == "csv": + # Generate CSV + output = io.StringIO() + df = pd.DataFrame(export_data) + df.to_csv(output, index=False) + + return Response( + content=output.getvalue(), + media_type="text/csv", + headers={"Content-Disposition": "attachment; filename=ventas_export.csv"} + ) + + elif export_format.lower() == "excel": + # Generate Excel + output = io.BytesIO() + df = pd.DataFrame(export_data) + df.to_excel(output, index=False, sheet_name="Ventas") + + return Response( + content=output.getvalue(), + media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + headers={"Content-Disposition": "attachment; filename=ventas_export.xlsx"} + ) + + elif export_format.lower() == "json": + return { + "data": export_data, + "total_records": len(export_data), + "export_date": datetime.now().isoformat() + } + + else: + raise HTTPException( + status_code=400, + detail="Formato no soportado. Use: csv, excel, json" + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/services/data/app/api/traffic.py b/services/data/app/api/traffic.py new file mode 100644 index 00000000..e901e235 --- /dev/null +++ b/services/data/app/api/traffic.py @@ -0,0 +1,83 @@ +# ================================================================ +# services/data/app/api/traffic.py +# ================================================================ +"""Traffic data API endpoints""" + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.ext.asyncio import AsyncSession +from typing import List, Optional +from datetime import datetime, timedelta + +from app.core.database import get_db +from app.core.auth import verify_token +from app.services.traffic_service import TrafficService +from app.services.messaging import data_publisher +from app.schemas.external import ( + TrafficDataResponse, + LocationRequest, + DateRangeRequest +) + +router = APIRouter() +traffic_service = TrafficService() + +@router.get("/current", response_model=TrafficDataResponse) +async def get_current_traffic( + latitude: float = Query(..., description="Latitude"), + longitude: float = Query(..., description="Longitude"), + current_user: dict = Depends(verify_token) +): + """Get current traffic data for location""" + try: + traffic = await traffic_service.get_current_traffic(latitude, longitude) + if not traffic: + raise HTTPException(status_code=404, detail="Traffic data not available") + + # Publish event + await data_publisher.publish_traffic_updated({ + "type": "current_requested", + "latitude": latitude, + "longitude": longitude, + "timestamp": datetime.utcnow().isoformat() + }) + + return traffic + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/historical", response_model=List[TrafficDataResponse]) +async def get_historical_traffic( + latitude: float = Query(..., description="Latitude"), + longitude: float = Query(..., description="Longitude"), + start_date: datetime = Query(..., description="Start date"), + end_date: datetime = Query(..., description="End date"), + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Get historical traffic data""" + try: + # Validate date range + if end_date <= start_date: + raise HTTPException(status_code=400, detail="End date must be after start date") + + if (end_date - start_date).days > 90: + raise HTTPException(status_code=400, detail="Date range cannot exceed 90 days") + + historical_data = await traffic_service.get_historical_traffic( + latitude, longitude, start_date, end_date, db + ) + + # Publish event + await data_publisher.publish_traffic_updated({ + "type": "historical_requested", + "latitude": latitude, + "longitude": longitude, + "start_date": start_date.isoformat(), + "end_date": end_date.isoformat(), + "records_count": len(historical_data), + "timestamp": datetime.utcnow().isoformat() + }) + + return historical_data + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/services/data/app/api/weather.py b/services/data/app/api/weather.py new file mode 100644 index 00000000..9249f4f0 --- /dev/null +++ b/services/data/app/api/weather.py @@ -0,0 +1,100 @@ +# ================================================================ +# services/data/app/api/weather.py +# ================================================================ +"""Weather data API endpoints""" + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.ext.asyncio import AsyncSession +from typing import List, Optional +from datetime import datetime, timedelta + +from app.core.database import get_db +from app.core.auth import verify_token +from app.services.weather_service import WeatherService +from app.services.messaging import data_publisher +from app.schemas.external import ( + WeatherDataResponse, + WeatherForecastResponse, + LocationRequest, + DateRangeRequest +) + +router = APIRouter() +weather_service = WeatherService() + +@router.get("/current", response_model=WeatherDataResponse) +async def get_current_weather( + latitude: float = Query(..., description="Latitude"), + longitude: float = Query(..., description="Longitude"), + current_user: dict = Depends(verify_token) +): + """Get current weather for location""" + try: + weather = await weather_service.get_current_weather(latitude, longitude) + if not weather: + raise HTTPException(status_code=404, detail="Weather data not available") + + return weather + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/forecast", response_model=List[WeatherForecastResponse]) +async def get_weather_forecast( + latitude: float = Query(..., description="Latitude"), + longitude: float = Query(..., description="Longitude"), + days: int = Query(7, description="Number of forecast days", ge=1, le=14), + current_user: dict = Depends(verify_token) +): + """Get weather forecast for location""" + try: + forecast = await weather_service.get_weather_forecast(latitude, longitude, days) + + # Publish event + await data_publisher.publish_weather_updated({ + "type": "forecast_requested", + "latitude": latitude, + "longitude": longitude, + "days": days, + "timestamp": datetime.utcnow().isoformat() + }) + + return forecast + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/historical", response_model=List[WeatherDataResponse]) +async def get_historical_weather( + latitude: float = Query(..., description="Latitude"), + longitude: float = Query(..., description="Longitude"), + start_date: datetime = Query(..., description="Start date"), + end_date: datetime = Query(..., description="End date"), + db: AsyncSession = Depends(get_db), + current_user: dict = Depends(verify_token) +): + """Get historical weather data""" + try: + # Validate date range + if end_date <= start_date: + raise HTTPException(status_code=400, detail="End date must be after start date") + + if (end_date - start_date).days > 365: + raise HTTPException(status_code=400, detail="Date range cannot exceed 365 days") + + historical_data = await weather_service.get_historical_weather( + latitude, longitude, start_date, end_date, db + ) + + # Publish event + await data_publisher.publish_weather_updated({ + "type": "historical_requested", + "latitude": latitude, + "longitude": longitude, + "start_date": start_date.isoformat(), + "end_date": end_date.isoformat(), + "records_count": len(historical_data), + "timestamp": datetime.utcnow().isoformat() + }) + + return historical_data + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/services/data/app/core/auth.py b/services/data/app/core/auth.py new file mode 100644 index 00000000..b660d1ce --- /dev/null +++ b/services/data/app/core/auth.py @@ -0,0 +1,36 @@ +# ================================================================ +# services/data/app/core/auth.py +# ================================================================ +"""Authentication utilities for data service""" + +from fastapi import HTTPException, Depends, status +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +import httpx +import structlog + +from app.core.config import settings + +logger = structlog.get_logger() +security = HTTPBearer() + +async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict: + """Verify JWT token with auth service""" + try: + async with httpx.AsyncClient() as client: + response = await client.post( + f"{settings.AUTH_SERVICE_URL}/api/v1/auth/verify", + headers={"Authorization": f"Bearer {credentials.credentials}"} + ) + + if response.status_code == 200: + return response.json() + else: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid authentication credentials" + ) + except httpx.RequestError: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Authentication service unavailable" + ) diff --git a/services/data/app/core/config.py b/services/data/app/core/config.py index ef22dfed..b36ce3df 100644 --- a/services/data/app/core/config.py +++ b/services/data/app/core/config.py @@ -1,30 +1,35 @@ -""" -uLudata service configuration -""" +"""Data service configuration""" -import os from pydantic_settings import BaseSettings +from typing import List class Settings(BaseSettings): - """Application settings""" + # Database + DATABASE_URL: str = "postgresql+asyncpg://data_user:data_pass123@data-db:5432/data_db" - # Basic settings - APP_NAME: str = "uLudata Service" - VERSION: str = "1.0.0" - DEBUG: bool = os.getenv("DEBUG", "False").lower() == "true" - LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO") + # Redis + REDIS_URL: str = "redis://redis:6379/3" - # Database settings - DATABASE_URL: str = os.getenv("DATABASE_URL", "postgresql+asyncpg://data_user:data_pass123@data-db:5432/data_db") + # RabbitMQ + RABBITMQ_URL: str = "amqp://bakery:forecast123@rabbitmq:5672/" - # Redis settings - REDIS_URL: str = os.getenv("REDIS_URL", "redis://redis:6379/0") + # External APIs + AEMET_API_KEY: str = "your-aemet-api-key-here" + MADRID_OPENDATA_API_KEY: str = "your-madrid-opendata-key-here" - # RabbitMQ settings - RABBITMQ_URL: str = os.getenv("RABBITMQ_URL", "amqp://bakery:forecast123@rabbitmq:5672/") + # Service settings + SERVICE_NAME: str = "data-service" + SERVICE_VERSION: str = "1.0.0" - # Service URLs - AUTH_SERVICE_URL: str = os.getenv("AUTH_SERVICE_URL", "http://auth-service:8000") + # Auth + AUTH_SERVICE_URL: str = "http://auth-service:8000" + + # CORS + CORS_ORIGINS: List[str] = ["http://localhost:3000", "http://localhost:3001"] + + # Monitoring + LOG_LEVEL: str = "INFO" + ENABLE_METRICS: bool = True class Config: env_file = ".env" diff --git a/services/data/app/core/database.py b/services/data/app/core/database.py index 07002bf6..07c5ee28 100644 --- a/services/data/app/core/database.py +++ b/services/data/app/core/database.py @@ -1,12 +1,46 @@ -""" -Database configuration for data service -""" +"""Database configuration for data service""" + +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.orm import declarative_base +import structlog -from shared.database.base import DatabaseManager from app.core.config import settings -# Initialize database manager -database_manager = DatabaseManager(settings.DATABASE_URL) +logger = structlog.get_logger() -# Alias for convenience -get_db = database_manager.get_db +# Create async engine +engine = create_async_engine( + settings.DATABASE_URL, + echo=False, + pool_pre_ping=True, + pool_size=10, + max_overflow=20 +) + +# Create async session factory +AsyncSessionLocal = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False +) + +# Base class for models +Base = declarative_base() + +async def get_db() -> AsyncSession: + """Get database session""" + async with AsyncSessionLocal() as session: + try: + yield session + finally: + await session.close() + +async def init_db(): + """Initialize database tables""" + try: + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + logger.info("Database initialized successfully") + except Exception as e: + logger.error("Failed to initialize database", error=str(e)) + raise \ No newline at end of file diff --git a/services/data/app/external/__init__.py b/services/data/app/external/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/data/app/external/aemet.py b/services/data/app/external/aemet.py new file mode 100644 index 00000000..417c5b89 --- /dev/null +++ b/services/data/app/external/aemet.py @@ -0,0 +1,254 @@ +# ================================================================ +# services/data/app/external/aemet.py +# ================================================================ +"""AEMET (Spanish Weather Service) API client""" + +import math +from typing import List, Dict, Any, Optional +from datetime import datetime, timedelta +import structlog + +from app.external.base_client import BaseAPIClient +from app.core.config import settings + +logger = structlog.get_logger() + +class AEMETClient(BaseAPIClient): + + def __init__(self): + super().__init__( + base_url="https://opendata.aemet.es/opendata/api", + api_key=settings.AEMET_API_KEY + ) + + async def get_current_weather(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]: + """Get current weather for coordinates""" + try: + # Find nearest station + station_id = await self._get_nearest_station(latitude, longitude) + if not station_id: + logger.warning("No weather station found", lat=latitude, lon=longitude) + return await self._generate_synthetic_weather() + + # Get current weather from station + endpoint = f"/observacion/convencional/datos/estacion/{station_id}" + response = await self._get(endpoint) + + if response and response.get("datos"): + # Parse AEMET response + weather_data = response["datos"][0] if response["datos"] else {} + return self._parse_weather_data(weather_data) + + # Fallback to synthetic data + return await self._generate_synthetic_weather() + + except Exception as e: + logger.error("Failed to get current weather", error=str(e)) + return await self._generate_synthetic_weather() + + async def get_forecast(self, latitude: float, longitude: float, days: int = 7) -> List[Dict[str, Any]]: + """Get weather forecast for coordinates""" + try: + # Get municipality code for location + municipality_code = await self._get_municipality_code(latitude, longitude) + if not municipality_code: + return await self._generate_synthetic_forecast(days) + + # Get forecast + endpoint = f"/prediccion/especifica/municipio/diaria/{municipality_code}" + response = await self._get(endpoint) + + if response and response.get("datos"): + return self._parse_forecast_data(response["datos"], days) + + # Fallback to synthetic data + return await self._generate_synthetic_forecast(days) + + except Exception as e: + logger.error("Failed to get weather forecast", error=str(e)) + return await self._generate_synthetic_forecast(days) + + async def get_historical_weather(self, + latitude: float, + longitude: float, + start_date: datetime, + end_date: datetime) -> List[Dict[str, Any]]: + """Get historical weather data""" + try: + # For now, generate synthetic historical data + # In production, this would use AEMET historical data API + return await self._generate_synthetic_historical(start_date, end_date) + + except Exception as e: + logger.error("Failed to get historical weather", error=str(e)) + return [] + + async def _get_nearest_station(self, latitude: float, longitude: float) -> Optional[str]: + """Find nearest weather station""" + try: + # Madrid area stations (simplified) + madrid_stations = { + "3195": {"lat": 40.4168, "lon": -3.7038, "name": "Madrid Centro"}, + "3196": {"lat": 40.4518, "lon": -3.7246, "name": "Madrid Norte"}, + "3197": {"lat": 40.3833, "lon": -3.7167, "name": "Madrid Sur"} + } + + closest_station = None + min_distance = float('inf') + + for station_id, station_data in madrid_stations.items(): + distance = self._calculate_distance( + latitude, longitude, + station_data["lat"], station_data["lon"] + ) + if distance < min_distance: + min_distance = distance + closest_station = station_id + + return closest_station + + except Exception as e: + logger.error("Failed to find nearest station", error=str(e)) + return None + + async def _get_municipality_code(self, latitude: float, longitude: float) -> Optional[str]: + """Get municipality code for coordinates""" + # Madrid municipality code + if self._is_in_madrid_area(latitude, longitude): + return "28079" # Madrid municipality code + return None + + def _is_in_madrid_area(self, latitude: float, longitude: float) -> bool: + """Check if coordinates are in Madrid area""" + # Madrid approximate bounds + return (40.3 <= latitude <= 40.6) and (-3.9 <= longitude <= -3.5) + + def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """Calculate distance between two coordinates in km""" + R = 6371 # Earth's radius in km + + dlat = math.radians(lat2 - lat1) + dlon = math.radians(lon2 - lon1) + + a = (math.sin(dlat/2) * math.sin(dlat/2) + + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * + math.sin(dlon/2) * math.sin(dlon/2)) + + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) + distance = R * c + + return distance + + def _parse_weather_data(self, data: Dict) -> Dict[str, Any]: + """Parse AEMET weather data format""" + return { + "date": datetime.now(), + "temperature": data.get("ta", 15.0), # Temperature + "precipitation": data.get("prec", 0.0), # Precipitation + "humidity": data.get("hr", 50.0), # Humidity + "wind_speed": data.get("vv", 10.0), # Wind speed + "pressure": data.get("pres", 1013.0), # Pressure + "description": "Partly cloudy", + "source": "aemet" + } + + def _parse_forecast_data(self, data: List, days: int) -> List[Dict[str, Any]]: + """Parse AEMET forecast data""" + forecast = [] + base_date = datetime.now().date() + + for i in range(min(days, len(data))): + forecast_date = base_date + timedelta(days=i) + day_data = data[i] if i < len(data) else {} + + forecast.append({ + "forecast_date": datetime.combine(forecast_date, datetime.min.time()), + "generated_at": datetime.now(), + "temperature": day_data.get("temperatura", 15.0), + "precipitation": day_data.get("precipitacion", 0.0), + "humidity": day_data.get("humedad", 50.0), + "wind_speed": day_data.get("viento", 10.0), + "description": day_data.get("descripcion", "Partly cloudy"), + "source": "aemet" + }) + + return forecast + + async def _generate_synthetic_weather(self) -> Dict[str, Any]: + """Generate realistic synthetic weather for Madrid""" + now = datetime.now() + month = now.month + hour = now.hour + + # Madrid climate simulation + base_temp = 5 + (month - 1) * 2.5 # Seasonal variation + temp_variation = math.sin((hour - 6) * math.pi / 12) * 8 # Daily variation + temperature = base_temp + temp_variation + + # Rain probability (higher in winter) + rain_prob = 0.3 if month in [11, 12, 1, 2, 3] else 0.1 + precipitation = 2.5 if hash(now.date()) % 100 < rain_prob * 100 else 0.0 + + return { + "date": now, + "temperature": round(temperature, 1), + "precipitation": precipitation, + "humidity": 45 + (month % 6) * 5, + "wind_speed": 8 + (hour % 12), + "pressure": 1013 + math.sin(now.day * 0.2) * 15, + "description": "Lluvioso" if precipitation > 0 else "Soleado", + "source": "synthetic" + } + + async def _generate_synthetic_forecast(self, days: int) -> List[Dict[str, Any]]: + """Generate synthetic forecast data""" + forecast = [] + base_date = datetime.now().date() + + for i in range(days): + forecast_date = base_date + timedelta(days=i) + + # Seasonal temperature + month = forecast_date.month + base_temp = 5 + (month - 1) * 2.5 + temp_variation = (i % 7 - 3) * 2 # Weekly variation + + forecast.append({ + "forecast_date": datetime.combine(forecast_date, datetime.min.time()), + "generated_at": datetime.now(), + "temperature": round(base_temp + temp_variation, 1), + "precipitation": 2.0 if i % 5 == 0 else 0.0, + "humidity": 50 + (i % 30), + "wind_speed": 10 + (i % 15), + "description": "Lluvioso" if i % 5 == 0 else "Soleado", + "source": "synthetic" + }) + + return forecast + + async def _generate_synthetic_historical(self, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + """Generate synthetic historical weather data""" + historical_data = [] + current_date = start_date + + while current_date <= end_date: + month = current_date.month + base_temp = 5 + (month - 1) * 2.5 + + # Add some randomness based on date + temp_variation = math.sin(current_date.day * 0.3) * 5 + + historical_data.append({ + "date": current_date, + "temperature": round(base_temp + temp_variation, 1), + "precipitation": 1.5 if current_date.day % 7 == 0 else 0.0, + "humidity": 45 + (current_date.day % 40), + "wind_speed": 8 + (current_date.day % 20), + "pressure": 1013 + math.sin(current_date.day * 0.2) * 20, + "description": "Variable", + "source": "synthetic" + }) + + current_date += timedelta(days=1) + + return historical_data diff --git a/services/data/app/external/base_client.py b/services/data/app/external/base_client.py new file mode 100644 index 00000000..1d1bd5ab --- /dev/null +++ b/services/data/app/external/base_client.py @@ -0,0 +1,67 @@ +# ================================================================ +# services/data/app/external/base_client.py +# ================================================================ +"""Base HTTP client for external APIs""" + +import httpx +from typing import Dict, Any, Optional +import structlog +from datetime import datetime + +logger = structlog.get_logger() + +class BaseAPIClient: + + def __init__(self, base_url: str, api_key: Optional[str] = None): + self.base_url = base_url + self.api_key = api_key + self.timeout = httpx.Timeout(30.0) + + async def _get(self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]: + """Make GET request""" + try: + url = f"{self.base_url}{endpoint}" + + # Add API key to headers if available + request_headers = headers or {} + if self.api_key: + request_headers["Authorization"] = f"Bearer {self.api_key}" + + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get(url, params=params, headers=request_headers) + response.raise_for_status() + return response.json() + + except httpx.HTTPStatusError as e: + logger.error("HTTP error", status_code=e.response.status_code, url=url) + return None + except httpx.RequestError as e: + logger.error("Request error", error=str(e), url=url) + return None + except Exception as e: + logger.error("Unexpected error", error=str(e), url=url) + return None + + async def _post(self, endpoint: str, data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Optional[Dict[str, Any]]: + """Make POST request""" + try: + url = f"{self.base_url}{endpoint}" + + request_headers = headers or {} + if self.api_key: + request_headers["Authorization"] = f"Bearer {self.api_key}" + + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post(url, json=data, headers=request_headers) + response.raise_for_status() + return response.json() + + except httpx.HTTPStatusError as e: + logger.error("HTTP error", status_code=e.response.status_code, url=url) + return None + except httpx.RequestError as e: + logger.error("Request error", error=str(e), url=url) + return None + except Exception as e: + logger.error("Unexpected error", error=str(e), url=url) + return None diff --git a/services/data/app/external/madrid_opendata.py b/services/data/app/external/madrid_opendata.py new file mode 100644 index 00000000..bec7bdef --- /dev/null +++ b/services/data/app/external/madrid_opendata.py @@ -0,0 +1,235 @@ +# ================================================================ +# services/data/app/external/madrid_opendata.py +# ================================================================ +"""Madrid Open Data API client for traffic and events""" + +import math +from typing import List, Dict, Any, Optional +from datetime import datetime, timedelta +import structlog + +from app.external.base_client import BaseAPIClient +from app.core.config import settings + +logger = structlog.get_logger() + +class MadridOpenDataClient(BaseAPIClient): + + def __init__(self): + super().__init__( + base_url="https://datos.madrid.es/egob/catalogo", + api_key=settings.MADRID_OPENDATA_API_KEY + ) + + async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[Dict[str, Any]]: + """Get current traffic data for location""" + try: + # In production, this would call real Madrid Open Data API + # For now, generate realistic synthetic data + return await self._generate_synthetic_traffic(latitude, longitude) + + except Exception as e: + logger.error("Failed to get current traffic", error=str(e)) + return None + + async def get_historical_traffic(self, + latitude: float, + longitude: float, + start_date: datetime, + end_date: datetime) -> List[Dict[str, Any]]: + """Get historical traffic data""" + try: + # Generate synthetic historical traffic data + return await self._generate_historical_traffic(latitude, longitude, start_date, end_date) + + except Exception as e: + logger.error("Failed to get historical traffic", error=str(e)) + return [] + + async def get_events(self, latitude: float, longitude: float, radius_km: float = 5.0) -> List[Dict[str, Any]]: + """Get events near location""" + try: + # In production, would fetch real events from Madrid Open Data + return await self._generate_synthetic_events(latitude, longitude) + + except Exception as e: + logger.error("Failed to get events", error=str(e)) + return [] + + async def _generate_synthetic_traffic(self, latitude: float, longitude: float) -> Dict[str, Any]: + """Generate realistic Madrid traffic data""" + now = datetime.now() + hour = now.hour + is_weekend = now.weekday() >= 5 + + # Base traffic volume + base_traffic = 100 + + # Madrid traffic patterns + if not is_weekend: # Weekdays + if 7 <= hour <= 9: # Morning rush + traffic_multiplier = 2.2 + congestion = "high" + elif 18 <= hour <= 20: # Evening rush + traffic_multiplier = 2.5 + congestion = "high" + elif 12 <= hour <= 14: # Lunch time + traffic_multiplier = 1.6 + congestion = "medium" + elif 6 <= hour <= 22: # Daytime + traffic_multiplier = 1.2 + congestion = "medium" + else: # Night + traffic_multiplier = 0.4 + congestion = "low" + else: # Weekends + if 11 <= hour <= 14: # Weekend shopping + traffic_multiplier = 1.4 + congestion = "medium" + elif 19 <= hour <= 22: # Weekend evening + traffic_multiplier = 1.6 + congestion = "medium" + else: + traffic_multiplier = 0.8 + congestion = "low" + + # Calculate pedestrian traffic (higher during meal times and school hours) + pedestrian_base = 150 + if 13 <= hour <= 15: # Lunch time + pedestrian_multiplier = 2.8 + elif hour == 14: # School pickup time + pedestrian_multiplier = 3.5 + elif 20 <= hour <= 22: # Dinner time + pedestrian_multiplier = 2.2 + elif 8 <= hour <= 9: # Morning commute + pedestrian_multiplier = 2.0 + else: + pedestrian_multiplier = 1.0 + + traffic_volume = int(base_traffic * traffic_multiplier) + pedestrian_count = int(pedestrian_base * pedestrian_multiplier) + + # Average speed based on congestion + speed_map = {"low": 45, "medium": 25, "high": 15} + average_speed = speed_map[congestion] + (hash(f"{latitude}{longitude}") % 10 - 5) + + return { + "date": now, + "traffic_volume": traffic_volume, + "pedestrian_count": pedestrian_count, + "congestion_level": congestion, + "average_speed": max(10, average_speed), # Minimum 10 km/h + "source": "madrid_opendata" + } + + async def _generate_historical_traffic(self, + latitude: float, + longitude: float, + start_date: datetime, + end_date: datetime) -> List[Dict[str, Any]]: + """Generate synthetic historical traffic data""" + historical_data = [] + current_date = start_date + + while current_date <= end_date: + hour = current_date.hour + is_weekend = current_date.weekday() >= 5 + + # Base patterns similar to current traffic + base_traffic = 100 + + if not is_weekend: + if 7 <= hour <= 9 or 18 <= hour <= 20: + traffic_multiplier = 2.0 + (current_date.day % 5) * 0.1 + elif 12 <= hour <= 14: + traffic_multiplier = 1.5 + else: + traffic_multiplier = 1.0 + else: + traffic_multiplier = 0.7 + (current_date.day % 3) * 0.2 + + # Add seasonal variations + month = current_date.month + seasonal_factor = 1.0 + if month in [12, 1]: # Holiday season + seasonal_factor = 0.8 + elif month in [7, 8]: # Summer vacation + seasonal_factor = 0.9 + + traffic_volume = int(base_traffic * traffic_multiplier * seasonal_factor) + + # Determine congestion level + if traffic_volume > 160: + congestion_level = "high" + avg_speed = 15 + elif traffic_volume > 120: + congestion_level = "medium" + avg_speed = 25 + else: + congestion_level = "low" + avg_speed = 40 + + # Pedestrian count + pedestrian_base = 150 + if 13 <= hour <= 15: + pedestrian_multiplier = 2.5 + elif hour == 14: + pedestrian_multiplier = 3.0 + else: + pedestrian_multiplier = 1.0 + + historical_data.append({ + "date": current_date, + "traffic_volume": traffic_volume, + "pedestrian_count": int(pedestrian_base * pedestrian_multiplier), + "congestion_level": congestion_level, + "average_speed": avg_speed + (current_date.day % 10 - 5), + "source": "madrid_opendata" + }) + + current_date += timedelta(hours=1) + + return historical_data + + async def _generate_synthetic_events(self, latitude: float, longitude: float) -> List[Dict[str, Any]]: + """Generate synthetic Madrid events""" + events = [] + base_date = datetime.now().date() + + # Generate some sample events + sample_events = [ + { + "name": "Mercado de San Miguel", + "type": "market", + "impact_level": "medium", + "distance_km": 1.2 + }, + { + "name": "Concierto en el Retiro", + "type": "concert", + "impact_level": "high", + "distance_km": 2.5 + }, + { + "name": "Partido Real Madrid", + "type": "sports", + "impact_level": "high", + "distance_km": 8.0 + } + ] + + for i, event in enumerate(sample_events): + event_date = base_date + timedelta(days=i + 1) + events.append({ + "id": f"event_{i+1}", + "name": event["name"], + "date": datetime.combine(event_date, datetime.min.time()), + "type": event["type"], + "impact_level": event["impact_level"], + "distance_km": event["distance_km"], + "latitude": latitude + (hash(event["name"]) % 100 - 50) / 1000, + "longitude": longitude + (hash(event["name"]) % 100 - 50) / 1000, + "source": "madrid_opendata" + }) + + return events diff --git a/services/data/app/main.py b/services/data/app/main.py index 7187a0f7..73dbea8a 100644 --- a/services/data/app/main.py +++ b/services/data/app/main.py @@ -1,61 +1,72 @@ """ -uLudata Service +Data Service Main Application +Handles external API integrations (weather, traffic, events) """ import logging -from fastapi import FastAPI +from contextlib import asynccontextmanager +from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +import structlog from app.core.config import settings -from app.core.database import database_manager +from app.core.database import init_db +from app.api.sales import router as sales_router +from app.api.weather import router as weather_router +from app.api.traffic import router as traffic_router +from shared.monitoring.metrics import setup_metrics from shared.monitoring.logging import setup_logging -from shared.monitoring.metrics import MetricsCollector # Setup logging -setup_logging("data-service", "INFO") -logger = logging.getLogger(__name__) +setup_logging("data-service", settings.LOG_LEVEL) +logger = structlog.get_logger() + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan events""" + # Startup + logger.info("Starting Data Service") + await init_db() + yield + # Shutdown + logger.info("Shutting down Data Service") # Create FastAPI app app = FastAPI( - title="uLudata Service", - description="uLudata service for bakery forecasting", - version="1.0.0" + title="Bakery Data Service", + description="External data integration service for weather, traffic, and sales data", + version="1.0.0", + lifespan=lifespan ) -# Initialize metrics collector -metrics_collector = MetricsCollector("data-service") +# Setup metrics +setup_metrics(app) # CORS middleware app.add_middleware( CORSMiddleware, - allow_origins=["*"], + allow_origins=settings.CORS_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) -@app.on_event("startup") -async def startup_event(): - """Application startup""" - logger.info("Starting uLudata Service") - - # Create database tables - await database_manager.create_tables() - - # Start metrics server - metrics_collector.start_metrics_server(8080) - - logger.info("uLudata Service started successfully") +# Include routers +app.include_router(sales_router, prefix="/api/v1/sales", tags=["sales"]) +app.include_router(weather_router, prefix="/api/v1/weather", tags=["weather"]) +app.include_router(traffic_router, prefix="/api/v1/traffic", tags=["traffic"]) @app.get("/health") async def health_check(): """Health check endpoint""" - return { - "status": "healthy", - "service": "data-service", - "version": "1.0.0" - } + return {"status": "healthy", "service": "data-service"} -if __name__ == "__main__": - import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) +@app.exception_handler(Exception) +async def global_exception_handler(request: Request, exc: Exception): + """Global exception handler""" + logger.error("Unhandled exception", exc_info=exc, path=request.url.path) + return JSONResponse( + status_code=500, + content={"detail": "Internal server error"} + ) diff --git a/services/data/app/models/sales.py b/services/data/app/models/sales.py new file mode 100644 index 00000000..a416563c --- /dev/null +++ b/services/data/app/models/sales.py @@ -0,0 +1,31 @@ +# ================================================================ +# services/data/app/models/sales.py +# ================================================================ +"""Sales data models""" + +from sqlalchemy import Column, String, DateTime, Float, Integer, Text, Index +from sqlalchemy.dialects.postgresql import UUID +import uuid +from datetime import datetime + +from app.core.database import Base + +class SalesData(Base): + __tablename__ = "sales_data" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True) + date = Column(DateTime, nullable=False, index=True) + product_name = Column(String(200), nullable=False) + quantity_sold = Column(Integer, nullable=False) + revenue = Column(Float, nullable=True) + location_id = Column(String(100), nullable=True) + source = Column(String(50), nullable=False, default="manual") # manual, pos, csv + raw_data = Column(Text, nullable=True) # Store original data for debugging + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + __table_args__ = ( + Index('idx_sales_tenant_date', 'tenant_id', 'date'), + Index('idx_sales_tenant_product', 'tenant_id', 'product_name'), + ) diff --git a/services/data/app/models/traffic.py b/services/data/app/models/traffic.py new file mode 100644 index 00000000..fd25208c --- /dev/null +++ b/services/data/app/models/traffic.py @@ -0,0 +1,29 @@ +# ================================================================ +# services/data/app/models/traffic.py +# ================================================================ +"""Traffic data models""" + +from sqlalchemy import Column, String, DateTime, Float, Integer, Text, Index +from sqlalchemy.dialects.postgresql import UUID +import uuid +from datetime import datetime + +from app.core.database import Base + +class TrafficData(Base): + __tablename__ = "traffic_data" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + location_id = Column(String(100), nullable=False, index=True) + date = Column(DateTime, nullable=False, index=True) + traffic_volume = Column(Integer, nullable=True) # vehicles per hour + pedestrian_count = Column(Integer, nullable=True) # pedestrians per hour + congestion_level = Column(String(20), nullable=True) # low/medium/high + average_speed = Column(Float, nullable=True) # km/h + source = Column(String(50), nullable=False, default="madrid_opendata") + raw_data = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + + __table_args__ = ( + Index('idx_traffic_location_date', 'location_id', 'date'), + ) diff --git a/services/data/app/models/weather.py b/services/data/app/models/weather.py new file mode 100644 index 00000000..e4ac5d74 --- /dev/null +++ b/services/data/app/models/weather.py @@ -0,0 +1,50 @@ +# ================================================================ +# services/data/app/models/weather.py +# ================================================================ +"""Weather data models""" + +from sqlalchemy import Column, String, DateTime, Float, Integer, Text, Index +from sqlalchemy.dialects.postgresql import UUID +import uuid +from datetime import datetime + +from app.core.database import Base + +class WeatherData(Base): + __tablename__ = "weather_data" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + location_id = Column(String(100), nullable=False, index=True) + date = Column(DateTime, nullable=False, index=True) + temperature = Column(Float, nullable=True) # Celsius + precipitation = Column(Float, nullable=True) # mm + humidity = Column(Float, nullable=True) # percentage + wind_speed = Column(Float, nullable=True) # km/h + pressure = Column(Float, nullable=True) # hPa + description = Column(String(200), nullable=True) + source = Column(String(50), nullable=False, default="aemet") + raw_data = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + + __table_args__ = ( + Index('idx_weather_location_date', 'location_id', 'date'), + ) + +class WeatherForecast(Base): + __tablename__ = "weather_forecasts" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + location_id = Column(String(100), nullable=False, index=True) + forecast_date = Column(DateTime, nullable=False) + generated_at = Column(DateTime, nullable=False, default=datetime.utcnow) + temperature = Column(Float, nullable=True) + precipitation = Column(Float, nullable=True) + humidity = Column(Float, nullable=True) + wind_speed = Column(Float, nullable=True) + description = Column(String(200), nullable=True) + source = Column(String(50), nullable=False, default="aemet") + raw_data = Column(Text, nullable=True) + + __table_args__ = ( + Index('idx_forecast_location_date', 'location_id', 'forecast_date'), + ) \ No newline at end of file diff --git a/services/data/app/schemas/external.py b/services/data/app/schemas/external.py new file mode 100644 index 00000000..72bd6488 --- /dev/null +++ b/services/data/app/schemas/external.py @@ -0,0 +1,45 @@ +# ================================================================ +# services/data/app/schemas/external.py +# ================================================================ +"""External API response schemas""" + +from pydantic import BaseModel +from datetime import datetime +from typing import Optional, List + +class WeatherDataResponse(BaseModel): + date: datetime + temperature: Optional[float] + precipitation: Optional[float] + humidity: Optional[float] + wind_speed: Optional[float] + pressure: Optional[float] + description: Optional[str] + source: str + +class WeatherForecastResponse(BaseModel): + forecast_date: datetime + generated_at: datetime + temperature: Optional[float] + precipitation: Optional[float] + humidity: Optional[float] + wind_speed: Optional[float] + description: Optional[str] + source: str + +class TrafficDataResponse(BaseModel): + date: datetime + traffic_volume: Optional[int] + pedestrian_count: Optional[int] + congestion_level: Optional[str] + average_speed: Optional[float] + source: str + +class LocationRequest(BaseModel): + latitude: float + longitude: float + address: Optional[str] = None + +class DateRangeRequest(BaseModel): + start_date: datetime + end_date: datetime diff --git a/services/data/app/schemas/sales.py b/services/data/app/schemas/sales.py new file mode 100644 index 00000000..d2068087 --- /dev/null +++ b/services/data/app/schemas/sales.py @@ -0,0 +1,46 @@ +# ================================================================ +# services/data/app/schemas/sales.py +# ================================================================ +"""Sales data schemas""" + +from pydantic import BaseModel, validator +from datetime import datetime +from typing import Optional, List +import uuid + +class SalesDataCreate(BaseModel): + tenant_id: uuid.UUID + date: datetime + product_name: str + quantity_sold: int + revenue: Optional[float] = None + location_id: Optional[str] = None + source: str = "manual" + raw_data: Optional[str] = None + +class SalesDataResponse(BaseModel): + id: uuid.UUID + tenant_id: uuid.UUID + date: datetime + product_name: str + quantity_sold: int + revenue: Optional[float] + location_id: Optional[str] + source: str + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + +class SalesDataImport(BaseModel): + tenant_id: uuid.UUID + data_format: str # csv, excel, pos + data: str # Base64 encoded or JSON string + +class SalesDataQuery(BaseModel): + tenant_id: uuid.UUID + start_date: datetime + end_date: datetime + product_name: Optional[str] = None + location_id: Optional[str] = None \ No newline at end of file diff --git a/services/data/app/services/data_import_service.py b/services/data/app/services/data_import_service.py new file mode 100644 index 00000000..17e3c69f --- /dev/null +++ b/services/data/app/services/data_import_service.py @@ -0,0 +1,648 @@ +# ================================================================ +# services/data/app/services/data_import_service.py +# ================================================================ +"""Data import service for various formats""" + +import csv +import io +import json +import base64 +import openpyxl +import pandas as pd +from typing import Dict, Any, List, Optional, Union +from datetime import datetime, timedelta +from sqlalchemy.ext.asyncio import AsyncSession +import structlog +import re +from pathlib import Path + +from app.services.sales_service import SalesService +from app.schemas.sales import SalesDataCreate + +logger = structlog.get_logger() + +class DataImportService: + """ + Service for importing sales data from various formats. + Supports CSV, Excel, JSON, and direct data entry. + """ + + # Common column mappings for different languages/formats + COLUMN_MAPPINGS = { + # Date columns + 'date': ['date', 'fecha', 'datum', 'data', 'dia'], + 'datetime': ['datetime', 'fecha_hora', 'timestamp'], + + # Product columns + 'product': ['product', 'producto', 'item', 'articulo', 'nombre', 'name'], + 'product_name': ['product_name', 'nombre_producto', 'item_name'], + + # Quantity columns + 'quantity': ['quantity', 'cantidad', 'qty', 'units', 'unidades'], + 'quantity_sold': ['quantity_sold', 'cantidad_vendida', 'sold'], + + # Revenue columns + 'revenue': ['revenue', 'ingresos', 'sales', 'ventas', 'total', 'importe'], + 'price': ['price', 'precio', 'cost', 'coste'], + + # Location columns + 'location': ['location', 'ubicacion', 'tienda', 'store', 'punto_venta'], + 'location_id': ['location_id', 'store_id', 'tienda_id'], + } + + # Date formats to try + DATE_FORMATS = [ + '%Y-%m-%d', # 2024-01-15 + '%d/%m/%Y', # 15/01/2024 + '%m/%d/%Y', # 01/15/2024 + '%d-%m-%Y', # 15-01-2024 + '%m-%d-%Y', # 01-15-2024 + '%d.%m.%Y', # 15.01.2024 + '%Y/%m/%d', # 2024/01/15 + '%d/%m/%y', # 15/01/24 + '%m/%d/%y', # 01/15/24 + '%Y-%m-%d %H:%M:%S', # 2024-01-15 14:30:00 + '%d/%m/%Y %H:%M', # 15/01/2024 14:30 + ] + + @staticmethod + async def process_upload(tenant_id: str, + file_content: str, + file_format: str, + db: AsyncSession, + filename: Optional[str] = None) -> Dict[str, Any]: + """Process uploaded data file""" + try: + logger.info("Starting data import", + tenant_id=tenant_id, + format=file_format, + filename=filename) + + if file_format.lower() == 'csv': + return await DataImportService._process_csv(tenant_id, file_content, db, filename) + elif file_format.lower() in ['xlsx', 'excel', 'xls']: + return await DataImportService._process_excel(tenant_id, file_content, db, filename) + elif file_format.lower() == 'json': + return await DataImportService._process_json(tenant_id, file_content, db, filename) + elif file_format.lower() == 'pos': + return await DataImportService._process_pos_data(tenant_id, file_content, db, filename) + else: + return { + "success": False, + "error": f"Formato no soportado: {file_format}. Formatos válidos: CSV, Excel, JSON, POS" + } + except Exception as e: + logger.error("Data import failed", error=str(e), tenant_id=tenant_id) + return { + "success": False, + "error": f"Error en la importación: {str(e)}" + } + + @staticmethod + async def _process_csv(tenant_id: str, csv_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]: + """Process CSV file with intelligent column mapping""" + try: + # Handle base64 encoded content + if csv_content.startswith('data:'): + csv_content = base64.b64decode(csv_content.split(',')[1]).decode('utf-8') + + # Try different encodings if UTF-8 fails + encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] + df = None + + for encoding in encodings: + try: + csv_buffer = io.StringIO(csv_content) + df = pd.read_csv(csv_buffer, encoding=encoding) + break + except UnicodeDecodeError: + continue + + if df is None: + return {"success": False, "error": "No se pudo leer el archivo CSV con ninguna codificación"} + + # Clean column names + df.columns = df.columns.str.strip().str.lower() + + # Map columns to standard names + column_mapping = DataImportService._detect_columns(df.columns.tolist()) + + if not column_mapping.get('date') or not column_mapping.get('product'): + return { + "success": False, + "error": f"Columnas requeridas no encontradas. Detectadas: {list(df.columns)}. Se requieren: fecha y producto" + } + + # Process records + return await DataImportService._process_dataframe( + tenant_id, df, column_mapping, db, "csv", filename + ) + + except Exception as e: + logger.error("CSV processing failed", error=str(e)) + return {"success": False, "error": f"Error procesando CSV: {str(e)}"} + + @staticmethod + async def _process_excel(tenant_id: str, excel_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]: + """Process Excel file""" + try: + # Decode base64 content + if excel_content.startswith('data:'): + excel_bytes = base64.b64decode(excel_content.split(',')[1]) + else: + excel_bytes = base64.b64decode(excel_content) + + # Read Excel file - try first sheet + try: + df = pd.read_excel(io.BytesIO(excel_bytes), sheet_name=0) + except Exception as e: + # If pandas fails, try openpyxl directly + workbook = openpyxl.load_workbook(io.BytesIO(excel_bytes)) + sheet = workbook.active + + # Convert to DataFrame + data = [] + headers = None + for row in sheet.iter_rows(values_only=True): + if headers is None: + headers = [str(cell).strip().lower() if cell else f"col_{i}" for i, cell in enumerate(row)] + else: + data.append(row) + + df = pd.DataFrame(data, columns=headers) + + # Clean column names + df.columns = df.columns.str.strip().str.lower() + + # Remove empty rows + df = df.dropna(how='all') + + # Map columns + column_mapping = DataImportService._detect_columns(df.columns.tolist()) + + if not column_mapping.get('date') or not column_mapping.get('product'): + return { + "success": False, + "error": f"Columnas requeridas no encontradas en Excel. Detectadas: {list(df.columns)}" + } + + return await DataImportService._process_dataframe( + tenant_id, df, column_mapping, db, "excel", filename + ) + + except Exception as e: + logger.error("Excel processing failed", error=str(e)) + return {"success": False, "error": f"Error procesando Excel: {str(e)}"} + + @staticmethod + async def _process_json(tenant_id: str, json_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]: + """Process JSON file""" + try: + # Parse JSON + if json_content.startswith('data:'): + json_content = base64.b64decode(json_content.split(',')[1]).decode('utf-8') + + data = json.loads(json_content) + + # Handle different JSON structures + if isinstance(data, dict): + if 'data' in data: + records = data['data'] + elif 'records' in data: + records = data['records'] + elif 'sales' in data: + records = data['sales'] + else: + records = [data] # Single record + elif isinstance(data, list): + records = data + else: + return {"success": False, "error": "Formato JSON no válido"} + + # Convert to DataFrame for consistent processing + df = pd.DataFrame(records) + df.columns = df.columns.str.strip().str.lower() + + # Map columns + column_mapping = DataImportService._detect_columns(df.columns.tolist()) + + if not column_mapping.get('date') or not column_mapping.get('product'): + return { + "success": False, + "error": f"Columnas requeridas no encontradas en JSON. Detectadas: {list(df.columns)}" + } + + return await DataImportService._process_dataframe( + tenant_id, df, column_mapping, db, "json", filename + ) + + except json.JSONDecodeError as e: + return {"success": False, "error": f"JSON inválido: {str(e)}"} + except Exception as e: + logger.error("JSON processing failed", error=str(e)) + return {"success": False, "error": f"Error procesando JSON: {str(e)}"} + + @staticmethod + async def _process_pos_data(tenant_id: str, pos_content: str, db: AsyncSession, filename: Optional[str] = None) -> Dict[str, Any]: + """Process POS (Point of Sale) system data""" + try: + # POS data often comes in specific formats + # This is a generic parser that can be customized for specific POS systems + + if pos_content.startswith('data:'): + pos_content = base64.b64decode(pos_content.split(',')[1]).decode('utf-8') + + lines = pos_content.strip().split('\n') + records = [] + + for line_num, line in enumerate(lines, 1): + try: + # Skip empty lines and headers + if not line.strip() or line.startswith('#') or 'TOTAL' in line.upper(): + continue + + # Try different delimiters + for delimiter in ['\t', ';', '|', ',']: + if delimiter in line: + parts = line.split(delimiter) + if len(parts) >= 3: # At least date, product, quantity + records.append({ + 'date': parts[0].strip(), + 'product': parts[1].strip(), + 'quantity': parts[2].strip(), + 'revenue': parts[3].strip() if len(parts) > 3 else None, + 'line_number': line_num + }) + break + + except Exception as e: + logger.warning(f"Skipping POS line {line_num}: {e}") + continue + + if not records: + return {"success": False, "error": "No se encontraron datos válidos en el archivo POS"} + + # Convert to DataFrame + df = pd.DataFrame(records) + + # Standard column mapping for POS + column_mapping = { + 'date': 'date', + 'product': 'product', + 'quantity': 'quantity', + 'revenue': 'revenue' + } + + return await DataImportService._process_dataframe( + tenant_id, df, column_mapping, db, "pos", filename + ) + + except Exception as e: + logger.error("POS processing failed", error=str(e)) + return {"success": False, "error": f"Error procesando datos POS: {str(e)}"} + + @staticmethod + async def _process_dataframe(tenant_id: str, + df: pd.DataFrame, + column_mapping: Dict[str, str], + db: AsyncSession, + source: str, + filename: Optional[str] = None) -> Dict[str, Any]: + """Process DataFrame with mapped columns""" + try: + records_created = 0 + errors = [] + warnings = [] + skipped = 0 + + logger.info(f"Processing {len(df)} records from {source}") + + for index, row in df.iterrows(): + try: + # Extract and validate date + date_str = str(row.get(column_mapping['date'], '')).strip() + if not date_str or date_str.lower() in ['nan', 'null', 'none', '']: + errors.append(f"Fila {index + 1}: Fecha faltante") + skipped += 1 + continue + + date = DataImportService._parse_date(date_str) + if not date: + errors.append(f"Fila {index + 1}: Formato de fecha inválido: {date_str}") + skipped += 1 + continue + + # Extract and validate product name + product_name = str(row.get(column_mapping['product'], '')).strip() + if not product_name or product_name.lower() in ['nan', 'null', 'none', '']: + errors.append(f"Fila {index + 1}: Nombre de producto faltante") + skipped += 1 + continue + + # Clean product name + product_name = DataImportService._clean_product_name(product_name) + + # Extract and validate quantity + quantity_raw = row.get(column_mapping.get('quantity', 'quantity'), 0) + try: + quantity = int(float(str(quantity_raw).replace(',', '.'))) + if quantity <= 0: + warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity}), usando 1") + quantity = 1 + except (ValueError, TypeError): + warnings.append(f"Fila {index + 1}: Cantidad inválida ({quantity_raw}), usando 1") + quantity = 1 + + # Extract revenue (optional) + revenue = None + if 'revenue' in column_mapping and column_mapping['revenue'] in row: + revenue_raw = row.get(column_mapping['revenue']) + if revenue_raw and str(revenue_raw).lower() not in ['nan', 'null', 'none', '']: + try: + revenue = float(str(revenue_raw).replace(',', '.').replace('€', '').replace('$', '').strip()) + if revenue < 0: + revenue = None + warnings.append(f"Fila {index + 1}: Ingreso negativo ignorado") + except (ValueError, TypeError): + warnings.append(f"Fila {index + 1}: Formato de ingreso inválido: {revenue_raw}") + + # Extract location (optional) + location_id = None + if 'location' in column_mapping and column_mapping['location'] in row: + location_raw = row.get(column_mapping['location']) + if location_raw and str(location_raw).lower() not in ['nan', 'null', 'none', '']: + location_id = str(location_raw).strip() + + # Create sales record + sales_data = SalesDataCreate( + tenant_id=tenant_id, + date=date, + product_name=product_name, + quantity_sold=quantity, + revenue=revenue, + location_id=location_id, + source=source, + raw_data=json.dumps({ + **row.to_dict(), + "original_row": index + 1, + "filename": filename + }) + ) + + await SalesService.create_sales_record(sales_data, db) + records_created += 1 + + # Log progress for large imports + if records_created % 100 == 0: + logger.info(f"Processed {records_created} records...") + + except Exception as e: + error_msg = f"Fila {index + 1}: {str(e)}" + errors.append(error_msg) + logger.warning("Record processing failed", error=error_msg) + continue + + # Calculate success rate + total_processed = records_created + skipped + success_rate = (records_created / len(df)) * 100 if len(df) > 0 else 0 + + result = { + "success": True, + "records_created": records_created, + "total_rows": len(df), + "skipped": skipped, + "success_rate": round(success_rate, 1), + "errors": errors[:10], # Limit to first 10 errors + "warnings": warnings[:10], # Limit to first 10 warnings + "source": source, + "filename": filename + } + + if errors: + result["error_count"] = len(errors) + if len(errors) > 10: + result["errors"].append(f"... y {len(errors) - 10} errores más") + + if warnings: + result["warning_count"] = len(warnings) + if len(warnings) > 10: + result["warnings"].append(f"... y {len(warnings) - 10} advertencias más") + + logger.info("Data processing completed", + records_created=records_created, + total_rows=len(df), + success_rate=success_rate) + + return result + + except Exception as e: + logger.error("DataFrame processing failed", error=str(e)) + return { + "success": False, + "error": f"Error procesando datos: {str(e)}", + "records_created": 0 + } + + @staticmethod + def _detect_columns(columns: List[str]) -> Dict[str, str]: + """Detect column mappings using fuzzy matching""" + mapping = {} + columns_lower = [col.lower() for col in columns] + + for standard_name, possible_names in DataImportService.COLUMN_MAPPINGS.items(): + for col in columns_lower: + for possible in possible_names: + if possible in col or col in possible: + mapping[standard_name] = columns[columns_lower.index(col)] + break + if standard_name in mapping: + break + + # Map common aliases + if 'product' not in mapping and 'product_name' in mapping: + mapping['product'] = mapping['product_name'] + if 'quantity' not in mapping and 'quantity_sold' in mapping: + mapping['quantity'] = mapping['quantity_sold'] + if 'location' not in mapping and 'location_id' in mapping: + mapping['location'] = mapping['location_id'] + + return mapping + + @staticmethod + def _parse_date(date_str: str) -> Optional[datetime]: + """Parse date string with multiple format attempts""" + if not date_str or str(date_str).lower() in ['nan', 'null', 'none']: + return None + + # Clean date string + date_str = str(date_str).strip() + + # Try pandas first (handles most formats automatically) + try: + return pd.to_datetime(date_str, dayfirst=True) + except: + pass + + # Try specific formats + for fmt in DataImportService.DATE_FORMATS: + try: + return datetime.strptime(date_str, fmt) + except ValueError: + continue + + # Try extracting numbers and common patterns + try: + # Look for patterns like dd/mm/yyyy or dd-mm-yyyy + date_pattern = re.search(r'(\d{1,2})[/\-.](\d{1,2})[/\-.](\d{2,4})', date_str) + if date_pattern: + day, month, year = date_pattern.groups() + + # Convert 2-digit year to 4-digit + year = int(year) + if year < 50: + year += 2000 + elif year < 100: + year += 1900 + + return datetime(year, int(month), int(day)) + except: + pass + + logger.warning(f"Could not parse date: {date_str}") + return None + + @staticmethod + def _clean_product_name(product_name: str) -> str: + """Clean and standardize product names""" + if not product_name: + return "Producto sin nombre" + + # Remove extra whitespace + cleaned = re.sub(r'\s+', ' ', str(product_name).strip()) + + # Remove special characters but keep Spanish characters + cleaned = re.sub(r'[^\w\s\-áéíóúñçüÁÉÍÓÚÑÇÜ]', '', cleaned) + + # Capitalize first letter of each word + cleaned = cleaned.title() + + # Common product name corrections for Spanish bakeries + replacements = { + 'Pan De': 'Pan de', + 'Café Con': 'Café con', + 'Te ': 'Té ', + 'Bocadillo De': 'Bocadillo de', + } + + for old, new in replacements.items(): + cleaned = cleaned.replace(old, new) + + return cleaned if cleaned else "Producto sin nombre" + + @staticmethod + async def validate_import_data(data: Dict[str, Any]) -> Dict[str, Any]: + """Validate import data before processing""" + validation_result = { + "valid": True, + "errors": [], + "warnings": [], + "suggestions": [] + } + + # Check required fields + if not data.get("tenant_id"): + validation_result["errors"].append("tenant_id es requerido") + validation_result["valid"] = False + + if not data.get("data"): + validation_result["errors"].append("Datos faltantes") + validation_result["valid"] = False + + # Check file format + format_type = data.get("data_format", "").lower() + if format_type not in ["csv", "excel", "xlsx", "xls", "json", "pos"]: + validation_result["errors"].append(f"Formato no soportado: {format_type}") + validation_result["valid"] = False + + # Check data size (prevent very large uploads) + data_content = data.get("data", "") + if len(data_content) > 10 * 1024 * 1024: # 10MB limit + validation_result["errors"].append("Archivo demasiado grande (máximo 10MB)") + validation_result["valid"] = False + + # Suggestions for better imports + if len(data_content) > 1024 * 1024: # 1MB + validation_result["suggestions"].append("Archivo grande detectado. Considere dividir en archivos más pequeños para mejor rendimiento.") + + return validation_result + + @staticmethod + async def get_import_template(format_type: str = "csv") -> Dict[str, Any]: + """Generate import template for specified format""" + try: + # Sample data for template + sample_data = [ + { + "fecha": "15/01/2024", + "producto": "Pan Integral", + "cantidad": 25, + "ingresos": 37.50, + "ubicacion": "madrid_centro" + }, + { + "fecha": "15/01/2024", + "producto": "Croissant", + "cantidad": 15, + "ingresos": 22.50, + "ubicacion": "madrid_centro" + }, + { + "fecha": "15/01/2024", + "producto": "Café con Leche", + "cantidad": 42, + "ingresos": 84.00, + "ubicacion": "madrid_centro" + } + ] + + if format_type.lower() == "csv": + # Generate CSV template + output = io.StringIO() + df = pd.DataFrame(sample_data) + df.to_csv(output, index=False) + + return { + "template": output.getvalue(), + "content_type": "text/csv", + "filename": "plantilla_ventas.csv" + } + + elif format_type.lower() == "json": + return { + "template": json.dumps(sample_data, indent=2, ensure_ascii=False), + "content_type": "application/json", + "filename": "plantilla_ventas.json" + } + + elif format_type.lower() in ["excel", "xlsx"]: + # Generate Excel template + output = io.BytesIO() + df = pd.DataFrame(sample_data) + df.to_excel(output, index=False, sheet_name="Ventas") + + return { + "template": base64.b64encode(output.getvalue()).decode(), + "content_type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "filename": "plantilla_ventas.xlsx" + } + + else: + return { + "error": f"Formato de plantilla no soportado: {format_type}" + } + + except Exception as e: + logger.error("Template generation failed", error=str(e)) + return { + "error": f"Error generando plantilla: {str(e)}" + } \ No newline at end of file diff --git a/services/data/app/services/messaging.py b/services/data/app/services/messaging.py new file mode 100644 index 00000000..f9e30896 --- /dev/null +++ b/services/data/app/services/messaging.py @@ -0,0 +1,168 @@ +# ================================================================ +# services/data/app/services/messaging.py +# ================================================================ +"""Message queue service for data events""" + +import json +import asyncio +from typing import Dict, Any, Optional +import structlog + +try: + from aio_pika import connect_robust, Message, ExchangeType + AIO_PIKA_AVAILABLE = True +except ImportError: + AIO_PIKA_AVAILABLE = False + +from app.core.config import settings + +logger = structlog.get_logger() + +class DataEventPublisher: + """ + Event publisher for data service events. + Falls back gracefully if RabbitMQ is not available. + """ + + def __init__(self): + self.connection = None + self.channel = None + self.exchange = None + self.connected = False + + async def connect(self): + """Connect to RabbitMQ""" + if not AIO_PIKA_AVAILABLE: + logger.warning("aio-pika not available, messaging disabled") + return + + try: + self.connection = await connect_robust(settings.RABBITMQ_URL) + self.channel = await self.connection.channel() + + # Declare exchange for data events + self.exchange = await self.channel.declare_exchange( + "data.events", + ExchangeType.TOPIC, + durable=True + ) + + self.connected = True + logger.info("Connected to RabbitMQ for data events") + + except Exception as e: + logger.warning("Failed to connect to RabbitMQ", error=str(e)) + self.connected = False + + async def disconnect(self): + """Disconnect from RabbitMQ""" + if self.connection and not self.connection.is_closed: + await self.connection.close() + self.connected = False + logger.info("Disconnected from RabbitMQ") + + async def publish_data_imported(self, event_data: Dict[str, Any]): + """Publish data imported event""" + await self._publish_event("data.imported", event_data) + + async def publish_weather_updated(self, event_data: Dict[str, Any]): + """Publish weather data updated event""" + await self._publish_event("weather.updated", event_data) + + async def publish_traffic_updated(self, event_data: Dict[str, Any]): + """Publish traffic data updated event""" + await self._publish_event("traffic.updated", event_data) + + async def publish_sales_created(self, event_data: Dict[str, Any]): + """Publish sales record created event""" + await self._publish_event("sales.created", event_data) + + async def publish_import_completed(self, event_data: Dict[str, Any]): + """Publish import process completed event""" + await self._publish_event("import.completed", event_data) + + async def _publish_event(self, routing_key: str, data: Dict[str, Any]): + """Publish event to exchange""" + try: + # If not connected, try to connect + if not self.connected: + await self.connect() + + # If still not connected, log and return + if not self.connected: + logger.debug("Message not sent - RabbitMQ unavailable", routing_key=routing_key) + return + + # Prepare message + message_body = json.dumps(data, default=str) + message = Message( + message_body.encode(), + content_type="application/json", + delivery_mode=2 # Persistent + ) + + # Publish to exchange + await self.exchange.publish( + message, + routing_key=routing_key + ) + + logger.debug("Event published", routing_key=routing_key, data_size=len(message_body)) + + except Exception as e: + logger.error("Failed to publish event", + routing_key=routing_key, + error=str(e)) + # Reset connection on error + self.connected = False + +class MockDataEventPublisher: + """ + Mock publisher for development/testing when RabbitMQ is not available + """ + + async def connect(self): + logger.info("Mock publisher - connect called") + + async def disconnect(self): + logger.info("Mock publisher - disconnect called") + + async def publish_data_imported(self, event_data: Dict[str, Any]): + logger.debug("Mock publish - data imported", event_data=event_data) + + async def publish_weather_updated(self, event_data: Dict[str, Any]): + logger.debug("Mock publish - weather updated", event_data=event_data) + + async def publish_traffic_updated(self, event_data: Dict[str, Any]): + logger.debug("Mock publish - traffic updated", event_data=event_data) + + async def publish_sales_created(self, event_data: Dict[str, Any]): + logger.debug("Mock publish - sales created", event_data=event_data) + + async def publish_import_completed(self, event_data: Dict[str, Any]): + logger.debug("Mock publish - import completed", event_data=event_data) + +# Global publisher instance +# Use mock if RabbitMQ is not available or in development mode +if AIO_PIKA_AVAILABLE and hasattr(settings, 'RABBITMQ_URL') and settings.RABBITMQ_URL: + data_publisher = DataEventPublisher() +else: + logger.info("Using mock data publisher") + data_publisher = MockDataEventPublisher() + +# Ensure connection is established +async def init_messaging(): + """Initialize messaging connection""" + try: + await data_publisher.connect() + except Exception as e: + logger.warning("Failed to initialize messaging", error=str(e)) + +# Cleanup function +async def cleanup_messaging(): + """Cleanup messaging connection""" + try: + if hasattr(data_publisher, 'disconnect'): + await data_publisher.disconnect() + except Exception as e: + logger.warning("Failed to cleanup messaging", error=str(e)) \ No newline at end of file diff --git a/services/data/app/services/sales_service.py b/services/data/app/services/sales_service.py new file mode 100644 index 00000000..87f29839 --- /dev/null +++ b/services/data/app/services/sales_service.py @@ -0,0 +1,108 @@ +# ================================================================ +# services/data/app/services/sales_service.py +# ================================================================ +"""Sales data service""" + +import csv +import io +import json +from typing import List, Dict, Any, Optional +from datetime import datetime +import pandas as pd +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, and_ +import structlog + +from app.models.sales import SalesData +from app.schemas.sales import SalesDataCreate, SalesDataResponse, SalesDataQuery + +logger = structlog.get_logger() + +class SalesService: + + @staticmethod + async def create_sales_record(data: SalesDataCreate, db: AsyncSession) -> SalesData: + """Create a new sales record""" + sales_record = SalesData(**data.model_dump()) + db.add(sales_record) + await db.commit() + await db.refresh(sales_record) + return sales_record + + @staticmethod + async def get_sales_data(query: SalesDataQuery, db: AsyncSession) -> List[SalesData]: + """Get sales data by query parameters""" + stmt = select(SalesData).where( + and_( + SalesData.tenant_id == query.tenant_id, + SalesData.date >= query.start_date, + SalesData.date <= query.end_date + ) + ) + + if query.product_name: + stmt = stmt.where(SalesData.product_name.ilike(f"%{query.product_name}%")) + + if query.location_id: + stmt = stmt.where(SalesData.location_id == query.location_id) + + result = await db.execute(stmt) + return result.scalars().all() + + @staticmethod + async def import_csv_data(tenant_id: str, csv_content: str, db: AsyncSession) -> Dict[str, Any]: + """Import sales data from CSV""" + try: + # Parse CSV + csv_file = io.StringIO(csv_content) + df = pd.read_csv(csv_file) + + # Validate and clean data + records_created = 0 + errors = [] + + for index, row in df.iterrows(): + try: + # Parse date (handle multiple formats) + date_str = str(row.get('date', row.get('fecha', ''))) + date = pd.to_datetime(date_str, dayfirst=True) + + # Clean product name + product_name = str(row.get('product', row.get('producto', ''))).strip() + + # Parse quantity + quantity = int(row.get('quantity', row.get('cantidad', 0))) + + # Parse revenue (optional) + revenue = None + revenue_col = row.get('revenue', row.get('ingresos', None)) + if revenue_col and pd.notna(revenue_col): + revenue = float(revenue_col) + + # Create sales record + sales_data = SalesDataCreate( + tenant_id=tenant_id, + date=date, + product_name=product_name, + quantity_sold=quantity, + revenue=revenue, + source="csv", + raw_data=json.dumps(row.to_dict()) + ) + + await SalesService.create_sales_record(sales_data, db) + records_created += 1 + + except Exception as e: + errors.append(f"Row {index + 1}: {str(e)}") + continue + + return { + "success": True, + "records_created": records_created, + "errors": errors, + "total_records": len(data) + } + + except Exception as e: + return {"success": False, "error": f"JSON processing failed: {str(e)}"} diff --git a/services/data/app/services/traffic_service.py b/services/data/app/services/traffic_service.py new file mode 100644 index 00000000..1c5e1a01 --- /dev/null +++ b/services/data/app/services/traffic_service.py @@ -0,0 +1,90 @@ +# ================================================================ +# services/data/app/services/traffic_service.py +# ================================================================ +"""Traffic data service""" + +from typing import List, Dict, Any, Optional +from datetime import datetime, timedelta +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, and_ +import structlog + +from app.models.traffic import TrafficData +from app.external.madrid_opendata import MadridOpenDataClient +from app.schemas.external import TrafficDataResponse + +logger = structlog.get_logger() + +class TrafficService: + + def __init__(self): + self.madrid_client = MadridOpenDataClient() + + async def get_current_traffic(self, latitude: float, longitude: float) -> Optional[TrafficDataResponse]: + """Get current traffic data for location""" + try: + traffic_data = await self.madrid_client.get_current_traffic(latitude, longitude) + if traffic_data: + return TrafficDataResponse(**traffic_data) + return None + except Exception as e: + logger.error("Failed to get current traffic", error=str(e)) + return None + + async def get_historical_traffic(self, + latitude: float, + longitude: float, + start_date: datetime, + end_date: datetime, + db: AsyncSession) -> List[TrafficDataResponse]: + """Get historical traffic data""" + try: + # Check database first + location_id = f"{latitude:.4f},{longitude:.4f}" + stmt = select(TrafficData).where( + and_( + TrafficData.location_id == location_id, + TrafficData.date >= start_date, + TrafficData.date <= end_date + ) + ).order_by(TrafficData.date) + + result = await db.execute(stmt) + db_records = result.scalars().all() + + if db_records: + return [TrafficDataResponse( + date=record.date, + traffic_volume=record.traffic_volume, + pedestrian_count=record.pedestrian_count, + congestion_level=record.congestion_level, + average_speed=record.average_speed, + source=record.source + ) for record in db_records] + + # Fetch from API if not in database + traffic_data = await self.madrid_client.get_historical_traffic( + latitude, longitude, start_date, end_date + ) + + # Store in database + for data in traffic_data: + traffic_record = TrafficData( + location_id=location_id, + date=data['date'], + traffic_volume=data.get('traffic_volume'), + pedestrian_count=data.get('pedestrian_count'), + congestion_level=data.get('congestion_level'), + average_speed=data.get('average_speed'), + source="madrid_opendata", + raw_data=str(data) + ) + db.add(traffic_record) + + await db.commit() + + return [TrafficDataResponse(**item) for item in traffic_data] + + except Exception as e: + logger.error("Failed to get historical traffic", error=str(e)) + return [] diff --git a/services/data/app/services/weather_service.py b/services/data/app/services/weather_service.py new file mode 100644 index 00000000..8486f4c8 --- /dev/null +++ b/services/data/app/services/weather_service.py @@ -0,0 +1,103 @@ +# ================================================================ +# services/data/app/services/weather_service.py +# ================================================================ +"""Weather data service""" + +from typing import List, Dict, Any, Optional +from datetime import datetime, timedelta +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, and_ +import structlog + +from app.models.weather import WeatherData, WeatherForecast +from app.external.aemet import AEMETClient +from app.schemas.external import WeatherDataResponse, WeatherForecastResponse + +logger = structlog.get_logger() + +class WeatherService: + + def __init__(self): + self.aemet_client = AEMETClient() + + async def get_current_weather(self, latitude: float, longitude: float) -> Optional[WeatherDataResponse]: + """Get current weather for location""" + try: + weather_data = await self.aemet_client.get_current_weather(latitude, longitude) + if weather_data: + return WeatherDataResponse(**weather_data) + return None + except Exception as e: + logger.error("Failed to get current weather", error=str(e)) + return None + + async def get_weather_forecast(self, latitude: float, longitude: float, days: int = 7) -> List[WeatherForecastResponse]: + """Get weather forecast for location""" + try: + forecast_data = await self.aemet_client.get_forecast(latitude, longitude, days) + return [WeatherForecastResponse(**item) for item in forecast_data] + except Exception as e: + logger.error("Failed to get weather forecast", error=str(e)) + return [] + + async def get_historical_weather(self, + latitude: float, + longitude: float, + start_date: datetime, + end_date: datetime, + db: AsyncSession) -> List[WeatherDataResponse]: + """Get historical weather data""" + try: + # First check database + location_id = f"{latitude:.4f},{longitude:.4f}" + stmt = select(WeatherData).where( + and_( + WeatherData.location_id == location_id, + WeatherData.date >= start_date, + WeatherData.date <= end_date + ) + ).order_by(WeatherData.date) + + result = await db.execute(stmt) + db_records = result.scalars().all() + + if db_records: + return [WeatherDataResponse( + date=record.date, + temperature=record.temperature, + precipitation=record.precipitation, + humidity=record.humidity, + wind_speed=record.wind_speed, + pressure=record.pressure, + description=record.description, + source=record.source + ) for record in db_records] + + # If not in database, fetch from API and store + weather_data = await self.aemet_client.get_historical_weather( + latitude, longitude, start_date, end_date + ) + + # Store in database for future use + for data in weather_data: + weather_record = WeatherData( + location_id=location_id, + date=data['date'], + temperature=data.get('temperature'), + precipitation=data.get('precipitation'), + humidity=data.get('humidity'), + wind_speed=data.get('wind_speed'), + pressure=data.get('pressure'), + description=data.get('description'), + source="aemet", + raw_data=str(data) + ) + db.add(weather_record) + + await db.commit() + + return [WeatherDataResponse(**item) for item in weather_data] + + except Exception as e: + logger.error("Failed to get historical weather", error=str(e)) + return [] diff --git a/services/data/migrations/alembic.ini b/services/data/migrations/alembic.ini new file mode 100644 index 00000000..64ed2c84 --- /dev/null +++ b/services/data/migrations/alembic.ini @@ -0,0 +1,117 @@ +# ================================================================ +# services/data/alembic.ini +# ================================================================ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = migrations + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python-dateutil library that can be +# installed by adding `alembic[tz]` to the pip requirements +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version number format. This value is passed to the Python +# datetime.datetime.strftime() method for formatting the creation date. +# For UTC time zone add 'utc' prefix (ex: utc%Y_%m_%d_%H%M ) +version_num_format = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses +# os.pathsep. If this key is omitted entirely, it falls back to the legacy +# behavior of splitting on spaces and/or commas. +# valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the exec runner, execute a binary +# hooks = ruff +# ruff.type = exec +# ruff.executable = %(here)s/.venv/bin/ruff +# ruff.options = --fix REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/services/data/migrations/env.py b/services/data/migrations/env.py new file mode 100644 index 00000000..f1bf8b2d --- /dev/null +++ b/services/data/migrations/env.py @@ -0,0 +1,68 @@ +# ================================================================ +# services/data/migrations/env.py +# ================================================================ +"""Alembic environment configuration""" + +import asyncio +from logging.config import fileConfig +from sqlalchemy import pool +from sqlalchemy.engine import Connection +from sqlalchemy.ext.asyncio import async_engine_from_config +from alembic import context + +from app.core.config import settings +from app.core.database import Base +from app.models import sales, weather, traffic + +# this is the Alembic Config object +config = context.config + +# Interpret the config file for Python logging +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# Set database URL +config.set_main_option("sqlalchemy.url", settings.DATABASE_URL.replace('+asyncpg', '')) + +target_metadata = Base.metadata + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode.""" + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + +def do_run_migrations(connection: Connection) -> None: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + +async def run_async_migrations() -> None: + """Run migrations in 'online' mode with async engine.""" + connectable = async_engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + + await connectable.dispose() + +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + asyncio.run(run_async_migrations()) + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() \ No newline at end of file diff --git a/services/data/migrations/script.py.mako b/services/data/migrations/script.py.mako new file mode 100644 index 00000000..9b24ed68 --- /dev/null +++ b/services/data/migrations/script.py.mako @@ -0,0 +1,29 @@ +# ================================================================ +# services/data/migrations/script.py.mako +# ================================================================ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/services/data/requirements.txt b/services/data/requirements.txt index f8f5f85c..215dff86 100644 --- a/services/data/requirements.txt +++ b/services/data/requirements.txt @@ -1,14 +1,45 @@ +# ================================================================ +# services/data/requirements.txt - UPDATED +# ================================================================ + +# FastAPI and web framework fastapi==0.104.1 uvicorn[standard]==0.24.0 -sqlalchemy==2.0.23 + +# Database +sqlalchemy[asyncio]==2.0.23 asyncpg==0.29.0 alembic==1.12.1 + +# Data validation pydantic==2.5.0 pydantic-settings==2.1.0 -httpx==0.25.2 + +# Cache and messaging redis==5.0.1 -aio-pika==9.3.0 -prometheus-client==0.17.1 -python-json-logger==2.0.7 -pytz==2023.3 -python-logstash==0.4.8 \ No newline at end of file +aio-pika==9.3.1 + +# HTTP client +httpx==0.25.2 + +# Data processing (UPDATED - Added openpyxl and xlrd) +pandas==2.1.3 +numpy==1.25.2 +openpyxl==3.1.2 # For Excel (.xlsx) files +xlrd==2.0.1 # For Excel (.xls) files +python-multipart==0.0.6 + +# Monitoring and logging +prometheus-client==0.19.0 +structlog==23.2.0 +python-logstash==0.4.8 +python-json-logger==2.0.4 + +# Security +python-jose[cryptography]==3.3.0 +bcrypt==4.1.2 + +# Testing +pytest==7.4.3 +pytest-asyncio==0.21.1 +pytest-cov==4.1.0 \ No newline at end of file diff --git a/services/data/tests/__init__.py b/services/data/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/data/tests/conftest.py b/services/data/tests/conftest.py new file mode 100644 index 00000000..578ac2c5 --- /dev/null +++ b/services/data/tests/conftest.py @@ -0,0 +1,82 @@ +# ================================================================ +# services/data/tests/conftest.py +# ================================================================ +"""Test configuration for data service""" + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.pool import StaticPool +from fastapi.testclient import TestClient +import uuid +from datetime import datetime + +from app.main import app +from app.core.database import Base, get_db +from app.models.sales import SalesData +from app.models.weather import WeatherData, WeatherForecast +from app.models.traffic import TrafficData + +# Test database URL +TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" + +# Create test engine +test_engine = create_async_engine( + TEST_DATABASE_URL, + connect_args={"check_same_thread": False}, + poolclass=StaticPool, +) + +TestingSessionLocal = async_sessionmaker( + test_engine, + class_=AsyncSession, + expire_on_commit=False +) + +@pytest_asyncio.fixture +async def db(): + """Create test database session""" + async with test_engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with TestingSessionLocal() as session: + yield session + + async with test_engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + +@pytest.fixture +def client(): + """Create test client""" + async def override_get_db(): + async with TestingSessionLocal() as session: + yield session + + app.dependency_overrides[get_db] = override_get_db + + with TestClient(app) as test_client: + yield test_client + + app.dependency_overrides.clear() + +@pytest.fixture +def test_tenant_id(): + """Test tenant ID""" + return uuid.uuid4() + +@pytest.fixture +def test_sales_data(): + """Sample sales data for testing""" + return { + "date": datetime.now(), + "product_name": "Pan Integral", + "quantity_sold": 25, + "revenue": 37.50, + "location_id": "madrid_centro", + "source": "test" + } + +@pytest.fixture +def mock_auth_token(): + """Mock authentication token""" + return "Bearer test-token-123" \ No newline at end of file diff --git a/services/data/tests/test_data.py b/services/data/tests/test_data.py new file mode 100644 index 00000000..df88a72c --- /dev/null +++ b/services/data/tests/test_data.py @@ -0,0 +1,94 @@ +# ================================================================ +# services/data/tests/test_data.py +# ================================================================ +"""Integration tests for data service""" + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.ext.asyncio import AsyncSession +from datetime import datetime, timedelta +import uuid + +from app.services.sales_service import SalesService +from app.services.weather_service import WeatherService +from app.services.traffic_service import TrafficService +from app.schemas.sales import SalesDataCreate, SalesDataQuery + +@pytest.mark.asyncio +async def test_create_sales_record(db: AsyncSession, test_tenant_id, test_sales_data): + """Test creating a sales record""" + sales_data = SalesDataCreate( + tenant_id=test_tenant_id, + **test_sales_data + ) + + record = await SalesService.create_sales_record(sales_data, db) + + assert record.id is not None + assert record.tenant_id == test_tenant_id + assert record.product_name == test_sales_data["product_name"] + assert record.quantity_sold == test_sales_data["quantity_sold"] + +@pytest.mark.asyncio +async def test_query_sales_data(db: AsyncSession, test_tenant_id, test_sales_data): + """Test querying sales data""" + # Create test record + sales_data = SalesDataCreate( + tenant_id=test_tenant_id, + **test_sales_data + ) + await SalesService.create_sales_record(sales_data, db) + + # Query data + query = SalesDataQuery( + tenant_id=test_tenant_id, + start_date=datetime.now() - timedelta(days=1), + end_date=datetime.now() + timedelta(days=1) + ) + + records = await SalesService.get_sales_data(query, db) + + assert len(records) == 1 + assert records[0].product_name == test_sales_data["product_name"] + +@pytest.mark.asyncio +async def test_csv_import(): + """Test CSV data import""" + csv_content = """date,product,quantity,revenue +2024-01-15,Pan Integral,25,37.50 +2024-01-15,Croissant,15,22.50""" + + # This would normally use the database, but we're testing the parsing logic + import io + import pandas as pd + + csv_file = io.StringIO(csv_content) + df = pd.read_csv(csv_file) + + assert len(df) == 2 + assert df.iloc[0]['product'] == 'Pan Integral' + assert df.iloc[0]['quantity'] == 25 + +@pytest.mark.asyncio +async def test_weather_service(): + """Test weather service""" + weather_service = WeatherService() + + # Test current weather (should return synthetic data) + current = await weather_service.get_current_weather(40.4168, -3.7038) + + assert current is not None + assert current.temperature is not None + assert current.source in ['aemet', 'synthetic'] + +@pytest.mark.asyncio +async def test_traffic_service(): + """Test traffic service""" + traffic_service = TrafficService() + + # Test current traffic (should return synthetic data) + current = await traffic_service.get_current_traffic(40.4168, -3.7038) + + assert current is not None + assert current.traffic_volume is not None + assert current.congestion_level in ['low', 'medium', 'high'] diff --git a/services/data/tests/test_external.py b/services/data/tests/test_external.py new file mode 100644 index 00000000..79c48355 --- /dev/null +++ b/services/data/tests/test_external.py @@ -0,0 +1,87 @@ +# ================================================================ +# services/data/tests/test_external.py +# ================================================================ +"""Tests for external API clients""" + +import pytest +from unittest.mock import AsyncMock, patch +from datetime import datetime, timedelta + +from app.external.aemet import AEMETClient +from app.external.madrid_opendata import MadridOpenDataClient + +@pytest.mark.asyncio +async def test_aemet_client_synthetic_weather(): + """Test AEMET client synthetic weather generation""" + client = AEMETClient() + + weather = await client._generate_synthetic_weather() + + assert weather is not None + assert 'temperature' in weather + assert 'precipitation' in weather + assert 'humidity' in weather + assert weather['source'] == 'synthetic' + +@pytest.mark.asyncio +async def test_aemet_client_synthetic_forecast(): + """Test AEMET client synthetic forecast""" + client = AEMETClient() + + forecast = await client._generate_synthetic_forecast(7) + + assert len(forecast) == 7 + assert all('forecast_date' in item for item in forecast) + assert all('temperature' in item for item in forecast) + assert all(item['source'] == 'synthetic' for item in forecast) + +@pytest.mark.asyncio +async def test_madrid_client_synthetic_traffic(): + """Test Madrid Open Data client synthetic traffic""" + client = MadridOpenDataClient() + + traffic = await client._generate_synthetic_traffic(40.4168, -3.7038) + + assert traffic is not None + assert 'traffic_volume' in traffic + assert 'pedestrian_count' in traffic + assert 'congestion_level' in traffic + assert traffic['congestion_level'] in ['low', 'medium', 'high'] + +@pytest.mark.asyncio +async def test_madrid_client_historical_traffic(): + """Test Madrid Open Data client historical traffic""" + client = MadridOpenDataClient() + + start_date = datetime.now() - timedelta(days=7) + end_date = datetime.now() + + historical = await client._generate_historical_traffic( + 40.4168, -3.7038, start_date, end_date + ) + + assert len(historical) > 0 + assert all('traffic_volume' in item for item in historical) + assert all('date' in item for item in historical) + +@pytest.mark.asyncio +async def test_distance_calculation(): + """Test distance calculation in AEMET client""" + client = AEMETClient() + + # Distance between Madrid Centro and Madrid Norte (approximately) + distance = client._calculate_distance(40.4168, -3.7038, 40.4518, -3.7246) + + # Should be roughly 4-5 km + assert 3 < distance < 6 + +@pytest.mark.asyncio +async def test_madrid_area_detection(): + """Test Madrid area detection""" + client = AEMETClient() + + # Madrid coordinates should be detected as Madrid area + assert client._is_in_madrid_area(40.4168, -3.7038) == True + + # Barcelona coordinates should not + assert client._is_in_madrid_area(41.3851, 2.1734) == False \ No newline at end of file diff --git a/shared/monitoring/metrics.py b/shared/monitoring/metrics.py index 4bfd37b1..9da19f70 100644 --- a/shared/monitoring/metrics.py +++ b/shared/monitoring/metrics.py @@ -9,6 +9,7 @@ from typing import Dict, Any, List # Added List import from prometheus_client import Counter, Histogram, Gauge, start_http_server from functools import wraps from prometheus_client import generate_latest # Moved this import here for consistency +from fastapi import Request logger = logging.getLogger(__name__) @@ -175,4 +176,118 @@ def metrics_middleware(metrics_collector: MetricsCollector): return response - return middleware \ No newline at end of file + return middleware + + +def setup_metrics(app): + """ + Setup metrics collection for FastAPI app + + Args: + app: FastAPI application instance + + Returns: + MetricsCollector: Configured metrics collector + """ + + # Get service name from app title or default + service_name = getattr(app, 'title', 'unknown-service').lower().replace(' ', '-') + + # Create metrics collector for this service + metrics_collector = MetricsCollector(service_name) + + # Add metrics middleware to collect HTTP request metrics + @app.middleware("http") + async def collect_metrics_middleware(request: Request, call_next): + start_time = time.time() + + # Process the request + response = await call_next(request) + + # Calculate duration + duration = time.time() - start_time + + # Record metrics + metrics_collector.record_request( + method=request.method, + endpoint=request.url.path, + status_code=response.status_code, + duration=duration + ) + + return response + + # Add metrics endpoint if it doesn't exist + @app.get("/metrics") + async def prometheus_metrics(): + """Prometheus metrics endpoint""" + from prometheus_client import generate_latest + return Response( + content=generate_latest(), + media_type="text/plain; version=0.0.4; charset=utf-8" + ) + + # Store metrics collector in app state for later access + app.state.metrics_collector = metrics_collector + + logger.info(f"Metrics collection setup completed for service: {service_name}") + + return metrics_collector + + +# Alternative simplified setup function for services that don't need complex metrics +def setup_basic_metrics(app, service_name: str = None): + """ + Setup basic metrics collection without complex dependencies + + Args: + app: FastAPI application instance + service_name: Optional service name override + + Returns: + Simple metrics dict + """ + if service_name is None: + service_name = getattr(app, 'title', 'unknown-service').lower().replace(' ', '-') + + # Simple in-memory metrics + metrics_data = { + "requests_total": 0, + "requests_by_method": {}, + "requests_by_status": {}, + "service_name": service_name, + "start_time": time.time() + } + + @app.middleware("http") + async def simple_metrics_middleware(request: Request, call_next): + # Increment total requests + metrics_data["requests_total"] += 1 + + # Track by method + method = request.method + metrics_data["requests_by_method"][method] = metrics_data["requests_by_method"].get(method, 0) + 1 + + # Process request + response = await call_next(request) + + # Track by status code + status = str(response.status_code) + metrics_data["requests_by_status"][status] = metrics_data["requests_by_status"].get(status, 0) + 1 + + return response + + @app.get("/metrics") + async def simple_metrics(): + """Simple metrics endpoint""" + uptime = time.time() - metrics_data["start_time"] + return { + **metrics_data, + "uptime_seconds": round(uptime, 2) + } + + app.state.simple_metrics = metrics_data + + logger.info(f"Basic metrics setup completed for service: {service_name}") + + return metrics_data \ No newline at end of file diff --git a/test_data.py b/test_data.py new file mode 100644 index 00000000..70f600bc --- /dev/null +++ b/test_data.py @@ -0,0 +1,385 @@ +# ================================================================ +# validate_local.py - Script completo con todos los imports +# ================================================================ + +import asyncio +import httpx +import json +import sys +import traceback +from datetime import datetime +from typing import Optional, Dict, Any + +# Configuración +AUTH_URL = "http://localhost:8001" +DATA_URL = "http://localhost:8004" +GATEWAY_URL = "http://localhost:8000" # Si usas gateway + +class DataServiceValidator: + """Validador completo para el Data Service""" + + def __init__(self, use_gateway: bool = False): + self.auth_token: Optional[str] = None + self.use_gateway = use_gateway + self.base_url = GATEWAY_URL if use_gateway else DATA_URL + self.auth_base_url = GATEWAY_URL if use_gateway else AUTH_URL + + async def test_service_health(self) -> bool: + """Verificar que los servicios estén funcionando""" + print("🔍 Checking service health...") + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + # Test auth service + auth_response = await client.get(f"{AUTH_URL}/health") + if auth_response.status_code == 200: + print("✅ Auth service is healthy") + else: + print(f"❌ Auth service unhealthy: {auth_response.status_code}") + return False + + # Test data service + data_response = await client.get(f"{DATA_URL}/health") + if data_response.status_code == 200: + print("✅ Data service is healthy") + else: + print(f"❌ Data service unhealthy: {data_response.status_code}") + return False + + return True + + except httpx.ConnectError as e: + print(f"❌ Connection error: {e}") + print("💡 Make sure services are running with: docker-compose up -d") + return False + except Exception as e: + print(f"❌ Health check failed: {e}") + return False + + async def authenticate(self) -> bool: + """Autenticar y obtener token""" + print("🔐 Authenticating...") + + try: + async with httpx.AsyncClient(timeout=15.0) as client: + # Datos de usuario de prueba + user_data = { + "email": "test@bakery.es", + "password": "TestPass123", + "full_name": "Test User", + "language": "es" + } + + # Intentar registrar usuario (puede fallar si ya existe) + register_endpoint = f"{self.auth_base_url}/api/v1/auth/register" + register_response = await client.post(register_endpoint, json=user_data) + + if register_response.status_code == 200: + print("✅ User registered successfully") + elif register_response.status_code == 409: + print("ℹ️ User already exists, proceeding with login") + else: + print(f"⚠️ Registration response: {register_response.status_code}") + + # Login + login_data = { + "email": user_data["email"], + "password": user_data["password"] + } + + login_endpoint = f"{self.auth_base_url}/api/v1/auth/login" + login_response = await client.post(login_endpoint, json=login_data) + + if login_response.status_code == 200: + response_data = login_response.json() + self.auth_token = response_data["access_token"] + print("✅ Authentication successful") + return True + else: + print(f"❌ Login failed: {login_response.status_code}") + print(f"Response: {login_response.text}") + return False + + except Exception as e: + print(f"❌ Authentication failed: {e}") + return False + + def get_headers(self) -> Dict[str, str]: + """Obtener headers con token de autenticación""" + if not self.auth_token: + raise ValueError("No authentication token available") + return {"Authorization": f"Bearer {self.auth_token}"} + + async def test_weather_endpoints(self) -> bool: + """Probar endpoints de clima""" + print("🌤️ Testing weather endpoints...") + + try: + headers = self.get_headers() + madrid_coords = {"latitude": 40.4168, "longitude": -3.7038} + + async with httpx.AsyncClient(timeout=20.0) as client: + # Current weather + current_endpoint = f"{self.base_url}/api/v1/weather/current" if not self.use_gateway else f"{self.base_url}/api/v1/data/weather/current" + current_response = await client.get( + current_endpoint, + params=madrid_coords, + headers=headers + ) + + if current_response.status_code == 200: + weather = current_response.json() + print(f"✅ Current weather: {weather.get('temperature')}°C, {weather.get('description')}") + else: + print(f"❌ Current weather failed: {current_response.status_code}") + print(f"Response: {current_response.text}") + return False + + # Weather forecast + forecast_endpoint = f"{self.base_url}/api/v1/weather/forecast" if not self.use_gateway else f"{self.base_url}/api/v1/data/weather/forecast" + forecast_response = await client.get( + forecast_endpoint, + params={**madrid_coords, "days": 3}, + headers=headers + ) + + if forecast_response.status_code == 200: + forecast = forecast_response.json() + print(f"✅ Weather forecast: {len(forecast)} days") + else: + print(f"❌ Weather forecast failed: {forecast_response.status_code}") + return False + + return True + + except Exception as e: + print(f"❌ Weather tests failed: {e}") + return False + + async def test_traffic_endpoints(self) -> bool: + """Probar endpoints de tráfico""" + print("🚦 Testing traffic endpoints...") + + try: + headers = self.get_headers() + madrid_coords = {"latitude": 40.4168, "longitude": -3.7038} + + async with httpx.AsyncClient(timeout=20.0) as client: + # Current traffic + current_endpoint = f"{self.base_url}/api/v1/traffic/current" if not self.use_gateway else f"{self.base_url}/api/v1/data/traffic/current" + current_response = await client.get( + current_endpoint, + params=madrid_coords, + headers=headers + ) + + if current_response.status_code == 200: + traffic = current_response.json() + print(f"✅ Current traffic: {traffic.get('traffic_volume')} vehicles, {traffic.get('congestion_level')} congestion") + return True + else: + print(f"❌ Current traffic failed: {current_response.status_code}") + print(f"Response: {current_response.text}") + return False + + except Exception as e: + print(f"❌ Traffic tests failed: {e}") + return False + + async def test_sales_endpoints(self) -> bool: + """Probar endpoints de ventas""" + print("📊 Testing sales endpoints...") + + try: + headers = self.get_headers() + + # Datos de prueba + sales_data = { + "tenant_id": "123e4567-e89b-12d3-a456-426614174000", + "date": datetime.now().isoformat(), + "product_name": "Pan Integral Test", + "quantity_sold": 25, + "revenue": 37.50, + "location_id": "madrid_centro", + "source": "test" + } + + async with httpx.AsyncClient(timeout=20.0) as client: + # Create sales record + create_endpoint = f"{self.base_url}/api/v1/sales/" if not self.use_gateway else f"{self.base_url}/api/v1/data/sales/" + create_response = await client.post( + create_endpoint, + json=sales_data, + headers=headers + ) + + if create_response.status_code == 200: + record = create_response.json() + print(f"✅ Sales record created: ID {record.get('id')}") + else: + print(f"❌ Sales creation failed: {create_response.status_code}") + print(f"Response: {create_response.text}") + return False + + # Test import template + template_endpoint = f"{self.base_url}/api/v1/sales/import/template/csv" if not self.use_gateway else f"{self.base_url}/api/v1/data/sales/import/template/csv" + template_response = await client.get( + template_endpoint, + headers=headers + ) + + if template_response.status_code == 200: + print("✅ Import template generated successfully") + else: + print(f"❌ Template generation failed: {template_response.status_code}") + return False + + return True + + except Exception as e: + print(f"❌ Sales tests failed: {e}") + return False + + async def test_import_functionality(self) -> bool: + """Probar funcionalidad de importación""" + print("📄 Testing import functionality...") + + try: + headers = self.get_headers() + + # Crear CSV de prueba + csv_content = """fecha,producto,cantidad,ingresos +15/01/2024,Pan Integral,25,37.50 +15/01/2024,Croissant,15,22.50 +15/01/2024,Café con Leche,10,20.00""" + + # Test CSV import + import_data = { + "tenant_id": "123e4567-e89b-12d3-a456-426614174000", + "data_format": "csv", + "data": csv_content + } + + async with httpx.AsyncClient(timeout=30.0) as client: + import_endpoint = f"{self.base_url}/api/v1/sales/import/json" if not self.use_gateway else f"{self.base_url}/api/v1/data/sales/import/json" + import_response = await client.post( + import_endpoint, + json=import_data, + headers=headers + ) + + if import_response.status_code == 200: + result = import_response.json() + if result.get("success"): + print(f"✅ CSV import successful: {result.get('records_created')} records created") + return True + else: + print(f"❌ CSV import failed: {result.get('error')}") + return False + else: + print(f"❌ Import request failed: {import_response.status_code}") + print(f"Response: {import_response.text}") + return False + + except Exception as e: + print(f"❌ Import tests failed: {e}") + return False + +async def main(): + """Función principal de validación""" + print("🚀 Starting Data Service Validation") + print("=" * 50) + + # Preguntar si usar gateway + use_gateway_input = input("Use API Gateway? (y/N): ").lower() + use_gateway = use_gateway_input == 'y' + + if use_gateway: + print("📡 Testing via API Gateway") + else: + print("🔗 Testing direct service connections") + + validator = DataServiceValidator(use_gateway=use_gateway) + + try: + # 1. Health checks + if not await validator.test_service_health(): + print("\n❌ Health checks failed. Ensure services are running.") + return False + + # 2. Authentication + if not await validator.authenticate(): + print("\n❌ Authentication failed.") + return False + + # 3. Weather tests + if not await validator.test_weather_endpoints(): + print("\n❌ Weather endpoint tests failed.") + return False + + # 4. Traffic tests + if not await validator.test_traffic_endpoints(): + print("\n❌ Traffic endpoint tests failed.") + return False + + # 5. Sales tests + if not await validator.test_sales_endpoints(): + print("\n❌ Sales endpoint tests failed.") + return False + + # 6. Import tests + if not await validator.test_import_functionality(): + print("\n❌ Import functionality tests failed.") + return False + + print("\n" + "=" * 50) + print("🎉 ALL TESTS PASSED! Data Service is working correctly!") + print("=" * 50) + return True + + except KeyboardInterrupt: + print("\n⚠️ Tests interrupted by user") + return False + except Exception as e: + print(f"\n❌ Unexpected error: {e}") + print("Traceback:") + traceback.print_exc() + return False + +def check_dependencies(): + """Verificar que las dependencias estén instaladas""" + try: + import httpx + print("✅ httpx is available") + except ImportError: + print("❌ httpx not found. Install with: pip install httpx") + return False + + return True + +if __name__ == "__main__": + print("🔧 Checking dependencies...") + + if not check_dependencies(): + print("\n💡 Install dependencies with:") + print("pip install httpx") + sys.exit(1) + + print("✅ All dependencies available") + print() + + # Ejecutar validación + success = asyncio.run(main()) + + if success: + print("\n🎯 Next steps:") + print("1. Check logs: docker-compose logs -f data-service") + print("2. Connect to DB: docker-compose exec data-db psql -U data_user -d data_db") + print("3. Test with real data imports") + sys.exit(0) + else: + print("\n🔧 Troubleshooting:") + print("1. Check services: docker-compose ps") + print("2. Restart services: docker-compose restart") + print("3. Check logs: docker-compose logs data-service") + sys.exit(1) \ No newline at end of file