Support subcription payments

This commit is contained in:
Urtzi Alfaro
2025-09-25 14:30:47 +02:00
parent f02a980c87
commit 89b75bd7af
22 changed files with 2119 additions and 364 deletions

View File

@@ -21,59 +21,67 @@ class ProductionAlertService(BaseAlertService, AlertServiceMixin):
def setup_scheduled_checks(self):
"""Production-specific scheduled checks for alerts and recommendations"""
# Production capacity checks - every 10 minutes during business hours (alerts)
# Reduced frequency to prevent deadlocks and resource contention
# Production capacity checks - every 15 minutes during business hours (reduced from 10)
self.scheduler.add_job(
self.check_production_capacity,
CronTrigger(minute='*/10', hour='6-20'),
CronTrigger(minute='*/15', hour='6-20'),
id='capacity_check',
misfire_grace_time=60,
max_instances=1
misfire_grace_time=120, # Increased grace time
max_instances=1,
coalesce=True # Combine missed runs
)
# Production delays - every 5 minutes during production hours (alerts)
# Production delays - every 10 minutes during production hours (reduced from 5)
self.scheduler.add_job(
self.check_production_delays,
CronTrigger(minute='*/5', hour='4-22'),
CronTrigger(minute='*/10', hour='4-22'),
id='delay_check',
misfire_grace_time=30,
max_instances=1
misfire_grace_time=60,
max_instances=1,
coalesce=True
)
# Quality issues check - every 15 minutes (alerts)
# Quality issues check - every 20 minutes (reduced from 15)
self.scheduler.add_job(
self.check_quality_issues,
CronTrigger(minute='*/15'),
CronTrigger(minute='*/20'),
id='quality_check',
misfire_grace_time=60,
max_instances=1
misfire_grace_time=120,
max_instances=1,
coalesce=True
)
# Equipment monitoring - check equipment status for maintenance alerts
# Equipment monitoring - check equipment status every 45 minutes (reduced from 30)
self.scheduler.add_job(
self.check_equipment_status,
CronTrigger(minute='*/30'), # Check every 30 minutes
CronTrigger(minute='*/45'),
id='equipment_check',
misfire_grace_time=30,
max_instances=1
misfire_grace_time=180,
max_instances=1,
coalesce=True
)
# Efficiency recommendations - every 30 minutes (recommendations)
# Efficiency recommendations - every hour (reduced from 30 minutes)
self.scheduler.add_job(
self.generate_efficiency_recommendations,
CronTrigger(minute='*/30'),
CronTrigger(minute='0'),
id='efficiency_recs',
misfire_grace_time=120,
max_instances=1
misfire_grace_time=300,
max_instances=1,
coalesce=True
)
# Energy optimization - every hour (recommendations)
# Energy optimization - every 2 hours (reduced from 1 hour)
self.scheduler.add_job(
self.generate_energy_recommendations,
CronTrigger(minute='0'),
CronTrigger(minute='0', hour='*/2'),
id='energy_recs',
misfire_grace_time=300,
max_instances=1
misfire_grace_time=600, # 10 minutes grace
max_instances=1,
coalesce=True
)
logger.info("Production alert schedules configured",
@@ -83,69 +91,47 @@ class ProductionAlertService(BaseAlertService, AlertServiceMixin):
"""Check if production plan exceeds capacity (alerts)"""
try:
self._checks_performed += 1
query = """
WITH capacity_analysis AS (
SELECT
p.tenant_id,
p.planned_date,
SUM(p.planned_quantity) as total_planned,
MAX(pc.daily_capacity) as max_daily_capacity,
COUNT(DISTINCT p.equipment_id) as equipment_count,
AVG(pc.efficiency_percent) as avg_efficiency,
CASE
WHEN SUM(p.planned_quantity) > MAX(pc.daily_capacity) * 1.2 THEN 'severe_overload'
WHEN SUM(p.planned_quantity) > MAX(pc.daily_capacity) THEN 'overload'
WHEN SUM(p.planned_quantity) > MAX(pc.daily_capacity) * 0.9 THEN 'near_capacity'
ELSE 'normal'
END as capacity_status,
(SUM(p.planned_quantity) / MAX(pc.daily_capacity)) * 100 as capacity_percentage
FROM production_schedule p
JOIN production_capacity pc ON pc.equipment_id = p.equipment_id
WHERE p.planned_date >= CURRENT_DATE
AND p.planned_date <= CURRENT_DATE + INTERVAL '3 days'
AND p.status IN ('PENDING', 'IN_PROGRESS')
AND p.tenant_id = $1
GROUP BY p.tenant_id, p.planned_date
)
SELECT * FROM capacity_analysis
WHERE capacity_status != 'normal'
ORDER BY capacity_percentage DESC
"""
# Check production capacity without tenant dependencies
# Use a simpler query with timeout and connection management
from sqlalchemy import text
simplified_query = text("""
SELECT
pb.tenant_id,
DATE(pb.planned_start_time) as planned_date,
COUNT(*) as batch_count,
SUM(pb.planned_quantity) as total_planned,
'capacity_check' as capacity_status,
100.0 as capacity_percentage -- Default value for processing
FROM production_batches pb
WHERE pb.planned_start_time >= CURRENT_DATE
AND pb.planned_start_time <= CURRENT_DATE + INTERVAL '3 days'
AND pb.status IN ('PLANNED', 'PENDING', 'IN_PROGRESS')
GROUP BY pb.tenant_id, DATE(pb.planned_start_time)
HAVING COUNT(*) > 10 -- Alert if more than 10 batches per day
ORDER BY total_planned DESC
LIMIT 20 -- Limit results to prevent excessive processing
""")
# Use timeout and proper session handling
try:
from sqlalchemy import text
# Simplified query using only existing production tables
simplified_query = text("""
SELECT
pb.tenant_id,
DATE(pb.planned_start_time) as planned_date,
COUNT(*) as batch_count,
SUM(pb.planned_quantity) as total_planned,
'capacity_check' as capacity_status
FROM production_batches pb
WHERE pb.planned_start_time >= CURRENT_DATE
AND pb.planned_start_time <= CURRENT_DATE + INTERVAL '3 days'
AND pb.status IN ('PLANNED', 'PENDING', 'IN_PROGRESS')
GROUP BY pb.tenant_id, DATE(pb.planned_start_time)
HAVING COUNT(*) > 10 -- Alert if more than 10 batches per day
ORDER BY total_planned DESC
""")
async with self.db_manager.get_session() as session:
# Set statement timeout to prevent long-running queries
await session.execute(text("SET statement_timeout = '30s'"))
result = await session.execute(simplified_query)
capacity_issues = result.fetchall()
for issue in capacity_issues:
await self._process_capacity_issue(issue.tenant_id, issue)
except asyncio.TimeoutError:
logger.warning("Capacity check timed out", service=self.config.SERVICE_NAME)
self._errors_count += 1
except Exception as e:
logger.debug("Simplified capacity check failed", error=str(e))
logger.debug("Capacity check failed", error=str(e), service=self.config.SERVICE_NAME)
except Exception as e:
# Skip capacity checks if tables don't exist (graceful degradation)
if "does not exist" in str(e):
if "does not exist" in str(e).lower() or "relation" in str(e).lower():
logger.debug("Capacity check skipped - missing tables", error=str(e))
else:
logger.error("Capacity check failed", error=str(e))
@@ -215,10 +201,10 @@ class ProductionAlertService(BaseAlertService, AlertServiceMixin):
"""Check for production delays (alerts)"""
try:
self._checks_performed += 1
# Simplified query without customer_orders dependency
query = """
SELECT
# Simplified query with timeout and proper error handling
query = text("""
SELECT
pb.id, pb.tenant_id, pb.product_name, pb.batch_number,
pb.planned_end_time as planned_completion_time, pb.actual_start_time,
pb.actual_end_time as estimated_completion_time, pb.status,
@@ -232,24 +218,34 @@ class ProductionAlertService(BaseAlertService, AlertServiceMixin):
OR pb.status IN ('ON_HOLD', 'QUALITY_CHECK')
)
AND pb.planned_end_time > NOW() - INTERVAL '24 hours'
ORDER BY
ORDER BY
CASE COALESCE(pb.priority::text, 'MEDIUM')
WHEN 'URGENT' THEN 1 WHEN 'HIGH' THEN 2 ELSE 3
WHEN 'URGENT' THEN 1 WHEN 'HIGH' THEN 2 ELSE 3
END,
delay_minutes DESC
"""
from sqlalchemy import text
async with self.db_manager.get_session() as session:
result = await session.execute(text(query))
delays = result.fetchall()
for delay in delays:
await self._process_production_delay(delay)
LIMIT 50 -- Limit results to prevent excessive processing
""")
try:
from sqlalchemy import text
async with self.db_manager.get_session() as session:
# Set statement timeout
await session.execute(text("SET statement_timeout = '30s'"))
result = await session.execute(query)
delays = result.fetchall()
for delay in delays:
await self._process_production_delay(delay)
except asyncio.TimeoutError:
logger.warning("Production delay check timed out", service=self.config.SERVICE_NAME)
self._errors_count += 1
except Exception as e:
logger.debug("Production delay check failed", error=str(e), service=self.config.SERVICE_NAME)
except Exception as e:
# Skip delay checks if tables don't exist (graceful degradation)
if "does not exist" in str(e):
if "does not exist" in str(e).lower() or "relation" in str(e).lower():
logger.debug("Production delay check skipped - missing tables", error=str(e))
else:
logger.error("Production delay check failed", error=str(e))

