104 lines
3.0 KiB
Python
104 lines
3.0 KiB
Python
"""
|
|
Background job to process subscription downgrades at period end
|
|
|
|
Runs periodically to check for subscriptions with:
|
|
- status = 'pending_cancellation'
|
|
- cancellation_effective_date <= now()
|
|
|
|
Converts them to 'inactive' status
|
|
"""
|
|
|
|
import structlog
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.database import get_async_session_factory
|
|
from app.models.tenants import Subscription
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
async def process_pending_cancellations():
|
|
"""
|
|
Process all subscriptions that have reached their cancellation_effective_date
|
|
"""
|
|
async_session_factory = get_async_session_factory()
|
|
|
|
async with async_session_factory() as session:
|
|
try:
|
|
query = select(Subscription).where(
|
|
Subscription.status == 'pending_cancellation',
|
|
Subscription.cancellation_effective_date <= datetime.now(timezone.utc)
|
|
)
|
|
|
|
result = await session.execute(query)
|
|
subscriptions_to_downgrade = result.scalars().all()
|
|
|
|
downgraded_count = 0
|
|
|
|
for subscription in subscriptions_to_downgrade:
|
|
subscription.status = 'inactive'
|
|
subscription.plan = 'free'
|
|
subscription.monthly_price = 0.0
|
|
|
|
logger.info(
|
|
"subscription_downgraded_to_inactive",
|
|
tenant_id=str(subscription.tenant_id),
|
|
previous_plan=subscription.plan,
|
|
cancellation_effective_date=subscription.cancellation_effective_date.isoformat()
|
|
)
|
|
|
|
downgraded_count += 1
|
|
|
|
if downgraded_count > 0:
|
|
await session.commit()
|
|
logger.info(
|
|
"subscriptions_downgraded",
|
|
count=downgraded_count
|
|
)
|
|
else:
|
|
logger.debug("no_subscriptions_to_downgrade")
|
|
|
|
return downgraded_count
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"subscription_downgrade_job_failed",
|
|
error=str(e)
|
|
)
|
|
await session.rollback()
|
|
raise
|
|
|
|
|
|
async def run_subscription_downgrade_job():
|
|
"""
|
|
Main entry point for the subscription downgrade job
|
|
Runs in a loop with configurable interval
|
|
"""
|
|
interval_seconds = 3600 # Run every hour
|
|
|
|
logger.info("subscription_downgrade_job_started", interval_seconds=interval_seconds)
|
|
|
|
while True:
|
|
try:
|
|
downgraded_count = await process_pending_cancellations()
|
|
|
|
logger.info(
|
|
"subscription_downgrade_job_completed",
|
|
downgraded_count=downgraded_count
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"subscription_downgrade_job_error",
|
|
error=str(e)
|
|
)
|
|
|
|
await asyncio.sleep(interval_seconds)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_subscription_downgrade_job())
|