Files
bakery-ia/services/auth/app/services/auth_service.py
Urtzi Alfaro a9e6cdae09 Fix datetime
2025-07-17 15:02:25 +02:00

284 lines
9.5 KiB
Python

"""
Authentication service business logic
"""
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from fastapi import HTTPException, status
from app.models.users import User, UserSession
from app.schemas.auth import UserRegistration, UserLogin, TokenResponse, UserResponse
from app.core.security import security_manager
from app.services.messaging import message_publisher
from shared.messaging.events import UserRegisteredEvent, UserLoginEvent
logger = logging.getLogger(__name__)
class AuthService:
"""Authentication service business logic"""
@staticmethod
async def register_user(user_data: UserRegistration, db: AsyncSession) -> UserResponse:
"""Register a new user"""
# Check if user already exists
result = await db.execute(
select(User).where(User.email == user_data.email)
)
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Hash password
hashed_password = security_manager.hash_password(user_data.password)
# Create user
user = User(
email=user_data.email,
hashed_password=hashed_password,
full_name=user_data.full_name,
phone=user_data.phone,
language=user_data.language,
is_active=True,
is_verified=False # Email verification required
)
db.add(user)
await db.commit()
await db.refresh(user)
# Publish user registered event
await message_publisher.publish_event(
"user_events",
"user.registered",
UserRegisteredEvent(
event_id="",
service_name="auth-service",
timestamp= datetime.now(datetime.timezone.utc),
data={
"user_id": str(user.id),
"email": user.email,
"full_name": user.full_name,
"language": user.language
}
).__dict__
)
logger.info(f"User registered: {user.email}")
return UserResponse(**user.to_dict())
@staticmethod
async def login_user(login_data: UserLogin, db: AsyncSession, ip_address: str, user_agent: str) -> TokenResponse:
"""Authenticate user and create tokens"""
# Check login attempts
if not await security_manager.check_login_attempts(login_data.email):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many login attempts. Please try again later."
)
# Get user
result = await db.execute(
select(User).where(User.email == login_data.email)
)
user = result.scalar_one_or_none()
if not user or not security_manager.verify_password(login_data.password, user.hashed_password):
await security_manager.increment_login_attempts(login_data.email)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Account is inactive"
)
# Clear login attempts
await security_manager.clear_login_attempts(login_data.email)
# Update last login
await db.execute(
update(User)
.where(User.id == user.id)
.values(last_login= datetime.now(datetime.timezone.utc))
)
await db.commit()
# Create tokens
token_data = {
"user_id": str(user.id),
"email": user.email,
"tenant_id": str(user.tenant_id) if user.tenant_id else None,
"role": user.role
}
access_token = security_manager.create_access_token(token_data)
refresh_token = security_manager.create_refresh_token(token_data)
# Store refresh token
await security_manager.store_refresh_token(str(user.id), refresh_token)
# Create session record
session = UserSession(
user_id=user.id,
refresh_token_hash=security_manager.hash_password(refresh_token),
expires_at= datetime.now(datetime.timezone.utc) + timedelta(days=7),
ip_address=ip_address,
user_agent=user_agent
)
db.add(session)
await db.commit()
# Publish login event
await message_publisher.publish_event(
"user_events",
"user.login",
UserLoginEvent(
event_id="",
service_name="auth-service",
timestamp= datetime.now(datetime.timezone.utc),
data={
"user_id": str(user.id),
"email": user.email,
"ip_address": ip_address,
"user_agent": user_agent
}
).__dict__
)
logger.info(f"User logged in: {user.email}")
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
@staticmethod
async def refresh_token(refresh_token: str, db: AsyncSession) -> TokenResponse:
"""Refresh access token"""
# Verify refresh token
payload = security_manager.verify_token(refresh_token)
if not payload or payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
user_id = payload.get("user_id")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
# Verify refresh token is stored
if not await security_manager.verify_refresh_token(user_id, refresh_token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
# Get user
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive"
)
# Create new tokens
token_data = {
"user_id": str(user.id),
"email": user.email,
"tenant_id": str(user.tenant_id) if user.tenant_id else None,
"role": user.role
}
new_access_token = security_manager.create_access_token(token_data)
new_refresh_token = security_manager.create_refresh_token(token_data)
# Update stored refresh token
await security_manager.store_refresh_token(str(user.id), new_refresh_token)
logger.info(f"Token refreshed for user: {user.email}")
return TokenResponse(
access_token=new_access_token,
refresh_token=new_refresh_token,
expires_in=settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
@staticmethod
async def verify_token(token: str, db: AsyncSession) -> Dict[str, Any]:
"""Verify access token"""
# Verify token
payload = security_manager.verify_token(token)
if not payload or payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token"
)
user_id = payload.get("user_id")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
# Get user
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive"
)
return {
"user_id": str(user.id),
"email": user.email,
"tenant_id": str(user.tenant_id) if user.tenant_id else None,
"role": user.role,
"full_name": user.full_name,
"is_verified": user.is_verified
}
@staticmethod
async def logout_user(user_id: str, db: AsyncSession):
"""Logout user and revoke tokens"""
# Revoke refresh token
await security_manager.revoke_refresh_token(user_id)
# Deactivate user sessions
await db.execute(
update(UserSession)
.where(UserSession.user_id == user_id)
.values(is_active=False)
)
await db.commit()
logger.info(f"User logged out: {user_id}")