View File

@@ -8,6 +8,7 @@ from typing import List, Dict, Any, Optional
from uuid import UUID
from app.services.subscription_limit_service import SubscriptionLimitService
from app.services.payment_service import PaymentService
from app.repositories import SubscriptionRepository
from app.models.tenants import Subscription
from shared.auth.decorators import get_current_user_dep, require_admin_role_dep
@@ -27,6 +28,13 @@ def get_subscription_limit_service():
logger.error("Failed to create subscription limit service", error=str(e))
raise HTTPException(status_code=500, detail="Service initialization failed")
def get_payment_service():
try:
return PaymentService()
except Exception as e:
logger.error("Failed to create payment service", error=str(e))
raise HTTPException(status_code=500, detail="Payment service initialization failed")
def get_subscription_repository():
try:
from app.core.config import settings
@@ -182,7 +190,7 @@ async def validate_plan_upgrade(
"""Validate if tenant can upgrade to a new plan"""
try:
# TODO: Add access control - verify user has admin access to tenant
# TODO: Add access control - verify user is owner/admin of tenant
result = await limit_service.validate_plan_upgrade(str(tenant_id), new_plan)
return result
@@ -241,9 +249,9 @@ async def upgrade_subscription_plan(
detail="Failed to upgrade subscription plan"
)
@router.get("/plans/available")
@router.get("/plans")
async def get_available_plans():
"""Get all available subscription plans with features and pricing"""
"""Get all available subscription plans with features and pricing - Public endpoint"""
try:
# This could be moved to a config service or database
@@ -294,7 +302,7 @@ async def get_available_plans():
"description": "Ideal para cadenas con obradores centrales",
"monthly_price": 399.0,
"max_users": -1, # Unlimited
"max_locations": -1, # Unlimited
"max_locations": -1, # Unlimited
"max_products": -1, # Unlimited
"features": {
"inventory_management": "multi_location",
@@ -321,4 +329,93 @@ async def get_available_plans():
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get available plans"
)
)
# New endpoints for payment processing during registration
@router.post("/subscriptions/register-with-subscription")
async def register_with_subscription(
user_data: Dict[str, Any],
plan_id: str = Query(..., description="Plan ID to subscribe to"),
payment_method_id: str = Query(..., description="Payment method ID from frontend"),
use_trial: bool = Query(False, description="Whether to use trial period for pilot users"),
payment_service: PaymentService = Depends(get_payment_service)
):
"""Process user registration with subscription creation"""
try:
result = await payment_service.process_registration_with_subscription(
user_data,
plan_id,
payment_method_id,
use_trial
)
return {
"success": True,
"message": "Registration and subscription created successfully",
"data": result
}
except Exception as e:
logger.error("Failed to register with subscription", error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to register with subscription"
)
@router.post("/subscriptions/{tenant_id}/cancel")
async def cancel_subscription(
tenant_id: UUID = Path(..., description="Tenant ID"),
current_user: Dict[str, Any] = Depends(get_current_user_dep),
payment_service: PaymentService = Depends(get_payment_service)
):
"""Cancel subscription for a tenant"""
try:
# TODO: Add access control - verify user is owner/admin of tenant
# In a real implementation, you would need to retrieve the subscription ID from the database
# For now, this is a placeholder
subscription_id = "sub_test" # This would come from the database
result = await payment_service.cancel_subscription(subscription_id)
return {
"success": True,
"message": "Subscription cancelled successfully",
"data": {
"subscription_id": result.id,
"status": result.status
}
}
except Exception as e:
logger.error("Failed to cancel subscription", error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to cancel subscription"
)
@router.get("/subscriptions/{tenant_id}/invoices")
async def get_invoices(
tenant_id: UUID = Path(..., description="Tenant ID"),
current_user: Dict[str, Any] = Depends(get_current_user_dep),
payment_service: PaymentService = Depends(get_payment_service)
):
"""Get invoices for a tenant"""
try:
# TODO: Add access control - verify user has access to tenant
# In a real implementation, you would need to retrieve the customer ID from the database
# For now, this is a placeholder
customer_id = "cus_test" # This would come from the database
invoices = await payment_service.get_invoices(customer_id)
return {
"success": True,
"data": invoices
}
except Exception as e:
logger.error("Failed to get invoices", error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get invoices"
)

