# services/pos/app/integrations/square_client.py """ Square POS Client Integration with Square Point of Sale API """ import hashlib import hmac import json from typing import Dict, List, Optional, Any, Tuple from datetime import datetime import asyncio import httpx import structlog from .base_pos_client import ( BasePOSClient, POSCredentials, POSTransaction, POSTransactionItem, POSProduct, POSClientError, POSAuthenticationError, POSRateLimitError, POSConnectionError ) logger = structlog.get_logger() class SquarePOSClient(BasePOSClient): """Square POS API client implementation""" def __init__(self, credentials: POSCredentials): super().__init__(credentials) self.base_url = self._get_base_url() self.application_id = credentials.application_id self.access_token = credentials.access_token self.webhook_secret = credentials.webhook_secret self.location_id = credentials.location_id if not self.access_token: raise POSAuthenticationError("Square access token is required") def _get_base_url(self) -> str: """Get Square API base URL based on environment""" if self.credentials.environment.lower() == "production": return "https://connect.squareup.com" else: return "https://connect.squareupsandbox.com" def _get_headers(self) -> Dict[str, str]: """Get headers for Square API requests""" headers = { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json", "Accept": "application/json", } if self.application_id: headers["Square-Version"] = "2024-01-18" # Use latest API version return headers async def _make_request( self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None ) -> Dict[str, Any]: """Make HTTP request to Square API with error handling""" url = f"{self.base_url}{endpoint}" headers = self._get_headers() start_time = datetime.utcnow() try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.request( method=method, url=url, headers=headers, json=data, params=params ) duration_ms = int((datetime.utcnow() - start_time).total_seconds() * 1000) self.log_api_call(method, endpoint, response.status_code, duration_ms) if response.status_code == 401: raise POSAuthenticationError("Invalid Square access token") elif response.status_code == 429: raise POSRateLimitError("Square API rate limit exceeded") elif response.status_code >= 400: error_text = response.text raise POSClientError(f"Square API error {response.status_code}: {error_text}") return response.json() except httpx.TimeoutException: raise POSConnectionError("Timeout connecting to Square API") except httpx.ConnectError: raise POSConnectionError("Failed to connect to Square API") async def test_connection(self) -> Tuple[bool, str]: """Test connection to Square API""" try: # Try to get location info response = await self._make_request("GET", "/v2/locations") locations = response.get("locations", []) if locations: return True, f"Connected successfully. Found {len(locations)} location(s)." else: return False, "Connected but no locations found" except POSAuthenticationError: return False, "Authentication failed - invalid access token" except POSRateLimitError: return False, "Rate limit exceeded" except POSConnectionError as e: return False, f"Connection failed: {str(e)}" except Exception as e: return False, f"Test failed: {str(e)}" async def get_transactions( self, start_date: datetime, end_date: datetime, location_id: Optional[str] = None, limit: int = 100, cursor: Optional[str] = None ) -> Tuple[List[POSTransaction], Optional[str]]: """Get transactions from Square API""" # Use provided location_id or fall back to configured one target_location = location_id or self.location_id if not target_location: # Get first available location locations_response = await self._make_request("GET", "/v2/locations") locations = locations_response.get("locations", []) if not locations: return [], None target_location = locations[0]["id"] # Build query parameters query = { "location_ids": [target_location], "begin_time": start_date.isoformat() + "Z", "end_time": end_date.isoformat() + "Z", "limit": min(limit, 200), # Square max is 200 } if cursor: query["cursor"] = cursor try: response = await self._make_request("POST", "/v2/orders/search", data={"query": query}) orders = response.get("orders", []) transactions = [] for order in orders: transaction = self._parse_square_order(order) if transaction: transactions.append(transaction) next_cursor = response.get("cursor") return transactions, next_cursor except Exception as e: self.log_error(e, "Getting transactions") raise async def get_transaction(self, transaction_id: str) -> Optional[POSTransaction]: """Get specific transaction by ID""" try: response = await self._make_request("GET", f"/v2/orders/{transaction_id}") order = response.get("order") if order: return self._parse_square_order(order) return None except Exception as e: self.log_error(e, f"Getting transaction {transaction_id}") return None def _parse_square_order(self, order: Dict[str, Any]) -> Optional[POSTransaction]: """Parse Square order into standardized transaction""" try: # Extract basic transaction info external_id = order.get("id", "") state = order.get("state", "") # Map Square states to our standard states status_map = { "COMPLETED": "completed", "CANCELED": "voided", "DRAFT": "pending", "OPEN": "pending" } status = status_map.get(state, "pending") # Parse amounts (Square uses smallest currency unit, e.g., cents) total_money = order.get("total_money", {}) total_amount = float(total_money.get("amount", 0)) / 100.0 base_price_money = order.get("base_price_money", {}) subtotal = float(base_price_money.get("amount", 0)) / 100.0 total_tax_money = order.get("total_tax_money", {}) tax_amount = float(total_tax_money.get("amount", 0)) / 100.0 total_tip_money = order.get("total_tip_money", {}) tip_amount = float(total_tip_money.get("amount", 0)) / 100.0 total_discount_money = order.get("total_discount_money", {}) discount_amount = float(total_discount_money.get("amount", 0)) / 100.0 currency = total_money.get("currency", "USD") # Parse timestamps created_at = order.get("created_at") transaction_date = datetime.fromisoformat(created_at.replace("Z", "+00:00")) if created_at else datetime.utcnow() # Parse location info location_id = order.get("location_id") # Parse line items items = [] line_items = order.get("line_items", []) for line_item in line_items: item = self._parse_square_line_item(line_item) if item: items.append(item) # Parse payments for payment method payment_method = None tenders = order.get("tenders", []) if tenders: payment_method = tenders[0].get("type", "").lower() # Create transaction transaction = POSTransaction( external_id=external_id, transaction_type="sale", # Square orders are typically sales status=status, total_amount=total_amount, subtotal=subtotal, tax_amount=tax_amount, tip_amount=tip_amount, discount_amount=discount_amount, currency=currency, transaction_date=transaction_date, payment_method=payment_method, payment_status="paid" if status == "completed" else "pending", location_id=location_id, items=items, raw_data=order ) return transaction except Exception as e: self.log_error(e, f"Parsing Square order {order.get('id', 'unknown')}") return None def _parse_square_line_item(self, line_item: Dict[str, Any]) -> Optional[POSTransactionItem]: """Parse Square line item into standardized transaction item""" try: name = line_item.get("name", "Unknown Item") quantity = float(line_item.get("quantity", "1")) # Parse pricing item_total_money = line_item.get("item_total_money", {}) total_price = float(item_total_money.get("amount", 0)) / 100.0 unit_price = total_price / quantity if quantity > 0 else 0 # Parse variations for SKU variation = line_item.get("catalog_object_id") sku = variation if variation else None # Parse category from item data item_data = line_item.get("item_data", {}) category = item_data.get("category_name") # Parse modifiers modifiers_data = line_item.get("modifiers", []) modifiers = {} for modifier in modifiers_data: mod_name = modifier.get("name", "") mod_price = float(modifier.get("total_price_money", {}).get("amount", 0)) / 100.0 modifiers[mod_name] = mod_price item = POSTransactionItem( external_id=line_item.get("uid"), sku=sku, name=name, category=category, quantity=quantity, unit_price=unit_price, total_price=total_price, discount_amount=0, # Square handles discounts at order level tax_amount=0, # Square handles taxes at order level modifiers=modifiers if modifiers else None, raw_data=line_item ) return item except Exception as e: self.log_error(e, f"Parsing Square line item {line_item.get('uid', 'unknown')}") return None async def get_products( self, location_id: Optional[str] = None, limit: int = 100, cursor: Optional[str] = None ) -> Tuple[List[POSProduct], Optional[str]]: """Get products from Square Catalog API""" query_params = { "types": "ITEM", "limit": min(limit, 1000) # Square catalog max } if cursor: query_params["cursor"] = cursor try: response = await self._make_request("GET", "/v2/catalog/list", params=query_params) objects = response.get("objects", []) products = [] for obj in objects: product = self._parse_square_catalog_item(obj) if product: products.append(product) next_cursor = response.get("cursor") return products, next_cursor except Exception as e: self.log_error(e, "Getting products") raise def _parse_square_catalog_item(self, catalog_object: Dict[str, Any]) -> Optional[POSProduct]: """Parse Square catalog item into standardized product""" try: item_data = catalog_object.get("item_data", {}) external_id = catalog_object.get("id", "") name = item_data.get("name", "Unknown Product") description = item_data.get("description") category = item_data.get("category_name") is_active = not catalog_object.get("is_deleted", False) # Get price from first variation variations = item_data.get("variations", []) price = 0.0 sku = None if variations: first_variation = variations[0] variation_data = first_variation.get("item_variation_data", {}) price_money = variation_data.get("price_money", {}) price = float(price_money.get("amount", 0)) / 100.0 sku = variation_data.get("sku") product = POSProduct( external_id=external_id, name=name, sku=sku, category=category, subcategory=None, price=price, description=description, is_active=is_active, raw_data=catalog_object ) return product except Exception as e: self.log_error(e, f"Parsing Square catalog item {catalog_object.get('id', 'unknown')}") return None def verify_webhook_signature(self, payload: bytes, signature: str) -> bool: """Verify Square webhook signature""" if not self.webhook_secret: self.logger.warning("No webhook secret configured for signature verification") return True # Allow webhooks without verification if no secret try: # Square uses HMAC-SHA256 expected_signature = hmac.new( self.webhook_secret.encode('utf-8'), payload, hashlib.sha256 ).hexdigest() # Remove any prefix from signature clean_signature = signature.replace("sha256=", "") return hmac.compare_digest(expected_signature, clean_signature) except Exception as e: self.log_error(e, "Webhook signature verification") return False def parse_webhook_payload(self, payload: Dict[str, Any]) -> Optional[POSTransaction]: """Parse Square webhook payload""" try: event_type = payload.get("type") # Handle different Square webhook events if event_type in ["order.created", "order.updated", "order.fulfilled"]: order_data = payload.get("data", {}).get("object", {}).get("order") if order_data: return self._parse_square_order(order_data) elif event_type in ["payment.created", "payment.updated"]: # For payment events, we might need to fetch the full order payment_data = payload.get("data", {}).get("object", {}).get("payment", {}) order_id = payment_data.get("order_id") if order_id: # Note: This would require an async call, so this is a simplified version self.logger.info("Payment webhook received", order_id=order_id, event_type=event_type) return None except Exception as e: self.log_error(e, "Parsing webhook payload") return None def get_webhook_events(self) -> List[str]: """Get list of supported Square webhook events""" return [ "order.created", "order.updated", "order.fulfilled", "payment.created", "payment.updated", "inventory.count.updated" ] def get_rate_limits(self) -> Dict[str, Any]: """Get Square API rate limit information""" return { "requests_per_second": 100, "daily_limit": 50000, "burst_limit": 200, "webhook_limit": 1000 }