diff --git a/services/auth/app/main.py b/services/auth/app/main.py index c0de34e7..38086335 100644 --- a/services/auth/app/main.py +++ b/services/auth/app/main.py @@ -117,7 +117,7 @@ async def metrics_middleware(request: Request, call_next): # Record metrics duration = time.time() - start_time - metrics.record_histogram("auth_request_duration_seconds", duration) + metrics.observe_histogram("auth_request_duration_seconds", duration) metrics.increment_counter("auth_requests_total") return response diff --git a/services/auth/app/services/messaging.py b/services/auth/app/services/messaging.py index c89a5074..a1db6c46 100644 --- a/services/auth/app/services/messaging.py +++ b/services/auth/app/services/messaging.py @@ -1,9 +1,25 @@ +# app/services/messaging.py """ Messaging service for auth service """ from shared.messaging.rabbitmq import RabbitMQClient from app.core.config import settings +import logging + +logger = logging.getLogger(__name__) # Global message publisher -message_publisher = RabbitMQClient(settings.RABBITMQ_URL) \ No newline at end of file +message_publisher = RabbitMQClient(settings.RABBITMQ_URL) + +async def setup_messaging(): + """Establishes connection to RabbitMQ for the message publisher.""" + logger.info("Attempting to connect to RabbitMQ...") + await message_publisher.connect() + logger.info("RabbitMQ connection established.") + +async def cleanup_messaging(): + """Closes the connection to RabbitMQ for the message publisher.""" + logger.info("Attempting to disconnect from RabbitMQ...") + await message_publisher.disconnect() + logger.info("RabbitMQ connection closed.") \ No newline at end of file diff --git a/services/auth/docker-compose.yml b/services/auth/docker-compose.yml index 92bcfc29..7a9bcc2c 100644 --- a/services/auth/docker-compose.yml +++ b/services/auth/docker-compose.yml @@ -1,7 +1,6 @@ # ================================================================ # services/auth/docker-compose.yml (For standalone testing) # ================================================================ -version: '3.8' services: auth-db: diff --git a/services/auth/migrations/alembic.ini b/services/auth/migrations/alembic.ini index e69de29b..40677ebd 100644 --- a/services/auth/migrations/alembic.ini +++ b/services/auth/migrations/alembic.ini @@ -0,0 +1,84 @@ +# ================================================================ +# services/auth/migrations/alembic.ini +# ================================================================ +[alembic] +# path to migration scripts +script_location = migrations + +# template used to generate migration file names +file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d_%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +timezone = Europe/Madrid + +# max length of characters to apply to the +# "slug" field +truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +sourceless = false + +# version of a migration file's filename format +version_num_format = %s + +# version path separator +version_path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +output_encoding = utf-8 + +sqlalchemy.url = postgresql+asyncpg://auth_user:auth_pass123@auth-db:5432/auth_db + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S \ No newline at end of file diff --git a/services/auth/requirements.txt b/services/auth/requirements.txt index eaeb32be..7032271d 100644 --- a/services/auth/requirements.txt +++ b/services/auth/requirements.txt @@ -15,4 +15,4 @@ email-validator==2.0.0 prometheus-client==0.17.1 python-json-logger==2.0.4 pytz==2023.3 -python-logstash==0.4.8 \ No newline at end of file +python-logstash==0.4.8 diff --git a/shared/monitoring/metrics.py b/shared/monitoring/metrics.py index a5e35223..4bfd37b1 100644 --- a/shared/monitoring/metrics.py +++ b/shared/monitoring/metrics.py @@ -1,12 +1,14 @@ +# shared/monitoring/metrics.py """ Metrics collection for microservices """ import time import logging -from typing import Dict, Any +from typing import Dict, Any, List # Added List import from prometheus_client import Counter, Histogram, Gauge, start_http_server from functools import wraps +from prometheus_client import generate_latest # Moved this import here for consistency logger = logging.getLogger(__name__) @@ -43,11 +45,14 @@ FORECASTS_GENERATED = Counter( class MetricsCollector: """Metrics collector for microservices""" - + def __init__(self, service_name: str): self.service_name = service_name self.start_time = time.time() - + # Initialize dictionaries to hold custom counters and histograms + self._counters: Dict[str, Counter] = {} + self._histograms: Dict[str, Histogram] = {} + def start_metrics_server(self, port: int = 8080): """Start Prometheus metrics server""" try: @@ -55,7 +60,7 @@ class MetricsCollector: logger.info(f"Metrics server started on port {port}") except Exception as e: logger.error(f"Failed to start metrics server: {e}") - + def record_request(self, method: str, endpoint: str, status_code: int, duration: float): """Record HTTP request metrics""" REQUEST_COUNT.labels( @@ -64,49 +69,110 @@ class MetricsCollector: status_code=status_code, service=self.service_name ).inc() - + REQUEST_DURATION.labels( method=method, endpoint=endpoint, service=self.service_name ).observe(duration) - + def record_training_job(self, status: str): """Record training job metrics""" TRAINING_JOBS.labels( status=status, service=self.service_name ).inc() - + def record_forecast_generated(self): """Record forecast generation metrics""" FORECASTS_GENERATED.labels( service=self.service_name ).inc() - + def set_active_connections(self, count: int): """Set active database connections""" ACTIVE_CONNECTIONS.labels( service=self.service_name ).set(count) + def register_counter(self, name: str, documentation: str, labels: List[str] = None): + """Register a custom Counter metric.""" + if name not in self._counters: + if labels is None: + labels = ['service'] + elif 'service' not in labels: + labels.append('service') + # Pass labelnames as a keyword argument + self._counters[name] = Counter(name, documentation, labelnames=labels) + logger.info(f"Registered counter: {name}") + else: + logger.warning(f"Counter '{name}' already registered.") + return self._counters[name] # Return the counter for direct use if needed + + def increment_counter(self, name: str, value: int = 1, labels: Dict[str, str] = None): + """Increment a custom Counter metric.""" + if name not in self._counters: + logger.error(f"Counter '{name}' not registered. Cannot increment.") + return + + # Ensure the 'service' label is always present + if labels is None: + labels = {'service': self.service_name} + elif 'service' not in labels: + labels['service'] = self.service_name + + self._counters[name].labels(**labels).inc(value) + + def register_histogram(self, name: str, documentation: str, labels: List[str] = None, buckets: tuple = Histogram.DEFAULT_BUCKETS): + """Register a custom Histogram metric.""" + if name not in self._histograms: + if labels is None: + labels = ['service'] + elif 'service' not in labels: + labels.append('service') + # Pass labelnames and buckets as keyword arguments + self._histograms[name] = Histogram(name, documentation, labelnames=labels, buckets=buckets) + logger.info(f"Registered histogram: {name}") + else: + logger.warning(f"Histogram '{name}' already registered.") + return self._histograms[name] # Return the histogram for direct use if needed + + def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None): + """Observe a custom Histogram metric.""" + if name not in self._histograms: + logger.error(f"Histogram '{name}' not registered. Cannot observe.") + return + + if labels is None: + labels = {'service': self.service_name} + elif 'service' not in labels: + labels['service'] = self.service_name + + self._histograms[name].labels(**labels).observe(value) + + def get_metrics(self) -> str: + """Return Prometheus metrics in exposition format.""" + return generate_latest().decode('utf-8') + + def metrics_middleware(metrics_collector: MetricsCollector): """Middleware to collect metrics""" - - def middleware(request, call_next): + + async def middleware(request, call_next): start_time = time.time() - - response = call_next(request) - + + response = await call_next(request) + duration = time.time() - start_time - + + # Use the specific record_request for HTTP requests metrics_collector.record_request( method=request.method, endpoint=request.url.path, status_code=response.status_code, duration=duration ) - + return response - + return middleware \ No newline at end of file