View File

@@ -0,0 +1,133 @@
"""
Webhook endpoints for handling payment provider events
These endpoints receive events from payment providers like Stripe
"""
import structlog
from fastapi import APIRouter, Depends, HTTPException, status, Request
from typing import Dict, Any
from app.services.payment_service import PaymentService
from shared.auth.decorators import get_current_user_dep
from shared.monitoring.metrics import track_endpoint_metrics
logger = structlog.get_logger()
router = APIRouter()
def get_payment_service():
try:
return PaymentService()
except Exception as e:
logger.error("Failed to create payment service", error=str(e))
raise HTTPException(status_code=500, detail="Payment service initialization failed")
@router.post("/webhooks/stripe")
async def stripe_webhook(
request: Request,
payment_service: PaymentService = Depends(get_payment_service)
):
"""
Stripe webhook endpoint to handle payment events
"""
try:
# Get the payload
payload = await request.body()
sig_header = request.headers.get('stripe-signature')
# In a real implementation, you would verify the signature
# using the webhook signing secret
# event = stripe.Webhook.construct_event(
# payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
# )
# For now, we'll just log the event
logger.info("Received Stripe webhook", payload=payload.decode('utf-8'))
# Process different types of events
# event_type = event['type']
# event_data = event['data']['object']
# Example processing for different event types:
# if event_type == 'checkout.session.completed':
# # Handle successful checkout
# pass
# elif event_type == 'customer.subscription.created':
# # Handle new subscription
# pass
# elif event_type == 'customer.subscription.updated':
# # Handle subscription update
# pass
# elif event_type == 'customer.subscription.deleted':
# # Handle subscription cancellation
# pass
# elif event_type == 'invoice.payment_succeeded':
# # Handle successful payment
# pass
# elif event_type == 'invoice.payment_failed':
# # Handle failed payment
# pass
return {"success": True}
except Exception as e:
logger.error("Error processing Stripe webhook", error=str(e))
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook error"
)
@router.post("/webhooks/generic")
async def generic_webhook(
request: Request,
payment_service: PaymentService = Depends(get_payment_service)
):
"""
Generic webhook endpoint that can handle events from any payment provider
"""
try:
# Get the payload
payload = await request.json()
# Log the event for debugging
logger.info("Received generic webhook", payload=payload)
# Process the event based on its type
event_type = payload.get('type', 'unknown')
event_data = payload.get('data', {})
# Process different types of events
if event_type == 'subscription.created':
# Handle new subscription
logger.info("Processing new subscription event", subscription_id=event_data.get('id'))
# Update database with new subscription
elif event_type == 'subscription.updated':
# Handle subscription update
logger.info("Processing subscription update event", subscription_id=event_data.get('id'))
# Update database with subscription changes
elif event_type == 'subscription.deleted':
# Handle subscription cancellation
logger.info("Processing subscription cancellation event", subscription_id=event_data.get('id'))
# Update database with cancellation
elif event_type == 'payment.succeeded':
# Handle successful payment
logger.info("Processing successful payment event", payment_id=event_data.get('id'))
# Update payment status in database
elif event_type == 'payment.failed':
# Handle failed payment
logger.info("Processing failed payment event", payment_id=event_data.get('id'))
# Update payment status and notify user
elif event_type == 'invoice.created':
# Handle new invoice
logger.info("Processing new invoice event", invoice_id=event_data.get('id'))
# Store invoice information
else:
logger.warning("Unknown event type received", event_type=event_type)
return {"success": True}
except Exception as e:
logger.error("Error processing generic webhook", error=str(e))
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook error"
)

