Files
bakery-ia/services/pos/app/integrations/base_pos_client.py
2025-08-16 15:00:36 +02:00

365 lines
10 KiB
Python

# services/pos/app/integrations/base_pos_client.py
"""
Base POS Client
Abstract base class for all POS system integrations
"""
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
from dataclasses import dataclass
import structlog
logger = structlog.get_logger()
@dataclass
class POSCredentials:
"""POS system credentials"""
pos_system: str
environment: str
api_key: Optional[str] = None
api_secret: Optional[str] = None
access_token: Optional[str] = None
application_id: Optional[str] = None
merchant_id: Optional[str] = None
location_id: Optional[str] = None
webhook_secret: Optional[str] = None
additional_params: Optional[Dict[str, Any]] = None
@dataclass
class POSTransaction:
"""Standardized POS transaction"""
external_id: str
transaction_type: str
status: str
total_amount: float
subtotal: float
tax_amount: float
tip_amount: float
discount_amount: float
currency: str
transaction_date: datetime
payment_method: Optional[str] = None
payment_status: Optional[str] = None
location_id: Optional[str] = None
location_name: Optional[str] = None
staff_id: Optional[str] = None
staff_name: Optional[str] = None
customer_id: Optional[str] = None
customer_email: Optional[str] = None
order_type: Optional[str] = None
table_number: Optional[str] = None
receipt_number: Optional[str] = None
external_order_id: Optional[str] = None
items: List['POSTransactionItem']
raw_data: Dict[str, Any]
@dataclass
class POSTransactionItem:
"""Standardized POS transaction item"""
external_id: Optional[str]
sku: Optional[str]
name: str
category: Optional[str]
quantity: float
unit_price: float
total_price: float
discount_amount: float
tax_amount: float
modifiers: Optional[Dict[str, Any]] = None
raw_data: Optional[Dict[str, Any]] = None
@dataclass
class POSProduct:
"""Standardized POS product"""
external_id: str
name: str
sku: Optional[str]
category: Optional[str]
subcategory: Optional[str]
price: float
description: Optional[str]
is_active: bool
raw_data: Dict[str, Any]
@dataclass
class SyncResult:
"""Result of a sync operation"""
success: bool
records_processed: int
records_created: int
records_updated: int
records_skipped: int
records_failed: int
errors: List[str]
warnings: List[str]
duration_seconds: float
api_calls_made: int
class POSClientError(Exception):
"""Base exception for POS client errors"""
pass
class POSAuthenticationError(POSClientError):
"""Authentication failed"""
pass
class POSRateLimitError(POSClientError):
"""Rate limit exceeded"""
pass
class POSConnectionError(POSClientError):
"""Connection to POS system failed"""
pass
class BasePOSClient(ABC):
"""
Abstract base class for POS system integrations
Provides common interface for all POS providers:
- Square, Toast, Lightspeed, etc.
"""
def __init__(self, credentials: POSCredentials):
self.credentials = credentials
self.pos_system = credentials.pos_system
self.logger = logger.bind(pos_system=self.pos_system)
@abstractmethod
async def test_connection(self) -> Tuple[bool, str]:
"""
Test connection to POS system
Returns:
Tuple of (success: bool, message: str)
"""
pass
@abstractmethod
async def get_transactions(
self,
start_date: datetime,
end_date: datetime,
location_id: Optional[str] = None,
limit: int = 100,
cursor: Optional[str] = None
) -> Tuple[List[POSTransaction], Optional[str]]:
"""
Get transactions from POS system
Args:
start_date: Start date for transaction query
end_date: End date for transaction query
location_id: Optional location filter
limit: Maximum number of records to return
cursor: Pagination cursor for next page
Returns:
Tuple of (transactions: List[POSTransaction], next_cursor: Optional[str])
"""
pass
@abstractmethod
async def get_transaction(self, transaction_id: str) -> Optional[POSTransaction]:
"""
Get a specific transaction by ID
Args:
transaction_id: External transaction ID
Returns:
POSTransaction if found, None otherwise
"""
pass
@abstractmethod
async def get_products(
self,
location_id: Optional[str] = None,
limit: int = 100,
cursor: Optional[str] = None
) -> Tuple[List[POSProduct], Optional[str]]:
"""
Get products/menu items from POS system
Args:
location_id: Optional location filter
limit: Maximum number of records to return
cursor: Pagination cursor for next page
Returns:
Tuple of (products: List[POSProduct], next_cursor: Optional[str])
"""
pass
@abstractmethod
def verify_webhook_signature(self, payload: bytes, signature: str) -> bool:
"""
Verify webhook signature
Args:
payload: Raw webhook payload
signature: Signature from webhook headers
Returns:
True if signature is valid
"""
pass
@abstractmethod
def parse_webhook_payload(self, payload: Dict[str, Any]) -> Optional[POSTransaction]:
"""
Parse webhook payload into standardized transaction
Args:
payload: Webhook payload
Returns:
POSTransaction if parseable, None otherwise
"""
pass
@abstractmethod
def get_webhook_events(self) -> List[str]:
"""
Get list of supported webhook events
Returns:
List of supported event types
"""
pass
@abstractmethod
def get_rate_limits(self) -> Dict[str, Any]:
"""
Get rate limit information
Returns:
Dictionary with rate limit details
"""
pass
# Common utility methods
def get_pos_system(self) -> str:
"""Get POS system identifier"""
return self.pos_system
def get_environment(self) -> str:
"""Get environment (sandbox/production)"""
return self.credentials.environment
def is_production(self) -> bool:
"""Check if running in production environment"""
return self.credentials.environment.lower() == "production"
def log_api_call(self, method: str, endpoint: str, status_code: int, duration_ms: int):
"""Log API call for monitoring"""
self.logger.info(
"POS API call",
method=method,
endpoint=endpoint,
status_code=status_code,
duration_ms=duration_ms,
environment=self.get_environment()
)
def log_error(self, error: Exception, context: str):
"""Log error with context"""
self.logger.error(
f"POS client error: {context}",
error=str(error),
error_type=type(error).__name__,
pos_system=self.pos_system
)
async def sync_transactions(
self,
start_date: datetime,
end_date: datetime,
location_id: Optional[str] = None,
batch_size: int = 100
) -> SyncResult:
"""
Sync transactions from POS system with error handling and batching
Args:
start_date: Start date for sync
end_date: End date for sync
location_id: Optional location filter
batch_size: Number of records per batch
Returns:
SyncResult with operation details
"""
start_time = datetime.utcnow()
result = SyncResult(
success=False,
records_processed=0,
records_created=0,
records_updated=0,
records_skipped=0,
records_failed=0,
errors=[],
warnings=[],
duration_seconds=0,
api_calls_made=0
)
try:
cursor = None
while True:
try:
transactions, next_cursor = await self.get_transactions(
start_date=start_date,
end_date=end_date,
location_id=location_id,
limit=batch_size,
cursor=cursor
)
result.api_calls_made += 1
result.records_processed += len(transactions)
if not transactions:
break
# Process transactions would be implemented by the service layer
self.logger.info(
"Synced transaction batch",
batch_size=len(transactions),
total_processed=result.records_processed
)
cursor = next_cursor
if not cursor:
break
except Exception as e:
result.errors.append(f"Batch sync error: {str(e)}")
result.records_failed += batch_size
self.log_error(e, "Transaction sync batch")
break
result.success = len(result.errors) == 0
except Exception as e:
result.errors.append(f"Sync operation failed: {str(e)}")
self.log_error(e, "Transaction sync operation")
finally:
end_time = datetime.utcnow()
result.duration_seconds = (end_time - start_time).total_seconds()
return result