Files
bakery-ia/services/pos/app/services/pos_integration_service.py
2025-08-16 15:00:36 +02:00

473 lines
20 KiB
Python

# services/pos/app/services/pos_integration_service.py
"""
POS Integration Service
Handles real-time sync and webhook processing for POS systems
"""
import asyncio
import json
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from uuid import UUID
import structlog
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.database import get_db_transaction
from app.models.pos_config import POSConfiguration
from app.models.pos_transaction import POSTransaction, POSTransactionItem
from app.models.pos_webhook import POSWebhookLog
from app.models.pos_sync import POSSyncLog
from app.integrations.base_pos_client import (
POSCredentials,
BasePOSClient,
POSTransaction as ClientPOSTransaction,
SyncResult
)
from app.integrations.square_client import SquarePOSClient
logger = structlog.get_logger()
class POSIntegrationService:
"""
Main service for POS integrations
Handles webhook processing, real-time sync, and data transformation
"""
def __init__(self):
self.supported_clients = {
"square": SquarePOSClient,
# "toast": ToastPOSClient, # To be implemented
# "lightspeed": LightspeedPOSClient, # To be implemented
}
def _create_pos_client(self, config: POSConfiguration) -> BasePOSClient:
"""Create POS client from configuration"""
if config.pos_system not in self.supported_clients:
raise ValueError(f"Unsupported POS system: {config.pos_system}")
# Decrypt credentials (simplified - in production use proper encryption)
credentials_data = json.loads(config.encrypted_credentials or "{}")
credentials = POSCredentials(
pos_system=config.pos_system,
environment=config.environment,
api_key=credentials_data.get("api_key"),
api_secret=credentials_data.get("api_secret"),
access_token=credentials_data.get("access_token"),
application_id=credentials_data.get("application_id"),
merchant_id=config.merchant_id,
location_id=config.location_id,
webhook_secret=config.webhook_secret
)
client_class = self.supported_clients[config.pos_system]
return client_class(credentials)
async def test_connection(self, config: POSConfiguration) -> Dict[str, Any]:
"""Test connection to POS system"""
try:
client = self._create_pos_client(config)
success, message = await client.test_connection()
# Update health status in database
async with get_db_transaction() as session:
config.health_status = "healthy" if success else "unhealthy"
config.health_message = message
config.last_health_check_at = datetime.utcnow()
config.is_connected = success
session.add(config)
await session.commit()
return {
"success": success,
"message": message,
"tested_at": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error("Connection test failed", error=str(e), config_id=config.id)
# Update health status
async with get_db_transaction() as session:
config.health_status = "unhealthy"
config.health_message = f"Test failed: {str(e)}"
config.last_health_check_at = datetime.utcnow()
config.is_connected = False
session.add(config)
await session.commit()
return {
"success": False,
"message": f"Connection test failed: {str(e)}",
"tested_at": datetime.utcnow().isoformat()
}
async def process_webhook(
self,
pos_system: str,
payload: bytes,
headers: Dict[str, str],
query_params: Dict[str, str],
method: str,
url_path: str,
source_ip: str
) -> Dict[str, Any]:
"""Process incoming webhook from POS system"""
webhook_log = None
try:
# Parse payload
raw_payload = payload.decode('utf-8')
payload_data = json.loads(raw_payload) if raw_payload else {}
# Extract webhook type and event info
webhook_type = self._extract_webhook_type(pos_system, payload_data)
event_id = self._extract_event_id(pos_system, payload_data)
# Create webhook log
async with get_db_transaction() as session:
webhook_log = POSWebhookLog(
pos_system=pos_system,
webhook_type=webhook_type or "unknown",
method=method,
url_path=url_path,
query_params=query_params,
headers=headers,
raw_payload=raw_payload,
payload_size=len(payload),
content_type=headers.get("content-type"),
signature=headers.get("x-square-signature") or headers.get("x-toast-signature"),
source_ip=source_ip,
status="received",
event_id=event_id,
priority="normal"
)
session.add(webhook_log)
await session.commit()
await session.refresh(webhook_log)
# Find relevant POS configuration
config = await self._find_pos_config_for_webhook(pos_system, payload_data)
if not config:
logger.warning("No POS configuration found for webhook", pos_system=pos_system)
await self._update_webhook_status(webhook_log.id, "failed", "No configuration found")
return {"status": "error", "message": "No configuration found"}
# Update webhook log with tenant info
async with get_db_transaction() as session:
webhook_log.tenant_id = config.tenant_id
session.add(webhook_log)
await session.commit()
# Verify webhook signature
if config.webhook_secret:
client = self._create_pos_client(config)
signature = webhook_log.signature or ""
is_valid = client.verify_webhook_signature(payload, signature)
async with get_db_transaction() as session:
webhook_log.is_signature_valid = is_valid
session.add(webhook_log)
await session.commit()
if not is_valid:
logger.warning("Invalid webhook signature", config_id=config.id)
await self._update_webhook_status(webhook_log.id, "failed", "Invalid signature")
return {"status": "error", "message": "Invalid signature"}
# Process webhook payload
await self._update_webhook_status(webhook_log.id, "processing")
result = await self._process_webhook_payload(config, payload_data, webhook_log)
if result["success"]:
await self._update_webhook_status(webhook_log.id, "processed", result.get("message"))
return {"status": "success", "message": result.get("message", "Processed successfully")}
else:
await self._update_webhook_status(webhook_log.id, "failed", result.get("error"))
return {"status": "error", "message": result.get("error", "Processing failed")}
except Exception as e:
logger.error("Webhook processing failed", error=str(e), pos_system=pos_system)
if webhook_log:
await self._update_webhook_status(webhook_log.id, "failed", f"Processing error: {str(e)}")
return {"status": "error", "message": "Processing failed"}
async def _process_webhook_payload(
self,
config: POSConfiguration,
payload_data: Dict[str, Any],
webhook_log: POSWebhookLog
) -> Dict[str, Any]:
"""Process webhook payload and extract transaction data"""
try:
client = self._create_pos_client(config)
# Parse webhook into transaction
client_transaction = client.parse_webhook_payload(payload_data)
if not client_transaction:
return {"success": False, "error": "No transaction data in webhook"}
# Convert to database model and save
transaction = await self._save_pos_transaction(
config,
client_transaction,
webhook_log.id
)
if transaction:
# Queue for sync to sales service
await self._queue_sales_sync(transaction)
return {
"success": True,
"message": f"Transaction {transaction.external_transaction_id} processed",
"transaction_id": str(transaction.id)
}
else:
return {"success": False, "error": "Failed to save transaction"}
except Exception as e:
logger.error("Webhook payload processing failed", error=str(e), config_id=config.id)
return {"success": False, "error": str(e)}
async def _save_pos_transaction(
self,
config: POSConfiguration,
client_transaction: ClientPOSTransaction,
webhook_log_id: Optional[UUID] = None
) -> Optional[POSTransaction]:
"""Save POS transaction to database"""
try:
async with get_db_transaction() as session:
# Check for duplicate
existing = await session.execute(
"SELECT id FROM pos_transactions WHERE external_transaction_id = :ext_id AND pos_config_id = :config_id",
{
"ext_id": client_transaction.external_id,
"config_id": config.id
}
)
if existing.first():
logger.info("Duplicate transaction detected",
external_id=client_transaction.external_id)
return None
# Create transaction record
transaction = POSTransaction(
tenant_id=config.tenant_id,
pos_config_id=config.id,
pos_system=config.pos_system,
external_transaction_id=client_transaction.external_id,
external_order_id=client_transaction.external_order_id,
transaction_type=client_transaction.transaction_type,
status=client_transaction.status,
subtotal=client_transaction.subtotal,
tax_amount=client_transaction.tax_amount,
tip_amount=client_transaction.tip_amount,
discount_amount=client_transaction.discount_amount,
total_amount=client_transaction.total_amount,
currency=client_transaction.currency,
payment_method=client_transaction.payment_method,
payment_status=client_transaction.payment_status,
transaction_date=client_transaction.transaction_date,
pos_created_at=client_transaction.transaction_date,
location_id=client_transaction.location_id,
location_name=client_transaction.location_name,
staff_id=client_transaction.staff_id,
staff_name=client_transaction.staff_name,
customer_id=client_transaction.customer_id,
customer_email=client_transaction.customer_email,
order_type=client_transaction.order_type,
table_number=client_transaction.table_number,
receipt_number=client_transaction.receipt_number,
raw_data=client_transaction.raw_data,
is_processed=True
)
session.add(transaction)
await session.flush() # Get the ID
# Create transaction items
for client_item in client_transaction.items:
item = POSTransactionItem(
transaction_id=transaction.id,
tenant_id=config.tenant_id,
external_item_id=client_item.external_id,
sku=client_item.sku,
product_name=client_item.name,
product_category=client_item.category,
quantity=client_item.quantity,
unit_price=client_item.unit_price,
total_price=client_item.total_price,
discount_amount=client_item.discount_amount,
tax_amount=client_item.tax_amount,
modifiers=client_item.modifiers,
raw_data=client_item.raw_data
)
session.add(item)
await session.commit()
await session.refresh(transaction)
logger.info("Transaction saved",
transaction_id=transaction.id,
external_id=client_transaction.external_id)
return transaction
except Exception as e:
logger.error("Failed to save transaction", error=str(e))
return None
async def _queue_sales_sync(self, transaction: POSTransaction):
"""Queue transaction for sync to sales service"""
try:
# Send transaction data to sales service
sales_data = {
"product_name": f"POS Transaction {transaction.external_transaction_id}",
"quantity_sold": 1,
"unit_price": float(transaction.total_amount),
"total_revenue": float(transaction.total_amount),
"sale_date": transaction.transaction_date.isoformat(),
"sales_channel": f"{transaction.pos_system}_pos",
"location_id": transaction.location_id,
"source": "pos_integration",
"external_transaction_id": transaction.external_transaction_id,
"payment_method": transaction.payment_method,
"raw_pos_data": transaction.raw_data
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{settings.SALES_SERVICE_URL}/api/v1/tenants/{transaction.tenant_id}/sales",
json=sales_data,
timeout=30.0
)
if response.status_code == 200:
# Update transaction as synced
async with get_db_transaction() as session:
transaction.is_synced_to_sales = True
transaction.sync_completed_at = datetime.utcnow()
session.add(transaction)
await session.commit()
logger.info("Transaction synced to sales service",
transaction_id=transaction.id)
else:
logger.error("Failed to sync to sales service",
status_code=response.status_code,
transaction_id=transaction.id)
except Exception as e:
logger.error("Sales sync failed", error=str(e), transaction_id=transaction.id)
def _extract_webhook_type(self, pos_system: str, payload: Dict[str, Any]) -> Optional[str]:
"""Extract webhook type from payload"""
if pos_system == "square":
return payload.get("type")
elif pos_system == "toast":
return payload.get("eventType")
elif pos_system == "lightspeed":
return payload.get("action")
return None
def _extract_event_id(self, pos_system: str, payload: Dict[str, Any]) -> Optional[str]:
"""Extract event ID from payload"""
if pos_system == "square":
return payload.get("event_id")
elif pos_system == "toast":
return payload.get("guid")
elif pos_system == "lightspeed":
return payload.get("id")
return None
async def _find_pos_config_for_webhook(
self,
pos_system: str,
payload: Dict[str, Any]
) -> Optional[POSConfiguration]:
"""Find POS configuration that matches the webhook"""
# Extract location ID or merchant ID from payload
location_id = self._extract_location_id(pos_system, payload)
merchant_id = self._extract_merchant_id(pos_system, payload)
async with get_db_transaction() as session:
query = """
SELECT * FROM pos_configurations
WHERE pos_system = :pos_system
AND is_active = true
"""
params = {"pos_system": pos_system}
if location_id:
query += " AND location_id = :location_id"
params["location_id"] = location_id
elif merchant_id:
query += " AND merchant_id = :merchant_id"
params["merchant_id"] = merchant_id
query += " LIMIT 1"
result = await session.execute(query, params)
row = result.first()
if row:
return POSConfiguration(**row._asdict())
return None
def _extract_location_id(self, pos_system: str, payload: Dict[str, Any]) -> Optional[str]:
"""Extract location ID from webhook payload"""
if pos_system == "square":
# Square includes location_id in various places
return (payload.get("data", {})
.get("object", {})
.get("order", {})
.get("location_id"))
return None
def _extract_merchant_id(self, pos_system: str, payload: Dict[str, Any]) -> Optional[str]:
"""Extract merchant ID from webhook payload"""
if pos_system == "toast":
return payload.get("restaurantGuid")
return None
async def _update_webhook_status(
self,
webhook_id: UUID,
status: str,
message: Optional[str] = None
):
"""Update webhook log status"""
try:
async with get_db_transaction() as session:
webhook_log = await session.get(POSWebhookLog, webhook_id)
if webhook_log:
webhook_log.status = status
webhook_log.processing_completed_at = datetime.utcnow()
if message:
webhook_log.error_message = message
session.add(webhook_log)
await session.commit()
except Exception as e:
logger.error("Failed to update webhook status", error=str(e), webhook_id=webhook_id)