# ================================================================ # services/auth/app/models/tokens.py # ================================================================ """ Token models for authentication service """ import hashlib import uuid from datetime import datetime, timezone from sqlalchemy import Column, String, Boolean, DateTime, Text, Index from sqlalchemy.dialects.postgresql import UUID from shared.database.base import Base class RefreshToken(Base): """ Refresh token model - FIXED to prevent duplicate constraint violations """ __tablename__ = "refresh_tokens" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) user_id = Column(UUID(as_uuid=True), nullable=False, index=True) # ✅ FIX 1: Use TEXT instead of VARCHAR to handle longer tokens token = Column(Text, nullable=False) # ✅ FIX 2: Add token hash for uniqueness instead of full token token_hash = Column(String(255), nullable=True, unique=True) expires_at = Column(DateTime(timezone=True), nullable=False) is_revoked = Column(Boolean, default=False, nullable=False) created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) revoked_at = Column(DateTime(timezone=True), nullable=True) # ✅ FIX 3: Add indexes for better performance __table_args__ = ( Index('ix_refresh_tokens_user_id_active', 'user_id', 'is_revoked'), Index('ix_refresh_tokens_expires_at', 'expires_at'), Index('ix_refresh_tokens_token_hash', 'token_hash'), ) def __init__(self, **kwargs): """Initialize refresh token with automatic hash generation""" super().__init__(**kwargs) if self.token and not self.token_hash: self.token_hash = self._generate_token_hash(self.token) @staticmethod def _generate_token_hash(token: str) -> str: """Generate a hash of the token for uniqueness checking""" return hashlib.sha256(token.encode()).hexdigest() def update_token(self, new_token: str): """Update token and regenerate hash""" self.token = new_token self.token_hash = self._generate_token_hash(new_token) @classmethod async def create_refresh_token(cls, user_id: uuid.UUID, token: str, expires_at: datetime): """ Create a new refresh token with proper hash generation """ return cls( id=uuid.uuid4(), user_id=user_id, token=token, token_hash=cls._generate_token_hash(token), expires_at=expires_at, is_revoked=False, created_at=datetime.now(timezone.utc) ) def __repr__(self): return f"" class LoginAttempt(Base): """Login attempt tracking model""" __tablename__ = "login_attempts" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) email = Column(String(255), nullable=False, index=True) ip_address = Column(String(45), nullable=False) user_agent = Column(Text) success = Column(Boolean, default=False) failure_reason = Column(String(255)) created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) def __repr__(self): return f""