Files
bakery-ia/services/demo_session/app/jobs/cleanup_worker.py
Urtzi Alfaro 9f3b39bd28 Add comprehensive documentation and final improvements
Documentation Added:
- AI_INSIGHTS_DEMO_SETUP_GUIDE.md: Complete setup guide for demo sessions
- AI_INSIGHTS_DATA_FLOW.md: Architecture and data flow diagrams
- AI_INSIGHTS_QUICK_START.md: Quick reference guide
- DEMO_SESSION_ANALYSIS_REPORT.md: Detailed analysis of demo session d67eaae4
- ROOT_CAUSE_ANALYSIS_AND_FIXES.md: Complete analysis of 8 issues (6 fixed, 2 analyzed)
- COMPLETE_FIX_SUMMARY.md: Executive summary of all fixes
- FIX_MISSING_INSIGHTS.md: Forecasting and procurement fix guide
- FINAL_STATUS_SUMMARY.md: Status overview
- verify_fixes.sh: Automated verification script
- enhance_procurement_data.py: Procurement data enhancement script

Service Improvements:
- Demo session cleanup worker: Use proper settings for Redis configuration with TLS/auth
- Procurement service: Add Redis initialization with proper error handling and cleanup
- Production fixture: Remove duplicate worker assignments (cleaned 56 duplicates)
- Orchestrator fixture: Add purchase order metadata for better tracking

Impact:
- Complete documentation for troubleshooting and setup
- Improved Redis connection handling across services
- Clean production data without duplicates
- Better error handling and logging

