Add onboardin steps improvements
This commit is contained in:
418
services/auth/app/api/onboarding.py
Normal file
418
services/auth/app/api/onboarding.py
Normal file
@@ -0,0 +1,418 @@
|
||||
"""
|
||||
User onboarding progress API routes
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from typing import Dict, Any, List, Optional
|
||||
import structlog
|
||||
from datetime import datetime, timezone
|
||||
from pydantic import BaseModel
|
||||
|
||||
from app.core.database import get_db
|
||||
from app.services.user_service import UserService
|
||||
from app.repositories.onboarding_repository import OnboardingRepository
|
||||
from shared.auth.decorators import get_current_user_dep
|
||||
|
||||
logger = structlog.get_logger()
|
||||
router = APIRouter(tags=["onboarding"])
|
||||
|
||||
# Request/Response Models
|
||||
class OnboardingStepStatus(BaseModel):
|
||||
step_name: str
|
||||
completed: bool
|
||||
completed_at: Optional[datetime] = None
|
||||
data: Optional[Dict[str, Any]] = None
|
||||
|
||||
class UserProgress(BaseModel):
|
||||
user_id: str
|
||||
steps: List[OnboardingStepStatus]
|
||||
current_step: str
|
||||
next_step: Optional[str] = None
|
||||
completion_percentage: float
|
||||
fully_completed: bool
|
||||
last_updated: datetime
|
||||
|
||||
class UpdateStepRequest(BaseModel):
|
||||
step_name: str
|
||||
completed: bool
|
||||
data: Optional[Dict[str, Any]] = None
|
||||
|
||||
# Define the onboarding steps and their order
|
||||
ONBOARDING_STEPS = [
|
||||
"user_registered", # Step 1: User account created
|
||||
"bakery_registered", # Step 2: Bakery/tenant created
|
||||
"sales_data_uploaded", # Step 3: Historical sales data uploaded
|
||||
"training_completed", # Step 4: AI model training completed
|
||||
"dashboard_accessible" # Step 5: Ready to use dashboard
|
||||
]
|
||||
|
||||
STEP_DEPENDENCIES = {
|
||||
"bakery_registered": ["user_registered"],
|
||||
"sales_data_uploaded": ["user_registered", "bakery_registered"],
|
||||
"training_completed": ["user_registered", "bakery_registered", "sales_data_uploaded"],
|
||||
"dashboard_accessible": ["user_registered", "bakery_registered", "sales_data_uploaded", "training_completed"]
|
||||
}
|
||||
|
||||
class OnboardingService:
|
||||
"""Service for managing user onboarding progress"""
|
||||
|
||||
def __init__(self, db: AsyncSession):
|
||||
self.db = db
|
||||
self.user_service = UserService(db)
|
||||
self.onboarding_repo = OnboardingRepository(db)
|
||||
|
||||
async def get_user_progress(self, user_id: str) -> UserProgress:
|
||||
"""Get current onboarding progress for user"""
|
||||
|
||||
# Get user's onboarding data from user preferences or separate table
|
||||
user_progress_data = await self._get_user_onboarding_data(user_id)
|
||||
|
||||
# Calculate current status for each step
|
||||
steps = []
|
||||
completed_steps = []
|
||||
|
||||
for step_name in ONBOARDING_STEPS:
|
||||
step_data = user_progress_data.get(step_name, {})
|
||||
is_completed = step_data.get("completed", False)
|
||||
|
||||
if is_completed:
|
||||
completed_steps.append(step_name)
|
||||
|
||||
steps.append(OnboardingStepStatus(
|
||||
step_name=step_name,
|
||||
completed=is_completed,
|
||||
completed_at=step_data.get("completed_at"),
|
||||
data=step_data.get("data", {})
|
||||
))
|
||||
|
||||
# Determine current and next step
|
||||
current_step = self._get_current_step(completed_steps)
|
||||
next_step = self._get_next_step(completed_steps)
|
||||
|
||||
# Calculate completion percentage
|
||||
completion_percentage = (len(completed_steps) / len(ONBOARDING_STEPS)) * 100
|
||||
|
||||
# Check if fully completed
|
||||
fully_completed = len(completed_steps) == len(ONBOARDING_STEPS)
|
||||
|
||||
return UserProgress(
|
||||
user_id=user_id,
|
||||
steps=steps,
|
||||
current_step=current_step,
|
||||
next_step=next_step,
|
||||
completion_percentage=completion_percentage,
|
||||
fully_completed=fully_completed,
|
||||
last_updated=datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
async def update_step(self, user_id: str, update_request: UpdateStepRequest) -> UserProgress:
|
||||
"""Update a specific onboarding step"""
|
||||
|
||||
step_name = update_request.step_name
|
||||
|
||||
# Validate step name
|
||||
if step_name not in ONBOARDING_STEPS:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"Invalid step name: {step_name}"
|
||||
)
|
||||
|
||||
# Check dependencies if marking as completed
|
||||
if update_request.completed:
|
||||
can_complete = await self._can_complete_step(user_id, step_name)
|
||||
if not can_complete:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"Cannot complete step {step_name}: dependencies not met"
|
||||
)
|
||||
|
||||
# Update the step
|
||||
await self._update_user_onboarding_data(
|
||||
user_id,
|
||||
step_name,
|
||||
{
|
||||
"completed": update_request.completed,
|
||||
"completed_at": datetime.now(timezone.utc).isoformat() if update_request.completed else None,
|
||||
"data": update_request.data or {}
|
||||
}
|
||||
)
|
||||
|
||||
# Return updated progress
|
||||
return await self.get_user_progress(user_id)
|
||||
|
||||
async def get_next_step(self, user_id: str) -> Dict[str, Any]:
|
||||
"""Get the next required step for user"""
|
||||
progress = await self.get_user_progress(user_id)
|
||||
|
||||
if progress.fully_completed:
|
||||
return {"step": "dashboard_accessible", "completed": True}
|
||||
|
||||
return {"step": progress.next_step or progress.current_step}
|
||||
|
||||
async def can_access_step(self, user_id: str, step_name: str) -> Dict[str, Any]:
|
||||
"""Check if user can access a specific step"""
|
||||
|
||||
if step_name not in ONBOARDING_STEPS:
|
||||
return {"can_access": False, "reason": "Invalid step name"}
|
||||
|
||||
can_access = await self._can_complete_step(user_id, step_name)
|
||||
return {"can_access": can_access}
|
||||
|
||||
async def complete_onboarding(self, user_id: str) -> Dict[str, Any]:
|
||||
"""Mark entire onboarding as complete"""
|
||||
|
||||
# Ensure all steps are completed
|
||||
progress = await self.get_user_progress(user_id)
|
||||
|
||||
if not progress.fully_completed:
|
||||
incomplete_steps = [
|
||||
step.step_name for step in progress.steps if not step.completed
|
||||
]
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"Cannot complete onboarding: incomplete steps: {incomplete_steps}"
|
||||
)
|
||||
|
||||
# Update user's isOnboardingComplete flag
|
||||
await self.user_service.update_user_field(
|
||||
user_id,
|
||||
"is_onboarding_complete",
|
||||
True
|
||||
)
|
||||
|
||||
return {"success": True, "message": "Onboarding completed successfully"}
|
||||
|
||||
def _get_current_step(self, completed_steps: List[str]) -> str:
|
||||
"""Determine current step based on completed steps"""
|
||||
for step in ONBOARDING_STEPS:
|
||||
if step not in completed_steps:
|
||||
return step
|
||||
return ONBOARDING_STEPS[-1] # All completed
|
||||
|
||||
def _get_next_step(self, completed_steps: List[str]) -> Optional[str]:
|
||||
"""Determine next step based on completed steps"""
|
||||
current_step = self._get_current_step(completed_steps)
|
||||
current_index = ONBOARDING_STEPS.index(current_step)
|
||||
|
||||
if current_index < len(ONBOARDING_STEPS) - 1:
|
||||
return ONBOARDING_STEPS[current_index + 1]
|
||||
|
||||
return None # No next step
|
||||
|
||||
async def _can_complete_step(self, user_id: str, step_name: str) -> bool:
|
||||
"""Check if user can complete a specific step"""
|
||||
|
||||
# Get required dependencies for this step
|
||||
required_steps = STEP_DEPENDENCIES.get(step_name, [])
|
||||
|
||||
if not required_steps:
|
||||
return True # No dependencies
|
||||
|
||||
# Check if all required steps are completed
|
||||
user_progress_data = await self._get_user_onboarding_data(user_id)
|
||||
|
||||
for required_step in required_steps:
|
||||
if not user_progress_data.get(required_step, {}).get("completed", False):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def _get_user_onboarding_data(self, user_id: str) -> Dict[str, Any]:
|
||||
"""Get user's onboarding progress data from storage"""
|
||||
try:
|
||||
# Get all onboarding steps for the user from database
|
||||
steps = await self.onboarding_repo.get_user_progress_steps(user_id)
|
||||
|
||||
# Convert to the expected dictionary format
|
||||
progress_data = {}
|
||||
for step in steps:
|
||||
progress_data[step.step_name] = {
|
||||
"completed": step.completed,
|
||||
"completed_at": step.completed_at,
|
||||
"data": step.step_data or {}
|
||||
}
|
||||
|
||||
return progress_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting onboarding data for user {user_id}: {e}")
|
||||
return {}
|
||||
|
||||
async def _update_user_onboarding_data(
|
||||
self,
|
||||
user_id: str,
|
||||
step_name: str,
|
||||
step_data: Dict[str, Any]
|
||||
):
|
||||
"""Update user's onboarding step data"""
|
||||
try:
|
||||
# Extract the completion status and other data
|
||||
completed = step_data.get("completed", False)
|
||||
data_payload = step_data.get("data", {})
|
||||
|
||||
# Update the step in database
|
||||
updated_step = await self.onboarding_repo.upsert_user_step(
|
||||
user_id=user_id,
|
||||
step_name=step_name,
|
||||
completed=completed,
|
||||
step_data=data_payload
|
||||
)
|
||||
|
||||
# Update the user's onboarding summary
|
||||
await self._update_user_summary(user_id)
|
||||
|
||||
logger.info(f"Successfully updated onboarding step for user {user_id}: {step_name} = {step_data}")
|
||||
return updated_step
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating onboarding data for user {user_id}, step {step_name}: {e}")
|
||||
raise
|
||||
|
||||
async def _update_user_summary(self, user_id: str):
|
||||
"""Update user's onboarding summary after step changes"""
|
||||
try:
|
||||
# Get updated progress
|
||||
user_progress_data = await self._get_user_onboarding_data(user_id)
|
||||
|
||||
# Calculate current status
|
||||
completed_steps = []
|
||||
for step_name in ONBOARDING_STEPS:
|
||||
if user_progress_data.get(step_name, {}).get("completed", False):
|
||||
completed_steps.append(step_name)
|
||||
|
||||
# Determine current and next step
|
||||
current_step = self._get_current_step(completed_steps)
|
||||
next_step = self._get_next_step(completed_steps)
|
||||
|
||||
# Calculate completion percentage
|
||||
completion_percentage = (len(completed_steps) / len(ONBOARDING_STEPS)) * 100
|
||||
|
||||
# Check if fully completed
|
||||
fully_completed = len(completed_steps) == len(ONBOARDING_STEPS)
|
||||
|
||||
# Format steps count
|
||||
steps_completed_count = f"{len(completed_steps)}/{len(ONBOARDING_STEPS)}"
|
||||
|
||||
# Update summary in database
|
||||
await self.onboarding_repo.upsert_user_summary(
|
||||
user_id=user_id,
|
||||
current_step=current_step,
|
||||
next_step=next_step,
|
||||
completion_percentage=completion_percentage,
|
||||
fully_completed=fully_completed,
|
||||
steps_completed_count=steps_completed_count
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating onboarding summary for user {user_id}: {e}")
|
||||
# Don't raise here - summary update failure shouldn't break step updates
|
||||
|
||||
# API Routes
|
||||
|
||||
@router.get("/me/onboarding/progress", response_model=UserProgress)
|
||||
async def get_user_progress(
|
||||
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
||||
db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""Get current user's onboarding progress"""
|
||||
|
||||
try:
|
||||
onboarding_service = OnboardingService(db)
|
||||
progress = await onboarding_service.get_user_progress(current_user["user_id"])
|
||||
return progress
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Get onboarding progress error: {e}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Failed to get onboarding progress"
|
||||
)
|
||||
|
||||
@router.put("/me/onboarding/step", response_model=UserProgress)
|
||||
async def update_onboarding_step(
|
||||
update_request: UpdateStepRequest,
|
||||
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
||||
db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""Update a specific onboarding step"""
|
||||
|
||||
try:
|
||||
onboarding_service = OnboardingService(db)
|
||||
progress = await onboarding_service.update_step(
|
||||
current_user["user_id"],
|
||||
update_request
|
||||
)
|
||||
return progress
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Update onboarding step error: {e}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Failed to update onboarding step"
|
||||
)
|
||||
|
||||
@router.get("/me/onboarding/next-step")
|
||||
async def get_next_step(
|
||||
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
||||
db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""Get next required step for user"""
|
||||
|
||||
try:
|
||||
onboarding_service = OnboardingService(db)
|
||||
result = await onboarding_service.get_next_step(current_user["user_id"])
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Get next step error: {e}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Failed to get next step"
|
||||
)
|
||||
|
||||
@router.get("/me/onboarding/can-access/{step_name}")
|
||||
async def can_access_step(
|
||||
step_name: str,
|
||||
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
||||
db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""Check if user can access a specific step"""
|
||||
|
||||
try:
|
||||
onboarding_service = OnboardingService(db)
|
||||
result = await onboarding_service.can_access_step(
|
||||
current_user["user_id"],
|
||||
step_name
|
||||
)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Can access step error: {e}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Failed to check step access"
|
||||
)
|
||||
|
||||
@router.post("/me/onboarding/complete")
|
||||
async def complete_onboarding(
|
||||
current_user: Dict[str, Any] = Depends(get_current_user_dep),
|
||||
db: AsyncSession = Depends(get_db)
|
||||
):
|
||||
"""Complete entire onboarding process"""
|
||||
|
||||
try:
|
||||
onboarding_service = OnboardingService(db)
|
||||
result = await onboarding_service.complete_onboarding(current_user["user_id"])
|
||||
return result
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Complete onboarding error: {e}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Failed to complete onboarding"
|
||||
)
|
||||
@@ -10,7 +10,7 @@ from contextlib import asynccontextmanager
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.database import engine, create_tables
|
||||
from app.api import auth, users
|
||||
from app.api import auth, users, onboarding
|
||||
from app.services.messaging import setup_messaging, cleanup_messaging
|
||||
from shared.monitoring import setup_logging, HealthChecker
|
||||
from shared.monitoring.metrics import setup_metrics_early
|
||||
@@ -149,6 +149,7 @@ app.add_middleware(
|
||||
# Include routers
|
||||
app.include_router(auth.router, prefix="/api/v1/auth", tags=["authentication"])
|
||||
app.include_router(users.router, prefix="/api/v1/users", tags=["users"])
|
||||
app.include_router(onboarding.router, prefix="/api/v1/users", tags=["onboarding"])
|
||||
|
||||
# Health check endpoint with comprehensive checks
|
||||
@app.get("/health")
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
# services/auth/app/models/__init__.py
|
||||
"""
|
||||
Models export for auth service
|
||||
"""
|
||||
|
||||
from .users import User, RefreshToken
|
||||
from .onboarding import UserOnboardingProgress, UserOnboardingSummary
|
||||
|
||||
__all__ = [
|
||||
'User',
|
||||
'RefreshToken',
|
||||
'UserOnboardingProgress',
|
||||
'UserOnboardingSummary',
|
||||
]
|
||||
91
services/auth/app/models/onboarding.py
Normal file
91
services/auth/app/models/onboarding.py
Normal file
@@ -0,0 +1,91 @@
|
||||
# services/auth/app/models/onboarding.py
|
||||
"""
|
||||
User onboarding progress models
|
||||
"""
|
||||
|
||||
from sqlalchemy import Column, String, Boolean, DateTime, Text, ForeignKey, JSON, UniqueConstraint
|
||||
from sqlalchemy.dialects.postgresql import UUID
|
||||
from datetime import datetime, timezone
|
||||
import uuid
|
||||
|
||||
from shared.database.base import Base
|
||||
|
||||
class UserOnboardingProgress(Base):
|
||||
"""User onboarding progress tracking model"""
|
||||
__tablename__ = "user_onboarding_progress"
|
||||
|
||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True)
|
||||
|
||||
# Step tracking
|
||||
step_name = Column(String(50), nullable=False)
|
||||
completed = Column(Boolean, default=False, nullable=False)
|
||||
completed_at = Column(DateTime(timezone=True))
|
||||
|
||||
# Additional step data (JSON field for flexibility)
|
||||
step_data = Column(JSON, default=dict)
|
||||
|
||||
# Timestamps
|
||||
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
||||
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
|
||||
|
||||
# Unique constraint to prevent duplicate step entries per user
|
||||
__table_args__ = (
|
||||
UniqueConstraint('user_id', 'step_name', name='uq_user_step'),
|
||||
{'extend_existing': True}
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<UserOnboardingProgress(id={self.id}, user_id={self.user_id}, step={self.step_name}, completed={self.completed})>"
|
||||
|
||||
def to_dict(self):
|
||||
"""Convert to dictionary"""
|
||||
return {
|
||||
"id": str(self.id),
|
||||
"user_id": str(self.user_id),
|
||||
"step_name": self.step_name,
|
||||
"completed": self.completed,
|
||||
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
|
||||
"step_data": self.step_data or {},
|
||||
"created_at": self.created_at.isoformat() if self.created_at else None,
|
||||
"updated_at": self.updated_at.isoformat() if self.updated_at else None
|
||||
}
|
||||
|
||||
class UserOnboardingSummary(Base):
|
||||
"""User onboarding summary for quick lookups"""
|
||||
__tablename__ = "user_onboarding_summary"
|
||||
|
||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, unique=True, index=True)
|
||||
|
||||
# Summary fields
|
||||
current_step = Column(String(50), nullable=False, default="user_registered")
|
||||
next_step = Column(String(50))
|
||||
completion_percentage = Column(String(10), default="0.0") # Store as string for precision
|
||||
fully_completed = Column(Boolean, default=False)
|
||||
|
||||
# Progress tracking
|
||||
steps_completed_count = Column(String(10), default="0") # Store as string: "3/5"
|
||||
|
||||
# Timestamps
|
||||
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
||||
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
|
||||
last_activity_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
||||
|
||||
def __repr__(self):
|
||||
return f"<UserOnboardingSummary(user_id={self.user_id}, current_step={self.current_step}, completion={self.completion_percentage}%)>"
|
||||
|
||||
def to_dict(self):
|
||||
"""Convert to dictionary"""
|
||||
return {
|
||||
"id": str(self.id),
|
||||
"user_id": str(self.user_id),
|
||||
"current_step": self.current_step,
|
||||
"next_step": self.next_step,
|
||||
"completion_percentage": float(self.completion_percentage) if self.completion_percentage else 0.0,
|
||||
"fully_completed": self.fully_completed,
|
||||
"steps_completed_count": self.steps_completed_count,
|
||||
"created_at": self.created_at.isoformat() if self.created_at else None,
|
||||
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
|
||||
"last_activity_at": self.last_activity_at.isoformat() if self.last_activity_at else None
|
||||
}
|
||||
@@ -6,9 +6,11 @@ Repository implementations for authentication service
|
||||
from .base import AuthBaseRepository
|
||||
from .user_repository import UserRepository
|
||||
from .token_repository import TokenRepository
|
||||
from .onboarding_repository import OnboardingRepository
|
||||
|
||||
__all__ = [
|
||||
"AuthBaseRepository",
|
||||
"UserRepository",
|
||||
"TokenRepository"
|
||||
"TokenRepository",
|
||||
"OnboardingRepository"
|
||||
]
|
||||
211
services/auth/app/repositories/onboarding_repository.py
Normal file
211
services/auth/app/repositories/onboarding_repository.py
Normal file
@@ -0,0 +1,211 @@
|
||||
# services/auth/app/repositories/onboarding_repository.py
|
||||
"""
|
||||
Onboarding Repository for database operations
|
||||
"""
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select, update, delete, and_
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
from typing import List, Dict, Any, Optional
|
||||
from datetime import datetime, timezone
|
||||
import structlog
|
||||
|
||||
from app.models.onboarding import UserOnboardingProgress, UserOnboardingSummary
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
class OnboardingRepository:
|
||||
"""Repository for onboarding progress operations"""
|
||||
|
||||
def __init__(self, db: AsyncSession):
|
||||
self.db = db
|
||||
|
||||
async def get_user_progress_steps(self, user_id: str) -> List[UserOnboardingProgress]:
|
||||
"""Get all onboarding steps for a user"""
|
||||
try:
|
||||
result = await self.db.execute(
|
||||
select(UserOnboardingProgress)
|
||||
.where(UserOnboardingProgress.user_id == user_id)
|
||||
.order_by(UserOnboardingProgress.created_at)
|
||||
)
|
||||
return result.scalars().all()
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting user progress steps for {user_id}: {e}")
|
||||
return []
|
||||
|
||||
async def get_user_step(self, user_id: str, step_name: str) -> Optional[UserOnboardingProgress]:
|
||||
"""Get a specific step for a user"""
|
||||
try:
|
||||
result = await self.db.execute(
|
||||
select(UserOnboardingProgress)
|
||||
.where(
|
||||
and_(
|
||||
UserOnboardingProgress.user_id == user_id,
|
||||
UserOnboardingProgress.step_name == step_name
|
||||
)
|
||||
)
|
||||
)
|
||||
return result.scalars().first()
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting step {step_name} for user {user_id}: {e}")
|
||||
return None
|
||||
|
||||
async def upsert_user_step(
|
||||
self,
|
||||
user_id: str,
|
||||
step_name: str,
|
||||
completed: bool,
|
||||
step_data: Dict[str, Any] = None
|
||||
) -> UserOnboardingProgress:
|
||||
"""Insert or update a user's onboarding step"""
|
||||
try:
|
||||
completed_at = datetime.now(timezone.utc) if completed else None
|
||||
step_data = step_data or {}
|
||||
|
||||
# Use PostgreSQL UPSERT (INSERT ... ON CONFLICT ... DO UPDATE)
|
||||
stmt = insert(UserOnboardingProgress).values(
|
||||
user_id=user_id,
|
||||
step_name=step_name,
|
||||
completed=completed,
|
||||
completed_at=completed_at,
|
||||
step_data=step_data,
|
||||
updated_at=datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
# On conflict, update the existing record
|
||||
stmt = stmt.on_conflict_do_update(
|
||||
index_elements=['user_id', 'step_name'],
|
||||
set_=dict(
|
||||
completed=stmt.excluded.completed,
|
||||
completed_at=stmt.excluded.completed_at,
|
||||
step_data=stmt.excluded.step_data,
|
||||
updated_at=stmt.excluded.updated_at
|
||||
)
|
||||
)
|
||||
|
||||
# Return the updated record
|
||||
stmt = stmt.returning(UserOnboardingProgress)
|
||||
result = await self.db.execute(stmt)
|
||||
await self.db.commit()
|
||||
|
||||
return result.scalars().first()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error upserting step {step_name} for user {user_id}: {e}")
|
||||
await self.db.rollback()
|
||||
raise
|
||||
|
||||
async def get_user_summary(self, user_id: str) -> Optional[UserOnboardingSummary]:
|
||||
"""Get user's onboarding summary"""
|
||||
try:
|
||||
result = await self.db.execute(
|
||||
select(UserOnboardingSummary)
|
||||
.where(UserOnboardingSummary.user_id == user_id)
|
||||
)
|
||||
return result.scalars().first()
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting onboarding summary for user {user_id}: {e}")
|
||||
return None
|
||||
|
||||
async def upsert_user_summary(
|
||||
self,
|
||||
user_id: str,
|
||||
current_step: str,
|
||||
next_step: Optional[str],
|
||||
completion_percentage: float,
|
||||
fully_completed: bool,
|
||||
steps_completed_count: str
|
||||
) -> UserOnboardingSummary:
|
||||
"""Insert or update user's onboarding summary"""
|
||||
try:
|
||||
# Use PostgreSQL UPSERT
|
||||
stmt = insert(UserOnboardingSummary).values(
|
||||
user_id=user_id,
|
||||
current_step=current_step,
|
||||
next_step=next_step,
|
||||
completion_percentage=str(completion_percentage),
|
||||
fully_completed=fully_completed,
|
||||
steps_completed_count=steps_completed_count,
|
||||
updated_at=datetime.now(timezone.utc),
|
||||
last_activity_at=datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
# On conflict, update the existing record
|
||||
stmt = stmt.on_conflict_do_update(
|
||||
index_elements=['user_id'],
|
||||
set_=dict(
|
||||
current_step=stmt.excluded.current_step,
|
||||
next_step=stmt.excluded.next_step,
|
||||
completion_percentage=stmt.excluded.completion_percentage,
|
||||
fully_completed=stmt.excluded.fully_completed,
|
||||
steps_completed_count=stmt.excluded.steps_completed_count,
|
||||
updated_at=stmt.excluded.updated_at,
|
||||
last_activity_at=stmt.excluded.last_activity_at
|
||||
)
|
||||
)
|
||||
|
||||
# Return the updated record
|
||||
stmt = stmt.returning(UserOnboardingSummary)
|
||||
result = await self.db.execute(stmt)
|
||||
await self.db.commit()
|
||||
|
||||
return result.scalars().first()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error upserting summary for user {user_id}: {e}")
|
||||
await self.db.rollback()
|
||||
raise
|
||||
|
||||
async def delete_user_progress(self, user_id: str) -> bool:
|
||||
"""Delete all onboarding progress for a user"""
|
||||
try:
|
||||
# Delete steps
|
||||
await self.db.execute(
|
||||
delete(UserOnboardingProgress)
|
||||
.where(UserOnboardingProgress.user_id == user_id)
|
||||
)
|
||||
|
||||
# Delete summary
|
||||
await self.db.execute(
|
||||
delete(UserOnboardingSummary)
|
||||
.where(UserOnboardingSummary.user_id == user_id)
|
||||
)
|
||||
|
||||
await self.db.commit()
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting progress for user {user_id}: {e}")
|
||||
await self.db.rollback()
|
||||
return False
|
||||
|
||||
async def get_completion_stats(self) -> Dict[str, Any]:
|
||||
"""Get completion statistics across all users"""
|
||||
try:
|
||||
# Get total users with onboarding data
|
||||
total_result = await self.db.execute(
|
||||
select(UserOnboardingSummary).count()
|
||||
)
|
||||
total_users = total_result.scalar()
|
||||
|
||||
# Get completed users
|
||||
completed_result = await self.db.execute(
|
||||
select(UserOnboardingSummary)
|
||||
.where(UserOnboardingSummary.fully_completed == True)
|
||||
.count()
|
||||
)
|
||||
completed_users = completed_result.scalar()
|
||||
|
||||
return {
|
||||
"total_users_in_onboarding": total_users,
|
||||
"fully_completed_users": completed_users,
|
||||
"completion_rate": (completed_users / total_users * 100) if total_users > 0 else 0
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting completion stats: {e}")
|
||||
return {
|
||||
"total_users_in_onboarding": 0,
|
||||
"fully_completed_users": 0,
|
||||
"completion_rate": 0
|
||||
}
|
||||
Reference in New Issue
Block a user