# shared/clients/sales_client.py """ Sales Service Client Handles all API calls to the sales service """ import httpx import structlog from typing import Dict, Any, Optional, List, Union from .base_service_client import BaseServiceClient from shared.config.base import BaseServiceSettings logger = structlog.get_logger() class SalesServiceClient(BaseServiceClient): """Client for communicating with the sales service""" def __init__(self, config: BaseServiceSettings, calling_service_name: str = "unknown"): super().__init__(calling_service_name, config) self.service_url = config.SALES_SERVICE_URL def get_service_base_path(self) -> str: return "/api/v1" # ================================================================ # SALES DATA (with advanced pagination support) # ================================================================ async def get_sales_data( self, tenant_id: str, start_date: Optional[str] = None, end_date: Optional[str] = None, product_id: Optional[str] = None, aggregation: str = "daily" ) -> Optional[List[Dict[str, Any]]]: """Get sales data for a date range""" params = {"aggregation": aggregation} if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date if product_id: params["product_id"] = product_id result = await self.get("sales/sales", tenant_id=tenant_id, params=params) # Handle both list and dict responses if result is None: return None elif isinstance(result, list): return result elif isinstance(result, dict): return result.get("sales", []) else: return None async def get_all_sales_data( self, tenant_id: str, start_date: Optional[str] = None, end_date: Optional[str] = None, product_id: Optional[str] = None, aggregation: str = "daily", page_size: int = 1000, max_pages: int = 100 ) -> List[Dict[str, Any]]: """ Get ALL sales data using pagination (equivalent to original fetch_sales_data) Retrieves all records without pagination limits """ params = {"aggregation": aggregation} if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date if product_id: params["product_id"] = product_id # Use the inherited paginated request method try: all_records = await self.get_paginated( "sales/sales", tenant_id=tenant_id, params=params, page_size=page_size, max_pages=max_pages, timeout=2000.0 ) logger.info(f"Successfully fetched {len(all_records)} total sales records via sales service", tenant_id=tenant_id) return all_records except Exception as e: logger.error(f"Failed to fetch paginated sales data: {e}") return [] async def upload_sales_data( self, tenant_id: str, sales_data: List[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """Upload sales data""" data = {"sales": sales_data} return await self.post("sales/sales", data=data, tenant_id=tenant_id) # ================================================================ # PRODUCTS # ================================================================ async def get_products(self, tenant_id: str) -> Optional[List[Dict[str, Any]]]: """Get all products for a tenant""" result = await self.get("sales/products", tenant_id=tenant_id) return result.get("products", []) if result else None async def get_product(self, tenant_id: str, product_id: str) -> Optional[Dict[str, Any]]: """Get a specific product""" return await self.get(f"sales/products/{product_id}", tenant_id=tenant_id) async def create_product( self, tenant_id: str, name: str, category: str, price: float, **kwargs ) -> Optional[Dict[str, Any]]: """Create a new product""" data = { "name": name, "category": category, "price": price, **kwargs } return await self.post("sales/products", data=data, tenant_id=tenant_id) async def update_product( self, tenant_id: str, product_id: str, **updates ) -> Optional[Dict[str, Any]]: """Update a product""" return await self.put(f"sales/products/{product_id}", data=updates, tenant_id=tenant_id) # ================================================================ # DATA IMPORT # ================================================================ async def import_sales_data( self, tenant_id: str, file_content: str, file_format: str, filename: Optional[str] = None ) -> Optional[Dict[str, Any]]: """Import sales data from CSV/Excel/JSON""" data = { "content": file_content, "format": file_format, "filename": filename } return await self.post("sales/operations/import", data=data, tenant_id=tenant_id)