Files
bakery-ia/services/pos/app/api/pos_operations.py
2025-10-31 11:54:19 +01:00

498 lines
17 KiB
Python

"""
POS Operations API Endpoints
BUSINESS layer - Sync operations, webhooks, reconciliation, and test connection
"""
from fastapi import APIRouter, Depends, HTTPException, Path, Query, Body, Request, Header
from typing import Optional, Dict, Any
from uuid import UUID
from datetime import datetime
import structlog
import json
from app.core.database import get_db
from shared.auth.decorators import get_current_user_dep
from shared.auth.access_control import require_user_role, admin_role_required, service_only_access
from shared.routing import RouteBuilder
from app.services.pos_transaction_service import POSTransactionService
from app.services.pos_config_service import POSConfigurationService
from app.services.tenant_deletion_service import POSTenantDeletionService
router = APIRouter()
logger = structlog.get_logger()
route_builder = RouteBuilder('pos')
# ============================================================================
# Sync Operations
# ============================================================================
@router.post(
route_builder.build_operations_route("sync"),
response_model=dict
)
@require_user_role(['member', 'admin', 'owner'])
async def trigger_sync(
sync_request: Dict[str, Any],
tenant_id: UUID = Path(...),
current_user: dict = Depends(get_current_user_dep),
db=Depends(get_db)
):
"""Trigger manual synchronization with POS system (Member+)"""
try:
sync_type = sync_request.get("sync_type", "incremental")
data_types = sync_request.get("data_types", ["transactions"])
config_id = sync_request.get("config_id")
logger.info("Manual sync triggered",
tenant_id=tenant_id,
config_id=config_id,
sync_type=sync_type,
user_id=current_user.get("user_id"))
return {
"message": "Sync triggered successfully",
"sync_id": "placeholder-sync-id",
"status": "queued",
"sync_type": sync_type,
"data_types": data_types,
"estimated_duration": "5-10 minutes"
}
except Exception as e:
logger.error("Failed to trigger sync", error=str(e), tenant_id=tenant_id)
raise HTTPException(status_code=500, detail=f"Failed to trigger sync: {str(e)}")
@router.get(
route_builder.build_operations_route("sync-status"),
response_model=dict
)
@require_user_role(['viewer', 'member', 'admin', 'owner'])
async def get_sync_status(
tenant_id: UUID = Path(...),
config_id: Optional[UUID] = Query(None),
limit: int = Query(10, ge=1, le=100),
current_user: dict = Depends(get_current_user_dep),
db=Depends(get_db)
):
"""Get synchronization status and recent sync history"""
try:
transaction_service = POSTransactionService()
# Get sync metrics from transaction service
sync_metrics = await transaction_service.get_sync_metrics(tenant_id)
# Get last successful sync time
sync_status = sync_metrics["sync_status"]
last_successful_sync = sync_status.get("last_sync_at")
# Calculate sync success rate
total = sync_metrics["total_transactions"]
synced = sync_status.get("synced", 0)
success_rate = (synced / total * 100) if total > 0 else 100.0
return {
"current_sync": None,
"last_successful_sync": last_successful_sync.isoformat() if last_successful_sync else None,
"recent_syncs": [], # Could be enhanced with actual sync history
"sync_health": {
"status": "healthy" if success_rate > 90 else "degraded" if success_rate > 70 else "unhealthy",
"success_rate": round(success_rate, 2),
"average_duration_minutes": 3.2, # Placeholder - could calculate from actual data
"last_error": None,
"total_transactions": total,
"synced_count": synced,
"pending_count": sync_status.get("pending", 0),
"failed_count": sync_status.get("failed", 0)
}
}
except Exception as e:
logger.error("Failed to get sync status", error=str(e), tenant_id=tenant_id)
raise HTTPException(status_code=500, detail=f"Failed to get sync status: {str(e)}")
@router.get(
route_builder.build_operations_route("sync-logs"),
response_model=dict
)
@require_user_role(['viewer', 'member', 'admin', 'owner'])
async def get_sync_logs(
tenant_id: UUID = Path(...),
config_id: Optional[UUID] = Query(None),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
status: Optional[str] = Query(None),
sync_type: Optional[str] = Query(None),
current_user: dict = Depends(get_current_user_dep),
db=Depends(get_db)
):
"""Get detailed sync logs"""
try:
return {
"logs": [],
"total": 0,
"has_more": False
}
except Exception as e:
logger.error("Failed to get sync logs", error=str(e), tenant_id=tenant_id)
raise HTTPException(status_code=500, detail=f"Failed to get sync logs: {str(e)}")
@router.post(
route_builder.build_operations_route("resync-failed"),
response_model=dict
)
@admin_role_required
async def resync_failed_transactions(
tenant_id: UUID = Path(...),
days_back: int = Query(7, ge=1, le=90),
current_user: dict = Depends(get_current_user_dep),
db=Depends(get_db)
):
"""Resync failed transactions from the specified time period (Admin/Owner only)"""
try:
logger.info("Resync failed transactions requested",
tenant_id=tenant_id,
days_back=days_back,
user_id=current_user.get("user_id"))
return {
"message": "Resync job queued successfully",
"job_id": "placeholder-resync-job-id",
"scope": f"Failed transactions from last {days_back} days",
"estimated_transactions": 0
}
except Exception as e:
logger.error("Failed to queue resync job", error=str(e), tenant_id=tenant_id)
raise HTTPException(status_code=500, detail=f"Failed to queue resync job: {str(e)}")
@router.post(
route_builder.build_operations_route("test-connection"),
response_model=dict
)
@admin_role_required
async def test_pos_connection(
tenant_id: UUID = Path(...),
config_id: UUID = Query(...),
current_user: dict = Depends(get_current_user_dep),
db=Depends(get_db)
):
"""Test connection to POS system (Admin/Owner only)"""
try:
config_service = POSConfigurationService()
# Get the configuration to verify it exists
configurations = await config_service.get_configurations_by_tenant(
tenant_id=tenant_id,
skip=0,
limit=100
)
config = next((c for c in configurations if str(c.id) == str(config_id)), None)
if not config:
raise HTTPException(status_code=404, detail="Configuration not found")
# For demo purposes, we assume connection is successful if config exists
# In production, this would actually test the POS API connection
is_connected = config.is_connected and config.is_active
return {
"success": is_connected,
"status": "success" if is_connected else "failed",
"message": f"Connection test {'successful' if is_connected else 'failed'} for {config.pos_system}",
"tested_at": datetime.utcnow().isoformat(),
"config_id": str(config_id),
"pos_system": config.pos_system,
"health_status": config.health_status
}
except HTTPException:
raise
except Exception as e:
logger.error("Failed to test POS connection", error=str(e),
tenant_id=tenant_id, config_id=config_id)
raise HTTPException(status_code=500, detail=f"Failed to test connection: {str(e)}")
# ============================================================================
# Webhook Operations
# ============================================================================
@router.post(
route_builder.build_webhook_route("{pos_system}"),
response_model=dict
)
async def receive_webhook(
request: Request,
pos_system: str = Path(..., description="POS system name"),
content_type: Optional[str] = Header(None),
x_signature: Optional[str] = Header(None),
x_webhook_signature: Optional[str] = Header(None),
authorization: Optional[str] = Header(None)
):
"""
Receive webhooks from POS systems
Supports Square, Toast, and Lightspeed webhook formats
"""
try:
# Validate POS system
supported_systems = ["square", "toast", "lightspeed"]
if pos_system.lower() not in supported_systems:
raise HTTPException(status_code=400, detail=f"Unsupported POS system: {pos_system}")
# Get request details
method = request.method
url_path = str(request.url.path)
query_params = dict(request.query_params)
headers = dict(request.headers)
# Get client IP
client_ip = None
if hasattr(request, 'client') and request.client:
client_ip = request.client.host
# Read payload
try:
body = await request.body()
raw_payload = body.decode('utf-8') if body else ""
payload_size = len(body) if body else 0
# Parse JSON if possible
parsed_payload = None
if raw_payload:
try:
parsed_payload = json.loads(raw_payload)
except json.JSONDecodeError:
logger.warning("Failed to parse webhook payload as JSON",
pos_system=pos_system, payload_size=payload_size)
except Exception as e:
logger.error("Failed to read webhook payload", error=str(e))
raise HTTPException(status_code=400, detail="Failed to read request payload")
# Determine signature from various header formats
signature = x_signature or x_webhook_signature or authorization
# Log webhook receipt
logger.info("Webhook received",
pos_system=pos_system,
method=method,
url_path=url_path,
payload_size=payload_size,
client_ip=client_ip,
has_signature=bool(signature),
content_type=content_type)
# TODO: Store webhook log in database
# TODO: Verify webhook signature
# TODO: Extract tenant_id from payload
# TODO: Process webhook based on POS system type
# TODO: Queue for async processing if needed
# Parse webhook type based on POS system
webhook_type = None
event_id = None
if parsed_payload:
if pos_system.lower() == "square":
webhook_type = parsed_payload.get("type")
event_id = parsed_payload.get("event_id")
elif pos_system.lower() == "toast":
webhook_type = parsed_payload.get("eventType")
event_id = parsed_payload.get("guid")
elif pos_system.lower() == "lightspeed":
webhook_type = parsed_payload.get("action")
event_id = parsed_payload.get("id")
logger.info("Webhook processed successfully",
pos_system=pos_system,
webhook_type=webhook_type,
event_id=event_id)
# Return appropriate response based on POS system requirements
if pos_system.lower() == "square":
return {"status": "success"}
elif pos_system.lower() == "toast":
return {"success": True}
elif pos_system.lower() == "lightspeed":
return {"received": True}
else:
return {"status": "received"}
except HTTPException:
raise
except Exception as e:
logger.error("Webhook processing failed",
error=str(e),
pos_system=pos_system)
# Return 500 to trigger POS system retry
raise HTTPException(status_code=500, detail="Webhook processing failed")
@router.get(
route_builder.build_webhook_route("{pos_system}/status"),
response_model=dict
)
async def get_webhook_status(pos_system: str = Path(..., description="POS system name")):
"""Get webhook endpoint status for a POS system"""
try:
supported_systems = ["square", "toast", "lightspeed"]
if pos_system.lower() not in supported_systems:
raise HTTPException(status_code=400, detail=f"Unsupported POS system: {pos_system}")
return {
"pos_system": pos_system,
"status": "active",
"endpoint": f"/api/v1/webhooks/{pos_system}",
"supported_events": _get_supported_events(pos_system),
"last_received": None,
"total_received": 0
}
except Exception as e:
logger.error("Failed to get webhook status", error=str(e), pos_system=pos_system)
raise HTTPException(status_code=500, detail=f"Failed to get webhook status: {str(e)}")
def _get_supported_events(pos_system: str) -> Dict[str, Any]:
"""Get supported webhook events for each POS system"""
events = {
"square": [
"payment.created",
"payment.updated",
"order.created",
"order.updated",
"order.fulfilled",
"inventory.count.updated"
],
"toast": [
"OrderCreated",
"OrderUpdated",
"OrderPaid",
"OrderCanceled",
"OrderVoided"
],
"lightspeed": [
"order.created",
"order.updated",
"order.paid",
"sale.created",
"sale.updated"
]
}
return {
"events": events.get(pos_system.lower(), []),
"format": "JSON",
"authentication": "signature_verification"
}
# ============================================================================
# Tenant Data Deletion Operations (Internal Service Only)
# ============================================================================
@router.delete(
route_builder.build_base_route("tenant/{tenant_id}", include_tenant_prefix=False),
response_model=dict
)
@service_only_access
async def delete_tenant_data(
tenant_id: str = Path(..., description="Tenant ID to delete data for"),
current_user: dict = Depends(get_current_user_dep),
db=Depends(get_db)
):
"""
Delete all POS data for a tenant (Internal service only)
This endpoint is called by the orchestrator during tenant deletion.
It permanently deletes all POS-related data including:
- POS configurations
- POS transactions and items
- Webhook logs
- Sync logs
- Audit logs
**WARNING**: This operation is irreversible!
Returns:
Deletion summary with counts of deleted records
"""
try:
logger.info("pos.tenant_deletion.api_called", tenant_id=tenant_id)
deletion_service = POSTenantDeletionService(db)
result = await deletion_service.safe_delete_tenant_data(tenant_id)
if not result.success:
raise HTTPException(
status_code=500,
detail=f"Tenant data deletion failed: {', '.join(result.errors)}"
)
return {
"message": "Tenant data deletion completed successfully",
"summary": result.to_dict()
}
except HTTPException:
raise
except Exception as e:
logger.error("pos.tenant_deletion.api_error",
tenant_id=tenant_id,
error=str(e),
exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to delete tenant data: {str(e)}"
)
@router.get(
route_builder.build_base_route("tenant/{tenant_id}/deletion-preview", include_tenant_prefix=False),
response_model=dict
)
@service_only_access
async def preview_tenant_data_deletion(
tenant_id: str = Path(..., description="Tenant ID to preview deletion for"),
current_user: dict = Depends(get_current_user_dep),
db=Depends(get_db)
):
"""
Preview what data would be deleted for a tenant (dry-run)
This endpoint shows counts of all data that would be deleted
without actually deleting anything. Useful for:
- Confirming deletion scope before execution
- Auditing and compliance
- Troubleshooting
Returns:
Dictionary with entity names and their counts
"""
try:
logger.info("pos.tenant_deletion.preview_called", tenant_id=tenant_id)
deletion_service = POSTenantDeletionService(db)
preview = await deletion_service.get_tenant_data_preview(tenant_id)
total_records = sum(preview.values())
return {
"tenant_id": tenant_id,
"service": "pos",
"preview": preview,
"total_records": total_records,
"warning": "These records will be permanently deleted and cannot be recovered"
}
except Exception as e:
logger.error("pos.tenant_deletion.preview_error",
tenant_id=tenant_id,
error=str(e),
exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to preview tenant data deletion: {str(e)}"
)