View File

@@ -66,5 +66,10 @@ class TenantSettings(BaseServiceSettings):
GDPR_COMPLIANCE_ENABLED: bool = True
DATA_EXPORT_ENABLED: bool = True
DATA_DELETION_ENABLED: bool = True
# Stripe Payment Configuration
STRIPE_PUBLISHABLE_KEY: str = os.getenv("STRIPE_PUBLISHABLE_KEY", "")
STRIPE_SECRET_KEY: str = os.getenv("STRIPE_SECRET_KEY", "")
STRIPE_WEBHOOK_SECRET: str = os.getenv("STRIPE_WEBHOOK_SECRET", "")
settings = TenantSettings()

View File

@@ -9,7 +9,7 @@ from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
from app.core.database import database_manager
from app.api import tenants, subscriptions
from app.api import tenants, subscriptions, webhooks
from shared.monitoring.logging import setup_logging
from shared.monitoring.metrics import MetricsCollector
@@ -42,6 +42,7 @@ app.add_middleware(
# Include routers
app.include_router(tenants.router, prefix="/api/v1", tags=["tenants"])
app.include_router(subscriptions.router, prefix="/api/v1", tags=["subscriptions"])
app.include_router(webhooks.router, prefix="/api/v1", tags=["webhooks"])
@app.on_event("startup")
async def startup_event():
@@ -94,4 +95,4 @@ async def metrics():
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -0,0 +1,152 @@
"""
Payment Service for handling subscription payments
This service abstracts payment provider interactions and makes the system payment-agnostic
"""
import structlog
from typing import Dict, Any, Optional
import uuid
from app.core.config import settings
from shared.clients.payment_client import PaymentProvider, PaymentCustomer, Subscription, PaymentMethod
from shared.clients.stripe_client import StripeProvider
from shared.database.base import create_database_manager
from app.repositories.subscription_repository import SubscriptionRepository
from app.models.tenants import Subscription as SubscriptionModel
logger = structlog.get_logger()
class PaymentService:
"""Service for handling payment provider interactions"""
def __init__(self):
# Initialize payment provider based on configuration
# For now, we'll use Stripe, but this can be swapped for other providers
self.payment_provider: PaymentProvider = StripeProvider(
api_key=settings.STRIPE_SECRET_KEY,
webhook_secret=settings.STRIPE_WEBHOOK_SECRET
)
# Initialize database components
self.database_manager = create_database_manager(settings.DATABASE_URL, "tenant-service")
self.subscription_repo = SubscriptionRepository(SubscriptionModel, None) # Will be set in methods
async def create_customer(self, user_data: Dict[str, Any]) -> PaymentCustomer:
"""Create a customer in the payment provider system"""
try:
customer_data = {
'email': user_data.get('email'),
'name': user_data.get('full_name'),
'metadata': {
'user_id': user_data.get('user_id'),
'tenant_id': user_data.get('tenant_id')
}
}
return await self.payment_provider.create_customer(customer_data)
except Exception as e:
logger.error("Failed to create customer in payment provider", error=str(e))
raise e
async def create_subscription(
self,
customer_id: str,
plan_id: str,
payment_method_id: str,
trial_period_days: Optional[int] = None
) -> Subscription:
"""Create a subscription for a customer"""
try:
return await self.payment_provider.create_subscription(
customer_id,
plan_id,
payment_method_id,
trial_period_days
)
except Exception as e:
logger.error("Failed to create subscription in payment provider", error=str(e))
raise e
async def process_registration_with_subscription(
self,
user_data: Dict[str, Any],
plan_id: str,
payment_method_id: str,
use_trial: bool = False
) -> Dict[str, Any]:
"""Process user registration with subscription creation"""
try:
# Create customer in payment provider
customer = await self.create_customer(user_data)
# Determine trial period
trial_period_days = None
if use_trial:
trial_period_days = 90 # 3 months trial for pilot users
# Create subscription
subscription = await self.create_subscription(
customer.id,
plan_id,
payment_method_id,
trial_period_days
)
# Save subscription to database
async with self.database_manager.get_session() as session:
self.subscription_repo.session = session
subscription_record = await self.subscription_repo.create({
'id': str(uuid.uuid4()),
'tenant_id': user_data.get('tenant_id'),
'customer_id': customer.id,
'subscription_id': subscription.id,
'plan_id': plan_id,
'status': subscription.status,
'current_period_start': subscription.current_period_start,
'current_period_end': subscription.current_period_end,
'created_at': subscription.created_at,
'trial_period_days': trial_period_days
})
return {
'customer_id': customer.id,
'subscription_id': subscription.id,
'status': subscription.status,
'trial_period_days': trial_period_days
}
except Exception as e:
logger.error("Failed to process registration with subscription", error=str(e))
raise e
async def cancel_subscription(self, subscription_id: str) -> Subscription:
"""Cancel a subscription in the payment provider"""
try:
return await self.payment_provider.cancel_subscription(subscription_id)
except Exception as e:
logger.error("Failed to cancel subscription in payment provider", error=str(e))
raise e
async def update_payment_method(self, customer_id: str, payment_method_id: str) -> PaymentMethod:
"""Update the payment method for a customer"""
try:
return await self.payment_provider.update_payment_method(customer_id, payment_method_id)
except Exception as e:
logger.error("Failed to update payment method in payment provider", error=str(e))
raise e
async def get_invoices(self, customer_id: str) -> list:
"""Get invoices for a customer from the payment provider"""
try:
return await self.payment_provider.get_invoices(customer_id)
except Exception as e:
logger.error("Failed to get invoices from payment provider", error=str(e))
raise e
async def get_subscription(self, subscription_id: str) -> Subscription:
"""Get subscription details from the payment provider"""
try:
return await self.payment_provider.get_subscription(subscription_id)
except Exception as e:
logger.error("Failed to get subscription from payment provider", error=str(e))
raise e

View File

@@ -13,4 +13,5 @@ python-json-logger==2.0.4
pytz==2023.3
python-logstash==0.4.8
structlog==23.2.0
python-jose[cryptography]==3.3.0
python-jose[cryptography]==3.3.0
stripe==7.4.0