Add POS service
This commit is contained in:
1
services/pos/app/services/__init__.py
Normal file
1
services/pos/app/services/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# POS Services
|
||||
473
services/pos/app/services/pos_integration_service.py
Normal file
473
services/pos/app/services/pos_integration_service.py
Normal file
@@ -0,0 +1,473 @@
|
||||
# services/pos/app/services/pos_integration_service.py
|
||||
"""
|
||||
POS Integration Service
|
||||
Handles real-time sync and webhook processing for POS systems
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Dict, List, Optional, Any
|
||||
from datetime import datetime, timedelta
|
||||
from uuid import UUID
|
||||
|
||||
import structlog
|
||||
import httpx
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.database import get_db_transaction
|
||||
from app.models.pos_config import POSConfiguration
|
||||
from app.models.pos_transaction import POSTransaction, POSTransactionItem
|
||||
from app.models.pos_webhook import POSWebhookLog
|
||||
from app.models.pos_sync import POSSyncLog
|
||||
from app.integrations.base_pos_client import (
|
||||
POSCredentials,
|
||||
BasePOSClient,
|
||||
POSTransaction as ClientPOSTransaction,
|
||||
SyncResult
|
||||
)
|
||||
from app.integrations.square_client import SquarePOSClient
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
class POSIntegrationService:
|
||||
"""
|
||||
Main service for POS integrations
|
||||
Handles webhook processing, real-time sync, and data transformation
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.supported_clients = {
|
||||
"square": SquarePOSClient,
|
||||
# "toast": ToastPOSClient, # To be implemented
|
||||
# "lightspeed": LightspeedPOSClient, # To be implemented
|
||||
}
|
||||
|
||||
def _create_pos_client(self, config: POSConfiguration) -> BasePOSClient:
|
||||
"""Create POS client from configuration"""
|
||||
|
||||
if config.pos_system not in self.supported_clients:
|
||||
raise ValueError(f"Unsupported POS system: {config.pos_system}")
|
||||
|
||||
# Decrypt credentials (simplified - in production use proper encryption)
|
||||
credentials_data = json.loads(config.encrypted_credentials or "{}")
|
||||
|
||||
credentials = POSCredentials(
|
||||
pos_system=config.pos_system,
|
||||
environment=config.environment,
|
||||
api_key=credentials_data.get("api_key"),
|
||||
api_secret=credentials_data.get("api_secret"),
|
||||
access_token=credentials_data.get("access_token"),
|
||||
application_id=credentials_data.get("application_id"),
|
||||
merchant_id=config.merchant_id,
|
||||
location_id=config.location_id,
|
||||
webhook_secret=config.webhook_secret
|
||||
)
|
||||
|
||||
client_class = self.supported_clients[config.pos_system]
|
||||
return client_class(credentials)
|
||||
|
||||
async def test_connection(self, config: POSConfiguration) -> Dict[str, Any]:
|
||||
"""Test connection to POS system"""
|
||||
try:
|
||||
client = self._create_pos_client(config)
|
||||
success, message = await client.test_connection()
|
||||
|
||||
# Update health status in database
|
||||
async with get_db_transaction() as session:
|
||||
config.health_status = "healthy" if success else "unhealthy"
|
||||
config.health_message = message
|
||||
config.last_health_check_at = datetime.utcnow()
|
||||
config.is_connected = success
|
||||
|
||||
session.add(config)
|
||||
await session.commit()
|
||||
|
||||
return {
|
||||
"success": success,
|
||||
"message": message,
|
||||
"tested_at": datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Connection test failed", error=str(e), config_id=config.id)
|
||||
|
||||
# Update health status
|
||||
async with get_db_transaction() as session:
|
||||
config.health_status = "unhealthy"
|
||||
config.health_message = f"Test failed: {str(e)}"
|
||||
config.last_health_check_at = datetime.utcnow()
|
||||
config.is_connected = False
|
||||
|
||||
session.add(config)
|
||||
await session.commit()
|
||||
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"Connection test failed: {str(e)}",
|
||||
"tested_at": datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
async def process_webhook(
|
||||
self,
|
||||
pos_system: str,
|
||||
payload: bytes,
|
||||
headers: Dict[str, str],
|
||||
query_params: Dict[str, str],
|
||||
method: str,
|
||||
url_path: str,
|
||||
source_ip: str
|
||||
) -> Dict[str, Any]:
|
||||
"""Process incoming webhook from POS system"""
|
||||
|
||||
webhook_log = None
|
||||
|
||||
try:
|
||||
# Parse payload
|
||||
raw_payload = payload.decode('utf-8')
|
||||
payload_data = json.loads(raw_payload) if raw_payload else {}
|
||||
|
||||
# Extract webhook type and event info
|
||||
webhook_type = self._extract_webhook_type(pos_system, payload_data)
|
||||
event_id = self._extract_event_id(pos_system, payload_data)
|
||||
|
||||
# Create webhook log
|
||||
async with get_db_transaction() as session:
|
||||
webhook_log = POSWebhookLog(
|
||||
pos_system=pos_system,
|
||||
webhook_type=webhook_type or "unknown",
|
||||
method=method,
|
||||
url_path=url_path,
|
||||
query_params=query_params,
|
||||
headers=headers,
|
||||
raw_payload=raw_payload,
|
||||
payload_size=len(payload),
|
||||
content_type=headers.get("content-type"),
|
||||
signature=headers.get("x-square-signature") or headers.get("x-toast-signature"),
|
||||
source_ip=source_ip,
|
||||
status="received",
|
||||
event_id=event_id,
|
||||
priority="normal"
|
||||
)
|
||||
|
||||
session.add(webhook_log)
|
||||
await session.commit()
|
||||
await session.refresh(webhook_log)
|
||||
|
||||
# Find relevant POS configuration
|
||||
config = await self._find_pos_config_for_webhook(pos_system, payload_data)
|
||||
|
||||
if not config:
|
||||
logger.warning("No POS configuration found for webhook", pos_system=pos_system)
|
||||
await self._update_webhook_status(webhook_log.id, "failed", "No configuration found")
|
||||
return {"status": "error", "message": "No configuration found"}
|
||||
|
||||
# Update webhook log with tenant info
|
||||
async with get_db_transaction() as session:
|
||||
webhook_log.tenant_id = config.tenant_id
|
||||
session.add(webhook_log)
|
||||
await session.commit()
|
||||
|
||||
# Verify webhook signature
|
||||
if config.webhook_secret:
|
||||
client = self._create_pos_client(config)
|
||||
signature = webhook_log.signature or ""
|
||||
is_valid = client.verify_webhook_signature(payload, signature)
|
||||
|
||||
async with get_db_transaction() as session:
|
||||
webhook_log.is_signature_valid = is_valid
|
||||
session.add(webhook_log)
|
||||
await session.commit()
|
||||
|
||||
if not is_valid:
|
||||
logger.warning("Invalid webhook signature", config_id=config.id)
|
||||
await self._update_webhook_status(webhook_log.id, "failed", "Invalid signature")
|
||||
return {"status": "error", "message": "Invalid signature"}
|
||||
|
||||
# Process webhook payload
|
||||
await self._update_webhook_status(webhook_log.id, "processing")
|
||||
|
||||
result = await self._process_webhook_payload(config, payload_data, webhook_log)
|
||||
|
||||
if result["success"]:
|
||||
await self._update_webhook_status(webhook_log.id, "processed", result.get("message"))
|
||||
return {"status": "success", "message": result.get("message", "Processed successfully")}
|
||||
else:
|
||||
await self._update_webhook_status(webhook_log.id, "failed", result.get("error"))
|
||||
return {"status": "error", "message": result.get("error", "Processing failed")}
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Webhook processing failed", error=str(e), pos_system=pos_system)
|
||||
|
||||
if webhook_log:
|
||||
await self._update_webhook_status(webhook_log.id, "failed", f"Processing error: {str(e)}")
|
||||
|
||||
return {"status": "error", "message": "Processing failed"}
|
||||
|
||||
async def _process_webhook_payload(
|
||||
self,
|
||||
config: POSConfiguration,
|
||||
payload_data: Dict[str, Any],
|
||||
webhook_log: POSWebhookLog
|
||||
) -> Dict[str, Any]:
|
||||
"""Process webhook payload and extract transaction data"""
|
||||
|
||||
try:
|
||||
client = self._create_pos_client(config)
|
||||
|
||||
# Parse webhook into transaction
|
||||
client_transaction = client.parse_webhook_payload(payload_data)
|
||||
|
||||
if not client_transaction:
|
||||
return {"success": False, "error": "No transaction data in webhook"}
|
||||
|
||||
# Convert to database model and save
|
||||
transaction = await self._save_pos_transaction(
|
||||
config,
|
||||
client_transaction,
|
||||
webhook_log.id
|
||||
)
|
||||
|
||||
if transaction:
|
||||
# Queue for sync to sales service
|
||||
await self._queue_sales_sync(transaction)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Transaction {transaction.external_transaction_id} processed",
|
||||
"transaction_id": str(transaction.id)
|
||||
}
|
||||
else:
|
||||
return {"success": False, "error": "Failed to save transaction"}
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Webhook payload processing failed", error=str(e), config_id=config.id)
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def _save_pos_transaction(
|
||||
self,
|
||||
config: POSConfiguration,
|
||||
client_transaction: ClientPOSTransaction,
|
||||
webhook_log_id: Optional[UUID] = None
|
||||
) -> Optional[POSTransaction]:
|
||||
"""Save POS transaction to database"""
|
||||
|
||||
try:
|
||||
async with get_db_transaction() as session:
|
||||
# Check for duplicate
|
||||
existing = await session.execute(
|
||||
"SELECT id FROM pos_transactions WHERE external_transaction_id = :ext_id AND pos_config_id = :config_id",
|
||||
{
|
||||
"ext_id": client_transaction.external_id,
|
||||
"config_id": config.id
|
||||
}
|
||||
)
|
||||
|
||||
if existing.first():
|
||||
logger.info("Duplicate transaction detected",
|
||||
external_id=client_transaction.external_id)
|
||||
return None
|
||||
|
||||
# Create transaction record
|
||||
transaction = POSTransaction(
|
||||
tenant_id=config.tenant_id,
|
||||
pos_config_id=config.id,
|
||||
pos_system=config.pos_system,
|
||||
external_transaction_id=client_transaction.external_id,
|
||||
external_order_id=client_transaction.external_order_id,
|
||||
transaction_type=client_transaction.transaction_type,
|
||||
status=client_transaction.status,
|
||||
subtotal=client_transaction.subtotal,
|
||||
tax_amount=client_transaction.tax_amount,
|
||||
tip_amount=client_transaction.tip_amount,
|
||||
discount_amount=client_transaction.discount_amount,
|
||||
total_amount=client_transaction.total_amount,
|
||||
currency=client_transaction.currency,
|
||||
payment_method=client_transaction.payment_method,
|
||||
payment_status=client_transaction.payment_status,
|
||||
transaction_date=client_transaction.transaction_date,
|
||||
pos_created_at=client_transaction.transaction_date,
|
||||
location_id=client_transaction.location_id,
|
||||
location_name=client_transaction.location_name,
|
||||
staff_id=client_transaction.staff_id,
|
||||
staff_name=client_transaction.staff_name,
|
||||
customer_id=client_transaction.customer_id,
|
||||
customer_email=client_transaction.customer_email,
|
||||
order_type=client_transaction.order_type,
|
||||
table_number=client_transaction.table_number,
|
||||
receipt_number=client_transaction.receipt_number,
|
||||
raw_data=client_transaction.raw_data,
|
||||
is_processed=True
|
||||
)
|
||||
|
||||
session.add(transaction)
|
||||
await session.flush() # Get the ID
|
||||
|
||||
# Create transaction items
|
||||
for client_item in client_transaction.items:
|
||||
item = POSTransactionItem(
|
||||
transaction_id=transaction.id,
|
||||
tenant_id=config.tenant_id,
|
||||
external_item_id=client_item.external_id,
|
||||
sku=client_item.sku,
|
||||
product_name=client_item.name,
|
||||
product_category=client_item.category,
|
||||
quantity=client_item.quantity,
|
||||
unit_price=client_item.unit_price,
|
||||
total_price=client_item.total_price,
|
||||
discount_amount=client_item.discount_amount,
|
||||
tax_amount=client_item.tax_amount,
|
||||
modifiers=client_item.modifiers,
|
||||
raw_data=client_item.raw_data
|
||||
)
|
||||
session.add(item)
|
||||
|
||||
await session.commit()
|
||||
await session.refresh(transaction)
|
||||
|
||||
logger.info("Transaction saved",
|
||||
transaction_id=transaction.id,
|
||||
external_id=client_transaction.external_id)
|
||||
|
||||
return transaction
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to save transaction", error=str(e))
|
||||
return None
|
||||
|
||||
async def _queue_sales_sync(self, transaction: POSTransaction):
|
||||
"""Queue transaction for sync to sales service"""
|
||||
try:
|
||||
# Send transaction data to sales service
|
||||
sales_data = {
|
||||
"product_name": f"POS Transaction {transaction.external_transaction_id}",
|
||||
"quantity_sold": 1,
|
||||
"unit_price": float(transaction.total_amount),
|
||||
"total_revenue": float(transaction.total_amount),
|
||||
"sale_date": transaction.transaction_date.isoformat(),
|
||||
"sales_channel": f"{transaction.pos_system}_pos",
|
||||
"location_id": transaction.location_id,
|
||||
"source": "pos_integration",
|
||||
"external_transaction_id": transaction.external_transaction_id,
|
||||
"payment_method": transaction.payment_method,
|
||||
"raw_pos_data": transaction.raw_data
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
f"{settings.SALES_SERVICE_URL}/api/v1/tenants/{transaction.tenant_id}/sales",
|
||||
json=sales_data,
|
||||
timeout=30.0
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
# Update transaction as synced
|
||||
async with get_db_transaction() as session:
|
||||
transaction.is_synced_to_sales = True
|
||||
transaction.sync_completed_at = datetime.utcnow()
|
||||
session.add(transaction)
|
||||
await session.commit()
|
||||
|
||||
logger.info("Transaction synced to sales service",
|
||||
transaction_id=transaction.id)
|
||||
else:
|
||||
logger.error("Failed to sync to sales service",
|
||||
status_code=response.status_code,
|
||||
transaction_id=transaction.id)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Sales sync failed", error=str(e), transaction_id=transaction.id)
|
||||
|
||||
def _extract_webhook_type(self, pos_system: str, payload: Dict[str, Any]) -> Optional[str]:
|
||||
"""Extract webhook type from payload"""
|
||||
if pos_system == "square":
|
||||
return payload.get("type")
|
||||
elif pos_system == "toast":
|
||||
return payload.get("eventType")
|
||||
elif pos_system == "lightspeed":
|
||||
return payload.get("action")
|
||||
return None
|
||||
|
||||
def _extract_event_id(self, pos_system: str, payload: Dict[str, Any]) -> Optional[str]:
|
||||
"""Extract event ID from payload"""
|
||||
if pos_system == "square":
|
||||
return payload.get("event_id")
|
||||
elif pos_system == "toast":
|
||||
return payload.get("guid")
|
||||
elif pos_system == "lightspeed":
|
||||
return payload.get("id")
|
||||
return None
|
||||
|
||||
async def _find_pos_config_for_webhook(
|
||||
self,
|
||||
pos_system: str,
|
||||
payload: Dict[str, Any]
|
||||
) -> Optional[POSConfiguration]:
|
||||
"""Find POS configuration that matches the webhook"""
|
||||
|
||||
# Extract location ID or merchant ID from payload
|
||||
location_id = self._extract_location_id(pos_system, payload)
|
||||
merchant_id = self._extract_merchant_id(pos_system, payload)
|
||||
|
||||
async with get_db_transaction() as session:
|
||||
query = """
|
||||
SELECT * FROM pos_configurations
|
||||
WHERE pos_system = :pos_system
|
||||
AND is_active = true
|
||||
"""
|
||||
|
||||
params = {"pos_system": pos_system}
|
||||
|
||||
if location_id:
|
||||
query += " AND location_id = :location_id"
|
||||
params["location_id"] = location_id
|
||||
elif merchant_id:
|
||||
query += " AND merchant_id = :merchant_id"
|
||||
params["merchant_id"] = merchant_id
|
||||
|
||||
query += " LIMIT 1"
|
||||
|
||||
result = await session.execute(query, params)
|
||||
row = result.first()
|
||||
|
||||
if row:
|
||||
return POSConfiguration(**row._asdict())
|
||||
return None
|
||||
|
||||
def _extract_location_id(self, pos_system: str, payload: Dict[str, Any]) -> Optional[str]:
|
||||
"""Extract location ID from webhook payload"""
|
||||
if pos_system == "square":
|
||||
# Square includes location_id in various places
|
||||
return (payload.get("data", {})
|
||||
.get("object", {})
|
||||
.get("order", {})
|
||||
.get("location_id"))
|
||||
return None
|
||||
|
||||
def _extract_merchant_id(self, pos_system: str, payload: Dict[str, Any]) -> Optional[str]:
|
||||
"""Extract merchant ID from webhook payload"""
|
||||
if pos_system == "toast":
|
||||
return payload.get("restaurantGuid")
|
||||
return None
|
||||
|
||||
async def _update_webhook_status(
|
||||
self,
|
||||
webhook_id: UUID,
|
||||
status: str,
|
||||
message: Optional[str] = None
|
||||
):
|
||||
"""Update webhook log status"""
|
||||
try:
|
||||
async with get_db_transaction() as session:
|
||||
webhook_log = await session.get(POSWebhookLog, webhook_id)
|
||||
if webhook_log:
|
||||
webhook_log.status = status
|
||||
webhook_log.processing_completed_at = datetime.utcnow()
|
||||
if message:
|
||||
webhook_log.error_message = message
|
||||
|
||||
session.add(webhook_log)
|
||||
await session.commit()
|
||||
except Exception as e:
|
||||
logger.error("Failed to update webhook status", error=str(e), webhook_id=webhook_id)
|
||||
Reference in New Issue
Block a user