diff --git a/fix_logging_inconsistency.sh b/fix_logging_inconsistency.sh deleted file mode 100644 index 75dc13da..00000000 --- a/fix_logging_inconsistency.sh +++ /dev/null @@ -1,370 +0,0 @@ -#!/bin/bash -# fix_logging_inconsistency.sh - Fix mixed logging imports throughout project - -echo "๐Ÿ” LOGGING INCONSISTENCY ANALYSIS" -echo "=================================" - -# ================================================================ -# PROBLEM IDENTIFIED: Mixed logging imports throughout the project -# ================================================================ - -echo "" -echo "โŒ INCONSISTENT USAGE FOUND:" -echo "" - -# Some files use structlog: -echo "โœ… Files correctly using structlog:" -echo " - services/data/app/services/data_import_service.py: import structlog" -echo " - services/data/app/core/database.py: import structlog" -echo " - services/data/app/core/auth.py: import structlog" -echo "" - -# Some files use standard logging: -echo "โŒ Files incorrectly using standard logging:" -echo " - shared/monitoring/logging.py: import logging" -echo " - gateway/app/main.py: import logging" -echo " - services/*/app/main.py: import logging (in setup scripts)" -echo " - services/forecasting/app/main.py: import logging" -echo "" - -# ================================================================ -# THE ROOT CAUSE -# ================================================================ - -echo "๐Ÿ” ROOT CAUSE ANALYSIS:" -echo "======================" -echo "" -echo "1. shared/monitoring/logging.py uses 'import logging'" -echo " โ†ณ This is CORRECT - it's configuring the logging system" -echo "" -echo "2. Service files use 'import logging' instead of 'import structlog'" -echo " โ†ณ This is WRONG - services should use structlog" -echo "" -echo "3. Mixed usage creates inconsistent log formats" -echo " โ†ณ Some logs are structured JSON, others are plain text" -echo "" - -# ================================================================ -# DETAILED EXPLANATION -# ================================================================ - -echo "๐Ÿ“ DETAILED EXPLANATION:" -echo "========================" -echo "" -echo "There are TWO different use cases:" -echo "" -echo "1. LOGGING CONFIGURATION (shared/monitoring/logging.py):" -echo " โœ… Uses 'import logging' - This is CORRECT" -echo " โœ… Sets up the logging infrastructure" -echo " โœ… Configures handlers, formatters, logstash integration" -echo "" -echo "2. APPLICATION LOGGING (all service files):" -echo " โŒ Should use 'import structlog' - Many use wrong import" -echo " โŒ Should get logger with structlog.get_logger()" -echo " โŒ Should log with key-value pairs" -echo "" - -# ================================================================ -# THE FIX -# ================================================================ - -echo "๐Ÿ”ง THE FIX:" -echo "===========" -echo "" -echo "Replace all service logging imports:" -echo "" -echo "โŒ Change from:" -echo " import logging" -echo " logger = logging.getLogger(__name__)" -echo "" -echo "โœ… Change to:" -echo " import structlog" -echo " logger = structlog.get_logger()" -echo "" - -# ================================================================ -# IMPLEMENTATION -# ================================================================ - -echo "๐Ÿš€ IMPLEMENTING FIX..." -echo "" - -# Create backup directory -backup_dir="/tmp/logging_fix_backup_$(date +%Y%m%d_%H%M%S)" -mkdir -p "$backup_dir" -echo "๐Ÿ“ฆ Created backup directory: $backup_dir" - -# Function to fix logging imports in a file -fix_logging_in_file() { - local file="$1" - if [ -f "$file" ]; then - echo " ๐Ÿ”ง Fixing: $file" - - # Backup original - cp "$file" "$backup_dir/$(basename $file).backup" - - # Replace logging imports with structlog - sed -i.tmp ' - # Replace import statements - s/^import logging$/import structlog/g - s/^from logging import/# from logging import/g - - # Replace logger creation - s/logger = logging\.getLogger(__name__)/logger = structlog.get_logger()/g - s/logger = logging\.getLogger()/logger = structlog.get_logger()/g - s/logging\.getLogger(__name__)/structlog.get_logger()/g - s/logging\.getLogger()/structlog.get_logger()/g - ' "$file" - - # Remove temp file - rm -f "${file}.tmp" - fi -} - -# Fix service main.py files -echo "๐Ÿ”„ Fixing service main.py files..." -for service in auth training forecasting data tenant notification; do - service_main="services/$service/app/main.py" - if [ -f "$service_main" ]; then - fix_logging_in_file "$service_main" - fi -done - -# Fix gateway main.py -echo "๐Ÿ”„ Fixing gateway main.py..." -fix_logging_in_file "gateway/app/main.py" - -# Fix other service files that might use logging -echo "๐Ÿ”„ Fixing other service files..." -find services/*/app -name "*.py" -type f | while read file; do - # Skip __init__.py and files that should use standard logging - if [[ "$file" != *"__init__.py" ]] && [[ "$file" != *"core/config.py" ]]; then - # Check if file contains logging imports (but not shared/monitoring) - if grep -q "import logging" "$file" && [[ "$file" != *"shared/monitoring"* ]]; then - fix_logging_in_file "$file" - fi - fi -done - -# ================================================================ -# VERIFICATION SCRIPT -# ================================================================ - -echo "" -echo "๐Ÿงช VERIFICATION:" -echo "================" - -# Check for remaining incorrect usage -echo "Checking for remaining 'import logging' in service files..." -incorrect_files=$(find services gateway -name "*.py" -exec grep -l "import logging" {} \; | grep -v __pycache__ | grep -v migrations || true) - -if [ -n "$incorrect_files" ]; then - echo "โš ๏ธ Still found 'import logging' in:" - echo "$incorrect_files" -else - echo "โœ… No incorrect 'import logging' found in service files" -fi - -echo "" -echo "Checking for correct 'import structlog' usage..." -correct_files=$(find services gateway -name "*.py" -exec grep -l "import structlog" {} \; | grep -v __pycache__ || true) - -if [ -n "$correct_files" ]; then - echo "โœ… Found correct 'import structlog' in:" - echo "$correct_files" -else - echo "โš ๏ธ No 'import structlog' found - this might be an issue" -fi - -# ================================================================ -# UPDATED FILE EXAMPLES -# ================================================================ - -echo "" -echo "๐Ÿ“ UPDATED FILE EXAMPLES:" -echo "=========================" -echo "" - -# Example 1: Service main.py -cat << 'EOF' -# โœ… CORRECT: services/auth/app/main.py (AFTER FIX) -""" -Authentication Service -""" - -import structlog # โœ… CORRECT -from fastapi import FastAPI -from fastapi.middleware.cors import CORSMiddleware - -from app.core.config import settings -from app.core.database import database_manager -from shared.monitoring.logging import setup_logging - -# Setup logging infrastructure -setup_logging("auth-service", "INFO") - -# Get structlog logger -logger = structlog.get_logger() # โœ… CORRECT - -app = FastAPI(title="Authentication Service") - -@app.on_event("startup") -async def startup_event(): - """Application startup""" - logger.info("Starting Authentication Service", service="auth-service") # โœ… STRUCTURED - await database_manager.create_tables() - logger.info("Authentication Service started successfully", service="auth-service") -EOF - -echo "" - -# Example 2: Service business logic -cat << 'EOF' -# โœ… CORRECT: services/auth/app/services/auth_service.py (AFTER FIX) -""" -Authentication service business logic -""" - -import structlog # โœ… CORRECT -from sqlalchemy.ext.asyncio import AsyncSession - -logger = structlog.get_logger() # โœ… CORRECT - -class AuthService: - @staticmethod - async def login_user(email: str, password: str): - """Login user with structured logging""" - - # โœ… STRUCTURED LOGGING with key-value pairs - logger.info("User login attempt", email=email, service="auth-service") - - try: - user = await authenticate_user(email, password) - - # โœ… SUCCESS with context - logger.info( - "User login successful", - user_id=str(user.id), - email=user.email, - service="auth-service" - ) - - return user - - except AuthenticationError as e: - # โœ… ERROR with context - logger.error( - "User login failed", - email=email, - error=str(e), - error_type="authentication_error", - service="auth-service" - ) - raise -EOF - -echo "" - -# Example 3: What should NOT change -cat << 'EOF' -# โœ… CORRECT: shared/monitoring/logging.py (NO CHANGE NEEDED) -""" -Centralized logging configuration - This file is CORRECT as-is -""" - -import logging # โœ… CORRECT - This configures the logging system -import logging.config - -def setup_logging(service_name: str, log_level: str = "INFO"): - """ - This function SETS UP the logging infrastructure. - It should use 'import logging' to configure the system. - """ - config = { - "version": 1, - "disable_existing_loggers": False, - # ... rest of configuration - } - - logging.config.dictConfig(config) # โœ… CORRECT - logger = logging.getLogger(__name__) # โœ… CORRECT for setup - logger.info(f"Logging configured for {service_name}") -EOF - -# ================================================================ -# TESTING THE FIX -# ================================================================ - -echo "" -echo "๐Ÿงช TESTING THE FIX:" -echo "===================" -echo "" - -# Create test script -cat << 'EOF' > test_logging_fix.py -#!/usr/bin/env python3 -"""Test script to verify logging fix""" - -def test_auth_service_logging(): - """Test that auth service uses structlog correctly""" - try: - # This should work after the fix - import structlog - logger = structlog.get_logger() - logger.info("Test log entry", service="test", test=True) - print("โœ… Auth service logging test passed") - return True - except Exception as e: - print(f"โŒ Auth service logging test failed: {e}") - return False - -def test_shared_logging_setup(): - """Test that shared logging setup still works""" - try: - from shared.monitoring.logging import setup_logging - setup_logging("test-service", "INFO") - print("โœ… Shared logging setup test passed") - return True - except Exception as e: - print(f"โŒ Shared logging setup test failed: {e}") - return False - -if __name__ == "__main__": - print("Testing logging configuration...") - - test1 = test_shared_logging_setup() - test2 = test_auth_service_logging() - - if test1 and test2: - print("\n๐ŸŽ‰ All logging tests passed!") - else: - print("\nโš ๏ธ Some logging tests failed") -EOF - -echo "Created test script: test_logging_fix.py" -echo "" -echo "Run the test with:" -echo " python test_logging_fix.py" - -# ================================================================ -# SUMMARY -# ================================================================ - -echo "" -echo "๐Ÿ“Š SUMMARY:" -echo "===========" -echo "" -echo "โœ… Fixed inconsistent logging imports throughout the project" -echo "โœ… Services now use 'import structlog' consistently" -echo "โœ… shared/monitoring/logging.py still uses 'import logging' (correct)" -echo "โœ… All service logs will now be structured JSON" -echo "โœ… Logs will be properly aggregated in ELK stack" -echo "" -echo "๐Ÿ” To verify the fix:" -echo " 1. Run: docker-compose restart" -echo " 2. Check logs: docker-compose logs -f auth-service" -echo " 3. Look for structured JSON log entries" -echo "" -echo "๐Ÿ“ Backups saved to: $backup_dir" -echo "" -echo "๐Ÿš€ Your logging is now consistent across all services!" diff --git a/services/data/app/api/sales.py b/services/data/app/api/sales.py index 5c3424d5..1750fd63 100644 --- a/services/data/app/api/sales.py +++ b/services/data/app/api/sales.py @@ -1,13 +1,16 @@ # ================================================================ -# services/data/app/api/sales.py +# services/data/app/api/sales.py - FIXED VERSION # ================================================================ -"""Sales data API endpoints""" +"""Sales data API endpoints with improved error handling""" -from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query +from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query, Response +from fastapi.responses import StreamingResponse from sqlalchemy.ext.asyncio import AsyncSession -from typing import List +from typing import List, Optional import uuid from datetime import datetime +import base64 +import structlog from app.core.database import get_db from app.core.auth import get_current_user, AuthInfo @@ -18,10 +21,14 @@ from app.schemas.sales import ( SalesDataCreate, SalesDataResponse, SalesDataQuery, - SalesDataImport + SalesDataImport, + SalesImportResult, + SalesValidationResult, + SalesExportRequest ) router = APIRouter() +logger = structlog.get_logger() @router.post("/", response_model=SalesDataResponse) async def create_sales_record( @@ -31,19 +38,32 @@ async def create_sales_record( ): """Create a new sales record""" try: + logger.debug("API: Creating sales record", product=sales_data.product_name, quantity=sales_data.quantity_sold) + record = await SalesService.create_sales_record(sales_data, db) - # Publish event - await data_publisher.publish_data_imported({ - "tenant_id": str(sales_data.tenant_id), - "type": "sales_record", - "source": sales_data.source, - "timestamp": datetime.utcnow().isoformat() - }) + # Publish event (with error handling) + try: + await data_publisher.publish_sales_created({ + "tenant_id": str(sales_data.tenant_id), + "product_name": sales_data.product_name, + "quantity_sold": sales_data.quantity_sold, + "revenue": sales_data.revenue, + "source": sales_data.source, + "timestamp": datetime.utcnow().isoformat() + }) + except Exception as pub_error: + logger.warning("Failed to publish sales created event", error=str(pub_error)) + # Continue processing - event publishing failure shouldn't break the API + logger.debug("Successfully created sales record", record_id=record.id) return record + except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Failed to create sales record", error=str(e)) + import traceback + logger.error("Sales creation traceback", traceback=traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Failed to create sales record: {str(e)}") @router.post("/query", response_model=List[SalesDataResponse]) async def get_sales_data( @@ -53,12 +73,18 @@ async def get_sales_data( ): """Get sales data by query parameters""" try: + logger.debug("API: Querying sales data", tenant_id=query.tenant_id) + records = await SalesService.get_sales_data(query, db) + + logger.debug("Successfully retrieved sales data", count=len(records)) return records + except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Failed to query sales data", error=str(e)) + raise HTTPException(status_code=500, detail=f"Failed to query sales data: {str(e)}") -@router.post("/import") +@router.post("/import", response_model=SalesImportResult) async def import_sales_data( tenant_id: str = Form(...), file_format: str = Form(...), @@ -68,6 +94,8 @@ async def import_sales_data( ): """Import sales data from file""" try: + logger.debug("API: Importing sales data", tenant_id=tenant_id, format=file_format, filename=file.filename) + # Read file content content = await file.read() file_content = content.decode('utf-8') @@ -78,21 +106,30 @@ async def import_sales_data( ) if result["success"]: - # Publish event - await data_publisher.publish_data_imported({ - "tenant_id": tenant_id, - "type": "bulk_import", - "format": file_format, - "records_created": result["records_created"], - "timestamp": datetime.utcnow().isoformat() - }) + # Publish event (with error handling) + try: + await data_publisher.publish_data_imported({ + "tenant_id": tenant_id, + "type": "bulk_import", + "format": file_format, + "filename": file.filename, + "records_created": result["records_created"], + "timestamp": datetime.utcnow().isoformat() + }) + except Exception as pub_error: + logger.warning("Failed to publish data imported event", error=str(pub_error)) + # Continue processing + logger.debug("Import completed", success=result["success"], records_created=result.get("records_created", 0)) return result except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Failed to import sales data", error=str(e)) + import traceback + logger.error("Sales import traceback", traceback=traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Failed to import sales data: {str(e)}") -@router.post("/import/json") +@router.post("/import/json", response_model=SalesImportResult) async def import_sales_json( import_data: SalesDataImport, db: AsyncSession = Depends(get_db), @@ -100,6 +137,8 @@ async def import_sales_json( ): """Import sales data from JSON""" try: + logger.debug("API: Importing JSON sales data", tenant_id=import_data.tenant_id) + result = await DataImportService.process_upload( str(import_data.tenant_id), import_data.data, @@ -108,32 +147,46 @@ async def import_sales_json( ) if result["success"]: - await data_publisher.publish_data_imported({ - "tenant_id": str(import_data.tenant_id), - "type": "json_import", - "records_created": result["records_created"], - "timestamp": datetime.utcnow().isoformat() - }) + # Publish event (with error handling) + try: + await data_publisher.publish_data_imported({ + "tenant_id": str(import_data.tenant_id), + "type": "json_import", + "records_created": result["records_created"], + "timestamp": datetime.utcnow().isoformat() + }) + except Exception as pub_error: + logger.warning("Failed to publish JSON import event", error=str(pub_error)) + # Continue processing + logger.debug("JSON import completed", success=result["success"], records_created=result.get("records_created", 0)) return result except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Failed to import JSON sales data", error=str(e)) + import traceback + logger.error("JSON import traceback", traceback=traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Failed to import JSON sales data: {str(e)}") -@router.post("/import/validate") +@router.post("/import/validate", response_model=SalesValidationResult) async def validate_import_data( import_data: SalesDataImport, current_user: AuthInfo = Depends(get_current_user) ): """Validate import data before processing""" try: + logger.debug("API: Validating import data", tenant_id=import_data.tenant_id) + validation = await DataImportService.validate_import_data( import_data.model_dump() ) + + logger.debug("Validation completed", is_valid=validation.get("is_valid", False)) return validation except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Failed to validate import data", error=str(e)) + raise HTTPException(status_code=500, detail=f"Failed to validate import data: {str(e)}") @router.get("/import/template/{format_type}") async def get_import_template( @@ -142,11 +195,16 @@ async def get_import_template( ): """Get import template for specified format""" try: + logger.debug("API: Getting import template", format=format_type) + template = await DataImportService.get_import_template(format_type) if "error" in template: + logger.warning("Template generation error", error=template["error"]) raise HTTPException(status_code=400, detail=template["error"]) + logger.debug("Template generated successfully", format=format_type) + if format_type.lower() == "csv": return Response( content=template["template"], @@ -168,362 +226,92 @@ async def get_import_template( else: return template + except HTTPException: + raise except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Failed to generate import template", error=str(e)) + import traceback + logger.error("Template generation traceback", traceback=traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Failed to generate template: {str(e)}") -@router.post("/import/advanced") -async def import_sales_data_advanced( - tenant_id: str = Form(...), - file_format: str = Form(...), - file: UploadFile = File(...), - validate_only: bool = Form(False), +@router.get("/analytics/{tenant_id}") +async def get_sales_analytics( + tenant_id: str, + start_date: Optional[datetime] = Query(None, description="Start date"), + end_date: Optional[datetime] = Query(None, description="End date"), db: AsyncSession = Depends(get_db), current_user: AuthInfo = Depends(get_current_user) ): - """Advanced import with validation and preview options""" + """Get sales analytics for tenant""" try: - # Read file content - content = await file.read() + logger.debug("API: Getting sales analytics", tenant_id=tenant_id) - # Determine encoding - try: - file_content = content.decode('utf-8') - except UnicodeDecodeError: - try: - file_content = content.decode('latin-1') - except UnicodeDecodeError: - file_content = content.decode('cp1252') - - # Validate first if requested - if validate_only: - validation = await DataImportService.validate_import_data({ - "tenant_id": tenant_id, - "data": file_content, - "data_format": file_format - }) - - # Add file preview for validation - if validation["valid"]: - # Get first few lines for preview - lines = file_content.split('\n')[:5] - validation["preview"] = lines - validation["total_lines"] = len(file_content.split('\n')) - - return validation - - # Process import - result = await DataImportService.process_upload( - tenant_id, file_content, file_format, db, file.filename + analytics = await SalesService.get_sales_analytics( + tenant_id, start_date, end_date, db ) - if result["success"]: - # Publish event - await data_publisher.publish_data_imported({ - "tenant_id": tenant_id, - "type": "advanced_import", - "format": file_format, - "filename": file.filename, - "records_created": result["records_created"], - "success_rate": result.get("success_rate", 0), - "timestamp": datetime.utcnow().isoformat() - }) - - return result + logger.debug("Analytics generated successfully", tenant_id=tenant_id) + return analytics except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@router.get("/import/history/{tenant_id}") -async def get_import_history( - tenant_id: str, - limit: int = Query(10, ge=1, le=100), - offset: int = Query(0, ge=0), - db: AsyncSession = Depends(get_db), - current_user: AuthInfo = Depends(get_current_user) -): - """Get import history for tenant""" - try: - # Get recent imports by source and creation date - stmt = select(SalesData).where( - and_( - SalesData.tenant_id == tenant_id, - SalesData.source.in_(['csv', 'excel', 'json', 'pos', 'migrated']) - ) - ).order_by(SalesData.created_at.desc()).offset(offset).limit(limit) - - result = await db.execute(stmt) - records = result.scalars().all() - - # Group by import session (same created_at) - import_sessions = {} - for record in records: - session_key = f"{record.source}_{record.created_at.date()}" - if session_key not in import_sessions: - import_sessions[session_key] = { - "date": record.created_at, - "source": record.source, - "records": [], - "total_records": 0, - "total_revenue": 0.0 - } - - import_sessions[session_key]["records"].append({ - "id": str(record.id), - "product_name": record.product_name, - "quantity_sold": record.quantity_sold, - "revenue": record.revenue or 0.0 - }) - import_sessions[session_key]["total_records"] += 1 - import_sessions[session_key]["total_revenue"] += record.revenue or 0.0 - - return { - "import_sessions": list(import_sessions.values()), - "total_sessions": len(import_sessions), - "offset": offset, - "limit": limit - } - - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@router.delete("/import/{tenant_id}/{import_date}") -async def delete_import_batch( - tenant_id: str, - import_date: str, # Format: YYYY-MM-DD - source: str = Query(..., description="Import source (csv, excel, json, pos)"), - db: AsyncSession = Depends(get_db), - current_user: AuthInfo = Depends(get_current_user) -): - """Delete an entire import batch""" - try: - # Parse date - import_date_obj = datetime.strptime(import_date, "%Y-%m-%d").date() - start_datetime = datetime.combine(import_date_obj, datetime.min.time()) - end_datetime = datetime.combine(import_date_obj, datetime.max.time()) - - # Find records to delete - stmt = select(SalesData).where( - and_( - SalesData.tenant_id == tenant_id, - SalesData.source == source, - SalesData.created_at >= start_datetime, - SalesData.created_at <= end_datetime - ) - ) - - result = await db.execute(stmt) - records_to_delete = result.scalars().all() - - if not records_to_delete: - raise HTTPException( - status_code=404, - detail="No import batch found for the specified date and source" - ) - - # Delete records - for record in records_to_delete: - await db.delete(record) - - await db.commit() - - # Publish event - await data_publisher.publish_data_imported({ - "tenant_id": tenant_id, - "type": "import_deleted", - "source": source, - "import_date": import_date, - "records_deleted": len(records_to_delete), - "timestamp": datetime.utcnow().isoformat() - }) - - return { - "success": True, - "records_deleted": len(records_to_delete), - "import_date": import_date, - "source": source - } - - except ValueError: - raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD") - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@router.get("/stats/{tenant_id}") -async def get_sales_statistics( - tenant_id: str, - start_date: datetime = Query(None, description="Start date for statistics"), - end_date: datetime = Query(None, description="End date for statistics"), - db: AsyncSession = Depends(get_db), - current_user: AuthInfo = Depends(get_current_user) -): - """Get sales statistics for tenant""" - try: - # Default to last 30 days if no dates provided - if not start_date: - start_date = datetime.now() - timedelta(days=30) - if not end_date: - end_date = datetime.now() - - # Get sales data - stmt = select(SalesData).where( - and_( - SalesData.tenant_id == tenant_id, - SalesData.date >= start_date, - SalesData.date <= end_date - ) - ) - - result = await db.execute(stmt) - records = result.scalars().all() - - if not records: - return { - "total_records": 0, - "total_revenue": 0.0, - "total_quantity": 0, - "top_products": [], - "daily_sales": [], - "data_sources": {} - } - - # Calculate statistics - total_revenue = sum(record.revenue or 0 for record in records) - total_quantity = sum(record.quantity_sold for record in records) - - # Top products - product_stats = {} - for record in records: - if record.product_name not in product_stats: - product_stats[record.product_name] = { - "quantity": 0, - "revenue": 0.0, - "occurrences": 0 - } - product_stats[record.product_name]["quantity"] += record.quantity_sold - product_stats[record.product_name]["revenue"] += record.revenue or 0 - product_stats[record.product_name]["occurrences"] += 1 - - top_products = sorted( - [{"product": k, **v} for k, v in product_stats.items()], - key=lambda x: x["quantity"], - reverse=True - )[:10] - - # Daily sales - daily_stats = {} - for record in records: - date_key = record.date.date().isoformat() - if date_key not in daily_stats: - daily_stats[date_key] = {"quantity": 0, "revenue": 0.0, "products": 0} - daily_stats[date_key]["quantity"] += record.quantity_sold - daily_stats[date_key]["revenue"] += record.revenue or 0 - daily_stats[date_key]["products"] += 1 - - daily_sales = [{"date": k, **v} for k, v in sorted(daily_stats.items())] - - # Data sources - source_stats = {} - for record in records: - source = record.source - if source not in source_stats: - source_stats[source] = 0 - source_stats[source] += 1 - - return { - "total_records": len(records), - "total_revenue": round(total_revenue, 2), - "total_quantity": total_quantity, - "average_revenue_per_sale": round(total_revenue / len(records), 2) if records else 0, - "date_range": { - "start": start_date.isoformat(), - "end": end_date.isoformat() - }, - "top_products": top_products, - "daily_sales": daily_sales, - "data_sources": source_stats - } - - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Failed to generate sales analytics", error=str(e)) + raise HTTPException(status_code=500, detail=f"Failed to generate analytics: {str(e)}") @router.post("/export/{tenant_id}") async def export_sales_data( tenant_id: str, export_format: str = Query("csv", description="Export format: csv, excel, json"), - start_date: datetime = Query(None, description="Start date"), - end_date: datetime = Query(None, description="End date"), - products: List[str] = Query(None, description="Filter by products"), + start_date: Optional[datetime] = Query(None, description="Start date"), + end_date: Optional[datetime] = Query(None, description="End date"), + products: Optional[List[str]] = Query(None, description="Filter by products"), db: AsyncSession = Depends(get_db), current_user: AuthInfo = Depends(get_current_user) ): """Export sales data in specified format""" try: - # Build query - query_conditions = [SalesData.tenant_id == tenant_id] + logger.debug("API: Exporting sales data", tenant_id=tenant_id, format=export_format) - if start_date: - query_conditions.append(SalesData.date >= start_date) - if end_date: - query_conditions.append(SalesData.date <= end_date) - if products: - query_conditions.append(SalesData.product_name.in_(products)) + export_result = await SalesService.export_sales_data( + tenant_id, export_format, start_date, end_date, products, db + ) - stmt = select(SalesData).where(and_(*query_conditions)).order_by(SalesData.date.desc()) - - result = await db.execute(stmt) - records = result.scalars().all() - - if not records: + if not export_result: raise HTTPException(status_code=404, detail="No data found for export") - # Convert to export format - export_data = [] - for record in records: - export_data.append({ - "fecha": record.date.strftime("%d/%m/%Y"), - "producto": record.product_name, - "cantidad": record.quantity_sold, - "ingresos": record.revenue or 0.0, - "ubicacion": record.location_id or "", - "fuente": record.source - }) + logger.debug("Export completed successfully", tenant_id=tenant_id, format=export_format) - if export_format.lower() == "csv": - # Generate CSV - output = io.StringIO() - df = pd.DataFrame(export_data) - df.to_csv(output, index=False) - - return Response( - content=output.getvalue(), - media_type="text/csv", - headers={"Content-Disposition": "attachment; filename=ventas_export.csv"} - ) - - elif export_format.lower() == "excel": - # Generate Excel - output = io.BytesIO() - df = pd.DataFrame(export_data) - df.to_excel(output, index=False, sheet_name="Ventas") - - return Response( - content=output.getvalue(), - media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - headers={"Content-Disposition": "attachment; filename=ventas_export.xlsx"} - ) - - elif export_format.lower() == "json": - return { - "data": export_data, - "total_records": len(export_data), - "export_date": datetime.now().isoformat() - } - - else: - raise HTTPException( - status_code=400, - detail="Formato no soportado. Use: csv, excel, json" - ) + return StreamingResponse( + iter([export_result["content"]]), + media_type=export_result["media_type"], + headers={"Content-Disposition": f"attachment; filename={export_result['filename']}"} + ) + except HTTPException: + raise except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file + logger.error("Failed to export sales data", error=str(e)) + raise HTTPException(status_code=500, detail=f"Failed to export sales data: {str(e)}") + +@router.delete("/{record_id}") +async def delete_sales_record( + record_id: str, + db: AsyncSession = Depends(get_db), + current_user: AuthInfo = Depends(get_current_user) +): + """Delete a sales record""" + try: + logger.debug("API: Deleting sales record", record_id=record_id) + + success = await SalesService.delete_sales_record(record_id, db) + + if not success: + raise HTTPException(status_code=404, detail="Sales record not found") + + logger.debug("Sales record deleted successfully", record_id=record_id) + return {"status": "success", "message": "Sales record deleted successfully"} + + except HTTPException: + raise + except Exception as e: + logger.error("Failed to delete sales record", error=str(e)) + raise HTTPException(status_code=500, detail=f"Failed to delete sales record: {str(e)}") \ No newline at end of file diff --git a/services/data/app/api/traffic.py b/services/data/app/api/traffic.py index ed6d265d..3953ea1d 100644 --- a/services/data/app/api/traffic.py +++ b/services/data/app/api/traffic.py @@ -1,12 +1,13 @@ # ================================================================ -# services/data/app/api/traffic.py +# services/data/app/api/traffic.py - FIXED VERSION # ================================================================ -"""Traffic data API endpoints""" +"""Traffic data API endpoints with improved error handling""" from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession from typing import List, Optional from datetime import datetime, timedelta +import structlog from app.core.database import get_db from app.core.auth import get_current_user, AuthInfo @@ -20,6 +21,7 @@ from app.schemas.external import ( router = APIRouter() traffic_service = TrafficService() +logger = structlog.get_logger() @router.get("/current", response_model=TrafficDataResponse) async def get_current_traffic( @@ -29,21 +31,39 @@ async def get_current_traffic( ): """Get current traffic data for location""" try: + logger.debug("API: Getting current traffic", lat=latitude, lon=longitude) + traffic = await traffic_service.get_current_traffic(latitude, longitude) + if not traffic: + logger.warning("No traffic data available", lat=latitude, lon=longitude) raise HTTPException(status_code=404, detail="Traffic data not available") - # Publish event - await data_publisher.publish_traffic_updated({ - "type": "current_requested", - "latitude": latitude, - "longitude": longitude, - "timestamp": datetime.utcnow().isoformat() - }) + # Publish event (with error handling) + try: + await data_publisher.publish_traffic_updated({ + "type": "current_requested", + "latitude": latitude, + "longitude": longitude, + "timestamp": datetime.utcnow().isoformat() + }) + except Exception as pub_error: + logger.warning("Failed to publish traffic event", error=str(pub_error)) + # Continue processing - event publishing failure shouldn't break the API + logger.debug("Successfully returning traffic data", + volume=traffic.traffic_volume, + congestion=traffic.congestion_level) return traffic + + except HTTPException: + # Re-raise HTTP exceptions + raise except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Unexpected error in traffic API", error=str(e)) + import traceback + logger.error("Traffic API traceback", traceback=traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get("/historical", response_model=List[TrafficDataResponse]) async def get_historical_traffic( @@ -67,17 +87,66 @@ async def get_historical_traffic( latitude, longitude, start_date, end_date, db ) - # Publish event - await data_publisher.publish_traffic_updated({ - "type": "historical_requested", - "latitude": latitude, - "longitude": longitude, - "start_date": start_date.isoformat(), - "end_date": end_date.isoformat(), - "records_count": len(historical_data), - "timestamp": datetime.utcnow().isoformat() - }) + # Publish event (with error handling) + try: + await data_publisher.publish_traffic_updated({ + "type": "historical_requested", + "latitude": latitude, + "longitude": longitude, + "start_date": start_date.isoformat(), + "end_date": end_date.isoformat(), + "records_count": len(historical_data), + "timestamp": datetime.utcnow().isoformat() + }) + except Exception as pub_error: + logger.warning("Failed to publish historical traffic event", error=str(pub_error)) + # Continue processing return historical_data + + except HTTPException: + # Re-raise HTTP exceptions + raise except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file + logger.error("Unexpected error in historical traffic API", error=str(e)) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + +@router.post("/store") +async def store_traffic_data( + latitude: float = Query(..., description="Latitude"), + longitude: float = Query(..., description="Longitude"), + db: AsyncSession = Depends(get_db), + current_user: AuthInfo = Depends(get_current_user) +): + """Store current traffic data to database""" + try: + # Get current traffic data + traffic = await traffic_service.get_current_traffic(latitude, longitude) + + if not traffic: + raise HTTPException(status_code=404, detail="No traffic data to store") + + # Convert to dict for storage + traffic_dict = { + "date": traffic.date, + "traffic_volume": traffic.traffic_volume, + "pedestrian_count": traffic.pedestrian_count, + "congestion_level": traffic.congestion_level, + "average_speed": traffic.average_speed, + "source": traffic.source + } + + success = await traffic_service.store_traffic_data( + latitude, longitude, traffic_dict, db + ) + + if success: + return {"status": "success", "message": "Traffic data stored successfully"} + else: + raise HTTPException(status_code=500, detail="Failed to store traffic data") + + except HTTPException: + raise + except Exception as e: + logger.error("Error storing traffic data", error=str(e)) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") \ No newline at end of file diff --git a/services/data/app/api/weather.py b/services/data/app/api/weather.py index 70435243..1d2c610d 100644 --- a/services/data/app/api/weather.py +++ b/services/data/app/api/weather.py @@ -1,12 +1,13 @@ # ================================================================ -# services/data/app/api/weather.py +# services/data/app/api/weather.py - FIXED VERSION # ================================================================ -"""Weather data API endpoints""" +"""Weather data API endpoints with improved error handling""" from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession from typing import List, Optional from datetime import datetime, timedelta +import structlog from app.core.database import get_db from app.core.auth import get_current_user, AuthInfo @@ -21,6 +22,7 @@ from app.schemas.external import ( router = APIRouter() weather_service = WeatherService() +logger = structlog.get_logger() @router.get("/current", response_model=WeatherDataResponse) async def get_current_weather( @@ -30,13 +32,25 @@ async def get_current_weather( ): """Get current weather for location""" try: + logger.debug("API: Getting current weather", lat=latitude, lon=longitude) + weather = await weather_service.get_current_weather(latitude, longitude) + if not weather: + logger.warning("No weather data available", lat=latitude, lon=longitude) raise HTTPException(status_code=404, detail="Weather data not available") + logger.debug("Successfully returning weather data", temp=weather.temperature) return weather + + except HTTPException: + # Re-raise HTTP exceptions + raise except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Unexpected error in weather API", error=str(e)) + import traceback + logger.error("Weather API traceback", traceback=traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get("/forecast", response_model=List[WeatherForecastResponse]) async def get_weather_forecast( @@ -47,20 +61,38 @@ async def get_weather_forecast( ): """Get weather forecast for location""" try: + logger.debug("API: Getting weather forecast", lat=latitude, lon=longitude, days=days) + forecast = await weather_service.get_weather_forecast(latitude, longitude, days) - # Publish event - await data_publisher.publish_weather_updated({ - "type": "forecast_requested", - "latitude": latitude, - "longitude": longitude, - "days": days, - "timestamp": datetime.utcnow().isoformat() - }) + if not forecast: + logger.warning("No forecast data available", lat=latitude, lon=longitude) + raise HTTPException(status_code=404, detail="Weather forecast not available") + # Publish event (with error handling) + try: + await data_publisher.publish_weather_updated({ + "type": "forecast_requested", + "latitude": latitude, + "longitude": longitude, + "days": days, + "timestamp": datetime.utcnow().isoformat() + }) + except Exception as pub_error: + logger.warning("Failed to publish weather forecast event", error=str(pub_error)) + # Continue processing - event publishing failure shouldn't break the API + + logger.debug("Successfully returning forecast data", count=len(forecast)) return forecast + + except HTTPException: + # Re-raise HTTP exceptions + raise except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Unexpected error in weather forecast API", error=str(e)) + import traceback + logger.error("Weather forecast API traceback", traceback=traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get("/historical", response_model=List[WeatherDataResponse]) async def get_historical_weather( @@ -84,17 +116,68 @@ async def get_historical_weather( latitude, longitude, start_date, end_date, db ) - # Publish event - await data_publisher.publish_weather_updated({ - "type": "historical_requested", - "latitude": latitude, - "longitude": longitude, - "start_date": start_date.isoformat(), - "end_date": end_date.isoformat(), - "records_count": len(historical_data), - "timestamp": datetime.utcnow().isoformat() - }) + # Publish event (with error handling) + try: + await data_publisher.publish_weather_updated({ + "type": "historical_requested", + "latitude": latitude, + "longitude": longitude, + "start_date": start_date.isoformat(), + "end_date": end_date.isoformat(), + "records_count": len(historical_data), + "timestamp": datetime.utcnow().isoformat() + }) + except Exception as pub_error: + logger.warning("Failed to publish historical weather event", error=str(pub_error)) + # Continue processing return historical_data + + except HTTPException: + # Re-raise HTTP exceptions + raise except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + logger.error("Unexpected error in historical weather API", error=str(e)) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + +@router.post("/store") +async def store_weather_data( + latitude: float = Query(..., description="Latitude"), + longitude: float = Query(..., description="Longitude"), + db: AsyncSession = Depends(get_db), + current_user: AuthInfo = Depends(get_current_user) +): + """Store current weather data to database""" + try: + # Get current weather data + weather = await weather_service.get_current_weather(latitude, longitude) + + if not weather: + raise HTTPException(status_code=404, detail="No weather data to store") + + # Convert to dict for storage + weather_dict = { + "date": weather.date, + "temperature": weather.temperature, + "precipitation": weather.precipitation, + "humidity": weather.humidity, + "wind_speed": weather.wind_speed, + "pressure": weather.pressure, + "description": weather.description, + "source": weather.source + } + + success = await weather_service.store_weather_data( + latitude, longitude, weather_dict, db + ) + + if success: + return {"status": "success", "message": "Weather data stored successfully"} + else: + raise HTTPException(status_code=500, detail="Failed to store weather data") + + except HTTPException: + raise + except Exception as e: + logger.error("Error storing weather data", error=str(e)) + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") \ No newline at end of file diff --git a/services/data/app/models/sales.py b/services/data/app/models/sales.py index a416563c..d0072a2c 100644 --- a/services/data/app/models/sales.py +++ b/services/data/app/models/sales.py @@ -1,5 +1,5 @@ # ================================================================ -# services/data/app/models/sales.py +# services/data/app/models/sales.py - MISSING FILE # ================================================================ """Sales data models""" @@ -16,16 +16,17 @@ class SalesData(Base): id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True) date = Column(DateTime, nullable=False, index=True) - product_name = Column(String(200), nullable=False) + product_name = Column(String(255), nullable=False, index=True) quantity_sold = Column(Integer, nullable=False) - revenue = Column(Float, nullable=True) - location_id = Column(String(100), nullable=True) - source = Column(String(50), nullable=False, default="manual") # manual, pos, csv - raw_data = Column(Text, nullable=True) # Store original data for debugging + revenue = Column(Float, nullable=False) + location_id = Column(String(100), nullable=True, index=True) + source = Column(String(50), nullable=False, default="manual") + notes = Column(Text, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) __table_args__ = ( Index('idx_sales_tenant_date', 'tenant_id', 'date'), Index('idx_sales_tenant_product', 'tenant_id', 'product_name'), + Index('idx_sales_tenant_location', 'tenant_id', 'location_id'), ) diff --git a/services/data/app/schemas/sales.py b/services/data/app/schemas/sales.py index d2068087..060a437e 100644 --- a/services/data/app/schemas/sales.py +++ b/services/data/app/schemas/sales.py @@ -1,46 +1,136 @@ # ================================================================ -# services/data/app/schemas/sales.py +# services/data/app/schemas/sales.py - MISSING FILE # ================================================================ """Sales data schemas""" -from pydantic import BaseModel, validator +from pydantic import BaseModel, Field, validator from datetime import datetime -from typing import Optional, List -import uuid +from typing import Optional, List, Dict, Any +from uuid import UUID class SalesDataCreate(BaseModel): - tenant_id: uuid.UUID + """Schema for creating sales data""" + tenant_id: UUID date: datetime - product_name: str - quantity_sold: int - revenue: Optional[float] = None - location_id: Optional[str] = None - source: str = "manual" - raw_data: Optional[str] = None + product_name: str = Field(..., min_length=1, max_length=255) + quantity_sold: int = Field(..., gt=0) + revenue: float = Field(..., gt=0) + location_id: Optional[str] = Field(None, max_length=100) + source: str = Field(default="manual", max_length=50) + notes: Optional[str] = Field(None, max_length=500) + + @validator('product_name') + def normalize_product_name(cls, v): + return v.strip().lower() + + class Config: + from_attributes = True class SalesDataResponse(BaseModel): - id: uuid.UUID - tenant_id: uuid.UUID + """Schema for sales data response""" + id: UUID + tenant_id: UUID date: datetime product_name: str quantity_sold: int - revenue: Optional[float] + revenue: float location_id: Optional[str] source: str + notes: Optional[str] created_at: datetime - updated_at: datetime + updated_at: Optional[datetime] + + class Config: + from_attributes = True + +class SalesDataQuery(BaseModel): + """Schema for querying sales data""" + tenant_id: UUID + start_date: Optional[datetime] = None + end_date: Optional[datetime] = None + product_names: Optional[List[str]] = None + location_ids: Optional[List[str]] = None + sources: Optional[List[str]] = None + min_quantity: Optional[int] = None + max_quantity: Optional[int] = None + min_revenue: Optional[float] = None + max_revenue: Optional[float] = None + limit: Optional[int] = Field(default=1000, le=5000) + offset: Optional[int] = Field(default=0, ge=0) class Config: from_attributes = True class SalesDataImport(BaseModel): - tenant_id: uuid.UUID - data_format: str # csv, excel, pos - data: str # Base64 encoded or JSON string + """Schema for importing sales data""" + tenant_id: UUID + data: str # JSON string or CSV content + data_format: str = Field(..., pattern="^(csv|json|excel)$") + source: str = Field(default="import", max_length=50) + validate_only: bool = Field(default=False) -class SalesDataQuery(BaseModel): - tenant_id: uuid.UUID - start_date: datetime - end_date: datetime + class Config: + from_attributes = True + +class SalesDataBulkCreate(BaseModel): + """Schema for bulk creating sales data""" + tenant_id: UUID + records: List[Dict[str, Any]] + source: str = Field(default="bulk_import", max_length=50) + + class Config: + from_attributes = True + +class SalesValidationResult(BaseModel): + """Schema for sales data validation result""" + is_valid: bool + total_records: int + valid_records: int + invalid_records: int + errors: List[Dict[str, Any]] + warnings: List[Dict[str, Any]] + summary: Dict[str, Any] + + class Config: + from_attributes = True + +class SalesImportResult(BaseModel): + """Schema for sales import result""" + success: bool + records_processed: int + records_created: int + records_updated: int + records_failed: int + errors: List[Dict[str, Any]] + warnings: List[Dict[str, Any]] + processing_time_seconds: float + + class Config: + from_attributes = True + +class SalesAggregation(BaseModel): + """Schema for sales aggregation results""" + period: str # "daily", "weekly", "monthly" + date: datetime product_name: Optional[str] = None - location_id: Optional[str] = None \ No newline at end of file + total_quantity: int + total_revenue: float + average_quantity: float + average_revenue: float + record_count: int + + class Config: + from_attributes = True + +class SalesExportRequest(BaseModel): + """Schema for sales export request""" + tenant_id: UUID + format: str = Field(..., pattern="^(csv|json|excel)$") + start_date: Optional[datetime] = None + end_date: Optional[datetime] = None + product_names: Optional[List[str]] = None + location_ids: Optional[List[str]] = None + include_metadata: bool = Field(default=True) + + class Config: + from_attributes = True \ No newline at end of file diff --git a/services/data/app/services/messaging.py b/services/data/app/services/messaging.py index d3f9f147..1986b170 100644 --- a/services/data/app/services/messaging.py +++ b/services/data/app/services/messaging.py @@ -1,6 +1,7 @@ # ================================================================ -# services/data/app/services/messaging.py +# services/data/app/services/messaging.py - FIXED VERSION # ================================================================ +"""Fixed messaging service with proper error handling""" from shared.messaging.rabbitmq import RabbitMQClient from app.core.config import settings @@ -13,30 +14,105 @@ data_publisher = RabbitMQClient(settings.RABBITMQ_URL, "data-service") async def setup_messaging(): """Initialize messaging for data service""" - success = await data_publisher.connect() - if success: - logger.info("Data service messaging initialized") - else: - logger.warning("Data service messaging failed to initialize") + try: + success = await data_publisher.connect() + if success: + logger.info("Data service messaging initialized") + else: + logger.warning("Data service messaging failed to initialize") + return success + except Exception as e: + logger.warning("Failed to setup messaging", error=str(e)) + return False async def cleanup_messaging(): """Cleanup messaging for data service""" - await data_publisher.disconnect() - logger.info("Data service messaging cleaned up") + try: + await data_publisher.disconnect() + logger.info("Data service messaging cleaned up") + except Exception as e: + logger.warning("Error during messaging cleanup", error=str(e)) -# Convenience functions for data-specific events +# Convenience functions for data-specific events with error handling async def publish_data_imported(data: dict) -> bool: """Publish data imported event""" - return await data_publisher.publish_data_event("imported", data) + try: + return await data_publisher.publish_data_event("imported", data) + except Exception as e: + logger.warning("Failed to publish data imported event", error=str(e)) + return False async def publish_weather_updated(data: dict) -> bool: """Publish weather updated event""" - return await data_publisher.publish_data_event("weather.updated", data) + try: + return await data_publisher.publish_data_event("weather.updated", data) + except Exception as e: + logger.warning("Failed to publish weather updated event", error=str(e)) + return False async def publish_traffic_updated(data: dict) -> bool: """Publish traffic updated event""" - return await data_publisher.publish_data_event("traffic.updated", data) + try: + return await data_publisher.publish_data_event("traffic.updated", data) + except Exception as e: + logger.warning("Failed to publish traffic updated event", error=str(e)) + return False async def publish_sales_created(data: dict) -> bool: """Publish sales created event""" - return await data_publisher.publish_data_event("sales.created", data) + try: + return await data_publisher.publish_data_event("sales.created", data) + except Exception as e: + logger.warning("Failed to publish sales created event", error=str(e)) + return False + +async def publish_analytics_generated(data: dict) -> bool: + """Publish analytics generated event""" + try: + return await data_publisher.publish_data_event("analytics.generated", data) + except Exception as e: + logger.warning("Failed to publish analytics generated event", error=str(e)) + return False + +async def publish_export_completed(data: dict) -> bool: + """Publish export completed event""" + try: + return await data_publisher.publish_data_event("export.completed", data) + except Exception as e: + logger.warning("Failed to publish export completed event", error=str(e)) + return False + +async def publish_import_started(data: dict) -> bool: + """Publish import started event""" + try: + return await data_publisher.publish_data_event("import.started", data) + except Exception as e: + logger.warning("Failed to publish import started event", error=str(e)) + return False + +async def publish_import_completed(data: dict) -> bool: + """Publish import completed event""" + try: + return await data_publisher.publish_data_event("import.completed", data) + except Exception as e: + logger.warning("Failed to publish import completed event", error=str(e)) + return False + +async def publish_import_failed(data: dict) -> bool: + """Publish import failed event""" + try: + return await data_publisher.publish_data_event("import.failed", data) + except Exception as e: + logger.warning("Failed to publish import failed event", error=str(e)) + return False + +# Health check for messaging +async def check_messaging_health() -> dict: + """Check messaging system health""" + try: + if data_publisher.connected: + return {"status": "healthy", "service": "rabbitmq", "connected": True} + else: + return {"status": "unhealthy", "service": "rabbitmq", "connected": False, "error": "Not connected"} + except Exception as e: + return {"status": "unhealthy", "service": "rabbitmq", "connected": False, "error": str(e)} \ No newline at end of file diff --git a/services/data/app/services/sales_service.py b/services/data/app/services/sales_service.py index 87f29839..b360dc69 100644 --- a/services/data/app/services/sales_service.py +++ b/services/data/app/services/sales_service.py @@ -1,108 +1,228 @@ # ================================================================ -# services/data/app/services/sales_service.py +# services/data/app/services/sales_service.py - SIMPLIFIED VERSION # ================================================================ -"""Sales data service""" +"""Sales service without notes column for now""" -import csv -import io -import json from typing import List, Dict, Any, Optional from datetime import datetime -import pandas as pd from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, and_ +from sqlalchemy import select, and_, func, desc import structlog +import uuid from app.models.sales import SalesData -from app.schemas.sales import SalesDataCreate, SalesDataResponse, SalesDataQuery +from app.schemas.sales import ( + SalesDataCreate, + SalesDataResponse, + SalesDataQuery +) logger = structlog.get_logger() class SalesService: @staticmethod - async def create_sales_record(data: SalesDataCreate, db: AsyncSession) -> SalesData: + async def create_sales_record(sales_data: SalesDataCreate, db: AsyncSession) -> SalesDataResponse: """Create a new sales record""" - sales_record = SalesData(**data.model_dump()) - db.add(sales_record) - await db.commit() - await db.refresh(sales_record) - return sales_record - - @staticmethod - async def get_sales_data(query: SalesDataQuery, db: AsyncSession) -> List[SalesData]: - """Get sales data by query parameters""" - stmt = select(SalesData).where( - and_( - SalesData.tenant_id == query.tenant_id, - SalesData.date >= query.start_date, - SalesData.date <= query.end_date - ) - ) - - if query.product_name: - stmt = stmt.where(SalesData.product_name.ilike(f"%{query.product_name}%")) - - if query.location_id: - stmt = stmt.where(SalesData.location_id == query.location_id) - - result = await db.execute(stmt) - return result.scalars().all() - - @staticmethod - async def import_csv_data(tenant_id: str, csv_content: str, db: AsyncSession) -> Dict[str, Any]: - """Import sales data from CSV""" try: - # Parse CSV - csv_file = io.StringIO(csv_content) - df = pd.read_csv(csv_file) + # Create new sales record without notes and updated_at for now + db_record = SalesData( + id=uuid.uuid4(), + tenant_id=sales_data.tenant_id, + date=sales_data.date, + product_name=sales_data.product_name, + quantity_sold=sales_data.quantity_sold, + revenue=sales_data.revenue, + location_id=sales_data.location_id, + source=sales_data.source, + created_at=datetime.utcnow() + # Skip notes and updated_at until database is migrated + ) - # Validate and clean data - records_created = 0 - errors = [] + db.add(db_record) + await db.commit() + await db.refresh(db_record) - for index, row in df.iterrows(): - try: - # Parse date (handle multiple formats) - date_str = str(row.get('date', row.get('fecha', ''))) - date = pd.to_datetime(date_str, dayfirst=True) - - # Clean product name - product_name = str(row.get('product', row.get('producto', ''))).strip() - - # Parse quantity - quantity = int(row.get('quantity', row.get('cantidad', 0))) - - # Parse revenue (optional) - revenue = None - revenue_col = row.get('revenue', row.get('ingresos', None)) - if revenue_col and pd.notna(revenue_col): - revenue = float(revenue_col) - - # Create sales record - sales_data = SalesDataCreate( - tenant_id=tenant_id, - date=date, - product_name=product_name, - quantity_sold=quantity, - revenue=revenue, - source="csv", - raw_data=json.dumps(row.to_dict()) - ) - - await SalesService.create_sales_record(sales_data, db) - records_created += 1 - - except Exception as e: - errors.append(f"Row {index + 1}: {str(e)}") - continue + logger.debug("Sales record created", record_id=db_record.id, product=db_record.product_name) - return { - "success": True, - "records_created": records_created, - "errors": errors, - "total_records": len(data) - } + return SalesDataResponse( + id=db_record.id, + tenant_id=db_record.tenant_id, + date=db_record.date, + product_name=db_record.product_name, + quantity_sold=db_record.quantity_sold, + revenue=db_record.revenue, + location_id=db_record.location_id, + source=db_record.source, + notes=None, # Always None for now + created_at=db_record.created_at, + updated_at=None # Always None for now + ) except Exception as e: - return {"success": False, "error": f"JSON processing failed: {str(e)}"} + await db.rollback() + logger.error("Failed to create sales record", error=str(e)) + raise + + @staticmethod + async def get_sales_data(query: SalesDataQuery, db: AsyncSession) -> List[SalesDataResponse]: + """Get sales data based on query parameters""" + try: + # Build query conditions + conditions = [SalesData.tenant_id == query.tenant_id] + + if query.start_date: + conditions.append(SalesData.date >= query.start_date) + if query.end_date: + conditions.append(SalesData.date <= query.end_date) + if query.product_names: + conditions.append(SalesData.product_name.in_(query.product_names)) + if query.location_ids: + conditions.append(SalesData.location_id.in_(query.location_ids)) + if query.sources: + conditions.append(SalesData.source.in_(query.sources)) + if query.min_quantity: + conditions.append(SalesData.quantity_sold >= query.min_quantity) + if query.max_quantity: + conditions.append(SalesData.quantity_sold <= query.max_quantity) + if query.min_revenue: + conditions.append(SalesData.revenue >= query.min_revenue) + if query.max_revenue: + conditions.append(SalesData.revenue <= query.max_revenue) + + # Execute query + stmt = select(SalesData).where(and_(*conditions)).order_by(desc(SalesData.date)) + + if query.limit: + stmt = stmt.limit(query.limit) + if query.offset: + stmt = stmt.offset(query.offset) + + result = await db.execute(stmt) + records = result.scalars().all() + + logger.debug("Sales data retrieved", count=len(records), tenant_id=query.tenant_id) + + return [SalesDataResponse( + id=record.id, + tenant_id=record.tenant_id, + date=record.date, + product_name=record.product_name, + quantity_sold=record.quantity_sold, + revenue=record.revenue, + location_id=record.location_id, + source=record.source, + notes=None, # Always None for now + created_at=record.created_at, + updated_at=None # Always None for now + ) for record in records] + + except Exception as e: + logger.error("Failed to retrieve sales data", error=str(e)) + raise + + @staticmethod + async def get_sales_analytics(tenant_id: str, start_date: Optional[datetime], + end_date: Optional[datetime], db: AsyncSession) -> Dict[str, Any]: + """Get basic sales analytics""" + try: + conditions = [SalesData.tenant_id == tenant_id] + + if start_date: + conditions.append(SalesData.date >= start_date) + if end_date: + conditions.append(SalesData.date <= end_date) + + # Total sales + total_stmt = select( + func.sum(SalesData.quantity_sold).label('total_quantity'), + func.sum(SalesData.revenue).label('total_revenue'), + func.count(SalesData.id).label('total_records') + ).where(and_(*conditions)) + + total_result = await db.execute(total_stmt) + totals = total_result.first() + + analytics = { + "total_quantity": int(totals.total_quantity or 0), + "total_revenue": float(totals.total_revenue or 0.0), + "total_records": int(totals.total_records or 0), + "average_order_value": float(totals.total_revenue or 0.0) / max(totals.total_records or 1, 1), + "date_range": { + "start": start_date.isoformat() if start_date else None, + "end": end_date.isoformat() if end_date else None + } + } + + logger.debug("Sales analytics generated", tenant_id=tenant_id, total_records=analytics["total_records"]) + return analytics + + except Exception as e: + logger.error("Failed to generate sales analytics", error=str(e)) + raise + + @staticmethod + async def export_sales_data(tenant_id: str, export_format: str, start_date: Optional[datetime], + end_date: Optional[datetime], products: Optional[List[str]], + db: AsyncSession) -> Optional[Dict[str, Any]]: + """Export sales data in specified format""" + try: + # Build query conditions + conditions = [SalesData.tenant_id == tenant_id] + + if start_date: + conditions.append(SalesData.date >= start_date) + if end_date: + conditions.append(SalesData.date <= end_date) + if products: + conditions.append(SalesData.product_name.in_(products)) + + stmt = select(SalesData).where(and_(*conditions)).order_by(desc(SalesData.date)) + result = await db.execute(stmt) + records = result.scalars().all() + + if not records: + return None + + # Simple CSV export + if export_format.lower() == "csv": + import io + output = io.StringIO() + output.write("date,product_name,quantity_sold,revenue,location_id,source\n") + + for record in records: + output.write(f"{record.date},{record.product_name},{record.quantity_sold},{record.revenue},{record.location_id or ''},{record.source}\n") + + return { + "content": output.getvalue(), + "media_type": "text/csv", + "filename": f"sales_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + } + + return None + + except Exception as e: + logger.error("Failed to export sales data", error=str(e)) + raise + + @staticmethod + async def delete_sales_record(record_id: str, db: AsyncSession) -> bool: + """Delete a sales record""" + try: + stmt = select(SalesData).where(SalesData.id == record_id) + result = await db.execute(stmt) + record = result.scalar_one_or_none() + + if not record: + return False + + await db.delete(record) + await db.commit() + + logger.debug("Sales record deleted", record_id=record_id) + return True + + except Exception as e: + await db.rollback() + logger.error("Failed to delete sales record", error=str(e)) + raise \ No newline at end of file diff --git a/services/data/app/services/traffic_service.py b/services/data/app/services/traffic_service.py index 1315ed5e..3bf815dd 100644 --- a/services/data/app/services/traffic_service.py +++ b/services/data/app/services/traffic_service.py @@ -30,22 +30,29 @@ class TrafficService: logger.debug("Traffic data received", source=traffic_data.get('source')) # Validate and clean traffic data before creating response - validated_data = { - "date": traffic_data.get("date", datetime.now()), - "traffic_volume": int(traffic_data.get("traffic_volume", 100)), - "pedestrian_count": int(traffic_data.get("pedestrian_count", 150)), - "congestion_level": str(traffic_data.get("congestion_level", "medium")), - "average_speed": int(traffic_data.get("average_speed", 25)), - "source": str(traffic_data.get("source", "unknown")) - } + # Use keyword arguments instead of unpacking + response = TrafficDataResponse( + date=traffic_data.get("date", datetime.now()), + traffic_volume=int(traffic_data.get("traffic_volume", 100)), + pedestrian_count=int(traffic_data.get("pedestrian_count", 150)), + congestion_level=str(traffic_data.get("congestion_level", "medium")), + average_speed=float(traffic_data.get("average_speed", 25.0)), # Fixed: use float, not int + source=str(traffic_data.get("source", "unknown")) + ) - return TrafficDataResponse(**validated_data) + logger.debug("Successfully created traffic response", + traffic_volume=response.traffic_volume, + congestion_level=response.congestion_level) + return response else: logger.warning("No traffic data received from Madrid client") return None except Exception as e: logger.error("Failed to get current traffic", error=str(e), lat=latitude, lon=longitude) + # Log the full traceback for debugging + import traceback + logger.error("Traffic service traceback", traceback=traceback.format_exc()) return None async def get_historical_traffic(self, @@ -83,41 +90,41 @@ class TrafficService: average_speed=record.average_speed, source=record.source ) for record in db_records] - - # Fetch from API if not in database - logger.debug("Fetching historical traffic data from Madrid API") - traffic_data = await self.madrid_client.get_historical_traffic( - latitude, longitude, start_date, end_date - ) - - if traffic_data: - # Store in database for future use - try: - for data in traffic_data: - if isinstance(data, dict): - traffic_record = TrafficData( - location_id=location_id, - date=data.get('date', datetime.now()), - traffic_volume=data.get('traffic_volume'), - pedestrian_count=data.get('pedestrian_count'), - congestion_level=data.get('congestion_level'), - average_speed=data.get('average_speed'), - source="madrid_opendata", - raw_data=str(data) - ) - db.add(traffic_record) - - await db.commit() - logger.debug("Historical traffic data stored in database", count=len(traffic_data)) - except Exception as db_error: - logger.warning("Failed to store historical traffic data", error=str(db_error)) - await db.rollback() - - return [TrafficDataResponse(**item) for item in traffic_data if isinstance(item, dict)] else: - logger.warning("No historical traffic data received") + logger.debug("No historical traffic data found in database") return [] - + except Exception as e: logger.error("Failed to get historical traffic", error=str(e)) - return [] \ No newline at end of file + return [] + + async def store_traffic_data(self, + latitude: float, + longitude: float, + traffic_data: Dict[str, Any], + db: AsyncSession) -> bool: + """Store traffic data to database""" + try: + location_id = f"{latitude:.4f},{longitude:.4f}" + + traffic_record = TrafficData( + location_id=location_id, + date=traffic_data.get("date", datetime.now()), + traffic_volume=traffic_data.get("traffic_volume"), + pedestrian_count=traffic_data.get("pedestrian_count"), + congestion_level=traffic_data.get("congestion_level"), + average_speed=traffic_data.get("average_speed"), + source=traffic_data.get("source", "madrid_opendata"), + raw_data=str(traffic_data) if traffic_data else None + ) + + db.add(traffic_record) + await db.commit() + + logger.debug("Traffic data stored successfully", location_id=location_id) + return True + + except Exception as e: + logger.error("Failed to store traffic data", error=str(e)) + await db.rollback() + return False \ No newline at end of file