635 lines
29 KiB
Python
635 lines
29 KiB
Python
"""
|
|
Subscription Limit Service
|
|
Service for validating tenant actions against subscription limits and features
|
|
"""
|
|
|
|
import structlog
|
|
from typing import Dict, Any, Optional
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from fastapi import HTTPException, status
|
|
from datetime import datetime, timezone
|
|
|
|
from app.repositories import SubscriptionRepository, TenantRepository, TenantMemberRepository
|
|
from app.models.tenants import Subscription, Tenant, TenantMember
|
|
from shared.database.exceptions import DatabaseError
|
|
from shared.database.base import create_database_manager
|
|
from shared.subscription.plans import SubscriptionPlanMetadata, get_training_job_quota, get_forecast_quota
|
|
from shared.clients.recipes_client import create_recipes_client
|
|
from shared.clients.suppliers_client import create_suppliers_client
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class SubscriptionLimitService:
|
|
"""Service for validating subscription limits and features"""
|
|
|
|
def __init__(self, database_manager=None, redis_client=None):
|
|
self.database_manager = database_manager or create_database_manager()
|
|
self.redis = redis_client
|
|
|
|
async def _init_repositories(self, session):
|
|
"""Initialize repositories with session"""
|
|
self.subscription_repo = SubscriptionRepository(Subscription, session)
|
|
self.tenant_repo = TenantRepository(Tenant, session)
|
|
self.member_repo = TenantMemberRepository(TenantMember, session)
|
|
return {
|
|
'subscription': self.subscription_repo,
|
|
'tenant': self.tenant_repo,
|
|
'member': self.member_repo
|
|
}
|
|
|
|
async def get_tenant_subscription_limits(self, tenant_id: str) -> Dict[str, Any]:
|
|
"""Get current subscription limits for a tenant"""
|
|
try:
|
|
async with self.database_manager.get_session() as db_session:
|
|
await self._init_repositories(db_session)
|
|
|
|
subscription = await self.subscription_repo.get_active_subscription(tenant_id)
|
|
if not subscription:
|
|
# Return basic limits if no subscription
|
|
return {
|
|
"plan": "starter",
|
|
"max_users": 5,
|
|
"max_locations": 1,
|
|
"max_products": 50,
|
|
"features": {
|
|
"inventory_management": "basic",
|
|
"demand_prediction": "basic",
|
|
"production_reports": "basic",
|
|
"analytics": "basic",
|
|
"support": "email",
|
|
"ai_model_configuration": "basic" # Added AI model configuration for all tiers
|
|
}
|
|
}
|
|
|
|
return {
|
|
"plan": subscription.plan,
|
|
"max_users": subscription.max_users,
|
|
"max_locations": subscription.max_locations,
|
|
"max_products": subscription.max_products,
|
|
"features": subscription.features or {}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get subscription limits",
|
|
tenant_id=tenant_id,
|
|
error=str(e))
|
|
# Return basic limits on error
|
|
return {
|
|
"plan": "starter",
|
|
"max_users": 5,
|
|
"max_locations": 1,
|
|
"max_products": 50,
|
|
"features": {}
|
|
}
|
|
|
|
async def can_add_location(self, tenant_id: str) -> Dict[str, Any]:
|
|
"""Check if tenant can add another location"""
|
|
try:
|
|
async with self.database_manager.get_session() as db_session:
|
|
await self._init_repositories(db_session)
|
|
|
|
# Get subscription limits
|
|
subscription = await self.subscription_repo.get_active_subscription(tenant_id)
|
|
if not subscription:
|
|
return {"can_add": False, "reason": "No active subscription"}
|
|
|
|
# Check if unlimited locations (-1)
|
|
if subscription.max_locations == -1:
|
|
return {"can_add": True, "reason": "Unlimited locations allowed"}
|
|
|
|
# Count current locations
|
|
# Currently, each tenant has 1 location (their primary bakery location)
|
|
# This is stored in tenant.address, tenant.city, tenant.postal_code
|
|
# If multi-location support is added in the future, this would query a locations table
|
|
current_locations = 1 # Each tenant has one primary location
|
|
|
|
can_add = current_locations < subscription.max_locations
|
|
return {
|
|
"can_add": can_add,
|
|
"current_count": current_locations,
|
|
"max_allowed": subscription.max_locations,
|
|
"reason": "Within limits" if can_add else f"Maximum {subscription.max_locations} locations allowed for {subscription.plan} plan"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to check location limits",
|
|
tenant_id=tenant_id,
|
|
error=str(e))
|
|
return {"can_add": False, "reason": "Error checking limits"}
|
|
|
|
async def can_add_product(self, tenant_id: str) -> Dict[str, Any]:
|
|
"""Check if tenant can add another product"""
|
|
try:
|
|
async with self.database_manager.get_session() as db_session:
|
|
await self._init_repositories(db_session)
|
|
|
|
# Get subscription limits
|
|
subscription = await self.subscription_repo.get_active_subscription(tenant_id)
|
|
if not subscription:
|
|
return {"can_add": False, "reason": "No active subscription"}
|
|
|
|
# Check if unlimited products (-1)
|
|
if subscription.max_products == -1:
|
|
return {"can_add": True, "reason": "Unlimited products allowed"}
|
|
|
|
# Count current products from inventory service
|
|
current_products = await self._get_ingredient_count(tenant_id)
|
|
|
|
can_add = current_products < subscription.max_products
|
|
return {
|
|
"can_add": can_add,
|
|
"current_count": current_products,
|
|
"max_allowed": subscription.max_products,
|
|
"reason": "Within limits" if can_add else f"Maximum {subscription.max_products} products allowed for {subscription.plan} plan"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to check product limits",
|
|
tenant_id=tenant_id,
|
|
error=str(e))
|
|
return {"can_add": False, "reason": "Error checking limits"}
|
|
|
|
async def can_add_user(self, tenant_id: str) -> Dict[str, Any]:
|
|
"""Check if tenant can add another user/member"""
|
|
try:
|
|
async with self.database_manager.get_session() as db_session:
|
|
await self._init_repositories(db_session)
|
|
|
|
# Get subscription limits
|
|
subscription = await self.subscription_repo.get_active_subscription(tenant_id)
|
|
if not subscription:
|
|
return {"can_add": False, "reason": "No active subscription"}
|
|
|
|
# Check if unlimited users (-1)
|
|
if subscription.max_users == -1:
|
|
return {"can_add": True, "reason": "Unlimited users allowed"}
|
|
|
|
# Count current active members
|
|
members = await self.member_repo.get_tenant_members(tenant_id, active_only=True)
|
|
current_users = len(members)
|
|
|
|
can_add = current_users < subscription.max_users
|
|
return {
|
|
"can_add": can_add,
|
|
"current_count": current_users,
|
|
"max_allowed": subscription.max_users,
|
|
"reason": "Within limits" if can_add else f"Maximum {subscription.max_users} users allowed for {subscription.plan} plan"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to check user limits",
|
|
tenant_id=tenant_id,
|
|
error=str(e))
|
|
return {"can_add": False, "reason": "Error checking limits"}
|
|
|
|
async def has_feature(self, tenant_id: str, feature: str) -> Dict[str, Any]:
|
|
"""Check if tenant has access to a specific feature"""
|
|
try:
|
|
async with self.database_manager.get_session() as db_session:
|
|
await self._init_repositories(db_session)
|
|
|
|
subscription = await self.subscription_repo.get_active_subscription(tenant_id)
|
|
if not subscription:
|
|
return {"has_feature": False, "reason": "No active subscription"}
|
|
|
|
features = subscription.features or {}
|
|
has_feature = feature in features
|
|
|
|
return {
|
|
"has_feature": has_feature,
|
|
"feature_value": features.get(feature),
|
|
"plan": subscription.plan,
|
|
"reason": "Feature available" if has_feature else f"Feature '{feature}' not available in {subscription.plan} plan"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to check feature access",
|
|
tenant_id=tenant_id,
|
|
feature=feature,
|
|
error=str(e))
|
|
return {"has_feature": False, "reason": "Error checking feature access"}
|
|
|
|
async def get_feature_level(self, tenant_id: str, feature: str) -> Optional[str]:
|
|
"""Get the level/type of a feature for a tenant"""
|
|
try:
|
|
async with self.database_manager.get_session() as db_session:
|
|
await self._init_repositories(db_session)
|
|
|
|
subscription = await self.subscription_repo.get_active_subscription(tenant_id)
|
|
if not subscription:
|
|
return None
|
|
|
|
features = subscription.features or {}
|
|
return features.get(feature)
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get feature level",
|
|
tenant_id=tenant_id,
|
|
feature=feature,
|
|
error=str(e))
|
|
return None
|
|
|
|
async def validate_plan_upgrade(self, tenant_id: str, new_plan: str) -> Dict[str, Any]:
|
|
"""Validate if a tenant can upgrade to a new plan"""
|
|
try:
|
|
async with self.database_manager.get_session() as db_session:
|
|
await self._init_repositories(db_session)
|
|
|
|
# Get current subscription
|
|
current_subscription = await self.subscription_repo.get_active_subscription(tenant_id)
|
|
if not current_subscription:
|
|
return {"can_upgrade": True, "reason": "No current subscription, can start with any plan"}
|
|
|
|
# Define plan hierarchy
|
|
plan_hierarchy = {"starter": 1, "professional": 2, "enterprise": 3}
|
|
|
|
current_level = plan_hierarchy.get(current_subscription.plan, 0)
|
|
new_level = plan_hierarchy.get(new_plan, 0)
|
|
|
|
if new_level == 0:
|
|
return {"can_upgrade": False, "reason": f"Invalid plan: {new_plan}"}
|
|
|
|
# Check current usage against new plan limits
|
|
from shared.subscription.plans import SubscriptionPlanMetadata, PlanPricing
|
|
new_plan_config = SubscriptionPlanMetadata.get_plan_info(new_plan)
|
|
|
|
# Get the max_users limit from the plan limits
|
|
plan_limits = new_plan_config.get('limits', {})
|
|
max_users_limit = plan_limits.get('users', 5) # Default to 5 if not specified
|
|
# Convert "Unlimited" string to None for comparison
|
|
if max_users_limit == "Unlimited":
|
|
max_users_limit = None
|
|
elif max_users_limit is None:
|
|
max_users_limit = -1 # Use -1 to represent unlimited in the comparison
|
|
|
|
# Check if current usage fits new plan
|
|
members = await self.member_repo.get_tenant_members(tenant_id, active_only=True)
|
|
current_users = len(members)
|
|
|
|
if max_users_limit is not None and max_users_limit != -1 and current_users > max_users_limit:
|
|
return {
|
|
"can_upgrade": False,
|
|
"reason": f"Current usage ({current_users} users) exceeds {new_plan} plan limits ({max_users_limit} users)"
|
|
}
|
|
|
|
return {
|
|
"can_upgrade": True,
|
|
"current_plan": current_subscription.plan,
|
|
"new_plan": new_plan,
|
|
"price_change": float(PlanPricing.get_price(new_plan)) - current_subscription.monthly_price,
|
|
"new_features": new_plan_config.get("features", []),
|
|
"reason": "Upgrade validation successful"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to validate plan upgrade",
|
|
tenant_id=tenant_id,
|
|
new_plan=new_plan,
|
|
error=str(e))
|
|
return {"can_upgrade": False, "reason": "Error validating upgrade"}
|
|
|
|
async def get_usage_summary(self, tenant_id: str) -> Dict[str, Any]:
|
|
"""Get a summary of current usage vs limits for a tenant - ALL 9 METRICS"""
|
|
try:
|
|
async with self.database_manager.get_session() as db_session:
|
|
await self._init_repositories(db_session)
|
|
|
|
subscription = await self.subscription_repo.get_active_subscription(tenant_id)
|
|
if not subscription:
|
|
logger.info("No subscription found, returning mock data", tenant_id=tenant_id)
|
|
return {
|
|
"plan": "demo",
|
|
"monthly_price": 0,
|
|
"status": "active",
|
|
"billing_cycle": "monthly",
|
|
"usage": {
|
|
"users": {
|
|
"current": 1,
|
|
"limit": 5,
|
|
"unlimited": False,
|
|
"usage_percentage": 20.0
|
|
},
|
|
"locations": {
|
|
"current": 1,
|
|
"limit": 1,
|
|
"unlimited": False,
|
|
"usage_percentage": 100.0
|
|
},
|
|
"products": {
|
|
"current": 0,
|
|
"limit": 50,
|
|
"unlimited": False,
|
|
"usage_percentage": 0.0
|
|
},
|
|
"recipes": {
|
|
"current": 0,
|
|
"limit": 100,
|
|
"unlimited": False,
|
|
"usage_percentage": 0.0
|
|
},
|
|
"suppliers": {
|
|
"current": 0,
|
|
"limit": 20,
|
|
"unlimited": False,
|
|
"usage_percentage": 0.0
|
|
},
|
|
"training_jobs_today": {
|
|
"current": 0,
|
|
"limit": 2,
|
|
"unlimited": False,
|
|
"usage_percentage": 0.0
|
|
},
|
|
"forecasts_today": {
|
|
"current": 0,
|
|
"limit": 10,
|
|
"unlimited": False,
|
|
"usage_percentage": 0.0
|
|
},
|
|
"api_calls_this_hour": {
|
|
"current": 0,
|
|
"limit": 100,
|
|
"unlimited": False,
|
|
"usage_percentage": 0.0
|
|
},
|
|
"file_storage_used_gb": {
|
|
"current": 0.0,
|
|
"limit": 1.0,
|
|
"unlimited": False,
|
|
"usage_percentage": 0.0
|
|
}
|
|
},
|
|
"features": {},
|
|
"next_billing_date": None,
|
|
"trial_ends_at": None
|
|
}
|
|
|
|
# Get current usage - Team & Organization
|
|
members = await self.member_repo.get_tenant_members(tenant_id, active_only=True)
|
|
current_users = len(members)
|
|
current_locations = 1 # Each tenant has one primary location
|
|
|
|
# Get current usage - Products & Inventory
|
|
current_products = await self._get_ingredient_count(tenant_id)
|
|
current_recipes = await self._get_recipe_count(tenant_id)
|
|
current_suppliers = await self._get_supplier_count(tenant_id)
|
|
|
|
# Get current usage - IA & Analytics (Redis-based daily quotas)
|
|
training_jobs_usage = await self._get_training_jobs_today(tenant_id, subscription.plan)
|
|
forecasts_usage = await self._get_forecasts_today(tenant_id, subscription.plan)
|
|
|
|
# Get current usage - API & Storage (Redis-based)
|
|
api_calls_usage = await self._get_api_calls_this_hour(tenant_id, subscription.plan)
|
|
storage_usage = await self._get_file_storage_usage_gb(tenant_id, subscription.plan)
|
|
|
|
# Get limits from subscription
|
|
recipes_limit = await self._get_limit_from_plan(subscription.plan, 'recipes')
|
|
suppliers_limit = await self._get_limit_from_plan(subscription.plan, 'suppliers')
|
|
|
|
return {
|
|
"plan": subscription.plan,
|
|
"monthly_price": subscription.monthly_price,
|
|
"status": subscription.status,
|
|
"billing_cycle": subscription.billing_cycle or "monthly",
|
|
"usage": {
|
|
# Team & Organization
|
|
"users": {
|
|
"current": current_users,
|
|
"limit": subscription.max_users,
|
|
"unlimited": subscription.max_users == -1,
|
|
"usage_percentage": 0 if subscription.max_users == -1 else self._calculate_percentage(current_users, subscription.max_users)
|
|
},
|
|
"locations": {
|
|
"current": current_locations,
|
|
"limit": subscription.max_locations,
|
|
"unlimited": subscription.max_locations == -1,
|
|
"usage_percentage": 0 if subscription.max_locations == -1 else self._calculate_percentage(current_locations, subscription.max_locations)
|
|
},
|
|
# Products & Inventory
|
|
"products": {
|
|
"current": current_products,
|
|
"limit": subscription.max_products,
|
|
"unlimited": subscription.max_products == -1,
|
|
"usage_percentage": 0 if subscription.max_products == -1 else self._calculate_percentage(current_products, subscription.max_products)
|
|
},
|
|
"recipes": {
|
|
"current": current_recipes,
|
|
"limit": recipes_limit,
|
|
"unlimited": recipes_limit is None,
|
|
"usage_percentage": self._calculate_percentage(current_recipes, recipes_limit)
|
|
},
|
|
"suppliers": {
|
|
"current": current_suppliers,
|
|
"limit": suppliers_limit,
|
|
"unlimited": suppliers_limit is None,
|
|
"usage_percentage": self._calculate_percentage(current_suppliers, suppliers_limit)
|
|
},
|
|
# IA & Analytics (Daily quotas)
|
|
"training_jobs_today": training_jobs_usage,
|
|
"forecasts_today": forecasts_usage,
|
|
# API & Storage
|
|
"api_calls_this_hour": api_calls_usage,
|
|
"file_storage_used_gb": storage_usage
|
|
},
|
|
"features": subscription.features or {},
|
|
"next_billing_date": subscription.next_billing_date.isoformat() if subscription.next_billing_date else None,
|
|
"trial_ends_at": subscription.trial_ends_at.isoformat() if subscription.trial_ends_at else None
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get usage summary",
|
|
tenant_id=tenant_id,
|
|
error=str(e))
|
|
return {"error": "Failed to get usage summary"}
|
|
|
|
async def _get_ingredient_count(self, tenant_id: str) -> int:
|
|
"""Get ingredient count from inventory service using shared client"""
|
|
try:
|
|
from app.core.config import settings
|
|
from shared.clients.inventory_client import create_inventory_client
|
|
|
|
# Use the shared inventory client with proper authentication
|
|
inventory_client = create_inventory_client(settings)
|
|
count = await inventory_client.count_ingredients(tenant_id)
|
|
|
|
logger.info(
|
|
"Retrieved ingredient count via inventory client",
|
|
tenant_id=tenant_id,
|
|
count=count
|
|
)
|
|
return count
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error getting ingredient count via inventory client",
|
|
tenant_id=tenant_id,
|
|
error=str(e)
|
|
)
|
|
# Return 0 as fallback to avoid breaking subscription display
|
|
return 0
|
|
|
|
async def _get_recipe_count(self, tenant_id: str) -> int:
|
|
"""Get recipe count from recipes service using shared client"""
|
|
try:
|
|
from app.core.config import settings
|
|
|
|
# Use the shared recipes client with proper authentication and resilience
|
|
recipes_client = create_recipes_client(settings)
|
|
count = await recipes_client.count_recipes(tenant_id)
|
|
|
|
logger.info(
|
|
"Retrieved recipe count via recipes client",
|
|
tenant_id=tenant_id,
|
|
count=count
|
|
)
|
|
return count
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error getting recipe count via recipes client",
|
|
tenant_id=tenant_id,
|
|
error=str(e)
|
|
)
|
|
# Return 0 as fallback to avoid breaking subscription display
|
|
return 0
|
|
|
|
async def _get_supplier_count(self, tenant_id: str) -> int:
|
|
"""Get supplier count from suppliers service using shared client"""
|
|
try:
|
|
from app.core.config import settings
|
|
|
|
# Use the shared suppliers client with proper authentication and resilience
|
|
suppliers_client = create_suppliers_client(settings)
|
|
count = await suppliers_client.count_suppliers(tenant_id)
|
|
|
|
logger.info(
|
|
"Retrieved supplier count via suppliers client",
|
|
tenant_id=tenant_id,
|
|
count=count
|
|
)
|
|
return count
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error getting supplier count via suppliers client",
|
|
tenant_id=tenant_id,
|
|
error=str(e)
|
|
)
|
|
# Return 0 as fallback to avoid breaking subscription display
|
|
return 0
|
|
|
|
async def _get_redis_quota(self, quota_key: str) -> int:
|
|
"""Get current count from Redis quota key"""
|
|
try:
|
|
if not self.redis:
|
|
# Try to initialize Redis client if not available
|
|
from app.core.config import settings
|
|
import shared.redis_utils
|
|
self.redis = await shared.redis_utils.initialize_redis(settings.REDIS_URL)
|
|
|
|
if not self.redis:
|
|
return 0
|
|
|
|
current = await self.redis.get(quota_key)
|
|
return int(current) if current else 0
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting Redis quota", key=quota_key, error=str(e))
|
|
return 0
|
|
|
|
async def _get_training_jobs_today(self, tenant_id: str, plan: str) -> Dict[str, Any]:
|
|
"""Get training jobs usage for today"""
|
|
try:
|
|
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
quota_key = f"quota:daily:training_jobs:{tenant_id}:{date_str}"
|
|
current_count = await self._get_redis_quota(quota_key)
|
|
|
|
limit = get_training_job_quota(plan)
|
|
|
|
return {
|
|
"current": current_count,
|
|
"limit": limit,
|
|
"unlimited": limit is None,
|
|
"usage_percentage": self._calculate_percentage(current_count, limit)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting training jobs today", tenant_id=tenant_id, error=str(e))
|
|
return {"current": 0, "limit": None, "unlimited": True, "usage_percentage": 0.0}
|
|
|
|
async def _get_forecasts_today(self, tenant_id: str, plan: str) -> Dict[str, Any]:
|
|
"""Get forecast generation usage for today"""
|
|
try:
|
|
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
quota_key = f"quota:daily:forecast_generation:{tenant_id}:{date_str}"
|
|
current_count = await self._get_redis_quota(quota_key)
|
|
|
|
limit = get_forecast_quota(plan)
|
|
|
|
return {
|
|
"current": current_count,
|
|
"limit": limit,
|
|
"unlimited": limit is None,
|
|
"usage_percentage": self._calculate_percentage(current_count, limit)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting forecasts today", tenant_id=tenant_id, error=str(e))
|
|
return {"current": 0, "limit": None, "unlimited": True, "usage_percentage": 0.0}
|
|
|
|
async def _get_api_calls_this_hour(self, tenant_id: str, plan: str) -> Dict[str, Any]:
|
|
"""Get API calls usage for current hour"""
|
|
try:
|
|
hour_str = datetime.now(timezone.utc).strftime('%Y-%m-%d-%H')
|
|
quota_key = f"quota:hourly:api_calls:{tenant_id}:{hour_str}"
|
|
current_count = await self._get_redis_quota(quota_key)
|
|
|
|
plan_metadata = SubscriptionPlanMetadata.PLANS.get(plan, {})
|
|
limit = plan_metadata.get('limits', {}).get('api_calls_per_hour')
|
|
|
|
return {
|
|
"current": current_count,
|
|
"limit": limit,
|
|
"unlimited": limit is None,
|
|
"usage_percentage": self._calculate_percentage(current_count, limit)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting API calls this hour", tenant_id=tenant_id, error=str(e))
|
|
return {"current": 0, "limit": None, "unlimited": True, "usage_percentage": 0.0}
|
|
|
|
async def _get_file_storage_usage_gb(self, tenant_id: str, plan: str) -> Dict[str, Any]:
|
|
"""Get file storage usage in GB"""
|
|
try:
|
|
storage_key = f"storage:total_bytes:{tenant_id}"
|
|
total_bytes = await self._get_redis_quota(storage_key)
|
|
total_gb = round(total_bytes / (1024 ** 3), 2) if total_bytes > 0 else 0.0
|
|
|
|
plan_metadata = SubscriptionPlanMetadata.PLANS.get(plan, {})
|
|
limit = plan_metadata.get('limits', {}).get('file_storage_gb')
|
|
|
|
return {
|
|
"current": total_gb,
|
|
"limit": limit,
|
|
"unlimited": limit is None,
|
|
"usage_percentage": self._calculate_percentage(total_gb, limit)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Error getting file storage usage", tenant_id=tenant_id, error=str(e))
|
|
return {"current": 0.0, "limit": None, "unlimited": True, "usage_percentage": 0.0}
|
|
|
|
def _calculate_percentage(self, current: float, limit: Optional[int]) -> float:
|
|
"""Calculate usage percentage"""
|
|
if limit is None or limit == -1:
|
|
return 0.0
|
|
if limit == 0:
|
|
return 0.0
|
|
return round((current / limit) * 100, 1)
|
|
|
|
async def _get_limit_from_plan(self, plan: str, limit_key: str) -> Optional[int]:
|
|
"""Get limit value from plan metadata"""
|
|
plan_metadata = SubscriptionPlanMetadata.PLANS.get(plan, {})
|
|
limit = plan_metadata.get('limits', {}).get(limit_key)
|
|
return limit if limit != -1 else None
|