""" Background Cleanup Worker Processes demo session cleanup jobs from Redis queue """ import asyncio import structlog from datetime import datetime, timezone, timedelta from typing import Dict, Any import json import uuid from contextlib import asynccontextmanager from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import DatabaseManager from app.core.redis_wrapper import DemoRedisWrapper from app.services.data_cloner import DemoDataCloner from app.models.demo_session import DemoSession, DemoSessionStatus logger = structlog.get_logger() @asynccontextmanager async def get_db_session(): """Get database session context manager""" db_manager = DatabaseManager() db_manager.initialize() async with db_manager.session_factory() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close() class CleanupWorker: """Background worker for processing cleanup jobs""" def __init__(self, redis: DemoRedisWrapper): self.redis = redis self.queue_key = "cleanup:queue" self.processing_key = "cleanup:processing" self.running = False async def start(self): """Start the worker (runs indefinitely)""" self.running = True logger.info("Cleanup worker started") while self.running: try: await self._process_next_job() except Exception as e: logger.error("Worker error", error=str(e), exc_info=True) await asyncio.sleep(5) # Back off on error async def stop(self): """Stop the worker gracefully""" self.running = False logger.info("Cleanup worker stopped") async def _process_next_job(self): """Process next job from queue""" client = await self.redis.get_client() # Blocking pop from queue (5 second timeout) result = await client.brpoplpush( self.queue_key, self.processing_key, timeout=5 ) if not result: return # No job available job_data = json.loads(result) job_id = job_data["job_id"] session_ids = job_data["session_ids"] logger.info( "Processing cleanup job", job_id=job_id, session_count=len(session_ids) ) try: # Process cleanup stats = await self._cleanup_sessions(session_ids) # Mark job as complete await self._mark_job_complete(job_id, stats) # Remove from processing queue await client.lrem(self.processing_key, 1, result) logger.info("Job completed", job_id=job_id, stats=stats) except Exception as e: logger.error("Job failed", job_id=job_id, error=str(e), exc_info=True) # Check retry count retry_count = job_data.get("retry_count", 0) if retry_count < 3: # Retry - put back in queue job_data["retry_count"] = retry_count + 1 await client.lpush(self.queue_key, json.dumps(job_data)) logger.info("Job requeued for retry", job_id=job_id, retry_count=retry_count + 1) else: # Max retries reached - mark as failed await self._mark_job_failed(job_id, str(e)) logger.error("Job failed after max retries", job_id=job_id) # Remove from processing queue await client.lrem(self.processing_key, 1, result) async def _cleanup_sessions(self, session_ids: list) -> Dict[str, Any]: """Execute cleanup for list of sessions with parallelization""" async with get_db_session() as db: redis = DemoRedisWrapper() data_cloner = DemoDataCloner(db, redis) try: # Get sessions to cleanup result = await db.execute( select(DemoSession).where( DemoSession.session_id.in_(session_ids) ) ) sessions = result.scalars().all() stats = { "cleaned_up": 0, "failed": 0, "errors": [] } # Process each session for session in sessions: try: # Mark session as expired session.status = DemoSessionStatus.EXPIRED await db.commit() # Check if this is an enterprise demo with children child_tenant_ids = [] if session.demo_account_type == "enterprise" and session.session_metadata: child_tenant_ids = session.session_metadata.get("child_tenant_ids", []) # Delete child tenants in parallel (for enterprise demos) if child_tenant_ids: logger.info( "Cleaning up enterprise demo children", session_id=session.session_id, child_count=len(child_tenant_ids) ) child_tasks = [ data_cloner.delete_session_data( str(child_id), session.session_id ) for child_id in child_tenant_ids ] child_results = await asyncio.gather(*child_tasks, return_exceptions=True) # Log any child deletion failures for child_id, result in zip(child_tenant_ids, child_results): if isinstance(result, Exception): logger.error( "Failed to delete child tenant", child_id=child_id, error=str(result) ) # Delete parent/main session data await data_cloner.delete_session_data( str(session.virtual_tenant_id), session.session_id ) stats["cleaned_up"] += 1 logger.info( "Session cleaned up", session_id=session.session_id, is_enterprise=(session.demo_account_type == "enterprise"), children_deleted=len(child_tenant_ids) ) except Exception as e: stats["failed"] += 1 stats["errors"].append({ "session_id": session.session_id, "error": str(e) }) logger.error( "Failed to cleanup session", session_id=session.session_id, error=str(e), exc_info=True ) return stats finally: # Always close HTTP client await data_cloner.close() async def _mark_job_complete(self, job_id: str, stats: Dict[str, Any]): """Mark job as complete in Redis""" client = await self.redis.get_client() status_key = f"cleanup:job:{job_id}:status" await client.setex( status_key, 3600, # Keep status for 1 hour json.dumps({ "status": "completed", "stats": stats, "completed_at": datetime.now(timezone.utc).isoformat() }) ) async def _mark_job_failed(self, job_id: str, error: str): """Mark job as failed in Redis""" client = await self.redis.get_client() status_key = f"cleanup:job:{job_id}:status" await client.setex( status_key, 3600, json.dumps({ "status": "failed", "error": error, "failed_at": datetime.now(timezone.utc).isoformat() }) ) async def run_cleanup_worker(): """Entry point for worker process""" # Initialize Redis client import os from shared.redis_utils import initialize_redis from app.core.config import Settings settings = Settings() redis_url = settings.REDIS_URL # Use proper configuration with TLS and auth try: # Initialize Redis with connection pool settings await initialize_redis(redis_url, db=settings.REDIS_DB, max_connections=settings.REDIS_MAX_CONNECTIONS) logger.info("Redis initialized successfully", redis_url=redis_url.split('@')[-1], db=settings.REDIS_DB) except Exception as e: logger.error("Failed to initialize Redis", error=str(e), redis_url=redis_url.split('@')[-1]) raise redis = DemoRedisWrapper() worker = CleanupWorker(redis) try: await worker.start() except KeyboardInterrupt: logger.info("Received interrupt signal") await worker.stop() except Exception as e: logger.error("Worker crashed", error=str(e), exc_info=True) raise if __name__ == "__main__": asyncio.run(run_cleanup_worker())