🤖 Generated with Claude Code (https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-16 11:32:45 +01:00

275 lines
9.5 KiB
Python

"""
Background Cleanup Worker
Processes demo session cleanup jobs from Redis queue
"""
import asyncio
import structlog
from datetime import datetime, timezone, timedelta
from typing import Dict, Any
import json
import uuid
from contextlib import asynccontextmanager
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import DatabaseManager
from app.core.redis_wrapper import DemoRedisWrapper
from app.services.data_cloner import DemoDataCloner
from app.models.demo_session import DemoSession, DemoSessionStatus
logger = structlog.get_logger()
@asynccontextmanager
async def get_db_session():
"""Get database session context manager"""
db_manager = DatabaseManager()
db_manager.initialize()
async with db_manager.session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
class CleanupWorker:
"""Background worker for processing cleanup jobs"""
def __init__(self, redis: DemoRedisWrapper):
self.redis = redis
self.queue_key = "cleanup:queue"
self.processing_key = "cleanup:processing"
self.running = False
async def start(self):
"""Start the worker (runs indefinitely)"""
self.running = True
logger.info("Cleanup worker started")
while self.running:
try:
await self._process_next_job()
except Exception as e:
logger.error("Worker error", error=str(e), exc_info=True)
await asyncio.sleep(5) # Back off on error
async def stop(self):
"""Stop the worker gracefully"""
self.running = False
logger.info("Cleanup worker stopped")
async def _process_next_job(self):
"""Process next job from queue"""
client = await self.redis.get_client()
# Blocking pop from queue (5 second timeout)
result = await client.brpoplpush(
self.queue_key,
self.processing_key,
timeout=5
)
if not result:
return # No job available
job_data = json.loads(result)
job_id = job_data["job_id"]
session_ids = job_data["session_ids"]
logger.info(
"Processing cleanup job",
job_id=job_id,
session_count=len(session_ids)
)
try:
# Process cleanup
stats = await self._cleanup_sessions(session_ids)
# Mark job as complete
await self._mark_job_complete(job_id, stats)
# Remove from processing queue
await client.lrem(self.processing_key, 1, result)
logger.info("Job completed", job_id=job_id, stats=stats)
except Exception as e:
logger.error("Job failed", job_id=job_id, error=str(e), exc_info=True)
# Check retry count
retry_count = job_data.get("retry_count", 0)
if retry_count < 3:
# Retry - put back in queue
job_data["retry_count"] = retry_count + 1
await client.lpush(self.queue_key, json.dumps(job_data))
logger.info("Job requeued for retry", job_id=job_id, retry_count=retry_count + 1)
else:
# Max retries reached - mark as failed
await self._mark_job_failed(job_id, str(e))
logger.error("Job failed after max retries", job_id=job_id)
# Remove from processing queue
await client.lrem(self.processing_key, 1, result)
async def _cleanup_sessions(self, session_ids: list) -> Dict[str, Any]:
"""Execute cleanup for list of sessions with parallelization"""
async with get_db_session() as db:
redis = DemoRedisWrapper()
data_cloner = DemoDataCloner(db, redis)
try:
# Get sessions to cleanup
result = await db.execute(
select(DemoSession).where(
DemoSession.session_id.in_(session_ids)
)
)
sessions = result.scalars().all()
stats = {
"cleaned_up": 0,
"failed": 0,
"errors": []
}
# Process each session
for session in sessions:
try:
# Mark session as expired
session.status = DemoSessionStatus.EXPIRED
await db.commit()
# Check if this is an enterprise demo with children
child_tenant_ids = []
if session.demo_account_type == "enterprise" and session.session_metadata:
child_tenant_ids = session.session_metadata.get("child_tenant_ids", [])
# Delete child tenants in parallel (for enterprise demos)
if child_tenant_ids:
logger.info(
"Cleaning up enterprise demo children",
session_id=session.session_id,
child_count=len(child_tenant_ids)
)
child_tasks = [
data_cloner.delete_session_data(
str(child_id),
session.session_id
)
for child_id in child_tenant_ids
]
child_results = await asyncio.gather(*child_tasks, return_exceptions=True)
# Log any child deletion failures
for child_id, result in zip(child_tenant_ids, child_results):
if isinstance(result, Exception):
logger.error(
"Failed to delete child tenant",
child_id=child_id,
error=str(result)
)
# Delete parent/main session data
await data_cloner.delete_session_data(
str(session.virtual_tenant_id),
session.session_id
)
stats["cleaned_up"] += 1
logger.info(
"Session cleaned up",
session_id=session.session_id,
is_enterprise=(session.demo_account_type == "enterprise"),
children_deleted=len(child_tenant_ids)
)
except Exception as e:
stats["failed"] += 1
stats["errors"].append({
"session_id": session.session_id,
"error": str(e)
})
logger.error(
"Failed to cleanup session",
session_id=session.session_id,
error=str(e),
exc_info=True
)
return stats
finally:
# Always close HTTP client
await data_cloner.close()
async def _mark_job_complete(self, job_id: str, stats: Dict[str, Any]):
"""Mark job as complete in Redis"""
client = await self.redis.get_client()
status_key = f"cleanup:job:{job_id}:status"
await client.setex(
status_key,
3600, # Keep status for 1 hour
json.dumps({
"status": "completed",
"stats": stats,
"completed_at": datetime.now(timezone.utc).isoformat()
})
)
async def _mark_job_failed(self, job_id: str, error: str):
"""Mark job as failed in Redis"""
client = await self.redis.get_client()
status_key = f"cleanup:job:{job_id}:status"
await client.setex(
status_key,
3600,
json.dumps({
"status": "failed",
"error": error,
"failed_at": datetime.now(timezone.utc).isoformat()
})
)
async def run_cleanup_worker():
"""Entry point for worker process"""
# Initialize Redis client
import os
from shared.redis_utils import initialize_redis
from app.core.config import Settings
settings = Settings()
redis_url = settings.REDIS_URL # Use proper configuration with TLS and auth
try:
# Initialize Redis with connection pool settings
await initialize_redis(redis_url, db=settings.REDIS_DB, max_connections=settings.REDIS_MAX_CONNECTIONS)
logger.info("Redis initialized successfully", redis_url=redis_url.split('@')[-1], db=settings.REDIS_DB)
except Exception as e:
logger.error("Failed to initialize Redis", error=str(e), redis_url=redis_url.split('@')[-1])
raise
redis = DemoRedisWrapper()
worker = CleanupWorker(redis)
try:
await worker.start()
except KeyboardInterrupt:
logger.info("Received interrupt signal")
await worker.stop()
except Exception as e:
logger.error("Worker crashed", error=str(e), exc_info=True)
raise
if __name__ == "__main__":
asyncio.run(run_cleanup_worker())