Files

92 lines
3.4 KiB
Python
Raw Permalink Normal View History

2025-07-17 21:25:27 +02:00
# ================================================================
# services/auth/app/models/tokens.py
# ================================================================
"""
Token models for authentication service
"""
2025-07-26 23:29:57 +02:00
import hashlib
2025-07-17 21:25:27 +02:00
import uuid
2025-07-26 23:29:57 +02:00
from datetime import datetime, timezone
from sqlalchemy import Column, String, Boolean, DateTime, Text, Index
from sqlalchemy.dialects.postgresql import UUID
2025-07-17 21:25:27 +02:00
from shared.database.base import Base
class RefreshToken(Base):
2025-07-26 23:29:57 +02:00
"""
Refresh token model - FIXED to prevent duplicate constraint violations
"""
2025-07-17 21:25:27 +02:00
__tablename__ = "refresh_tokens"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
2025-07-26 23:29:57 +02:00
user_id = Column(UUID(as_uuid=True), nullable=False, index=True)
2025-07-17 21:25:27 +02:00
2025-07-26 23:29:57 +02:00
# ✅ FIX 1: Use TEXT instead of VARCHAR to handle longer tokens
token = Column(Text, nullable=False)
# ✅ FIX 2: Add token hash for uniqueness instead of full token
token_hash = Column(String(255), nullable=True, unique=True)
expires_at = Column(DateTime(timezone=True), nullable=False)
is_revoked = Column(Boolean, default=False, nullable=False)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
revoked_at = Column(DateTime(timezone=True), nullable=True)
# ✅ FIX 3: Add indexes for better performance
__table_args__ = (
Index('ix_refresh_tokens_user_id_active', 'user_id', 'is_revoked'),
Index('ix_refresh_tokens_expires_at', 'expires_at'),
Index('ix_refresh_tokens_token_hash', 'token_hash'),
)
def __init__(self, **kwargs):
"""Initialize refresh token with automatic hash generation"""
super().__init__(**kwargs)
if self.token and not self.token_hash:
self.token_hash = self._generate_token_hash(self.token)
@staticmethod
def _generate_token_hash(token: str) -> str:
"""Generate a hash of the token for uniqueness checking"""
return hashlib.sha256(token.encode()).hexdigest()
def update_token(self, new_token: str):
"""Update token and regenerate hash"""
self.token = new_token
self.token_hash = self._generate_token_hash(new_token)
2025-07-17 21:25:27 +02:00
2025-07-26 23:29:57 +02:00
@classmethod
async def create_refresh_token(cls, user_id: uuid.UUID, token: str, expires_at: datetime):
"""
Create a new refresh token with proper hash generation
"""
return cls(
id=uuid.uuid4(),
user_id=user_id,
token=token,
token_hash=cls._generate_token_hash(token),
expires_at=expires_at,
is_revoked=False,
created_at=datetime.now(timezone.utc)
)
2025-07-17 21:25:27 +02:00
def __repr__(self):
2025-07-26 23:29:57 +02:00
return f"<RefreshToken(id={self.id}, user_id={self.user_id}, expires_at={self.expires_at})>"
2025-07-17 21:25:27 +02:00
class LoginAttempt(Base):
"""Login attempt tracking model"""
__tablename__ = "login_attempts"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email = Column(String(255), nullable=False, index=True)
ip_address = Column(String(45), nullable=False)
user_agent = Column(Text)
success = Column(Boolean, default=False)
failure_reason = Column(String(255))
2025-07-26 23:29:57 +02:00
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
2025-07-17 21:25:27 +02:00
def __repr__(self):
return f"<LoginAttempt(id={self.id}, email={self.email}, success={self.success})>"