189 lines
6.3 KiB
Python
189 lines
6.3 KiB
Python
"""
|
|
Demo Cleanup Service
|
|
Handles automatic cleanup of expired sessions
|
|
"""
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, update
|
|
from datetime import datetime, timezone
|
|
from typing import List
|
|
import structlog
|
|
|
|
from app.models import DemoSession, DemoSessionStatus
|
|
from app.services.data_cloner import DemoDataCloner
|
|
from app.core.redis_wrapper import DemoRedisWrapper
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class DemoCleanupService:
|
|
"""Handles cleanup of expired demo sessions"""
|
|
|
|
def __init__(self, db: AsyncSession, redis: DemoRedisWrapper):
|
|
self.db = db
|
|
self.redis = redis
|
|
self.data_cloner = DemoDataCloner(db, redis)
|
|
|
|
async def cleanup_expired_sessions(self) -> dict:
|
|
"""
|
|
Find and cleanup all expired sessions
|
|
Also cleans up sessions stuck in PENDING for too long (>5 minutes)
|
|
|
|
Returns:
|
|
Cleanup statistics
|
|
"""
|
|
from datetime import timedelta
|
|
|
|
logger.info("Starting demo session cleanup")
|
|
|
|
now = datetime.now(timezone.utc)
|
|
stuck_threshold = now - timedelta(minutes=5) # Sessions pending > 5 min are stuck
|
|
|
|
# Find expired sessions (any status except EXPIRED and DESTROYED)
|
|
result = await self.db.execute(
|
|
select(DemoSession).where(
|
|
DemoSession.status.in_([
|
|
DemoSessionStatus.PENDING,
|
|
DemoSessionStatus.READY,
|
|
DemoSessionStatus.PARTIAL,
|
|
DemoSessionStatus.FAILED,
|
|
DemoSessionStatus.ACTIVE # Legacy status, kept for compatibility
|
|
]),
|
|
DemoSession.expires_at < now
|
|
)
|
|
)
|
|
expired_sessions = result.scalars().all()
|
|
|
|
# Also find sessions stuck in PENDING
|
|
stuck_result = await self.db.execute(
|
|
select(DemoSession).where(
|
|
DemoSession.status == DemoSessionStatus.PENDING,
|
|
DemoSession.created_at < stuck_threshold
|
|
)
|
|
)
|
|
stuck_sessions = stuck_result.scalars().all()
|
|
|
|
# Combine both lists
|
|
all_sessions_to_cleanup = list(expired_sessions) + list(stuck_sessions)
|
|
|
|
stats = {
|
|
"total_expired": len(expired_sessions),
|
|
"total_stuck": len(stuck_sessions),
|
|
"total_to_cleanup": len(all_sessions_to_cleanup),
|
|
"cleaned_up": 0,
|
|
"failed": 0,
|
|
"errors": []
|
|
}
|
|
|
|
for session in all_sessions_to_cleanup:
|
|
try:
|
|
# Mark as expired
|
|
session.status = DemoSessionStatus.EXPIRED
|
|
await self.db.commit()
|
|
|
|
# Delete session data
|
|
await self.data_cloner.delete_session_data(
|
|
str(session.virtual_tenant_id),
|
|
session.session_id
|
|
)
|
|
|
|
stats["cleaned_up"] += 1
|
|
|
|
logger.info(
|
|
"Session cleaned up",
|
|
session_id=session.session_id,
|
|
age_minutes=(now - session.created_at).total_seconds() / 60
|
|
)
|
|
|
|
except Exception as e:
|
|
stats["failed"] += 1
|
|
stats["errors"].append({
|
|
"session_id": session.session_id,
|
|
"error": str(e)
|
|
})
|
|
logger.error(
|
|
"Failed to cleanup session",
|
|
session_id=session.session_id,
|
|
error=str(e)
|
|
)
|
|
|
|
logger.info("Demo session cleanup completed", stats=stats)
|
|
return stats
|
|
|
|
async def cleanup_old_destroyed_sessions(self, days: int = 7) -> int:
|
|
"""
|
|
Delete destroyed session records older than specified days
|
|
|
|
Args:
|
|
days: Number of days to keep destroyed sessions
|
|
|
|
Returns:
|
|
Number of deleted records
|
|
"""
|
|
from datetime import timedelta
|
|
|
|
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
|
|
|
|
result = await self.db.execute(
|
|
select(DemoSession).where(
|
|
DemoSession.status == DemoSessionStatus.DESTROYED,
|
|
DemoSession.destroyed_at < cutoff_date
|
|
)
|
|
)
|
|
old_sessions = result.scalars().all()
|
|
|
|
for session in old_sessions:
|
|
await self.db.delete(session)
|
|
|
|
await self.db.commit()
|
|
|
|
logger.info(
|
|
"Old destroyed sessions deleted",
|
|
count=len(old_sessions),
|
|
older_than_days=days
|
|
)
|
|
|
|
return len(old_sessions)
|
|
|
|
async def get_cleanup_stats(self) -> dict:
|
|
"""Get cleanup statistics"""
|
|
result = await self.db.execute(select(DemoSession))
|
|
all_sessions = result.scalars().all()
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# Count by status
|
|
pending_count = len([s for s in all_sessions if s.status == DemoSessionStatus.PENDING])
|
|
ready_count = len([s for s in all_sessions if s.status == DemoSessionStatus.READY])
|
|
partial_count = len([s for s in all_sessions if s.status == DemoSessionStatus.PARTIAL])
|
|
failed_count = len([s for s in all_sessions if s.status == DemoSessionStatus.FAILED])
|
|
active_count = len([s for s in all_sessions if s.status == DemoSessionStatus.ACTIVE])
|
|
expired_count = len([s for s in all_sessions if s.status == DemoSessionStatus.EXPIRED])
|
|
destroyed_count = len([s for s in all_sessions if s.status == DemoSessionStatus.DESTROYED])
|
|
|
|
# Find sessions that should be expired but aren't marked yet
|
|
should_be_expired = len([
|
|
s for s in all_sessions
|
|
if s.status in [
|
|
DemoSessionStatus.PENDING,
|
|
DemoSessionStatus.READY,
|
|
DemoSessionStatus.PARTIAL,
|
|
DemoSessionStatus.FAILED,
|
|
DemoSessionStatus.ACTIVE
|
|
] and s.expires_at < now
|
|
])
|
|
|
|
return {
|
|
"total_sessions": len(all_sessions),
|
|
"by_status": {
|
|
"pending": pending_count,
|
|
"ready": ready_count,
|
|
"partial": partial_count,
|
|
"failed": failed_count,
|
|
"active": active_count, # Legacy
|
|
"expired": expired_count,
|
|
"destroyed": destroyed_count
|
|
},
|
|
"pending_cleanup": should_be_expired
|
|
}
|