Files
bakery-ia/services/pos/app/services/pos_webhook_service.py

410 lines
14 KiB
Python

"""
POS Webhook Service - Business Logic Layer
Handles webhook processing, signature verification, and logging
"""
from typing import Optional, Dict, Any, Tuple
from uuid import UUID
import structlog
import hashlib
import hmac
import base64
import json
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.pos_webhook import POSWebhookLog
from app.repositories.pos_config_repository import POSConfigurationRepository
from app.core.database import get_db_transaction
logger = structlog.get_logger()
class POSWebhookService:
"""Service layer for POS webhook operations"""
def __init__(self, db: Optional[AsyncSession] = None):
self.db = db
async def verify_webhook_signature(
self,
pos_system: str,
payload: str,
signature: str,
webhook_secret: str
) -> bool:
"""
Verify webhook signature based on POS system
Args:
pos_system: POS system name (square, toast, lightspeed)
payload: Raw webhook payload
signature: Signature from webhook headers
webhook_secret: Secret key from POS configuration
Returns:
True if signature is valid, False otherwise
"""
try:
if pos_system.lower() == "square":
return self._verify_square_signature(payload, signature, webhook_secret)
elif pos_system.lower() == "toast":
return self._verify_toast_signature(payload, signature, webhook_secret)
elif pos_system.lower() == "lightspeed":
return self._verify_lightspeed_signature(payload, signature, webhook_secret)
else:
logger.warning("Unknown POS system for signature verification", pos_system=pos_system)
return False
except Exception as e:
logger.error("Signature verification failed", error=str(e), pos_system=pos_system)
return False
def _verify_square_signature(self, payload: str, signature: str, secret: str) -> bool:
"""Verify Square webhook signature using HMAC-SHA256"""
try:
# Square combines URL + body for signature
# Format: <notification_url> + <request_body>
# For simplicity, we'll just verify the body
expected_signature = hmac.new(
secret.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).digest()
# Square sends base64-encoded signature
expected_b64 = base64.b64encode(expected_signature).decode('utf-8')
return hmac.compare_digest(signature, expected_b64)
except Exception as e:
logger.error("Square signature verification error", error=str(e))
return False
def _verify_toast_signature(self, payload: str, signature: str, secret: str) -> bool:
"""Verify Toast webhook signature using HMAC-SHA256"""
try:
expected_signature = hmac.new(
secret.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
except Exception as e:
logger.error("Toast signature verification error", error=str(e))
return False
def _verify_lightspeed_signature(self, payload: str, signature: str, secret: str) -> bool:
"""Verify Lightspeed webhook signature using HMAC-SHA256"""
try:
expected_signature = hmac.new(
secret.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature.lower(), expected_signature.lower())
except Exception as e:
logger.error("Lightspeed signature verification error", error=str(e))
return False
async def extract_tenant_id_from_payload(
self,
pos_system: str,
parsed_payload: Dict[str, Any]
) -> Optional[UUID]:
"""
Extract tenant_id from webhook payload by matching POS system identifiers
Args:
pos_system: POS system name
parsed_payload: Parsed JSON payload
Returns:
tenant_id if found, None otherwise
"""
try:
# Extract POS-specific identifiers
pos_identifier = None
if pos_system.lower() == "square":
# Square uses merchant_id or location_id
pos_identifier = (
parsed_payload.get("merchant_id") or
parsed_payload.get("data", {}).get("object", {}).get("merchant_id") or
parsed_payload.get("location_id")
)
elif pos_system.lower() == "toast":
# Toast uses restaurantGuid
pos_identifier = (
parsed_payload.get("restaurantGuid") or
parsed_payload.get("restaurant", {}).get("guid")
)
elif pos_system.lower() == "lightspeed":
# Lightspeed uses accountID
pos_identifier = (
parsed_payload.get("accountID") or
parsed_payload.get("account", {}).get("id")
)
if not pos_identifier:
logger.warning("Could not extract POS identifier from payload", pos_system=pos_system)
return None
# Query database to find tenant_id by POS identifier
async with get_db_transaction() as db:
repository = POSConfigurationRepository(db)
config = await repository.get_by_pos_identifier(pos_system, pos_identifier)
if config:
return config.tenant_id
else:
logger.warning("No tenant found for POS identifier",
pos_system=pos_system,
identifier=pos_identifier)
return None
except Exception as e:
logger.error("Failed to extract tenant_id", error=str(e), pos_system=pos_system)
return None
async def log_webhook(
self,
pos_system: str,
webhook_type: str,
method: str,
url_path: str,
query_params: Dict[str, Any],
headers: Dict[str, str],
raw_payload: str,
payload_size: int,
content_type: Optional[str],
signature: Optional[str],
is_signature_valid: Optional[bool],
source_ip: Optional[str],
event_id: Optional[str] = None,
tenant_id: Optional[UUID] = None,
transaction_id: Optional[str] = None,
order_id: Optional[str] = None
) -> POSWebhookLog:
"""
Create a webhook log entry in the database
Returns:
Created POSWebhookLog instance
"""
try:
async with get_db_transaction() as db:
webhook_log = POSWebhookLog(
tenant_id=tenant_id,
pos_system=pos_system,
webhook_type=webhook_type,
method=method,
url_path=url_path,
query_params=query_params,
headers=headers,
raw_payload=raw_payload,
payload_size=payload_size,
content_type=content_type,
signature=signature,
is_signature_valid=is_signature_valid,
source_ip=source_ip,
status="received",
event_id=event_id,
transaction_id=transaction_id,
order_id=order_id,
received_at=datetime.utcnow(),
user_agent=headers.get("user-agent"),
forwarded_for=headers.get("x-forwarded-for"),
request_id=headers.get("x-request-id")
)
db.add(webhook_log)
await db.commit()
await db.refresh(webhook_log)
logger.info("Webhook logged to database",
webhook_log_id=str(webhook_log.id),
pos_system=pos_system,
webhook_type=webhook_type,
tenant_id=str(tenant_id) if tenant_id else None)
return webhook_log
except Exception as e:
logger.error("Failed to log webhook", error=str(e), pos_system=pos_system)
raise
async def get_webhook_secret(
self,
pos_system: str,
tenant_id: Optional[UUID] = None
) -> Optional[str]:
"""
Get webhook secret for signature verification
Args:
pos_system: POS system name
tenant_id: Optional tenant_id if known
Returns:
Webhook secret if found
"""
try:
async with get_db_transaction() as db:
repository = POSConfigurationRepository(db)
if tenant_id:
# Get active config for tenant and POS system
configs = await repository.get_configurations_by_tenant(
tenant_id=tenant_id,
pos_system=pos_system,
is_active=True,
skip=0,
limit=1
)
if configs:
return configs[0].webhook_secret
return None
except Exception as e:
logger.error("Failed to get webhook secret", error=str(e))
return None
async def update_webhook_status(
self,
webhook_log_id: UUID,
status: str,
error_message: Optional[str] = None,
processing_duration_ms: Optional[int] = None
) -> None:
"""Update webhook processing status"""
try:
async with get_db_transaction() as db:
webhook_log = await db.get(POSWebhookLog, webhook_log_id)
if webhook_log:
webhook_log.status = status
webhook_log.processing_completed_at = datetime.utcnow()
if error_message:
webhook_log.error_message = error_message
webhook_log.retry_count += 1
if processing_duration_ms:
webhook_log.processing_duration_ms = processing_duration_ms
await db.commit()
logger.info("Webhook status updated",
webhook_log_id=str(webhook_log_id),
status=status)
except Exception as e:
logger.error("Failed to update webhook status", error=str(e))
raise
async def check_duplicate_webhook(
self,
pos_system: str,
event_id: str,
tenant_id: Optional[UUID] = None
) -> Tuple[bool, Optional[UUID]]:
"""
Check if webhook has already been processed
Returns:
Tuple of (is_duplicate, original_webhook_id)
"""
try:
async with get_db_transaction() as db:
from sqlalchemy import select
query = select(POSWebhookLog).where(
POSWebhookLog.pos_system == pos_system,
POSWebhookLog.event_id == event_id,
POSWebhookLog.status == "processed"
)
if tenant_id:
query = query.where(POSWebhookLog.tenant_id == tenant_id)
result = await db.execute(query)
existing = result.scalar_one_or_none()
if existing:
logger.info("Duplicate webhook detected",
pos_system=pos_system,
event_id=event_id,
original_id=str(existing.id))
return True, existing.id
return False, None
except Exception as e:
logger.error("Failed to check duplicate webhook", error=str(e))
return False, None
def parse_webhook_event_details(
self,
pos_system: str,
parsed_payload: Dict[str, Any]
) -> Dict[str, Any]:
"""
Extract standardized event details from POS-specific payload
Returns:
Dict with event_id, webhook_type, transaction_id, order_id, etc.
"""
details = {
"event_id": None,
"webhook_type": None,
"transaction_id": None,
"order_id": None,
"customer_id": None,
"event_timestamp": None
}
try:
if pos_system.lower() == "square":
details["event_id"] = parsed_payload.get("event_id")
details["webhook_type"] = parsed_payload.get("type")
data = parsed_payload.get("data", {}).get("object", {})
details["transaction_id"] = data.get("id")
details["order_id"] = data.get("order_id")
details["customer_id"] = data.get("customer_id")
created_at = parsed_payload.get("created_at")
if created_at:
details["event_timestamp"] = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
elif pos_system.lower() == "toast":
details["event_id"] = parsed_payload.get("guid")
details["webhook_type"] = parsed_payload.get("eventType")
details["order_id"] = parsed_payload.get("entityId")
created_at = parsed_payload.get("eventTime")
if created_at:
try:
details["event_timestamp"] = datetime.fromtimestamp(created_at / 1000)
except:
pass
elif pos_system.lower() == "lightspeed":
details["event_id"] = parsed_payload.get("id")
details["webhook_type"] = parsed_payload.get("action")
details["transaction_id"] = parsed_payload.get("objectID")
created_at = parsed_payload.get("createdAt")
if created_at:
details["event_timestamp"] = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
return details
except Exception as e:
logger.error("Failed to parse webhook event details", error=str(e))
return details