From befcc126b0cb1de7c540fc1069d7c8ae68ad2c25 Mon Sep 17 00:00:00 2001 From: Urtzi Alfaro Date: Mon, 29 Sep 2025 13:13:12 +0200 Subject: [PATCH] Refactor all main.py --- HEALTH_CHECKS.md | 281 ++++++++++++ gateway/requirements.txt | 4 +- .../base/components/auth/auth-service.yaml | 10 +- .../components/external/external-service.yaml | 10 +- .../forecasting/forecasting-service.yaml | 10 +- .../inventory/inventory-service.yaml | 10 +- .../components/microservice-template.yaml | 10 +- .../notification/notification-service.yaml | 10 +- .../components/orders/orders-service.yaml | 10 +- .../base/components/pos/pos-service.yaml | 10 +- .../production/production-service.yaml | 10 +- .../components/recipes/recipes-service.yaml | 10 +- .../base/components/sales/sales-service.yaml | 10 +- .../suppliers/suppliers-service.yaml | 10 +- .../components/tenant/tenant-service.yaml | 10 +- .../components/training/training-service.yaml | 10 +- services/auth/app/main.py | 272 +++++------ services/external/app/main.py | 320 ++++++------- services/forecasting/app/core/database.py | 89 ++-- services/forecasting/app/main.py | 241 +++++----- services/inventory/app/main.py | 230 +++------- services/notification/app/main.py | 410 ++++++++--------- services/orders/app/core/database.py | 14 +- services/orders/app/main.py | 137 ++---- services/pos/app/main.py | 239 +++++----- services/production/app/main.py | 198 ++++---- services/recipes/app/main.py | 187 +++----- services/recipes/requirements.txt | 4 + services/sales/app/main.py | 255 +++++------ services/suppliers/app/main.py | 191 +++----- services/tenant/app/main.py | 140 +++--- services/training/app/main.py | 365 +++++---------- shared/monitoring/__init__.py | 14 +- shared/monitoring/health_checks.py | 370 +++++++++++++++ shared/service_base.py | 429 ++++++++++++++++++ 35 files changed, 2537 insertions(+), 1993 deletions(-) create mode 100644 HEALTH_CHECKS.md create mode 100644 shared/monitoring/health_checks.py create mode 100644 shared/service_base.py diff --git a/HEALTH_CHECKS.md b/HEALTH_CHECKS.md new file mode 100644 index 00000000..60f9a946 --- /dev/null +++ b/HEALTH_CHECKS.md @@ -0,0 +1,281 @@ +# Unified Health Check System + +This document describes the unified health check system implemented across all microservices in the bakery-ia platform. + +## Overview + +The unified health check system provides standardized health monitoring endpoints across all services, with comprehensive database verification, Kubernetes integration, and detailed health reporting. + +## Key Features + +- **Standardized Endpoints**: All services now provide the same health check endpoints +- **Database Verification**: Comprehensive database health checks including table existence verification +- **Kubernetes Integration**: Proper separation of liveness and readiness probes +- **Detailed Reporting**: Rich health status information for debugging and monitoring +- **App State Integration**: Health checks automatically detect service ready state + +## Health Check Endpoints + +### `/health` - Basic Health Check +- **Purpose**: Basic service health status +- **Use Case**: General health monitoring, API gateways +- **Response**: Service name, version, status, and timestamp +- **Status Codes**: 200 (healthy/starting) + +### `/health/ready` - Kubernetes Readiness Probe +- **Purpose**: Indicates if service is ready to receive traffic +- **Use Case**: Kubernetes readiness probe, load balancer health checks +- **Checks**: Application state, database connectivity, table verification, custom checks +- **Status Codes**: 200 (ready), 503 (not ready) + +### `/health/live` - Kubernetes Liveness Probe +- **Purpose**: Indicates if service is alive and should not be restarted +- **Use Case**: Kubernetes liveness probe +- **Response**: Simple alive status +- **Status Codes**: 200 (alive) + +### `/health/database` - Detailed Database Health +- **Purpose**: Comprehensive database health information for debugging +- **Use Case**: Database monitoring, troubleshooting +- **Checks**: Connectivity, table existence, connection pool status, response times +- **Status Codes**: 200 (healthy), 503 (unhealthy) + +## Implementation + +### Services Updated + +The following services have been updated to use the unified health check system: + +1. **Training Service** (`training-service`) + - Full implementation with database manager integration + - Table verification for ML training tables + - Expected tables: `model_training_logs`, `trained_models`, `model_performance_metrics`, `training_job_queue`, `model_artifacts` + +2. **Orders Service** (`orders-service`) + - Legacy database integration with custom health checks + - Expected tables: `customers`, `customer_contacts`, `customer_orders`, `order_items`, `order_status_history`, `procurement_plans`, `procurement_requirements` + +3. **Inventory Service** (`inventory-service`) + - Full database manager integration + - Food safety and inventory table verification + - Expected tables: `ingredients`, `stock`, `stock_movements`, `product_transformations`, `stock_alerts`, `food_safety_compliance`, `temperature_logs`, `food_safety_alerts` + +### Code Integration + +#### Basic Setup +```python +from shared.monitoring.health_checks import setup_fastapi_health_checks + +# Setup unified health checks +health_manager = setup_fastapi_health_checks( + app=app, + service_name="my-service", + version="1.0.0", + database_manager=database_manager, + expected_tables=['table1', 'table2'], + custom_checks={"custom_check": custom_check_function} +) +``` + +#### With Custom Checks +```python +async def custom_health_check(): + """Custom health check function""" + return await some_service_check() + +health_manager = setup_fastapi_health_checks( + app=app, + service_name="my-service", + version="1.0.0", + database_manager=database_manager, + expected_tables=['table1', 'table2'], + custom_checks={"external_service": custom_health_check} +) +``` + +#### Service Ready State +```python +# In your lifespan function +async def lifespan(app: FastAPI): + # Startup logic + await initialize_service() + + # Mark service as ready + app.state.ready = True + + yield + + # Shutdown logic +``` + +## Kubernetes Configuration + +### Updated Probe Configuration + +The microservice template and specific service configurations have been updated to use the new endpoints: + +```yaml +livenessProbe: + httpGet: + path: /health/live + port: 8000 + initialDelaySeconds: 30 + timeoutSeconds: 5 + periodSeconds: 10 + failureThreshold: 3 + +readinessProbe: + httpGet: + path: /health/ready + port: 8000 + initialDelaySeconds: 15 + timeoutSeconds: 3 + periodSeconds: 5 + failureThreshold: 5 +``` + +### Key Changes from Previous Configuration + +1. **Liveness Probe**: Now uses `/health/live` instead of `/health` +2. **Readiness Probe**: Now uses `/health/ready` instead of `/health` +3. **Improved Timing**: Adjusted timeouts and failure thresholds for better reliability +4. **Separate Concerns**: Liveness and readiness are now properly separated + +## Health Check Response Examples + +### Basic Health Check Response +```json +{ + "status": "healthy", + "service": "training-service", + "version": "1.0.0", + "timestamp": "2025-01-27T10:30:00Z" +} +``` + +### Readiness Check Response (Ready) +```json +{ + "status": "ready", + "checks": { + "application": true, + "database_connectivity": true, + "database_tables": true + }, + "database": { + "status": "healthy", + "tables_verified": ["model_training_logs", "trained_models"], + "missing_tables": [], + "errors": [] + } +} +``` + +### Database Health Response +```json +{ + "status": "healthy", + "connectivity": true, + "tables_exist": true, + "tables_verified": ["model_training_logs", "trained_models"], + "missing_tables": [], + "errors": [], + "connection_info": { + "service_name": "training-service", + "database_type": "postgresql", + "pool_size": 20, + "current_checked_out": 2 + }, + "response_time_ms": 15.23 +} +``` + +## Testing + +### Manual Testing +```bash +# Test all endpoints for a running service +curl http://localhost:8000/health +curl http://localhost:8000/health/ready +curl http://localhost:8000/health/live +curl http://localhost:8000/health/database +``` + +### Automated Testing +Use the provided test script: +```bash +python test_unified_health_checks.py +``` + +## Migration Guide + +### For Existing Services + +1. **Add Health Check Import**: + ```python + from shared.monitoring.health_checks import setup_fastapi_health_checks + ``` + +2. **Add Database Manager Import** (if using shared database): + ```python + from app.core.database import database_manager + ``` + +3. **Setup Health Checks** (after app creation, before router inclusion): + ```python + health_manager = setup_fastapi_health_checks( + app=app, + service_name="your-service-name", + version=settings.VERSION, + database_manager=database_manager, + expected_tables=["table1", "table2"] + ) + ``` + +4. **Remove Old Health Endpoints**: + Remove any existing `@app.get("/health")` endpoints + +5. **Add Ready State Management**: + ```python + # In lifespan function after successful startup + app.state.ready = True + ``` + +6. **Update Kubernetes Configuration**: + Update deployment YAML to use new probe endpoints + +### For Services Using Legacy Database + +If your service doesn't use the shared database manager: + +```python +async def legacy_database_check(): + """Custom health check for legacy database""" + return await your_db_health_check() + +health_manager = setup_fastapi_health_checks( + app=app, + service_name="your-service", + version=settings.VERSION, + database_manager=None, + expected_tables=None, + custom_checks={"legacy_database": legacy_database_check} +) +``` + +## Benefits + +1. **Consistency**: All services now provide the same health check interface +2. **Better Kubernetes Integration**: Proper separation of liveness and readiness concerns +3. **Enhanced Debugging**: Detailed health information for troubleshooting +4. **Database Verification**: Comprehensive database health checks including table verification +5. **Monitoring Ready**: Rich health status information for monitoring systems +6. **Maintainability**: Centralized health check logic reduces code duplication + +## Future Enhancements + +1. **Metrics Integration**: Add Prometheus metrics for health check performance +2. **Circuit Breaker**: Implement circuit breaker pattern for external service checks +3. **Health Check Dependencies**: Add dependency health checks between services +4. **Performance Thresholds**: Add configurable performance thresholds for health checks +5. **Health Check Scheduling**: Add scheduled background health checks \ No newline at end of file diff --git a/gateway/requirements.txt b/gateway/requirements.txt index 44b64e7b..9bc15706 100644 --- a/gateway/requirements.txt +++ b/gateway/requirements.txt @@ -14,4 +14,6 @@ aio-pika==9.3.0 pytz==2023.3 python-logstash==0.4.8 structlog==23.2.0 -websockets==12.0 \ No newline at end of file +websockets==12.0 +sqlalchemy==2.0.23 +asyncpg==0.29.0 \ No newline at end of file diff --git a/infrastructure/kubernetes/base/components/auth/auth-service.yaml b/infrastructure/kubernetes/base/components/auth/auth-service.yaml index 255eecff..ade8d6d1 100644 --- a/infrastructure/kubernetes/base/components/auth/auth-service.yaml +++ b/infrastructure/kubernetes/base/components/auth/auth-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/external/external-service.yaml b/infrastructure/kubernetes/base/components/external/external-service.yaml index 701a6101..2b443cde 100644 --- a/infrastructure/kubernetes/base/components/external/external-service.yaml +++ b/infrastructure/kubernetes/base/components/external/external-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/forecasting/forecasting-service.yaml b/infrastructure/kubernetes/base/components/forecasting/forecasting-service.yaml index 97b4ce46..25e88abc 100644 --- a/infrastructure/kubernetes/base/components/forecasting/forecasting-service.yaml +++ b/infrastructure/kubernetes/base/components/forecasting/forecasting-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/inventory/inventory-service.yaml b/infrastructure/kubernetes/base/components/inventory/inventory-service.yaml index e86bd84d..15e8c597 100644 --- a/infrastructure/kubernetes/base/components/inventory/inventory-service.yaml +++ b/infrastructure/kubernetes/base/components/inventory/inventory-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/microservice-template.yaml b/infrastructure/kubernetes/base/components/microservice-template.yaml index e02df7c8..078e2a59 100644 --- a/infrastructure/kubernetes/base/components/microservice-template.yaml +++ b/infrastructure/kubernetes/base/components/microservice-template.yaml @@ -115,7 +115,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -123,12 +123,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/notification/notification-service.yaml b/infrastructure/kubernetes/base/components/notification/notification-service.yaml index 80a82e2f..55b856c7 100644 --- a/infrastructure/kubernetes/base/components/notification/notification-service.yaml +++ b/infrastructure/kubernetes/base/components/notification/notification-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/orders/orders-service.yaml b/infrastructure/kubernetes/base/components/orders/orders-service.yaml index 8d081078..f11d19fa 100644 --- a/infrastructure/kubernetes/base/components/orders/orders-service.yaml +++ b/infrastructure/kubernetes/base/components/orders/orders-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/pos/pos-service.yaml b/infrastructure/kubernetes/base/components/pos/pos-service.yaml index 8bf9f7be..53c438f0 100644 --- a/infrastructure/kubernetes/base/components/pos/pos-service.yaml +++ b/infrastructure/kubernetes/base/components/pos/pos-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/production/production-service.yaml b/infrastructure/kubernetes/base/components/production/production-service.yaml index 0bd253c1..6ba5136c 100644 --- a/infrastructure/kubernetes/base/components/production/production-service.yaml +++ b/infrastructure/kubernetes/base/components/production/production-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/recipes/recipes-service.yaml b/infrastructure/kubernetes/base/components/recipes/recipes-service.yaml index e2c6b976..4bc03b95 100644 --- a/infrastructure/kubernetes/base/components/recipes/recipes-service.yaml +++ b/infrastructure/kubernetes/base/components/recipes/recipes-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/sales/sales-service.yaml b/infrastructure/kubernetes/base/components/sales/sales-service.yaml index 1070d933..94d52431 100644 --- a/infrastructure/kubernetes/base/components/sales/sales-service.yaml +++ b/infrastructure/kubernetes/base/components/sales/sales-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/suppliers/suppliers-service.yaml b/infrastructure/kubernetes/base/components/suppliers/suppliers-service.yaml index 52a3de90..28edcfff 100644 --- a/infrastructure/kubernetes/base/components/suppliers/suppliers-service.yaml +++ b/infrastructure/kubernetes/base/components/suppliers/suppliers-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/tenant/tenant-service.yaml b/infrastructure/kubernetes/base/components/tenant/tenant-service.yaml index 6d0ac2c5..acec09b1 100644 --- a/infrastructure/kubernetes/base/components/tenant/tenant-service.yaml +++ b/infrastructure/kubernetes/base/components/tenant/tenant-service.yaml @@ -57,7 +57,7 @@ spec: cpu: "500m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 30 timeoutSeconds: 5 @@ -65,12 +65,12 @@ spec: failureThreshold: 3 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 5 - timeoutSeconds: 1 + initialDelaySeconds: 15 + timeoutSeconds: 3 periodSeconds: 5 - failureThreshold: 3 + failureThreshold: 5 --- apiVersion: v1 diff --git a/infrastructure/kubernetes/base/components/training/training-service.yaml b/infrastructure/kubernetes/base/components/training/training-service.yaml index b5ef169a..a757c2b9 100644 --- a/infrastructure/kubernetes/base/components/training/training-service.yaml +++ b/infrastructure/kubernetes/base/components/training/training-service.yaml @@ -65,7 +65,7 @@ spec: cpu: "2000m" livenessProbe: httpGet: - path: /health + path: /health/live port: 8000 initialDelaySeconds: 60 timeoutSeconds: 30 @@ -73,12 +73,12 @@ spec: failureThreshold: 5 readinessProbe: httpGet: - path: /health + path: /health/ready port: 8000 - initialDelaySeconds: 15 - timeoutSeconds: 10 + initialDelaySeconds: 30 + timeoutSeconds: 15 periodSeconds: 15 - failureThreshold: 3 + failureThreshold: 5 volumes: - name: training-state persistentVolumeClaim: diff --git a/services/auth/app/main.py b/services/auth/app/main.py index 8184d3eb..72c3295d 100644 --- a/services/auth/app/main.py +++ b/services/auth/app/main.py @@ -1,180 +1,120 @@ """ -Authentication Service Main Application - Fixed middleware issue +Authentication Service Main Application """ -import structlog -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse -from contextlib import asynccontextmanager - +from fastapi import FastAPI from app.core.config import settings -from app.core.database import engine, create_tables +from app.core.database import database_manager from app.api import auth, users, onboarding from app.services.messaging import setup_messaging, cleanup_messaging -from shared.monitoring import setup_logging, HealthChecker -from shared.monitoring.metrics import setup_metrics_early +from shared.service_base import StandardFastAPIService -# Setup logging first -setup_logging("auth-service", settings.LOG_LEVEL) -logger = structlog.get_logger() -# Global variables for lifespan access -metrics_collector = None -health_checker = None +class AuthService(StandardFastAPIService): + """Authentication Service with standardized setup""" -# Create FastAPI app FIRST -app = FastAPI( - title="Authentication Service", - description="Handles user authentication and authorization for bakery forecasting platform", - version="1.0.0", + def __init__(self): + # Define expected database tables for health checks + auth_expected_tables = [ + 'users', 'refresh_tokens', 'user_onboarding_progress', + 'user_onboarding_summary', 'login_attempts' + ] + + # Define custom metrics for auth service + auth_custom_metrics = { + "registration_total": { + "type": "counter", + "description": "Total user registrations by status", + "labels": ["status"] + }, + "login_success_total": { + "type": "counter", + "description": "Total successful user logins" + }, + "login_failure_total": { + "type": "counter", + "description": "Total failed user logins by reason", + "labels": ["reason"] + }, + "token_refresh_total": { + "type": "counter", + "description": "Total token refreshes by status", + "labels": ["status"] + }, + "token_verify_total": { + "type": "counter", + "description": "Total token verifications by status", + "labels": ["status"] + }, + "logout_total": { + "type": "counter", + "description": "Total user logouts by status", + "labels": ["status"] + }, + "registration_duration_seconds": { + "type": "histogram", + "description": "Registration request duration" + }, + "login_duration_seconds": { + "type": "histogram", + "description": "Login request duration" + }, + "token_refresh_duration_seconds": { + "type": "histogram", + "description": "Token refresh duration" + } + } + + super().__init__( + service_name="auth-service", + app_name="Authentication Service", + description="Handles user authentication and authorization for bakery forecasting platform", + version="1.0.0", + log_level=settings.LOG_LEVEL, + api_prefix="/api/v1", + database_manager=database_manager, + expected_tables=auth_expected_tables, + enable_messaging=True, + custom_metrics=auth_custom_metrics + ) + + async def _setup_messaging(self): + """Setup messaging for auth service""" + await setup_messaging() + self.logger.info("Messaging setup complete") + + async def _cleanup_messaging(self): + """Cleanup messaging for auth service""" + await cleanup_messaging() + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for auth service""" + self.logger.info("Authentication Service shutdown complete") + + def get_service_features(self): + """Return auth-specific features""" + return [ + "user_authentication", + "token_management", + "user_onboarding", + "role_based_access", + "messaging_integration" + ] + + +# Create service instance +service = AuthService() + +# Create FastAPI app with standardized setup +app = service.create_app( docs_url="/docs", redoc_url="/redoc" ) -# Setup metrics BEFORE any middleware and BEFORE lifespan -# This must happen before the app starts -metrics_collector = setup_metrics_early(app, "auth-service") +# Setup standard endpoints +service.setup_standard_endpoints() -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan events - NO MIDDLEWARE ADDED HERE""" - global health_checker - - # Startup - logger.info("Starting Authentication Service...") - - try: - # Create database tables - await create_tables() - logger.info("Database tables created") - - # Setup messaging - await setup_messaging() - logger.info("Messaging setup complete") - - # Register custom metrics (metrics_collector already exists) - metrics_collector.register_counter( - "registration_total", - "Total user registrations by status", - labels=["status"] # Add this line - ) - metrics_collector.register_counter( - "login_success_total", - "Total successful user logins" - ) - metrics_collector.register_counter( - "login_failure_total", - "Total failed user logins by reason", - labels=["reason"] # Add this line, based on auth.py usage - ) - metrics_collector.register_counter( - "token_refresh_total", - "Total token refreshes by status", - labels=["status"] # Add this line - ) - metrics_collector.register_counter( - "token_verify_total", - "Total token verifications by status", - labels=["status"] # Add this line - ) - metrics_collector.register_counter( - "logout_total", - "Total user logouts by status", - labels=["status"] # Add this line - ) - metrics_collector.register_counter("errors_total", "Total errors", labels=["type"]) # Add this line - metrics_collector.register_histogram("registration_duration_seconds", "Registration request duration") - metrics_collector.register_histogram("login_duration_seconds", "Login request duration") - metrics_collector.register_histogram("token_refresh_duration_seconds", "Token refresh duration") - - # Setup health checker - health_checker = HealthChecker("auth-service") - - # Add database health check - async def check_database(): - try: - from app.core.database import get_db - from sqlalchemy import text - async for db in get_db(): - await db.execute(text("SELECT 1")) - return True - except Exception as e: - return f"Database error: {e}" - - health_checker.add_check("database", check_database, timeout=5.0, critical=True) - - # Add messaging health check - def check_messaging(): - try: - # Add your messaging health check logic here - return True - except Exception as e: - return f"Messaging error: {e}" - - health_checker.add_check("messaging", check_messaging, timeout=3.0, critical=False) - - # Store health checker in app state - app.state.health_checker = health_checker - - logger.info("Authentication Service started successfully") - - except Exception as e: - logger.error(f"Failed to start Authentication Service: {e}") - raise - - yield - - # Shutdown - logger.info("Shutting down Authentication Service...") - try: - await cleanup_messaging() - await engine.dispose() - logger.info("Authentication Service shutdown complete") - except Exception as e: - logger.error(f"Error during shutdown: {e}") - -# Set lifespan AFTER metrics setup -app.router.lifespan_context = lifespan - -# CORS middleware (added after metrics setup) -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # Configure properly for production - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -# Include routers -app.include_router(auth.router, prefix="/api/v1/auth", tags=["authentication"]) -app.include_router(users.router, prefix="/api/v1/users", tags=["users"]) -app.include_router(onboarding.router, prefix="/api/v1/users", tags=["onboarding"]) - -# Health check endpoint with comprehensive checks -@app.get("/health") -async def health_check(): - """Comprehensive health check endpoint""" - if health_checker: - return await health_checker.check_health() - else: - return { - "service": "auth-service", - "status": "healthy", - "version": "1.0.0" - } - -# Exception handlers -@app.exception_handler(Exception) -async def global_exception_handler(request: Request, exc: Exception): - """Global exception handler with metrics""" - logger.error(f"Unhandled exception: {exc}", exc_info=True) - - # Record error metric if available - if metrics_collector: - metrics_collector.increment_counter("errors_total", labels={"type": "unhandled"}) - - return JSONResponse( - status_code=500, - content={"detail": "Internal server error"} - ) \ No newline at end of file +# Include routers with specific configurations +service.add_router(auth.router, prefix="/api/v1/auth", tags=["authentication"]) +service.add_router(users.router, prefix="/api/v1/users", tags=["users"]) +service.add_router(onboarding.router, prefix="/api/v1/users", tags=["onboarding"]) \ No newline at end of file diff --git a/services/external/app/main.py b/services/external/app/main.py index 47e7fc51..5e9b332f 100644 --- a/services/external/app/main.py +++ b/services/external/app/main.py @@ -3,184 +3,158 @@ External Service Main Application """ -import structlog -from contextlib import asynccontextmanager -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse - +from fastapi import FastAPI from app.core.config import settings -from app.core.database import init_db, close_db -from shared.monitoring import setup_logging, HealthChecker -from shared.monitoring.metrics import setup_metrics_early - -# Setup logging first -setup_logging("external-service", settings.LOG_LEVEL) -logger = structlog.get_logger() - -# Global variables for lifespan access -metrics_collector = None -health_checker = None - -# Create FastAPI app FIRST -app = FastAPI( - title="Bakery External Data Service", - description="External data collection service for weather, traffic, and events data", - version="1.0.0" -) - -# Setup metrics BEFORE any middleware and BEFORE lifespan -metrics_collector = setup_metrics_early(app, "external-service") - -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan events""" - global health_checker - - # Startup - logger.info("Starting External Service...") - - try: - # Initialize database - await init_db() - logger.info("Database initialized") - - # Register custom metrics - metrics_collector.register_counter("weather_api_calls_total", "Total weather API calls") - metrics_collector.register_counter("weather_api_success_total", "Successful weather API calls") - metrics_collector.register_counter("weather_api_failures_total", "Failed weather API calls") - - metrics_collector.register_counter("traffic_api_calls_total", "Total traffic API calls") - metrics_collector.register_counter("traffic_api_success_total", "Successful traffic API calls") - metrics_collector.register_counter("traffic_api_failures_total", "Failed traffic API calls") - - metrics_collector.register_counter("data_collection_jobs_total", "Data collection jobs") - metrics_collector.register_counter("data_records_stored_total", "Data records stored") - metrics_collector.register_counter("data_quality_issues_total", "Data quality issues detected") - - metrics_collector.register_histogram("weather_api_duration_seconds", "Weather API call duration") - metrics_collector.register_histogram("traffic_api_duration_seconds", "Traffic API call duration") - metrics_collector.register_histogram("data_collection_duration_seconds", "Data collection job duration") - metrics_collector.register_histogram("data_processing_duration_seconds", "Data processing duration") - - # Setup health checker - health_checker = HealthChecker("external-service") - - # Add database health check - async def check_database(): - try: - from app.core.database import get_db - from sqlalchemy import text - async for db in get_db(): - await db.execute(text("SELECT 1")) - return True - except Exception as e: - return f"Database error: {e}" - - # Add external API health checks - async def check_weather_api(): - try: - # Simple connectivity check - if settings.AEMET_API_KEY: - return True - else: - return "AEMET API key not configured" - except Exception as e: - return f"Weather API error: {e}" - - async def check_traffic_api(): - try: - # Simple connectivity check - if settings.MADRID_OPENDATA_API_KEY: - return True - else: - return "Madrid Open Data API key not configured" - except Exception as e: - return f"Traffic API error: {e}" - - health_checker.add_check("database", check_database, timeout=5.0, critical=True) - health_checker.add_check("weather_api", check_weather_api, timeout=10.0, critical=False) - health_checker.add_check("traffic_api", check_traffic_api, timeout=10.0, critical=False) - - # Store health checker in app state - app.state.health_checker = health_checker - - logger.info("External Service started successfully") - - except Exception as e: - logger.error(f"Failed to start External Service: {e}") - raise - - yield - - # Shutdown - logger.info("Shutting down External Service...") - await close_db() - -# Set lifespan AFTER metrics setup -app.router.lifespan_context = lifespan - -# CORS middleware (added after metrics setup) -app.add_middleware( - CORSMiddleware, - allow_origins=settings.CORS_ORIGINS, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - +from app.core.database import database_manager +from app.services.messaging import setup_messaging, cleanup_messaging +from shared.service_base import StandardFastAPIService # Include routers from app.api.weather import router as weather_router from app.api.traffic import router as traffic_router -app.include_router(weather_router, prefix="/api/v1", tags=["weather"]) -app.include_router(traffic_router, prefix="/api/v1", tags=["traffic"]) -# Health check endpoint -@app.get("/health") -async def health_check(): - """Comprehensive health check endpoint""" - if health_checker: - return await health_checker.check_health() - else: - return { - "service": "external-service", - "status": "healthy", - "version": "1.0.0" + +class ExternalService(StandardFastAPIService): + """External Data Service with standardized setup""" + + def __init__(self): + # Define expected database tables for health checks + external_expected_tables = [ + 'weather_data', 'weather_forecasts', 'traffic_data', + 'traffic_measurement_points', 'traffic_background_jobs' + ] + + # Define custom API checks + async def check_weather_api(): + """Check weather API configuration""" + try: + return bool(settings.AEMET_API_KEY) + except Exception as e: + self.logger.error("Weather API check failed", error=str(e)) + return False + + async def check_traffic_api(): + """Check traffic API configuration""" + try: + return bool(settings.MADRID_OPENDATA_API_KEY) + except Exception as e: + self.logger.error("Traffic API check failed", error=str(e)) + return False + + # Define custom metrics for external service + external_custom_metrics = { + "weather_api_calls_total": { + "type": "counter", + "description": "Total weather API calls" + }, + "weather_api_success_total": { + "type": "counter", + "description": "Successful weather API calls" + }, + "weather_api_failures_total": { + "type": "counter", + "description": "Failed weather API calls" + }, + "traffic_api_calls_total": { + "type": "counter", + "description": "Total traffic API calls" + }, + "traffic_api_success_total": { + "type": "counter", + "description": "Successful traffic API calls" + }, + "traffic_api_failures_total": { + "type": "counter", + "description": "Failed traffic API calls" + }, + "data_collection_jobs_total": { + "type": "counter", + "description": "Data collection jobs" + }, + "data_records_stored_total": { + "type": "counter", + "description": "Data records stored" + }, + "data_quality_issues_total": { + "type": "counter", + "description": "Data quality issues detected" + }, + "weather_api_duration_seconds": { + "type": "histogram", + "description": "Weather API call duration" + }, + "traffic_api_duration_seconds": { + "type": "histogram", + "description": "Traffic API call duration" + }, + "data_collection_duration_seconds": { + "type": "histogram", + "description": "Data collection job duration" + }, + "data_processing_duration_seconds": { + "type": "histogram", + "description": "Data processing duration" + } } -# Root endpoint -@app.get("/") -async def root(): - """Root endpoint""" - return { - "service": "External Data Service", - "version": "1.0.0", - "status": "running", - "endpoints": { - "health": "/health", - "docs": "/docs", - "weather": "/api/v1/weather", - "traffic": "/api/v1/traffic", - "jobs": "/api/v1/jobs" - }, - "data_sources": { - "weather": "AEMET (Spanish Weather Service)", - "traffic": "Madrid Open Data Portal", - "coverage": "Madrid, Spain" - } - } + super().__init__( + service_name="external-service", + app_name="Bakery External Data Service", + description="External data collection service for weather, traffic, and events data", + version="1.0.0", + log_level=settings.LOG_LEVEL, + cors_origins=settings.CORS_ORIGINS, + api_prefix="/api/v1", + database_manager=database_manager, + expected_tables=external_expected_tables, + custom_health_checks={ + "weather_api": check_weather_api, + "traffic_api": check_traffic_api + }, + custom_metrics=external_custom_metrics, + enable_messaging=True + ) -# Exception handlers -@app.exception_handler(Exception) -async def global_exception_handler(request: Request, exc: Exception): - """Global exception handler with metrics""" - logger.error(f"Unhandled exception: {exc}", exc_info=True) - - # Record error metric if available - if metrics_collector: - metrics_collector.increment_counter("errors_total", labels={"type": "unhandled"}) - - return JSONResponse( - status_code=500, - content={"detail": "Internal server error"} - ) \ No newline at end of file + async def _setup_messaging(self): + """Setup messaging for external service""" + await setup_messaging() + self.logger.info("External service messaging initialized") + + async def _cleanup_messaging(self): + """Cleanup messaging for external service""" + await cleanup_messaging() + + async def on_startup(self, app: FastAPI): + """Custom startup logic for external service""" + pass + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for external service""" + # Database cleanup is handled by the base class + pass + + def get_service_features(self): + """Return external-specific features""" + return [ + "weather_data_collection", + "traffic_data_collection", + "aemet_integration", + "madrid_opendata_integration", + "data_quality_monitoring", + "scheduled_collection_jobs", + "external_api_monitoring" + ] + + + +# Create service instance +service = ExternalService() + +# Create FastAPI app with standardized setup +app = service.create_app() + +# Setup standard endpoints +service.setup_standard_endpoints() + +# Include routers +service.add_router(weather_router, tags=["weather"]) +service.add_router(traffic_router, tags=["traffic"]) \ No newline at end of file diff --git a/services/forecasting/app/core/database.py b/services/forecasting/app/core/database.py index ce3a97c0..c85a64f6 100644 --- a/services/forecasting/app/core/database.py +++ b/services/forecasting/app/core/database.py @@ -7,67 +7,74 @@ Database configuration for forecasting service import structlog from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker -from sqlalchemy.pool import NullPool from sqlalchemy import text +from typing import AsyncGenerator from app.core.config import settings -from shared.database.base import Base +from shared.database.base import Base, DatabaseManager logger = structlog.get_logger() # Create async engine -engine = create_async_engine( +async_engine = create_async_engine( settings.DATABASE_URL, - poolclass=NullPool, echo=settings.DEBUG, - future=True + pool_size=10, + max_overflow=20, + pool_pre_ping=True, + pool_recycle=3600 ) -# Create session factory +# Create async session factory AsyncSessionLocal = async_sessionmaker( - engine, + bind=async_engine, class_=AsyncSession, - expire_on_commit=False, - autoflush=False, - autocommit=False + expire_on_commit=False ) -class DatabaseManager: - """Database management operations""" - - async def create_tables(self): - """Create database tables""" - async with engine.begin() as conn: +async def get_db() -> AsyncGenerator[AsyncSession, None]: + """Get database session""" + async with AsyncSessionLocal() as session: + try: + yield session + except Exception as e: + await session.rollback() + logger.error("Database session error", error=str(e)) + raise + finally: + await session.close() + +async def init_database(): + """Initialize database tables""" + try: + async with async_engine.begin() as conn: + # Import all models to ensure they are registered + from app.models.forecast import ForecastBatch, Forecast + from app.models.prediction import PredictionBatch, Prediction + + # Create all tables await conn.run_sync(Base.metadata.create_all) - logger.info("Forecasting database tables created successfully") - - async def get_session(self) -> AsyncSession: - """Get database session""" - async with AsyncSessionLocal() as session: - try: - yield session - await session.commit() - except Exception as e: - await session.rollback() - logger.error(f"Database session error: {e}") - raise - finally: - await session.close() -# Global database manager instance -database_manager = DatabaseManager() - -async def get_db() -> AsyncSession: - """Database dependency""" - async for session in database_manager.get_session(): - yield session + logger.info("Forecasting database initialized successfully") + except Exception as e: + logger.error("Failed to initialize forecasting database", error=str(e)) + raise async def get_db_health() -> bool: """Check database health""" try: - async with AsyncSessionLocal() as session: - await session.execute(text("SELECT 1")) - return True + async with async_engine.begin() as conn: + await conn.execute(text("SELECT 1")) + return True except Exception as e: - logger.error(f"Database health check failed: {e}") + logger.error("Database health check failed", error=str(e)) return False + +# Database manager instance for service_base compatibility +database_manager = DatabaseManager( + database_url=settings.DATABASE_URL, + service_name="forecasting-service", + pool_size=10, + max_overflow=20, + echo=settings.DEBUG +) diff --git a/services/forecasting/app/main.py b/services/forecasting/app/main.py index 7800ba35..0d8fbdd3 100644 --- a/services/forecasting/app/main.py +++ b/services/forecasting/app/main.py @@ -6,144 +6,141 @@ Forecasting Service Main Application Demand prediction and forecasting service for bakery operations """ -import structlog -from contextlib import asynccontextmanager -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse - +from fastapi import FastAPI from app.core.config import settings -from app.core.database import database_manager, get_db_health +from app.core.database import database_manager from app.api import forecasts, predictions - - from app.services.messaging import setup_messaging, cleanup_messaging from app.services.forecasting_alert_service import ForecastingAlertService -from shared.monitoring.logging import setup_logging -from shared.monitoring.metrics import MetricsCollector +from shared.service_base import StandardFastAPIService -# Setup structured logging -setup_logging("forecasting-service", settings.LOG_LEVEL) -logger = structlog.get_logger() -# Initialize metrics collector -metrics_collector = MetricsCollector("forecasting-service") +class ForecastingService(StandardFastAPIService): + """Forecasting Service with standardized setup""" -# Initialize alert service -alert_service = None + def __init__(self): + # Define expected database tables for health checks + forecasting_expected_tables = [ + 'forecasts', 'prediction_batches', 'model_performance_metrics', 'prediction_cache' + ] -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan manager for startup and shutdown events""" - # Startup - logger.info("Starting Forecasting Service", version="1.0.0") - - try: - # Initialize database - logger.info("Initializing database connection") - await database_manager.create_tables() - logger.info("Database initialized successfully") - - # Initialize messaging - logger.info("Setting up messaging") + self.alert_service = None + + # Create custom checks for alert service + async def alert_service_check(): + """Custom health check for forecasting alert service""" + return await self.alert_service.health_check() if self.alert_service else False + + # Define custom metrics for forecasting service + forecasting_custom_metrics = { + "forecasts_generated_total": { + "type": "counter", + "description": "Total forecasts generated" + }, + "predictions_served_total": { + "type": "counter", + "description": "Total predictions served" + }, + "prediction_errors_total": { + "type": "counter", + "description": "Total prediction errors" + }, + "forecast_processing_time_seconds": { + "type": "histogram", + "description": "Time to process forecast request" + }, + "prediction_processing_time_seconds": { + "type": "histogram", + "description": "Time to process prediction request" + }, + "model_cache_hits_total": { + "type": "counter", + "description": "Total model cache hits" + }, + "model_cache_misses_total": { + "type": "counter", + "description": "Total model cache misses" + } + } + + super().__init__( + service_name="forecasting-service", + app_name="Bakery Forecasting Service", + description="AI-powered demand prediction and forecasting service for bakery operations", + version="1.0.0", + log_level=settings.LOG_LEVEL, + cors_origins=settings.CORS_ORIGINS_LIST, + api_prefix="/api/v1", + database_manager=database_manager, + expected_tables=forecasting_expected_tables, + custom_health_checks={"alert_service": alert_service_check}, + enable_messaging=True, + custom_metrics=forecasting_custom_metrics + ) + + async def _setup_messaging(self): + """Setup messaging for forecasting service""" await setup_messaging() - logger.info("Messaging initialized") - + self.logger.info("Messaging initialized") + + async def _cleanup_messaging(self): + """Cleanup messaging for forecasting service""" + await cleanup_messaging() + + async def on_startup(self, app: FastAPI): + """Custom startup logic for forecasting service""" # Initialize forecasting alert service - logger.info("Setting up forecasting alert service") - global alert_service - alert_service = ForecastingAlertService(settings) - await alert_service.start() - logger.info("Forecasting alert service initialized") - - # Register custom metrics - metrics_collector.register_counter("forecasts_generated_total", "Total forecasts generated") - metrics_collector.register_counter("predictions_served_total", "Total predictions served") - metrics_collector.register_counter("prediction_errors_total", "Total prediction errors") # ← MISSING REGISTRATION! - metrics_collector.register_histogram("forecast_processing_time_seconds", "Time to process forecast request") - metrics_collector.register_histogram("prediction_processing_time_seconds", "Time to process prediction request") # ← ADD MISSING METRIC! - metrics_collector.register_gauge("active_models_count", "Number of active models") - metrics_collector.register_counter("model_cache_hits_total", "Total model cache hits") # ← ADD USEFUL METRIC! - metrics_collector.register_counter("model_cache_misses_total", "Total model cache misses") # ← ADD USEFUL METRIC! - - # Start metrics server - metrics_collector.start_metrics_server(8080) - - logger.info("Forecasting Service started successfully") - - yield - - except Exception as e: - logger.error("Failed to start Forecasting Service", error=str(e)) - raise - finally: - # Shutdown - logger.info("Shutting down Forecasting Service") - - try: - # Cleanup alert service - if alert_service: - await alert_service.stop() - logger.info("Alert service cleanup completed") - - await cleanup_messaging() - logger.info("Messaging cleanup completed") - except Exception as e: - logger.error("Error during messaging cleanup", error=str(e)) + self.alert_service = ForecastingAlertService(settings) + await self.alert_service.start() + self.logger.info("Forecasting alert service initialized") -# Create FastAPI app with lifespan -app = FastAPI( - title="Bakery Forecasting Service", - description="AI-powered demand prediction and forecasting service for bakery operations", - version="1.0.0", + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for forecasting service""" + # Cleanup alert service + if self.alert_service: + await self.alert_service.stop() + self.logger.info("Alert service cleanup completed") + + def get_service_features(self): + """Return forecasting-specific features""" + return [ + "demand_prediction", + "ai_forecasting", + "model_performance_tracking", + "prediction_caching", + "alert_notifications", + "messaging_integration" + ] + + def setup_custom_endpoints(self): + """Setup custom endpoints for forecasting service""" + @self.app.get("/alert-metrics") + async def get_alert_metrics(): + """Alert service metrics endpoint""" + if self.alert_service: + return self.alert_service.get_metrics() + return {"error": "Alert service not initialized"} + + +# Create service instance +service = ForecastingService() + +# Create FastAPI app with standardized setup +app = service.create_app( docs_url="/docs", - redoc_url="/redoc", - lifespan=lifespan + redoc_url="/redoc" ) -# CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=settings.CORS_ORIGINS_LIST, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) +# Setup standard endpoints +service.setup_standard_endpoints() + +# Setup custom endpoints +service.setup_custom_endpoints() # Include API routers -app.include_router(forecasts.router, prefix="/api/v1", tags=["forecasts"]) - -app.include_router(predictions.router, prefix="/api/v1", tags=["predictions"]) - - -@app.get("/health") -async def health_check(): - """Health check endpoint""" - db_health = await get_db_health() - alert_health = await alert_service.health_check() if alert_service else {"status": "not_initialized"} - - overall_health = db_health and alert_health.get("status") == "healthy" - - return { - "status": "healthy" if overall_health else "unhealthy", - "service": "forecasting-service", - "version": "1.0.0", - "database": "connected" if db_health else "disconnected", - "alert_service": alert_health, - "timestamp": structlog.get_logger().info("Health check requested") - } - -@app.get("/metrics") -async def get_metrics(): - """Metrics endpoint for Prometheus""" - return metrics_collector.get_metrics() - -@app.get("/alert-metrics") -async def get_alert_metrics(): - """Alert service metrics endpoint""" - if alert_service: - return alert_service.get_metrics() - return {"error": "Alert service not initialized"} +service.add_router(forecasts.router, tags=["forecasts"]) +service.add_router(predictions.router, tags=["predictions"]) if __name__ == "__main__": import uvicorn diff --git a/services/inventory/app/main.py b/services/inventory/app/main.py index 890bf777..178ed7c7 100644 --- a/services/inventory/app/main.py +++ b/services/inventory/app/main.py @@ -4,191 +4,62 @@ Inventory Service FastAPI Application """ import os -from contextlib import asynccontextmanager -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse -import structlog +from fastapi import FastAPI # Import core modules from app.core.config import settings -from app.core.database import init_db, close_db, health_check as db_health_check +from app.core.database import database_manager from app.api import ingredients, stock, classification, transformations from app.services.inventory_alert_service import InventoryAlertService -from shared.monitoring.metrics import setup_metrics_early -# Auth decorators are used in endpoints, no global setup needed +from shared.service_base import StandardFastAPIService -logger = structlog.get_logger() - - -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan management""" - # Startup - logger.info("Starting Inventory Service", version=settings.VERSION) - - try: - # Initialize database - await init_db() - logger.info("Database initialized successfully") - - # Initialize alert service - alert_service = InventoryAlertService(settings) - await alert_service.start() - logger.info("Inventory alert service started") - - # Store alert service in app state - app.state.alert_service = alert_service - - # Setup metrics is already done early - no need to do it here - logger.info("Metrics setup completed") - - yield - - except Exception as e: - logger.error("Startup failed", error=str(e)) - raise - finally: - # Shutdown - logger.info("Shutting down Inventory Service") - try: - # Stop alert service - if hasattr(app.state, 'alert_service'): - await app.state.alert_service.stop() - logger.info("Alert service stopped") - - await close_db() - logger.info("Database connections closed") - except Exception as e: - logger.error("Shutdown error", error=str(e)) - - -# Create FastAPI application -app = FastAPI( - title=settings.APP_NAME, - description=settings.DESCRIPTION, - version=settings.VERSION, - openapi_url=f"{settings.API_V1_STR}/openapi.json", - docs_url=f"{settings.API_V1_STR}/docs", - redoc_url=f"{settings.API_V1_STR}/redoc", - lifespan=lifespan -) - -# Setup metrics BEFORE any middleware and BEFORE lifespan -metrics_collector = setup_metrics_early(app, "inventory-service") - - -# CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=settings.CORS_ORIGINS, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - - -# Auth is handled via decorators in individual endpoints - - -# Exception handlers -@app.exception_handler(ValueError) -async def value_error_handler(request: Request, exc: ValueError): - """Handle validation errors""" - return JSONResponse( - status_code=400, - content={ - "error": "Validation Error", - "detail": str(exc), - "type": "value_error" - } - ) - - -@app.exception_handler(Exception) -async def general_exception_handler(request: Request, exc: Exception): - """Handle general exceptions""" - logger.error( - "Unhandled exception", - error=str(exc), - path=request.url.path, - method=request.method - ) - - return JSONResponse( - status_code=500, - content={ - "error": "Internal Server Error", - "detail": "An unexpected error occurred", - "type": "internal_error" - } - ) - - -# Include routers -app.include_router(ingredients.router, prefix=settings.API_V1_STR) -app.include_router(stock.router, prefix=settings.API_V1_STR) -app.include_router(transformations.router, prefix=settings.API_V1_STR) -app.include_router(classification.router, prefix=settings.API_V1_STR) - -# Include enhanced routers +# Import enhanced routers from app.api.dashboard import router as dashboard_router from app.api.food_safety import router as food_safety_router -app.include_router(dashboard_router, prefix=settings.API_V1_STR) -app.include_router(food_safety_router, prefix=settings.API_V1_STR) +class InventoryService(StandardFastAPIService): + """Inventory Service with standardized setup""" -# Root endpoint -@app.get("/") -async def root(): - """Root endpoint with service information""" - return { - "service": settings.SERVICE_NAME, - "version": settings.VERSION, - "description": settings.DESCRIPTION, - "status": "running", - "docs_url": f"{settings.API_V1_STR}/docs", - "health_url": "/health" - } + def __init__(self): + # Define expected database tables for health checks + inventory_expected_tables = [ + 'ingredients', 'stock', 'stock_movements', 'product_transformations', + 'stock_alerts', 'food_safety_compliance', 'temperature_logs', 'food_safety_alerts' + ] + super().__init__( + service_name="inventory-service", + app_name=settings.APP_NAME, + description=settings.DESCRIPTION, + version=settings.VERSION, + log_level=settings.LOG_LEVEL, + cors_origins=settings.CORS_ORIGINS, + api_prefix=settings.API_V1_STR, + database_manager=database_manager, + expected_tables=inventory_expected_tables + ) -@app.get("/health") -async def health_check(): - """Comprehensive health check endpoint""" - try: - - return { - "status": "healthy", - "service": settings.SERVICE_NAME, - "version": settings.VERSION, - "timestamp": structlog.get_logger().info("Health check requested") - } - - except Exception as e: - logger.error("Health check failed", error=str(e)) - return { - "status": "unhealthy", - "service": settings.SERVICE_NAME, - "version": settings.VERSION, - "database": "unknown", - "alert_service": {"status": "unknown"}, - "error": str(e), - "timestamp": structlog.get_logger().info("Health check failed") - } + async def on_startup(self, app: FastAPI): + """Custom startup logic for inventory service""" + # Initialize alert service + alert_service = InventoryAlertService(settings) + await alert_service.start() + self.logger.info("Inventory alert service started") + # Store alert service in app state + app.state.alert_service = alert_service -# Service info endpoint -@app.get(f"{settings.API_V1_STR}/info") -async def service_info(): - """Service information endpoint""" - return { - "service": settings.SERVICE_NAME, - "version": settings.VERSION, - "description": settings.DESCRIPTION, - "api_version": "v1", - "environment": settings.ENVIRONMENT, - "features": [ + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for inventory service""" + # Stop alert service + if hasattr(app.state, 'alert_service'): + await app.state.alert_service.stop() + self.logger.info("Alert service stopped") + + def get_service_features(self): + """Return inventory-specific features""" + return [ "ingredient_management", "stock_tracking", "expiration_alerts", @@ -203,7 +74,24 @@ async def service_info(): "real_time_alerts", "regulatory_reporting" ] - } + + +# Create service instance +service = InventoryService() + +# Create FastAPI app with standardized setup +app = service.create_app() + +# Setup standard endpoints +service.setup_standard_endpoints() + +# Include routers using the service helper +service.add_router(ingredients.router) +service.add_router(stock.router) +service.add_router(transformations.router) +service.add_router(classification.router) +service.add_router(dashboard_router) +service.add_router(food_safety_router) if __name__ == "__main__": diff --git a/services/notification/app/main.py b/services/notification/app/main.py index 71365d04..1295e7aa 100644 --- a/services/notification/app/main.py +++ b/services/notification/app/main.py @@ -6,14 +6,9 @@ Notification Service Main Application Handles email, WhatsApp notifications and SSE for real-time alerts/recommendations """ -import structlog -from contextlib import asynccontextmanager -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse - +from fastapi import FastAPI from app.core.config import settings -from app.core.database import init_db +from app.core.database import database_manager from app.api.notifications import router as notification_router from app.api.sse_routes import router as sse_router from app.services.messaging import setup_messaging, cleanup_messaging @@ -21,226 +16,207 @@ from app.services.sse_service import SSEService from app.services.notification_orchestrator import NotificationOrchestrator from app.services.email_service import EmailService from app.services.whatsapp_service import WhatsAppService -from shared.monitoring import setup_logging, HealthChecker -from shared.monitoring.metrics import setup_metrics_early +from shared.service_base import StandardFastAPIService -# Setup logging first -setup_logging("notification-service", settings.LOG_LEVEL) -logger = structlog.get_logger() -# Global variables for lifespan access -metrics_collector = None -health_checker = None +class NotificationService(StandardFastAPIService): + """Notification Service with standardized setup""" -# Create FastAPI app FIRST -app = FastAPI( - title="Bakery Notification Service", - description="Email, WhatsApp and SSE notification service for bakery alerts and recommendations", - version="2.0.0", + def __init__(self): + # Define expected database tables for health checks + notification_expected_tables = [ + 'notifications', 'notification_templates', 'notification_preferences', + 'notification_logs', 'email_templates', 'whatsapp_templates' + ] + + self.sse_service = None + self.orchestrator = None + self.email_service = None + self.whatsapp_service = None + + # Define custom metrics for notification service + notification_custom_metrics = { + "notifications_sent_total": { + "type": "counter", + "description": "Total notifications sent", + "labels": ["type", "status", "channel"] + }, + "emails_sent_total": { + "type": "counter", + "description": "Total emails sent", + "labels": ["status"] + }, + "whatsapp_sent_total": { + "type": "counter", + "description": "Total WhatsApp messages sent", + "labels": ["status"] + }, + "sse_events_sent_total": { + "type": "counter", + "description": "Total SSE events sent", + "labels": ["tenant", "event_type"] + }, + "notification_processing_duration_seconds": { + "type": "histogram", + "description": "Time spent processing notifications" + } + } + + # Define custom health checks for notification service components + async def check_email_service(): + """Check email service health""" + try: + return await self.email_service.health_check() if self.email_service else False + except Exception as e: + self.logger.error("Email service health check failed", error=str(e)) + return False + + async def check_whatsapp_service(): + """Check WhatsApp service health""" + try: + return await self.whatsapp_service.health_check() if self.whatsapp_service else False + except Exception as e: + self.logger.error("WhatsApp service health check failed", error=str(e)) + return False + + async def check_sse_service(): + """Check SSE service health""" + try: + if self.sse_service: + metrics = self.sse_service.get_metrics() + return bool(metrics.get("redis_connected", False)) + return False + except Exception as e: + self.logger.error("SSE service health check failed", error=str(e)) + return False + + async def check_messaging(): + """Check messaging service health""" + try: + from app.services.messaging import notification_publisher + return bool(notification_publisher and notification_publisher.connected) + except Exception as e: + self.logger.error("Messaging health check failed", error=str(e)) + return False + + super().__init__( + service_name="notification-service", + app_name="Bakery Notification Service", + description="Email, WhatsApp and SSE notification service for bakery alerts and recommendations", + version="2.0.0", + log_level=settings.LOG_LEVEL, + cors_origins=getattr(settings, 'CORS_ORIGINS', ["*"]), + api_prefix="/api/v1", + database_manager=database_manager, + expected_tables=notification_expected_tables, + custom_health_checks={ + "email_service": check_email_service, + "whatsapp_service": check_whatsapp_service, + "sse_service": check_sse_service, + "messaging": check_messaging + }, + enable_messaging=True, + custom_metrics=notification_custom_metrics + ) + + async def _setup_messaging(self): + """Setup messaging for notification service""" + await setup_messaging() + self.logger.info("Messaging initialized") + + async def _cleanup_messaging(self): + """Cleanup messaging for notification service""" + await cleanup_messaging() + + async def on_startup(self, app: FastAPI): + """Custom startup logic for notification service""" + # Initialize services + self.email_service = EmailService() + self.whatsapp_service = WhatsAppService() + + # Initialize SSE service + self.sse_service = SSEService(settings.REDIS_URL) + await self.sse_service.initialize() + self.logger.info("SSE service initialized") + + # Create orchestrator + self.orchestrator = NotificationOrchestrator( + email_service=self.email_service, + whatsapp_service=self.whatsapp_service, + sse_service=self.sse_service + ) + + # Store services in app state + app.state.orchestrator = self.orchestrator + app.state.sse_service = self.sse_service + app.state.email_service = self.email_service + app.state.whatsapp_service = self.whatsapp_service + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for notification service""" + # Shutdown SSE service + if self.sse_service: + await self.sse_service.shutdown() + self.logger.info("SSE service shutdown completed") + + def get_service_features(self): + """Return notification-specific features""" + return [ + "email_notifications", + "whatsapp_notifications", + "sse_real_time_updates", + "notification_templates", + "notification_orchestration", + "messaging_integration", + "multi_channel_support" + ] + + def setup_custom_endpoints(self): + """Setup custom endpoints for notification service""" + # SSE metrics endpoint + @self.app.get("/sse-metrics") + async def sse_metrics(): + """Get SSE service metrics""" + if self.sse_service: + try: + sse_metrics = self.sse_service.get_metrics() + return { + 'active_tenants': sse_metrics.get('active_tenants', 0), + 'total_connections': sse_metrics.get('total_connections', 0), + 'active_listeners': sse_metrics.get('active_listeners', 0), + 'redis_connected': bool(sse_metrics.get('redis_connected', False)) + } + except Exception as e: + return {"error": str(e)} + return {"error": "SSE service not available"} + + # Metrics endpoint + @self.app.get("/metrics") + async def metrics(): + """Prometheus metrics endpoint""" + if self.metrics_collector: + return self.metrics_collector.get_metrics() + return {"metrics": "not_available"} + + +# Create service instance +service = NotificationService() + +# Create FastAPI app with standardized setup +app = service.create_app( docs_url="/docs", redoc_url="/redoc" ) -# Setup metrics BEFORE any middleware and BEFORE lifespan -metrics_collector = setup_metrics_early(app, "notification-service") +# Setup standard endpoints +service.setup_standard_endpoints() -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan events - NO MIDDLEWARE ADDED HERE""" - global health_checker - - # Startup - logger.info("Starting Notification Service...") - - try: - # Initialize database - await init_db() - logger.info("Database initialized") - - # Setup messaging - await setup_messaging() - logger.info("Messaging initialized") - - # Initialize services - email_service = EmailService() - whatsapp_service = WhatsAppService() - - # Initialize SSE service - sse_service = SSEService(settings.REDIS_URL) - await sse_service.initialize() - logger.info("SSE service initialized") - - # Create orchestrator - orchestrator = NotificationOrchestrator( - email_service=email_service, - whatsapp_service=whatsapp_service, - sse_service=sse_service - ) - - # Store services in app state - app.state.orchestrator = orchestrator - app.state.sse_service = sse_service - app.state.email_service = email_service - app.state.whatsapp_service = whatsapp_service - - # Register custom metrics (metrics_collector already exists) - metrics_collector.register_counter("notifications_sent_total", "Total notifications sent", labels=["type", "status", "channel"]) - metrics_collector.register_counter("emails_sent_total", "Total emails sent", labels=["status"]) - metrics_collector.register_counter("whatsapp_sent_total", "Total WhatsApp messages sent", labels=["status"]) - metrics_collector.register_counter("sse_events_sent_total", "Total SSE events sent", labels=["tenant", "event_type"]) - metrics_collector.register_histogram("notification_processing_duration_seconds", "Time spent processing notifications") - metrics_collector.register_gauge("notification_queue_size", "Current notification queue size") - metrics_collector.register_gauge("sse_active_connections", "Number of active SSE connections") - - # Setup health checker - health_checker = HealthChecker("notification-service") - - # Add database health check - async def check_database(): - try: - from app.core.database import get_db - from sqlalchemy import text - async for db in get_db(): - await db.execute(text("SELECT 1")) - return True - except Exception as e: - return f"Database error: {e}" - - health_checker.add_check("database", check_database, timeout=5.0, critical=True) - - # Add email service health check - async def check_email_service(): - try: - from app.services.email_service import EmailService - email_service = EmailService() - return await email_service.health_check() - except Exception as e: - return f"Email service error: {e}" - - health_checker.add_check("email_service", check_email_service, timeout=10.0, critical=True) - - # Add WhatsApp service health check - async def check_whatsapp_service(): - try: - return await whatsapp_service.health_check() - except Exception as e: - return f"WhatsApp service error: {e}" - - health_checker.add_check("whatsapp_service", check_whatsapp_service, timeout=10.0, critical=False) - - # Add SSE service health check - async def check_sse_service(): - try: - metrics = sse_service.get_metrics() - return "healthy" if metrics["redis_connected"] else "Redis connection failed" - except Exception as e: - return f"SSE service error: {e}" - - health_checker.add_check("sse_service", check_sse_service, timeout=5.0, critical=True) - - # Add messaging health check - def check_messaging(): - try: - # Check if messaging is properly initialized - from app.services.messaging import notification_publisher - return notification_publisher.connected if notification_publisher else False - except Exception as e: - return f"Messaging error: {e}" - - health_checker.add_check("messaging", check_messaging, timeout=3.0, critical=False) - - # Store health checker in app state - app.state.health_checker = health_checker - - logger.info("Notification Service with SSE support started successfully") - - except Exception as e: - logger.error(f"Failed to start Notification Service: {e}") - raise - - yield - - # Shutdown - logger.info("Shutting down Notification Service...") - try: - # Shutdown SSE service - if hasattr(app.state, 'sse_service'): - await app.state.sse_service.shutdown() - logger.info("SSE service shutdown completed") - - await cleanup_messaging() - logger.info("Messaging cleanup completed") - except Exception as e: - logger.error(f"Error during shutdown: {e}") - -# Set lifespan AFTER metrics setup -app.router.lifespan_context = lifespan - -# CORS middleware (added after metrics setup) -app.add_middleware( - CORSMiddleware, - allow_origins=getattr(settings, 'CORS_ORIGINS', ["*"]), - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) +# Setup custom endpoints +service.setup_custom_endpoints() # Include routers -app.include_router(notification_router, prefix="/api/v1", tags=["notifications"]) -app.include_router(sse_router, prefix="/api/v1", tags=["sse"]) - -# Health check endpoint -@app.get("/health") -async def health_check(): - """Comprehensive health check endpoint including SSE""" - if health_checker: - health_result = await health_checker.check_health() - - # Add SSE metrics to health check - if hasattr(app.state, 'sse_service'): - try: - sse_metrics = app.state.sse_service.get_metrics() - # Convert metrics to JSON-serializable format - health_result['sse_metrics'] = { - 'active_tenants': sse_metrics.get('active_tenants', 0), - 'total_connections': sse_metrics.get('total_connections', 0), - 'active_listeners': sse_metrics.get('active_listeners', 0), - 'redis_connected': bool(sse_metrics.get('redis_connected', False)) - } - except Exception as e: - health_result['sse_error'] = str(e) - - return health_result - else: - return { - "service": "notification-service", - "status": "healthy", - "version": "2.0.0", - "features": ["email", "whatsapp", "sse", "alerts", "recommendations"] - } - -# Metrics endpoint -@app.get("/metrics") -async def metrics(): - """Prometheus metrics endpoint""" - if metrics_collector: - return metrics_collector.get_metrics() - return {"metrics": "not_available"} - -# Exception handlers -@app.exception_handler(Exception) -async def global_exception_handler(request: Request, exc: Exception): - """Global exception handler with metrics""" - logger.error(f"Unhandled exception: {exc}", exc_info=True) - - # Record error metric if available - if metrics_collector: - metrics_collector.increment_counter("errors_total", labels={"type": "unhandled"}) - - return JSONResponse( - status_code=500, - content={"detail": "Internal server error"} - ) +service.add_router(notification_router, tags=["notifications"]) +service.add_router(sse_router, tags=["sse"]) if __name__ == "__main__": import uvicorn diff --git a/services/orders/app/core/database.py b/services/orders/app/core/database.py index ca51b27d..a114f3a0 100644 --- a/services/orders/app/core/database.py +++ b/services/orders/app/core/database.py @@ -76,4 +76,16 @@ async def get_db_health() -> bool: return True except Exception as e: logger.error("Database health check failed", error=str(e)) - return False \ No newline at end of file + return False + + +# Database manager instance for service_base compatibility +from shared.database.base import DatabaseManager + +database_manager = DatabaseManager( + database_url=settings.DATABASE_URL, + service_name="orders-service", + pool_size=10, + max_overflow=20, + echo=settings.DEBUG +) \ No newline at end of file diff --git a/services/orders/app/main.py b/services/orders/app/main.py index 99a9399a..9e090c65 100644 --- a/services/orders/app/main.py +++ b/services/orders/app/main.py @@ -7,113 +7,74 @@ Customer orders and procurement planning service """ from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from contextlib import asynccontextmanager -import structlog - from app.core.config import settings -from app.core.database import init_database, get_db_health +from app.core.database import database_manager from app.api.orders import router as orders_router from app.api.procurement import router as procurement_router from app.services.procurement_scheduler_service import ProcurementSchedulerService - -# Configure logging -logger = structlog.get_logger() +from shared.service_base import StandardFastAPIService -@asynccontextmanager -async def lifespan(app: FastAPI): - """Manage application lifespan events""" - # Startup - try: - await init_database() - logger.info("Database initialized successfully") - +class OrdersService(StandardFastAPIService): + """Orders Service with standardized setup""" + + def __init__(self): + # Define expected database tables for health checks + orders_expected_tables = [ + 'customers', 'customer_contacts', 'customer_orders', 'order_items', + 'order_status_history', 'procurement_plans', 'procurement_requirements' + ] + + super().__init__( + service_name="orders-service", + app_name=settings.APP_NAME, + description=settings.DESCRIPTION, + version=settings.VERSION, + api_prefix="/api/v1", + database_manager=database_manager, + expected_tables=orders_expected_tables + ) + + async def on_startup(self, app: FastAPI): + """Custom startup logic for orders service""" # Initialize procurement scheduler service scheduler_service = ProcurementSchedulerService(settings) await scheduler_service.start() - logger.info("Procurement scheduler service started") - + self.logger.info("Procurement scheduler service started") + # Store scheduler service in app state app.state.scheduler_service = scheduler_service - - logger.info("Orders service started successfully") - except Exception as e: - logger.error("Failed to initialize orders service", error=str(e)) - raise - - yield - - # Shutdown - logger.info("Orders service shutting down") - try: + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for orders service""" # Stop scheduler service if hasattr(app.state, 'scheduler_service'): await app.state.scheduler_service.stop() - logger.info("Scheduler service stopped") - except Exception as e: - logger.error("Error stopping scheduler service", error=str(e)) + self.logger.info("Scheduler service stopped") + + def get_service_features(self): + """Return orders-specific features""" + return [ + "customer_management", + "order_processing", + "procurement_planning", + "order_tracking", + "automated_scheduling" + ] -# Create FastAPI application -app = FastAPI( - title=settings.APP_NAME, - description=settings.DESCRIPTION, - version=settings.VERSION, - lifespan=lifespan -) +# Create service instance +service = OrdersService() -# Add CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # Configure based on environment - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) +# Create FastAPI app with standardized setup +app = service.create_app() + +# Setup standard endpoints +service.setup_standard_endpoints() # Include routers -app.include_router(orders_router, prefix="/api/v1") -app.include_router(procurement_router, prefix="/api/v1") - - -@app.get("/health") -async def health_check(): - """Health check endpoint""" - try: - db_healthy = await get_db_health() - - health_status = { - "status": "healthy" if db_healthy else "unhealthy", - "service": settings.SERVICE_NAME, - "version": settings.VERSION, - "database": "connected" if db_healthy else "disconnected" - } - - if not db_healthy: - health_status["status"] = "unhealthy" - - return health_status - - except Exception as e: - logger.error("Health check failed", error=str(e)) - return { - "status": "unhealthy", - "service": settings.SERVICE_NAME, - "version": settings.VERSION, - "error": str(e) - } - - -@app.get("/") -async def root(): - """Root endpoint""" - return { - "service": settings.APP_NAME, - "version": settings.VERSION, - "description": settings.DESCRIPTION, - "status": "running" - } +service.add_router(orders_router) +service.add_router(procurement_router) @app.post("/test/procurement-scheduler") diff --git a/services/pos/app/main.py b/services/pos/app/main.py index 45ced5a0..1c2372ad 100644 --- a/services/pos/app/main.py +++ b/services/pos/app/main.py @@ -3,131 +3,150 @@ POS Integration Service Handles integration with external POS systems (Square, Toast, Lightspeed) """ -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse -import structlog import time -from contextlib import asynccontextmanager - +from fastapi import FastAPI, Request from app.core.config import settings from app.api import pos_config, webhooks, sync -from app.core.database import init_db, close_db -from shared.monitoring.health import router as health_router -from shared.monitoring.logging import setup_logging +from app.core.database import database_manager +from shared.service_base import StandardFastAPIService -# Setup logging -setup_logging(service_name="pos-service") -logger = structlog.get_logger() +class POSService(StandardFastAPIService): + """POS Integration Service with standardized setup""" + + def __init__(self): + # Define expected database tables for health checks + pos_expected_tables = [ + 'pos_configurations', 'pos_transactions', 'pos_transaction_items', + 'pos_webhook_logs', 'pos_sync_logs' + ] + + # Define custom metrics for POS service + pos_custom_metrics = { + "pos_webhooks_received_total": { + "type": "counter", + "description": "Total POS webhooks received", + "labels": ["provider", "event_type"] + }, + "pos_sync_jobs_total": { + "type": "counter", + "description": "Total POS sync jobs", + "labels": ["provider", "status"] + }, + "pos_transactions_synced_total": { + "type": "counter", + "description": "Total transactions synced", + "labels": ["provider"] + }, + "pos_webhook_processing_duration_seconds": { + "type": "histogram", + "description": "Time spent processing webhooks" + }, + "pos_sync_duration_seconds": { + "type": "histogram", + "description": "Time spent syncing data" + } + } + + super().__init__( + service_name="pos-service", + app_name="POS Integration Service", + description="Handles integration with external POS systems", + version="1.0.0", + cors_origins=settings.CORS_ORIGINS, + api_prefix="/api/v1", + database_manager=database_manager, + expected_tables=pos_expected_tables, + custom_metrics=pos_custom_metrics + ) + + async def on_startup(self, app: FastAPI): + """Custom startup logic for POS service""" + # Custom startup completed + self.logger.info("POS Integration Service started successfully") + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for POS service""" + # Database cleanup is handled by the base class + pass + + def get_service_features(self): + """Return POS-specific features""" + return [ + "pos_integration", + "square_support", + "toast_support", + "lightspeed_support", + "webhook_handling", + "transaction_sync", + "real_time_updates" + ] + + def setup_custom_middleware(self): + """Setup custom middleware for POS service""" + # Middleware for request logging and timing + @self.app.middleware("http") + async def log_requests(request: Request, call_next): + start_time = time.time() + + # Log request + self.logger.info( + "Incoming request", + method=request.method, + url=str(request.url), + client_ip=request.client.host if request.client else None + ) + + response = await call_next(request) + + # Log response + process_time = time.time() - start_time + self.logger.info( + "Request completed", + method=request.method, + url=str(request.url), + status_code=response.status_code, + process_time=f"{process_time:.4f}s" + ) + + response.headers["X-Process-Time"] = str(process_time) + return response + + def setup_custom_endpoints(self): + """Setup custom endpoints for POS service""" + @self.app.get("/") + async def root(): + """Root endpoint""" + return { + "service": "POS Integration Service", + "version": "1.0.0", + "status": "running", + "supported_pos_systems": ["square", "toast", "lightspeed"] + } -@asynccontextmanager -async def lifespan(app: FastAPI): - """Lifecycle management for FastAPI app""" - logger.info("Starting POS Integration Service") - - # Startup - try: - # Initialize database connection - logger.info("Initializing database connection") - await init_db() - - # Add any startup logic here - logger.info("POS Integration Service started successfully") - - yield - - except Exception as e: - logger.error("Failed to start POS Integration Service", error=str(e)) - raise - finally: - # Shutdown - logger.info("Shutting down POS Integration Service") - await close_db() +# Create service instance +service = POSService() - -# Create FastAPI app -app = FastAPI( - title="POS Integration Service", - description="Handles integration with external POS systems", - version="1.0.0", +# Create FastAPI app with standardized setup +app = service.create_app( docs_url="/docs" if settings.ENVIRONMENT != "production" else None, - redoc_url="/redoc" if settings.ENVIRONMENT != "production" else None, - lifespan=lifespan + redoc_url="/redoc" if settings.ENVIRONMENT != "production" else None ) -# Add CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=settings.CORS_ORIGINS, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) +# Setup standard endpoints +service.setup_standard_endpoints() +# Setup custom middleware +service.setup_custom_middleware() -# Middleware for request logging and timing -@app.middleware("http") -async def log_requests(request: Request, call_next): - start_time = time.time() - - # Log request - logger.info( - "Incoming request", - method=request.method, - url=str(request.url), - client_ip=request.client.host if request.client else None - ) - - response = await call_next(request) - - # Log response - process_time = time.time() - start_time - logger.info( - "Request completed", - method=request.method, - url=str(request.url), - status_code=response.status_code, - process_time=f"{process_time:.4f}s" - ) - - response.headers["X-Process-Time"] = str(process_time) - return response - - -# Global exception handler -@app.exception_handler(Exception) -async def global_exception_handler(request: Request, exc: Exception): - logger.error( - "Unhandled exception", - error=str(exc), - method=request.method, - url=str(request.url) - ) - - return JSONResponse( - status_code=500, - content={"detail": "Internal server error"} - ) - +# Setup custom endpoints +service.setup_custom_endpoints() # Include routers -app.include_router(health_router, prefix="/health", tags=["health"]) -app.include_router(pos_config.router, prefix="/api/v1", tags=["pos-config"]) -app.include_router(webhooks.router, prefix="/api/v1", tags=["webhooks"]) -app.include_router(sync.router, prefix="/api/v1", tags=["sync"]) - - -@app.get("/") -async def root(): - """Root endpoint""" - return { - "service": "POS Integration Service", - "version": "1.0.0", - "status": "running", - "supported_pos_systems": ["square", "toast", "lightspeed"] - } +service.add_router(pos_config.router, tags=["pos-config"]) +service.add_router(webhooks.router, tags=["webhooks"]) +service.add_router(sync.router, tags=["sync"]) if __name__ == "__main__": diff --git a/services/production/app/main.py b/services/production/app/main.py index 53537791..8adfd758 100644 --- a/services/production/app/main.py +++ b/services/production/app/main.py @@ -6,130 +6,108 @@ Production Service - FastAPI Application Production planning and batch management service """ +import time from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from contextlib import asynccontextmanager -import structlog - from app.core.config import settings -from app.core.database import init_database, get_db_health +from app.core.database import database_manager from app.api.production import router as production_router from app.services.production_alert_service import ProductionAlertService - -# Configure logging -logger = structlog.get_logger() +from shared.service_base import StandardFastAPIService -@asynccontextmanager -async def lifespan(app: FastAPI): - """Manage application lifespan events""" - # Startup - try: - await init_database() - logger.info("Database initialized") - +class ProductionService(StandardFastAPIService): + """Production Service with standardized setup""" + + def __init__(self): + # Define expected database tables for health checks + production_expected_tables = [ + 'production_batches', 'production_schedules', 'production_capacity', + 'quality_check_templates', 'quality_checks', 'equipment' + ] + + self.alert_service = None + + # Create custom checks for alert service + async def check_alert_service(): + """Check production alert service health""" + try: + return bool(self.alert_service) if self.alert_service else False + except Exception as e: + self.logger.error("Alert service health check failed", error=str(e)) + return False + + super().__init__( + service_name=settings.SERVICE_NAME, + app_name=settings.APP_NAME, + description=settings.DESCRIPTION, + version=settings.VERSION, + api_prefix="/api/v1", + database_manager=database_manager, + expected_tables=production_expected_tables, + custom_health_checks={"alert_service": check_alert_service} + ) + + async def on_startup(self, app: FastAPI): + """Custom startup logic for production service""" # Initialize alert service - alert_service = ProductionAlertService(settings) - await alert_service.start() - logger.info("Production alert service started") - + self.alert_service = ProductionAlertService(settings) + await self.alert_service.start() + self.logger.info("Production alert service started") + # Store alert service in app state - app.state.alert_service = alert_service - - logger.info("Production service started successfully") - except Exception as e: - logger.error("Failed to initialize production service", error=str(e)) - raise - - yield - - # Shutdown - logger.info("Production service shutting down") - try: + app.state.alert_service = self.alert_service + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for production service""" # Stop alert service - if hasattr(app.state, 'alert_service'): - await app.state.alert_service.stop() - logger.info("Alert service stopped") - except Exception as e: - logger.error("Error during shutdown", error=str(e)) + if self.alert_service: + await self.alert_service.stop() + self.logger.info("Alert service stopped") + + def get_service_features(self): + """Return production-specific features""" + return [ + "production_planning", + "batch_management", + "production_scheduling", + "quality_control", + "equipment_management", + "capacity_planning", + "alert_notifications" + ] + + def setup_custom_middleware(self): + """Setup custom middleware for production service""" + @self.app.middleware("http") + async def logging_middleware(request: Request, call_next): + """Add request logging middleware""" + start_time = time.time() + response = await call_next(request) + process_time = time.time() - start_time + + self.logger.info("HTTP request processed", + method=request.method, + url=str(request.url), + status_code=response.status_code, + process_time=round(process_time, 4)) + + return response -# Create FastAPI application -app = FastAPI( - title=settings.APP_NAME, - description=settings.DESCRIPTION, - version=settings.VERSION, - lifespan=lifespan -) +# Create service instance +service = ProductionService() -# Add CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # Configure based on environment - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) +# Create FastAPI app with standardized setup +app = service.create_app() + +# Setup standard endpoints +service.setup_standard_endpoints() + +# Setup custom middleware +service.setup_custom_middleware() # Include routers -app.include_router(production_router, prefix="/api/v1") - - -@app.get("/health") -async def health_check(): - """Health check endpoint""" - try: - db_healthy = await get_db_health() - - health_status = { - "status": "healthy" if db_healthy else "unhealthy", - "service": settings.SERVICE_NAME, - "version": settings.VERSION, - "database": "connected" if db_healthy else "disconnected" - } - - if not db_healthy: - health_status["status"] = "unhealthy" - - return health_status - - except Exception as e: - logger.error("Health check failed", error=str(e)) - return { - "status": "unhealthy", - "service": settings.SERVICE_NAME, - "version": settings.VERSION, - "error": str(e) - } - - -@app.get("/") -async def root(): - """Root endpoint""" - return { - "service": settings.APP_NAME, - "version": settings.VERSION, - "description": settings.DESCRIPTION, - "status": "running" - } - - -@app.middleware("http") -async def logging_middleware(request: Request, call_next): - """Add request logging middleware""" - import time - - start_time = time.time() - response = await call_next(request) - process_time = time.time() - start_time - - logger.info("HTTP request processed", - method=request.method, - url=str(request.url), - status_code=response.status_code, - process_time=round(process_time, 4)) - - return response +service.add_router(production_router) if __name__ == "__main__": diff --git a/services/recipes/app/main.py b/services/recipes/app/main.py index 9a604622..ee3e1f1c 100644 --- a/services/recipes/app/main.py +++ b/services/recipes/app/main.py @@ -4,122 +4,90 @@ Recipe Service - FastAPI application Handles recipe management, production planning, and inventory consumption tracking """ -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.middleware.gzip import GZipMiddleware -from fastapi.responses import JSONResponse import time -import logging -from contextlib import asynccontextmanager +from fastapi import FastAPI, Request +from fastapi.middleware.gzip import GZipMiddleware from .core.config import settings from .core.database import db_manager from .api import recipes +from shared.service_base import StandardFastAPIService # Import models to register them with SQLAlchemy metadata from .models import recipes as recipe_models -# Configure logging -logging.basicConfig( - level=getattr(logging, settings.LOG_LEVEL.upper()), - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) +class RecipesService(StandardFastAPIService): + """Recipes Service with standardized setup""" + def __init__(self): + # Define expected database tables for health checks + recipes_expected_tables = [ + 'recipes', 'recipe_ingredients', 'production_batches', + 'production_ingredient_consumption', 'production_schedules' + ] -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan events""" - # Startup - logger.info(f"Starting {settings.SERVICE_NAME} service v{settings.SERVICE_VERSION}") - - # Create database tables - try: - await db_manager.create_tables() - logger.info("Database tables created successfully") - except Exception as e: - logger.error(f"Failed to create database tables: {e}") - - yield - - # Shutdown - logger.info(f"Shutting down {settings.SERVICE_NAME} service") - - -# Create FastAPI application -app = FastAPI( - title="Recipe Management Service", - description="Comprehensive recipe management, production planning, and inventory consumption tracking for bakery operations", - version=settings.SERVICE_VERSION, - lifespan=lifespan, - docs_url="/docs" if settings.DEBUG else None, - redoc_url="/redoc" if settings.DEBUG else None, -) - -# Add middleware -app.add_middleware( - CORSMiddleware, - allow_origins=settings.ALLOWED_ORIGINS, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -app.add_middleware(GZipMiddleware, minimum_size=1000) - - -# Request timing middleware -@app.middleware("http") -async def add_process_time_header(request: Request, call_next): - """Add processing time header to responses""" - start_time = time.time() - response = await call_next(request) - process_time = time.time() - start_time - response.headers["X-Process-Time"] = str(process_time) - return response - - -# Global exception handler -@app.exception_handler(Exception) -async def global_exception_handler(request: Request, exc: Exception): - """Global exception handler""" - logger.error(f"Global exception on {request.url}: {exc}", exc_info=True) - return JSONResponse( - status_code=500, - content={ - "detail": "Internal server error", - "error": str(exc) if settings.DEBUG else "An unexpected error occurred" - } - ) - - -# Health check endpoint -@app.get("/health") -async def health_check(): - """Health check endpoint""" - try: - # Test database connection - health_result = await db_manager.health_check() - - return { - "status": "healthy", - "service": settings.SERVICE_NAME, - "version": settings.SERVICE_VERSION, - "environment": settings.ENVIRONMENT, - "database": health_result - } - except Exception as e: - logger.error(f"Health check failed: {e}") - return JSONResponse( - status_code=503, - content={ - "status": "unhealthy", - "service": settings.SERVICE_NAME, - "version": settings.SERVICE_VERSION, - "error": str(e) - } + super().__init__( + service_name="recipes-service", + app_name="Recipe Management Service", + description="Comprehensive recipe management, production planning, and inventory consumption tracking for bakery operations", + version=settings.SERVICE_VERSION, + log_level=settings.LOG_LEVEL, + cors_origins=settings.ALLOWED_ORIGINS, + api_prefix=settings.API_V1_PREFIX, + database_manager=db_manager, + expected_tables=recipes_expected_tables ) + async def on_startup(self, app: FastAPI): + """Custom startup logic for recipes service""" + # Custom startup completed + pass + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for recipes service""" + # Database cleanup is handled by the base class + pass + + def get_service_features(self): + """Return recipes-specific features""" + return [ + "recipe_management", + "production_planning", + "inventory_consumption_tracking", + "batch_production", + "tenant_scoped_operations" + ] + + def setup_custom_middleware(self): + """Setup custom middleware for recipes service""" + # Add GZip middleware + self.app.add_middleware(GZipMiddleware, minimum_size=1000) + + # Request timing middleware + @self.app.middleware("http") + async def add_process_time_header(request: Request, call_next): + """Add processing time header to responses""" + start_time = time.time() + response = await call_next(request) + process_time = time.time() - start_time + response.headers["X-Process-Time"] = str(process_time) + return response + + +# Create service instance +service = RecipesService() + +# Create FastAPI app with standardized setup +app = service.create_app( + docs_url="/docs" if settings.DEBUG else None, + redoc_url="/redoc" if settings.DEBUG else None +) + +# Setup standard endpoints +service.setup_standard_endpoints() + +# Setup custom middleware +service.setup_custom_middleware() # Include API routers with tenant-scoped paths app.include_router( @@ -129,19 +97,6 @@ app.include_router( ) - - -@app.get("/") -async def root(): - """Root endpoint""" - return { - "service": settings.SERVICE_NAME, - "version": settings.SERVICE_VERSION, - "status": "running", - "docs_url": "/docs" if settings.DEBUG else None - } - - if __name__ == "__main__": import uvicorn uvicorn.run( diff --git a/services/recipes/requirements.txt b/services/recipes/requirements.txt index be840453..e0b8126a 100644 --- a/services/recipes/requirements.txt +++ b/services/recipes/requirements.txt @@ -24,6 +24,9 @@ requests==2.31.0 asyncio-mqtt==0.16.1 aiofiles==23.2.1 +# Messaging +aio-pika==9.3.1 + # Caching (optional) redis==5.0.1 python-redis-cache==0.1.0 @@ -31,6 +34,7 @@ python-redis-cache==0.1.0 # Monitoring and logging structlog==23.2.0 python-json-logger==2.0.4 +prometheus-client==0.19.0 # Date/time handling python-dateutil==2.8.2 diff --git a/services/sales/app/main.py b/services/sales/app/main.py index 59549e0e..2b6129fd 100644 --- a/services/sales/app/main.py +++ b/services/sales/app/main.py @@ -3,150 +3,125 @@ Sales Service Main Application """ -import structlog -from contextlib import asynccontextmanager -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse - +from fastapi import FastAPI from app.core.config import settings -from app.core.database import init_db, close_db -from shared.monitoring import setup_logging, HealthChecker -from shared.monitoring.metrics import setup_metrics_early - -# Setup logging first -setup_logging("sales-service", settings.LOG_LEVEL) -logger = structlog.get_logger() - -# Global variables for lifespan access -metrics_collector = None -health_checker = None - -# Create FastAPI app FIRST -app = FastAPI( - title="Bakery Sales Service", - description="Sales data management service for bakery operations", - version="1.0.0" -) - -# Setup metrics BEFORE any middleware and BEFORE lifespan -metrics_collector = setup_metrics_early(app, "sales-service") - -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan events""" - global health_checker - - # Startup - logger.info("Starting Sales Service...") - - try: - # Initialize database - await init_db() - logger.info("Database initialized") - - # Register custom metrics - metrics_collector.register_counter("sales_records_created_total", "Total sales records created") - metrics_collector.register_counter("sales_records_updated_total", "Total sales records updated") - metrics_collector.register_counter("sales_queries_total", "Sales record queries") - metrics_collector.register_counter("product_queries_total", "Product catalog queries") - metrics_collector.register_counter("import_jobs_total", "Data import jobs") - metrics_collector.register_counter("export_jobs_total", "Data export jobs") - - metrics_collector.register_histogram("sales_create_duration_seconds", "Sales record creation duration") - metrics_collector.register_histogram("sales_query_duration_seconds", "Sales query duration") - metrics_collector.register_histogram("import_processing_duration_seconds", "Import processing duration") - metrics_collector.register_histogram("export_generation_duration_seconds", "Export generation duration") - - # Setup health checker - health_checker = HealthChecker("sales-service") - - # Add database health check - async def check_database(): - try: - from app.core.database import get_db - from sqlalchemy import text - async for db in get_db(): - await db.execute(text("SELECT 1")) - return True - except Exception as e: - return f"Database error: {e}" - - health_checker.add_check("database", check_database, timeout=5.0, critical=True) - - # Store health checker in app state - app.state.health_checker = health_checker - - logger.info("Sales Service started successfully") - - except Exception as e: - logger.error(f"Failed to start Sales Service: {e}") - raise - - yield - - # Shutdown - logger.info("Shutting down Sales Service...") - await close_db() - -# Set lifespan AFTER metrics setup -app.router.lifespan_context = lifespan - -# CORS middleware (added after metrics setup) -app.add_middleware( - CORSMiddleware, - allow_origins=settings.CORS_ORIGINS, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - +from app.core.database import database_manager +from shared.service_base import StandardFastAPIService # Include routers - import router BEFORE sales router to avoid conflicts from app.api.sales import router as sales_router from app.api.import_data import router as import_router -app.include_router(import_router, prefix="/api/v1", tags=["import"]) -app.include_router(sales_router, prefix="/api/v1", tags=["sales"]) -# Health check endpoint -@app.get("/health") -async def health_check(): - """Comprehensive health check endpoint""" - if health_checker: - return await health_checker.check_health() - else: - return { - "service": "sales-service", - "status": "healthy", - "version": "1.0.0" - } -# Root endpoint -@app.get("/") -async def root(): - """Root endpoint""" - return { - "service": "Sales Service", - "version": "1.0.0", - "status": "running", - "endpoints": { - "health": "/health", - "docs": "/docs", - "sales": "/api/v1/sales", - "products": "/api/v1/products" - } - } +class SalesService(StandardFastAPIService): + """Sales Service with standardized setup""" -# Exception handlers -@app.exception_handler(Exception) -async def global_exception_handler(request: Request, exc: Exception): - """Global exception handler with metrics""" - logger.error(f"Unhandled exception: {exc}", exc_info=True) - - # Record error metric if available - if metrics_collector: - metrics_collector.increment_counter("errors_total", labels={"type": "unhandled"}) - - return JSONResponse( - status_code=500, - content={"detail": "Internal server error"} - ) \ No newline at end of file + def __init__(self): + # Define expected database tables for health checks + sales_expected_tables = ['sales_data', 'sales_import_jobs'] + + super().__init__( + service_name="sales-service", + app_name="Bakery Sales Service", + description="Sales data management service for bakery operations", + version="1.0.0", + log_level=settings.LOG_LEVEL, + cors_origins=settings.CORS_ORIGINS, + api_prefix="/api/v1", + database_manager=database_manager, + expected_tables=sales_expected_tables + ) + + async def on_startup(self, app: FastAPI): + """Custom startup logic for sales service""" + # Register custom metrics + self.register_custom_metrics({ + "sales_records_created_total": { + "type": "counter", + "description": "Total sales records created" + }, + "sales_records_updated_total": { + "type": "counter", + "description": "Total sales records updated" + }, + "sales_queries_total": { + "type": "counter", + "description": "Sales record queries" + }, + "product_queries_total": { + "type": "counter", + "description": "Product catalog queries" + }, + "import_jobs_total": { + "type": "counter", + "description": "Data import jobs" + }, + "export_jobs_total": { + "type": "counter", + "description": "Data export jobs" + }, + "sales_create_duration_seconds": { + "type": "histogram", + "description": "Sales record creation duration" + }, + "sales_query_duration_seconds": { + "type": "histogram", + "description": "Sales query duration" + }, + "import_processing_duration_seconds": { + "type": "histogram", + "description": "Import processing duration" + }, + "export_generation_duration_seconds": { + "type": "histogram", + "description": "Export generation duration" + } + }) + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for sales service""" + # Database cleanup is handled by the base class + pass + + def get_service_features(self): + """Return sales-specific features""" + return [ + "sales_data_management", + "product_catalog", + "data_import_export", + "sales_analytics", + "performance_tracking" + ] + + def setup_custom_endpoints(self): + """Setup custom endpoints for sales service""" + @self.app.get("/") + async def root(): + """Root endpoint""" + return { + "service": "Sales Service", + "version": "1.0.0", + "status": "running", + "endpoints": { + "health": "/health", + "docs": "/docs", + "sales": "/api/v1/sales", + "products": "/api/v1/products" + } + } + + +# Create service instance +service = SalesService() + +# Create FastAPI app with standardized setup +app = service.create_app() + +# Setup standard endpoints +service.setup_standard_endpoints() + +# Setup custom endpoints +service.setup_custom_endpoints() + +# Include routers +service.add_router(import_router, tags=["import"]) +service.add_router(sales_router, tags=["sales"]) \ No newline at end of file diff --git a/services/suppliers/app/main.py b/services/suppliers/app/main.py index d7bca04e..2fc2ea07 100644 --- a/services/suppliers/app/main.py +++ b/services/suppliers/app/main.py @@ -4,151 +4,51 @@ Supplier & Procurement Service FastAPI Application """ import os -from contextlib import asynccontextmanager -from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse -import structlog - -# Import core modules +from fastapi import FastAPI from app.core.config import settings -from app.core.database import init_db, close_db +from app.core.database import database_manager from app.api import suppliers, purchase_orders, deliveries -from shared.monitoring.health import router as health_router -from shared.monitoring.metrics import setup_metrics_early -# from shared.auth.decorators import setup_auth_middleware - -logger = structlog.get_logger() - - -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan management""" - # Startup - logger.info("Starting Supplier Service", version=settings.VERSION) - - try: - # Initialize database - await init_db() - logger.info("Database initialized successfully") - - yield - - except Exception as e: - logger.error("Startup failed", error=str(e)) - raise - finally: - # Shutdown - logger.info("Shutting down Supplier Service") - try: - await close_db() - logger.info("Database connections closed") - except Exception as e: - logger.error("Shutdown error", error=str(e)) - - -# Create FastAPI application -app = FastAPI( - title=settings.APP_NAME, - description=settings.DESCRIPTION, - version=settings.VERSION, - openapi_url=f"{settings.API_V1_STR}/openapi.json", - docs_url=f"{settings.API_V1_STR}/docs", - redoc_url=f"{settings.API_V1_STR}/redoc", - lifespan=lifespan -) - - -# CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=settings.CORS_ORIGINS, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -# Setup metrics -try: - setup_metrics_early(app, "suppliers-service") - logger.info("Metrics setup completed") -except Exception as e: - logger.error("Metrics setup failed", error=str(e)) - -# Setup authentication middleware (commented out - not implemented) -# setup_auth_middleware(app) - - -# Exception handlers -@app.exception_handler(ValueError) -async def value_error_handler(request: Request, exc: ValueError): - """Handle validation errors""" - return JSONResponse( - status_code=400, - content={ - "error": "Validation Error", - "detail": str(exc), - "type": "value_error" - } - ) - - -@app.exception_handler(Exception) -async def general_exception_handler(request: Request, exc: Exception): - """Handle general exceptions""" - logger.error( - "Unhandled exception", - error=str(exc), - path=request.url.path, - method=request.method - ) - - return JSONResponse( - status_code=500, - content={ - "error": "Internal Server Error", - "detail": "An unexpected error occurred", - "type": "internal_error" - } - ) - - -# Include routers -app.include_router(health_router, prefix="/health", tags=["health"]) -app.include_router(suppliers.router, prefix=settings.API_V1_STR) -app.include_router(purchase_orders.router, prefix=settings.API_V1_STR) -app.include_router(deliveries.router, prefix=settings.API_V1_STR) - +from shared.service_base import StandardFastAPIService # Include enhanced performance tracking router from app.api.performance import router as performance_router -app.include_router(performance_router, prefix=settings.API_V1_STR) -# Root endpoint -@app.get("/") -async def root(): - """Root endpoint with service information""" - return { - "service": settings.SERVICE_NAME, - "version": settings.VERSION, - "description": settings.DESCRIPTION, - "status": "running", - "docs_url": f"{settings.API_V1_STR}/docs", - "health_url": "/health" - } +class SuppliersService(StandardFastAPIService): + """Suppliers Service with standardized setup""" + def __init__(self): + # Define expected database tables for health checks + suppliers_expected_tables = [ + 'suppliers', 'supplier_price_lists', 'purchase_orders', 'purchase_order_items', + 'deliveries', 'delivery_items', 'supplier_quality_reviews', 'supplier_invoices', + 'supplier_performance_metrics', 'supplier_alerts', 'supplier_scorecards', + 'supplier_benchmarks', 'alert_rules' + ] -# Service info endpoint -@app.get(f"{settings.API_V1_STR}/info") -async def service_info(): - """Service information endpoint""" - return { - "service": settings.SERVICE_NAME, - "version": settings.VERSION, - "description": settings.DESCRIPTION, - "api_version": "v1", - "environment": settings.ENVIRONMENT, - "features": [ + super().__init__( + service_name="suppliers-service", + app_name=settings.APP_NAME, + description=settings.DESCRIPTION, + version=settings.VERSION, + cors_origins=settings.CORS_ORIGINS, + api_prefix=settings.API_V1_STR, + database_manager=database_manager, + expected_tables=suppliers_expected_tables + ) + + async def on_startup(self, app: FastAPI): + """Custom startup logic for suppliers service""" + # Custom startup completed + pass + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for suppliers service""" + # Database cleanup is handled by the base class + pass + + def get_service_features(self): + """Return suppliers-specific features""" + return [ "supplier_management", "vendor_onboarding", "purchase_orders", @@ -168,7 +68,22 @@ async def service_info(): "risk_assessment", "benchmarking" ] - } + + +# Create service instance +service = SuppliersService() + +# Create FastAPI app with standardized setup +app = service.create_app() + +# Setup standard endpoints +service.setup_standard_endpoints() + +# Include routers +service.add_router(suppliers.router) +service.add_router(purchase_orders.router) +service.add_router(deliveries.router) +service.add_router(performance_router) if __name__ == "__main__": diff --git a/services/tenant/app/main.py b/services/tenant/app/main.py index 07032433..b446a7e5 100644 --- a/services/tenant/app/main.py +++ b/services/tenant/app/main.py @@ -1,97 +1,83 @@ # services/tenant/app/main.py """ -Tenant Service FastAPI application - FIXED VERSION +Tenant Service FastAPI application """ -import structlog from fastapi import FastAPI -from fastapi.middleware.cors import CORSMiddleware - from app.core.config import settings from app.core.database import database_manager from app.api import tenants, subscriptions, webhooks -from shared.monitoring.logging import setup_logging -from shared.monitoring.metrics import MetricsCollector +from shared.service_base import StandardFastAPIService -# Setup logging -setup_logging("tenant-service", settings.LOG_LEVEL) -logger = structlog.get_logger() -# Create FastAPI app -app = FastAPI( - title="Tenant Management Service", - description="Multi-tenant bakery management service", - version="1.0.0", +class TenantService(StandardFastAPIService): + """Tenant Service with standardized setup""" + + def __init__(self): + # Define expected database tables for health checks + tenant_expected_tables = ['tenants', 'tenant_members', 'subscriptions'] + + super().__init__( + service_name="tenant-service", + app_name="Tenant Management Service", + description="Multi-tenant bakery management service", + version="1.0.0", + log_level=settings.LOG_LEVEL, + api_prefix="/api/v1", + database_manager=database_manager, + expected_tables=tenant_expected_tables + ) + + async def on_startup(self, app: FastAPI): + """Custom startup logic for tenant service""" + # Import models to ensure they're registered with SQLAlchemy + from app.models.tenants import Tenant, TenantMember, Subscription + self.logger.info("Tenant models imported successfully") + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for tenant service""" + # Database cleanup is handled by the base class + pass + + def get_service_features(self): + """Return tenant-specific features""" + return [ + "multi_tenant_management", + "subscription_management", + "tenant_isolation", + "webhook_notifications", + "member_management" + ] + + def setup_custom_endpoints(self): + """Setup custom endpoints for tenant service""" + @self.app.get("/metrics") + async def metrics(): + """Prometheus metrics endpoint""" + if self.metrics_collector: + return self.metrics_collector.get_metrics() + return {"metrics": "not_available"} + + +# Create service instance +service = TenantService() + +# Create FastAPI app with standardized setup +app = service.create_app( docs_url="/docs", redoc_url="/redoc" ) -# Initialize metrics -metrics_collector = MetricsCollector("tenant_service") -app.state.metrics_collector = metrics_collector +# Setup standard endpoints +service.setup_standard_endpoints() -# CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) +# Setup custom endpoints +service.setup_custom_endpoints() # Include routers -app.include_router(tenants.router, prefix="/api/v1", tags=["tenants"]) -app.include_router(subscriptions.router, prefix="/api/v1", tags=["subscriptions"]) -app.include_router(webhooks.router, prefix="/api/v1", tags=["webhooks"]) - -@app.on_event("startup") -async def startup_event(): - """Initialize service on startup""" - logger.info("Starting Tenant Service...") - - try: - # ✅ FIX: Import models to ensure they're registered with SQLAlchemy - from app.models.tenants import Tenant, TenantMember, Subscription - logger.info("Tenant models imported successfully") - - # ✅ FIX: Create database tables on startup - await database_manager.create_tables() - logger.info("Tenant database tables created successfully") - - except Exception as e: - logger.error(f"Failed to initialize tenant service: {e}") - raise - - logger.info("Tenant Service startup completed successfully") - -@app.on_event("shutdown") -async def shutdown_event(): - """Cleanup on shutdown""" - logger.info("Shutting down Tenant Service...") - - try: - # Close database connections properly - if hasattr(database_manager, 'engine') and database_manager.engine: - await database_manager.engine.dispose() - logger.info("Database connections closed") - except Exception as e: - logger.error(f"Error during shutdown: {e}") - - logger.info("Tenant Service shutdown completed") - -@app.get("/health") -async def health_check(): - """Health check endpoint""" - return { - "status": "healthy", - "service": "tenant-service", - "version": "1.0.0" - } - -@app.get("/metrics") -async def metrics(): - """Prometheus metrics endpoint""" - return metrics_collector.get_metrics() +service.add_router(tenants.router, tags=["tenants"]) +service.add_router(subscriptions.router, tags=["subscriptions"]) +service.add_router(webhooks.router, tags=["webhooks"]) if __name__ == "__main__": import uvicorn diff --git a/services/training/app/main.py b/services/training/app/main.py index 32bab02d..7dbbc922 100644 --- a/services/training/app/main.py +++ b/services/training/app/main.py @@ -1,280 +1,147 @@ # ================================================================ -# services/training/app/main.py - FIXED VERSION +# services/training/app/main.py # ================================================================ """ Training Service Main Application -Enhanced with proper error handling, monitoring, and lifecycle management +ML training service for bakery demand forecasting """ -import structlog import asyncio -from contextlib import asynccontextmanager from fastapi import FastAPI, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.middleware.trustedhost import TrustedHostMiddleware -from fastapi.responses import JSONResponse -import uvicorn - from app.core.config import settings -from app.core.database import initialize_training_database, cleanup_training_database, get_db_health, get_comprehensive_db_health +from app.core.database import initialize_training_database, cleanup_training_database, database_manager from app.api import training, models - from app.api.websocket import websocket_router from app.services.messaging import setup_messaging, cleanup_messaging -from shared.monitoring.logging import setup_logging -from shared.monitoring.metrics import MetricsCollector -# REMOVED: from shared.auth.decorators import require_auth +from shared.service_base import StandardFastAPIService -# Setup structured logging -setup_logging("training-service", settings.LOG_LEVEL) -logger = structlog.get_logger() -# Initialize metrics collector -metrics_collector = MetricsCollector("training-service") +class TrainingService(StandardFastAPIService): + """Training Service with standardized setup""" -@asynccontextmanager -async def lifespan(app: FastAPI): - """ - Application lifespan manager for startup and shutdown events - """ - # Startup - logger.info("Starting Training Service", version="1.0.0") - - try: - # Initialize database - logger.info("Initializing database connection") - await initialize_training_database() - logger.info("Database initialized successfully") - - # Initialize messaging - logger.info("Setting up messaging") + def __init__(self): + # Define expected database tables for health checks + training_expected_tables = [ + 'model_training_logs', 'trained_models', 'model_performance_metrics', + 'training_job_queue', 'model_artifacts' + ] + + super().__init__( + service_name="training-service", + app_name="Bakery Training Service", + description="ML training service for bakery demand forecasting", + version="1.0.0", + log_level=settings.LOG_LEVEL, + cors_origins=settings.CORS_ORIGINS_LIST, + api_prefix="/api/v1", + database_manager=database_manager, + expected_tables=training_expected_tables, + enable_messaging=True + ) + + async def _setup_messaging(self): + """Setup messaging for training service""" await setup_messaging() - logger.info("Messaging setup completed") - - # Start metrics server - logger.info("Starting metrics server") - metrics_collector.start_metrics_server(8080) - logger.info("Metrics server started on port 8080") - - # Store metrics collector in app state - app.state.metrics_collector = metrics_collector - - # Mark service as ready - app.state.ready = True - logger.info("Training Service startup completed successfully") - - yield - - except Exception as e: - logger.error("Failed to start Training Service", error=str(e)) - app.state.ready = False - raise - - # Shutdown - logger.info("Shutting down Training Service") - - try: - # Stop metrics server - if hasattr(app.state, 'metrics_collector'): - await app.state.metrics_collector.shutdown() - - # Cleanup messaging + self.logger.info("Messaging setup completed") + + async def _cleanup_messaging(self): + """Cleanup messaging for training service""" await cleanup_messaging() - logger.info("Messaging cleanup completed") - - # Close database connections + + async def on_startup(self, app: FastAPI): + """Custom startup logic for training service""" + pass + + async def on_shutdown(self, app: FastAPI): + """Custom shutdown logic for training service""" + # Note: Database cleanup is handled by the base class + # but training service has custom cleanup function await cleanup_training_database() - logger.info("Database connections closed") - - except Exception as e: - logger.error("Error during shutdown", error=str(e)) - - logger.info("Training Service shutdown completed") + self.logger.info("Training database cleanup completed") -# Create FastAPI application with lifespan -app = FastAPI( - title="Bakery Training Service", - description="ML training service for bakery demand forecasting", - version="1.0.0", + def get_service_features(self): + """Return training-specific features""" + return [ + "ml_model_training", + "demand_forecasting", + "model_performance_tracking", + "training_job_queue", + "model_artifacts_management", + "websocket_support", + "messaging_integration" + ] + + def setup_custom_middleware(self): + """Setup custom middleware for training service""" + # Request middleware for logging and metrics + @self.app.middleware("http") + async def process_request(request: Request, call_next): + """Process requests with logging and metrics""" + start_time = asyncio.get_event_loop().time() + + try: + response = await call_next(request) + duration = asyncio.get_event_loop().time() - start_time + + self.logger.info( + "Request completed", + method=request.method, + path=request.url.path, + status_code=response.status_code, + duration_ms=round(duration * 1000, 2) + ) + + return response + + except Exception as e: + duration = asyncio.get_event_loop().time() - start_time + + self.logger.error( + "Request failed", + method=request.method, + path=request.url.path, + error=str(e), + duration_ms=round(duration * 1000, 2) + ) + raise + + def setup_custom_endpoints(self): + """Setup custom endpoints for training service""" + @self.app.get("/metrics") + async def get_metrics(): + """Prometheus metrics endpoint""" + if self.metrics_collector: + return self.metrics_collector.get_metrics() + return {"status": "metrics not available"} + + @self.app.get("/") + async def root(): + return {"service": "training-service", "version": "1.0.0"} + + +# Create service instance +service = TrainingService() + +# Create FastAPI app with standardized setup +app = service.create_app( docs_url="/docs", - redoc_url="/redoc", - lifespan=lifespan + redoc_url="/redoc" ) -# Add middleware -app.add_middleware( - CORSMiddleware, - allow_origins=settings.CORS_ORIGINS_LIST, - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) +# Setup standard endpoints +service.setup_standard_endpoints() +# Setup custom middleware +service.setup_custom_middleware() -# Request middleware for logging and metrics -@app.middleware("http") -async def process_request(request: Request, call_next): - """Process requests with logging and metrics""" - start_time = asyncio.get_event_loop().time() - - try: - response = await call_next(request) - duration = asyncio.get_event_loop().time() - start_time - - logger.info( - "Request completed", - method=request.method, - path=request.url.path, - status_code=response.status_code, - duration_ms=round(duration * 1000, 2) - ) - - # Update metrics - metrics_collector.record_request( - method=request.method, - endpoint=request.url.path, - status_code=response.status_code, - duration=duration - ) - - return response - - except Exception as e: - duration = asyncio.get_event_loop().time() - start_time - - logger.error( - "Request failed", - method=request.method, - path=request.url.path, - error=str(e), - duration_ms=round(duration * 1000, 2) - ) - - metrics_collector.increment_counter("http_requests_failed_total") - raise - -# Exception handlers -@app.exception_handler(Exception) -async def global_exception_handler(request: Request, exc: Exception): - """Global exception handler for unhandled errors""" - logger.error( - "Unhandled exception", - path=request.url.path, - method=request.method, - error=str(exc), - exc_info=True - ) - - metrics_collector.increment_counter("unhandled_exceptions_total") - - return JSONResponse( - status_code=500, - content={ - "detail": "Internal server error", - "error_id": structlog.get_logger().new().info("Error logged", error=str(exc)) - } - ) +# Setup custom endpoints +service.setup_custom_endpoints() # Include API routers -app.include_router(training.router, prefix="/api/v1", tags=["training"]) - -app.include_router(models.router, prefix="/api/v1", tags=["models"]) +service.add_router(training.router, tags=["training"]) +service.add_router(models.router, tags=["models"]) app.include_router(websocket_router, prefix="/api/v1/ws", tags=["websocket"]) - -# Health check endpoints -@app.get("/health") -async def health_check(): - """Basic health check endpoint""" - return { - "status": "healthy" if app.state.ready else "starting", - "service": "training-service", - "version": "1.0.0", - "timestamp": structlog.get_logger().new().info("Health check") - } - -@app.get("/health/ready") -async def readiness_check(): - """Kubernetes readiness probe endpoint with comprehensive database checks""" - try: - # Get comprehensive database health including table verification - db_health = await get_comprehensive_db_health() - - checks = { - "database_connectivity": db_health["connectivity"], - "database_tables": db_health["tables_exist"], - "application": getattr(app.state, 'ready', False) - } - - # Include detailed database info for debugging - database_details = { - "status": db_health["status"], - "tables_verified": db_health["tables_verified"], - "missing_tables": db_health["missing_tables"], - "errors": db_health["errors"] - } - - # Service is ready only if all checks pass - all_ready = all(checks.values()) and db_health["status"] == "healthy" - - if all_ready: - return { - "status": "ready", - "checks": checks, - "database": database_details - } - else: - return JSONResponse( - status_code=503, - content={ - "status": "not ready", - "checks": checks, - "database": database_details - } - ) - - except Exception as e: - logger.error("Readiness check failed", error=str(e)) - return JSONResponse( - status_code=503, - content={ - "status": "not ready", - "error": f"Health check failed: {str(e)}" - } - ) - -@app.get("/health/database") -async def database_health_check(): - """Detailed database health endpoint for debugging""" - try: - db_health = await get_comprehensive_db_health() - status_code = 200 if db_health["status"] == "healthy" else 503 - return JSONResponse(status_code=status_code, content=db_health) - except Exception as e: - logger.error("Database health check failed", error=str(e)) - return JSONResponse( - status_code=503, - content={ - "status": "unhealthy", - "error": f"Health check failed: {str(e)}" - } - ) - -@app.get("/metrics") -async def get_metrics(): - """Prometheus metrics endpoint""" - if hasattr(app.state, 'metrics_collector'): - return app.state.metrics_collector.get_metrics() - return {"status": "metrics not available"} - -@app.get("/health/live") -async def liveness_check(): - return {"status": "alive"} - -@app.get("/") -async def root(): - return {"service": "training-service", "version": "1.0.0"} - if __name__ == "__main__": uvicorn.run( "app.main:app", diff --git a/shared/monitoring/__init__.py b/shared/monitoring/__init__.py index 7128a9f0..bc13aeab 100644 --- a/shared/monitoring/__init__.py +++ b/shared/monitoring/__init__.py @@ -4,12 +4,20 @@ Shared monitoring package for microservices from .logging import setup_logging from .metrics import setup_metrics_early, get_metrics_collector, MetricsCollector -from .health import HealthChecker +from .health_checks import ( + HealthCheckManager, + FastAPIHealthChecker, + create_health_manager, + setup_fastapi_health_checks +) __all__ = [ 'setup_logging', - 'setup_metrics_early', + 'setup_metrics_early', 'get_metrics_collector', 'MetricsCollector', - 'HealthChecker' + 'HealthCheckManager', + 'FastAPIHealthChecker', + 'create_health_manager', + 'setup_fastapi_health_checks' ] \ No newline at end of file diff --git a/shared/monitoring/health_checks.py b/shared/monitoring/health_checks.py new file mode 100644 index 00000000..5a8a7966 --- /dev/null +++ b/shared/monitoring/health_checks.py @@ -0,0 +1,370 @@ +""" +Enhanced Health Check System for Microservices + +Provides unified health check endpoints and database verification based on +the comprehensive implementation from the training service. +""" + +from typing import Dict, Any, List, Optional, Callable +from contextlib import asynccontextmanager +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import text, inspect +from fastapi import HTTPException +from fastapi.responses import JSONResponse +import structlog +import time +import datetime + +from ..database.base import DatabaseManager +from ..database.exceptions import DatabaseError, HealthCheckError + +logger = structlog.get_logger() + + +class HealthCheckManager: + """ + Unified health check manager for microservices + + Provides standardized health check endpoints: + - /health - Basic service health + - /health/ready - Kubernetes readiness probe with comprehensive checks + - /health/live - Kubernetes liveness probe + - /health/database - Detailed database health information + """ + + def __init__( + self, + service_name: str, + version: str = "1.0.0", + database_manager: Optional[DatabaseManager] = None, + expected_tables: Optional[List[str]] = None, + custom_checks: Optional[Dict[str, Callable]] = None + ): + self.service_name = service_name + self.version = version + self.database_manager = database_manager + self.expected_tables = expected_tables or [] + self.custom_checks = custom_checks or {} + self.ready_state = False + + def set_ready(self, ready: bool = True): + """Set service ready state""" + self.ready_state = ready + logger.info(f"Service ready state changed", + service=self.service_name, ready=ready) + + async def basic_health_check(self, app_state=None) -> Dict[str, Any]: + """Basic health check endpoint (/health)""" + # Check app state for ready status if available + ready = self.ready_state + if app_state and hasattr(app_state, 'ready'): + ready = app_state.ready + + return { + "status": "healthy" if ready else "starting", + "service": self.service_name, + "version": self.version, + "timestamp": datetime.datetime.utcnow().isoformat() + } + + async def readiness_check(self, app_state=None) -> Dict[str, Any]: + """ + Kubernetes readiness probe endpoint (/health/ready) + + Returns 200 if ready, 503 if not ready + """ + try: + # Check app state for ready status if available + ready = self.ready_state + if app_state and hasattr(app_state, 'ready'): + ready = app_state.ready + + checks = { + "application": ready + } + + database_details = {} + + # Database connectivity and table verification + if self.database_manager: + db_health = await self._get_comprehensive_db_health() + checks["database_connectivity"] = db_health["connectivity"] + checks["database_tables"] = db_health["tables_exist"] + + database_details = { + "status": db_health["status"], + "tables_verified": db_health["tables_verified"], + "missing_tables": db_health["missing_tables"], + "errors": db_health["errors"] + } + + # Execute custom checks + for check_name, check_func in self.custom_checks.items(): + try: + checks[check_name] = await check_func() + except Exception as e: + checks[check_name] = False + logger.error(f"Custom check '{check_name}' failed", error=str(e)) + + # Service is ready only if all checks pass + all_ready = all(checks.values()) + if self.database_manager: + all_ready = all_ready and database_details.get("status") == "healthy" + + response_data = { + "status": "ready" if all_ready else "not ready", + "checks": checks + } + + if database_details: + response_data["database"] = database_details + + if all_ready: + return response_data + else: + raise HTTPException(status_code=503, detail=response_data) + + except HTTPException: + raise + except Exception as e: + logger.error("Readiness check failed", error=str(e)) + raise HTTPException( + status_code=503, + detail={ + "status": "not ready", + "error": f"Health check failed: {str(e)}" + } + ) + + async def liveness_check(self) -> Dict[str, Any]: + """Kubernetes liveness probe endpoint (/health/live)""" + return {"status": "alive"} + + async def database_health_check(self) -> Dict[str, Any]: + """ + Detailed database health endpoint (/health/database) + + Returns 200 if healthy, 503 if unhealthy + """ + if not self.database_manager: + raise HTTPException( + status_code=404, + detail={"error": "Database health check not available"} + ) + + try: + db_health = await self._get_comprehensive_db_health() + status_code = 200 if db_health["status"] == "healthy" else 503 + + if status_code == 503: + raise HTTPException(status_code=503, detail=db_health) + return db_health + + except HTTPException: + raise + except Exception as e: + logger.error("Database health check failed", error=str(e)) + raise HTTPException( + status_code=503, + detail={ + "status": "unhealthy", + "error": f"Health check failed: {str(e)}" + } + ) + + async def _get_comprehensive_db_health(self) -> Dict[str, Any]: + """ + Comprehensive database health check with table verification + Based on training service implementation + """ + health_status = { + "status": "healthy", + "connectivity": False, + "tables_exist": False, + "tables_verified": [], + "missing_tables": [], + "errors": [], + "connection_info": {}, + "response_time_ms": 0 + } + + if not self.database_manager: + health_status["status"] = "unhealthy" + health_status["errors"].append("Database manager not configured") + return health_status + + try: + # Test basic connectivity with timing + start_time = time.time() + health_status["connectivity"] = await self.database_manager.test_connection() + response_time = (time.time() - start_time) * 1000 + health_status["response_time_ms"] = round(response_time, 2) + + if not health_status["connectivity"]: + health_status["status"] = "unhealthy" + health_status["errors"].append("Database connectivity failed") + return health_status + + # Get connection pool information + health_status["connection_info"] = await self.database_manager.get_connection_info() + + # Test table existence if expected tables are configured + if self.expected_tables: + tables_verified = await self._verify_tables_exist() + health_status["tables_exist"] = tables_verified + + if tables_verified: + health_status["tables_verified"] = self.expected_tables.copy() + else: + health_status["status"] = "unhealthy" + health_status["errors"].append("Required tables missing or inaccessible") + + # Identify which specific tables are missing + await self._identify_missing_tables(health_status) + else: + # If no expected tables configured, just mark as verified + health_status["tables_exist"] = True + + logger.debug("Comprehensive database health check completed", + service=self.service_name, + status=health_status["status"], + connectivity=health_status["connectivity"], + tables_exist=health_status["tables_exist"]) + + except Exception as e: + health_status["status"] = "unhealthy" + health_status["errors"].append(f"Health check failed: {str(e)}") + logger.error("Comprehensive database health check failed", + service=self.service_name, error=str(e)) + + return health_status + + async def _verify_tables_exist(self) -> bool: + """Verify that all expected tables exist and are accessible""" + try: + async with self.database_manager.get_session() as session: + for table_name in self.expected_tables: + try: + await session.execute(text(f"SELECT 1 FROM {table_name} LIMIT 1")) + except Exception: + return False + return True + except Exception as e: + logger.error("Table verification failed", error=str(e)) + return False + + async def _identify_missing_tables(self, health_status: Dict[str, Any]): + """Identify which specific tables are missing""" + try: + async with self.database_manager.get_session() as session: + for table_name in self.expected_tables: + try: + await session.execute(text(f"SELECT 1 FROM {table_name} LIMIT 1")) + health_status["tables_verified"].append(table_name) + except Exception: + health_status["missing_tables"].append(table_name) + except Exception as e: + health_status["errors"].append(f"Error checking individual tables: {str(e)}") + + +class FastAPIHealthChecker: + """ + FastAPI integration for health checks + + Provides router setup and endpoint registration + """ + + def __init__(self, health_manager: HealthCheckManager): + self.health_manager = health_manager + + def setup_health_routes(self, app): + """Setup health check routes on FastAPI app""" + + @app.get("/health") + async def health_check(): + """Basic health check endpoint""" + return await self.health_manager.basic_health_check(app.state) + + @app.get("/health/ready") + async def readiness_check(): + """Kubernetes readiness probe endpoint""" + try: + return await self.health_manager.readiness_check(app.state) + except HTTPException as e: + return JSONResponse( + status_code=e.status_code, + content=e.detail + ) + + @app.get("/health/live") + async def liveness_check(): + """Kubernetes liveness probe endpoint""" + return await self.health_manager.liveness_check() + + @app.get("/health/database") + async def database_health_check(): + """Detailed database health endpoint""" + try: + return await self.health_manager.database_health_check() + except HTTPException as e: + return JSONResponse( + status_code=e.status_code, + content=e.detail + ) + + +# Convenience functions for easy integration + +def create_health_manager( + service_name: str, + version: str = "1.0.0", + database_manager: Optional[DatabaseManager] = None, + expected_tables: Optional[List[str]] = None, + custom_checks: Optional[Dict[str, Callable]] = None +) -> HealthCheckManager: + """Factory function to create a HealthCheckManager""" + return HealthCheckManager( + service_name=service_name, + version=version, + database_manager=database_manager, + expected_tables=expected_tables, + custom_checks=custom_checks + ) + + +def setup_fastapi_health_checks( + app, + service_name: str, + version: str = "1.0.0", + database_manager: Optional[DatabaseManager] = None, + expected_tables: Optional[List[str]] = None, + custom_checks: Optional[Dict[str, Callable]] = None +) -> HealthCheckManager: + """ + Convenience function to setup health checks on a FastAPI app + + Args: + app: FastAPI application instance + service_name: Name of the service + version: Service version + database_manager: Database manager instance + expected_tables: List of tables that should exist + custom_checks: Dict of custom check functions + + Returns: + HealthCheckManager instance for further configuration + """ + health_manager = create_health_manager( + service_name=service_name, + version=version, + database_manager=database_manager, + expected_tables=expected_tables, + custom_checks=custom_checks + ) + + fastapi_checker = FastAPIHealthChecker(health_manager) + fastapi_checker.setup_health_routes(app) + + return health_manager + + diff --git a/shared/service_base.py b/shared/service_base.py new file mode 100644 index 00000000..46fef5e9 --- /dev/null +++ b/shared/service_base.py @@ -0,0 +1,429 @@ +""" +Standardized FastAPI Service Base + +Provides a unified approach for creating FastAPI microservices with common patterns: +- Logging setup +- Metrics initialization +- Health checks +- Database initialization +- CORS middleware +- Exception handlers +- Lifespan management +""" + +import os +import structlog +from typing import Optional, List, Dict, Callable, Any, TYPE_CHECKING +from contextlib import asynccontextmanager +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from fastapi.routing import APIRouter + +from shared.monitoring import setup_logging +from shared.monitoring.metrics import setup_metrics_early +from shared.monitoring.health_checks import setup_fastapi_health_checks +from shared.database.base import DatabaseManager + +if TYPE_CHECKING: + import uvicorn + + +class BaseFastAPIService: + """ + Base class for FastAPI microservices with standardized patterns + """ + + def __init__( + self, + service_name: str, + app_name: str, + description: str, + version: str = "1.0.0", + log_level: str = "INFO", + cors_origins: Optional[List[str]] = None, + api_prefix: str = "/api/v1", + database_manager: Optional[DatabaseManager] = None, + expected_tables: Optional[List[str]] = None, + custom_health_checks: Optional[Dict[str, Callable[[], Any]]] = None, + enable_metrics: bool = True, + enable_health_checks: bool = True, + enable_cors: bool = True, + enable_exception_handlers: bool = True, + enable_messaging: bool = False, + custom_metrics: Optional[Dict[str, Dict[str, Any]]] = None, + alert_service_class: Optional[type] = None + ): + self.service_name = service_name + self.app_name = app_name + self.description = description + self.version = version + self.log_level = log_level + self.cors_origins = cors_origins or ["*"] + self.api_prefix = api_prefix + self.database_manager = database_manager + self.expected_tables = expected_tables + self.custom_health_checks = custom_health_checks or {} + self.enable_metrics = enable_metrics + self.enable_health_checks = enable_health_checks + self.enable_cors = enable_cors + self.enable_exception_handlers = enable_exception_handlers + self.enable_messaging = enable_messaging + self.custom_metrics = custom_metrics or {} + self.alert_service_class = alert_service_class + + # Initialize logging + setup_logging(service_name, log_level) + self.logger = structlog.get_logger() + + # Will be set during app creation + self.app: Optional[FastAPI] = None + self.metrics_collector = None + self.health_manager = None + self.alert_service = None + + def create_app(self, **fastapi_kwargs) -> FastAPI: + """ + Create and configure FastAPI application with standardized setup + """ + # Default FastAPI configuration + default_config = { + "title": self.app_name, + "description": self.description, + "version": self.version, + "openapi_url": f"{self.api_prefix}/openapi.json", + "docs_url": f"{self.api_prefix}/docs", + "redoc_url": f"{self.api_prefix}/redoc", + } + + # Merge with user-provided config + config = {**default_config, **fastapi_kwargs} + + # Create FastAPI app + self.app = FastAPI(**config) + + # Setup metrics BEFORE middleware and lifespan + if self.enable_metrics: + self.metrics_collector = setup_metrics_early(self.app, self.service_name) + + # Setup lifespan + self.app.router.lifespan_context = self._create_lifespan() + + # Setup middleware + if self.enable_cors: + self._setup_cors() + + # Setup exception handlers + if self.enable_exception_handlers: + self._setup_exception_handlers() + + # Setup health checks + if self.enable_health_checks: + self._setup_health_checks() + + # Setup root endpoint + self._setup_root_endpoint() + + return self.app + + def _create_lifespan(self): + """Create lifespan context manager""" + @asynccontextmanager + async def lifespan(app: FastAPI): + # Startup + self.logger.info(f"Starting {self.service_name}", version=self.version) + + try: + # Initialize database if provided + if self.database_manager: + await self._initialize_database() + + # Setup messaging if enabled + if self.enable_messaging: + await self._setup_messaging() + + # Initialize alert service if provided + if self.alert_service_class: + await self._initialize_alert_service(app) + + # Register custom metrics if provided + if self.custom_metrics: + self.register_custom_metrics(self.custom_metrics) + + # Custom startup logic + await self.on_startup(app) + + # Mark service as ready + app.state.ready = True + + self.logger.info(f"{self.service_name} started successfully") + + yield + + except Exception as e: + self.logger.error(f"Startup failed for {self.service_name}", error=str(e)) + raise + finally: + # Shutdown + self.logger.info(f"Shutting down {self.service_name}") + try: + await self.on_shutdown(app) + + # Cleanup alert service if it exists + if self.alert_service: + await self._cleanup_alert_service() + + # Cleanup messaging if enabled + if self.enable_messaging: + await self._cleanup_messaging() + + if self.database_manager: + await self._cleanup_database() + + except Exception as e: + self.logger.error(f"Shutdown error for {self.service_name}", error=str(e)) + + return lifespan + + async def on_startup(self, app: FastAPI): + """ + Override this method for custom startup logic + Called after database initialization but before marking as ready + """ + pass + + async def on_shutdown(self, app: FastAPI): + """ + Override this method for custom shutdown logic + Called before database cleanup + """ + pass + + async def _initialize_database(self): + """Initialize database connection""" + try: + # Test connection + if await self.database_manager.test_connection(): + self.logger.info("Database initialized successfully") + else: + raise Exception("Database connection test failed") + except Exception as e: + self.logger.error("Database initialization failed", error=str(e)) + raise + + async def _cleanup_database(self): + """Cleanup database connections""" + try: + await self.database_manager.close_connections() + self.logger.info("Database connections closed") + except Exception as e: + self.logger.error("Database cleanup error", error=str(e)) + + async def _setup_messaging(self): + """Setup messaging service - to be overridden by services that need it""" + pass + + async def _cleanup_messaging(self): + """Cleanup messaging service - to be overridden by services that need it""" + pass + + async def _initialize_alert_service(self, app: FastAPI): + """Initialize alert service - to be overridden by services that need it""" + pass + + async def _cleanup_alert_service(self): + """Cleanup alert service - to be overridden by services that need it""" + pass + + def _setup_cors(self): + """Setup CORS middleware""" + if self.app is not None: + self.app.add_middleware( + CORSMiddleware, + allow_origins=self.cors_origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + def _setup_exception_handlers(self): + """Setup standard exception handlers""" + if self.app is not None: + @self.app.exception_handler(ValueError) + async def value_error_handler(request: Request, exc: ValueError): + """Handle validation errors""" + return JSONResponse( + status_code=400, + content={ + "error": "Validation Error", + "detail": str(exc), + "type": "value_error" + } + ) + + @self.app.exception_handler(Exception) + async def general_exception_handler(request: Request, exc: Exception): + """Handle general exceptions""" + self.logger.error( + "Unhandled exception", + error=str(exc), + path=request.url.path, + method=request.method + ) + + # Record error metric if available + if self.metrics_collector: + self.metrics_collector.increment_counter("errors_total", labels={"type": "unhandled"}) + + return JSONResponse( + status_code=500, + content={ + "error": "Internal Server Error", + "detail": "An unexpected error occurred", + "type": "internal_error" + } + ) + + def _setup_health_checks(self): + """Setup health check endpoints""" + self.health_manager = setup_fastapi_health_checks( + app=self.app, + service_name=self.service_name, + version=self.version, + database_manager=self.database_manager, + expected_tables=self.expected_tables, + custom_checks=self.custom_health_checks + ) + + def _setup_root_endpoint(self): + """Setup root endpoint with service information""" + if self.app is not None: + @self.app.get("/") + async def root(): + """Root endpoint with service information""" + return { + "service": self.service_name, + "version": self.version, + "description": self.description, + "status": "running", + "docs_url": f"{self.api_prefix}/docs", + "health_url": "/health" + } + + def add_router(self, router: APIRouter, **kwargs: Any): + """Convenience method to add routers with default prefix""" + if self.app is not None: + prefix = kwargs.get('prefix', self.api_prefix) + kwargs['prefix'] = prefix + self.app.include_router(router, **kwargs) + + def register_custom_metrics(self, metrics_config: Dict[str, Dict[str, Any]]): + """ + Register custom metrics for the service + + Args: + metrics_config: Dict with metric name as key and config as value + Example: { + "user_registrations": { + "type": "counter", + "description": "Total user registrations", + "labels": ["status"] + } + } + """ + if not self.metrics_collector: + self.logger.warning("Metrics collector not available") + return + + for metric_name, config in metrics_config.items(): + metric_type = config.get("type", "counter") + description = config.get("description", f"{metric_name} metric") + labels = config.get("labels", []) + + if metric_type == "counter": + self.metrics_collector.register_counter(metric_name, description, labels=labels) + elif metric_type == "histogram": + self.metrics_collector.register_histogram(metric_name, description, labels=labels) + else: + self.logger.warning(f"Unsupported metric type: {metric_type}") + + def run_development_server(self, host: str = "0.0.0.0", port: int = 8000, reload: Optional[bool] = None): + """ + Run development server with uvicorn + """ + import uvicorn + + if reload is None: + reload = os.getenv("RELOAD", "false").lower() == "true" + + uvicorn.run( + f"{self.__module__}:app", + host=host, + port=port, + reload=reload, + log_level="info" + ) + + +class StandardFastAPIService(BaseFastAPIService): + """ + Standard service implementation for most microservices + + Provides additional common patterns for services with database and standard endpoints + """ + + def __init__(self, **kwargs: Any): + super().__init__(**kwargs) + + def setup_standard_endpoints(self): + """Setup standard service endpoints""" + if self.app is not None: + @self.app.get(f"{self.api_prefix}/info") + async def service_info(): + """Service information endpoint""" + return { + "service": self.service_name, + "version": self.version, + "description": self.description, + "api_version": "v1", + "environment": os.getenv("ENVIRONMENT", "development"), + "features": self.get_service_features() + } + + def get_service_features(self) -> List[str]: + """ + Override this method to return service-specific features + """ + return ["health_checks", "metrics", "standardized_api"] + + +class MessageProcessorService(BaseFastAPIService): + """ + Service implementation for message processing services + + Provides patterns for background processing services + """ + + def __init__(self, **kwargs: Any): + # Message processors typically don't need CORS or full API setup + kwargs.setdefault('enable_cors', False) + kwargs.setdefault('api_prefix', '/api/v1') + super().__init__(**kwargs) + + async def on_startup(self, app: FastAPI): + """Initialize message processing components""" + await super().on_startup(app) + await self.setup_message_processing() + + async def on_shutdown(self, app: FastAPI): + """Cleanup message processing components""" + await self.cleanup_message_processing() + await super().on_shutdown(app) + + async def setup_message_processing(self): + """Override this method to setup message processing""" + pass + + async def cleanup_message_processing(self): + """Override this method to cleanup message processing""" + pass \ No newline at end of file