Files
bakery-ia/services/suppliers/app/services/supplier_service.py
2025-10-29 06:58:05 +01:00

511 lines
18 KiB
Python

# services/suppliers/app/services/supplier_service.py
"""
Supplier service for business logic operations
"""
import structlog
from typing import List, Optional, Dict, Any
from uuid import UUID
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from app.repositories.supplier_repository import SupplierRepository
from app.models.suppliers import Supplier, SupplierStatus, SupplierType
from app.schemas.suppliers import (
SupplierCreate, SupplierUpdate, SupplierResponse,
SupplierSearchParams, SupplierStatistics,
SupplierPriceListCreate, SupplierPriceListUpdate, SupplierPriceListResponse
)
from app.core.config import settings
logger = structlog.get_logger()
class SupplierService:
"""Service for supplier management operations"""
def __init__(self, db: AsyncSession):
self.db = db
self.repository = SupplierRepository(db)
async def create_supplier(
self,
tenant_id: UUID,
supplier_data: SupplierCreate,
created_by: UUID,
created_by_role: str = "member"
) -> Supplier:
"""Create a new supplier with role-based auto-approval"""
logger.info("Creating supplier", tenant_id=str(tenant_id), name=supplier_data.name, role=created_by_role)
# Check for duplicate name
existing = await self.repository.get_by_name(tenant_id, supplier_data.name)
if existing:
raise ValueError(f"Supplier with name '{supplier_data.name}' already exists")
# Check for duplicate supplier code if provided
if supplier_data.supplier_code:
existing_code = await self.repository.get_by_supplier_code(
tenant_id, supplier_data.supplier_code
)
if existing_code:
raise ValueError(
f"Supplier with code '{supplier_data.supplier_code}' already exists"
)
# Generate supplier code if not provided
supplier_code = supplier_data.supplier_code
if not supplier_code:
supplier_code = self._generate_supplier_code(supplier_data.name)
# Fetch tenant supplier settings to determine approval workflow
try:
from shared.clients.tenant_client import create_tenant_client
tenant_client = create_tenant_client(settings)
supplier_settings = await tenant_client.get_supplier_settings(str(tenant_id)) or {}
except Exception as e:
logger.warning("Failed to fetch tenant settings, using defaults", error=str(e))
supplier_settings = {}
# Determine initial status based on settings and role
require_approval = supplier_settings.get('require_supplier_approval', True)
auto_approve_admin = supplier_settings.get('auto_approve_for_admin_owner', True)
# Auto-approval logic
if not require_approval:
# Workflow disabled globally - always auto-approve
initial_status = SupplierStatus.active
auto_approved = True
logger.info("Supplier approval workflow disabled - auto-approving")
elif auto_approve_admin and created_by_role.lower() in ['admin', 'owner']:
# Auto-approve for admin/owner roles
initial_status = SupplierStatus.active
auto_approved = True
logger.info("Auto-approving supplier created by admin/owner", role=created_by_role)
else:
# Require approval for other roles
initial_status = SupplierStatus.pending_approval
auto_approved = False
logger.info("Supplier requires approval", role=created_by_role)
# Create supplier data
create_data = supplier_data.model_dump(exclude_unset=True)
create_data.update({
'tenant_id': tenant_id,
'supplier_code': supplier_code,
'status': initial_status,
'created_by': created_by,
'updated_by': created_by,
'quality_rating': 0.0,
'delivery_rating': 0.0,
'total_orders': 0,
'total_amount': 0.0
})
# Set approval fields if auto-approved
if auto_approved:
create_data['approved_by'] = created_by
create_data['approved_at'] = datetime.utcnow()
supplier = await self.repository.create(create_data)
logger.info(
"Supplier created successfully",
tenant_id=str(tenant_id),
supplier_id=str(supplier.id),
name=supplier.name,
status=initial_status.value,
auto_approved=auto_approved
)
return supplier
async def get_supplier(self, supplier_id: UUID) -> Optional[Supplier]:
"""Get supplier by ID"""
return await self.repository.get_by_id(supplier_id)
async def update_supplier(
self,
supplier_id: UUID,
supplier_data: SupplierUpdate,
updated_by: UUID
) -> Optional[Supplier]:
"""Update supplier information"""
logger.info("Updating supplier", supplier_id=str(supplier_id))
supplier = self.repository.get_by_id(supplier_id)
if not supplier:
return None
# Check for duplicate name if changing
if supplier_data.name and supplier_data.name != supplier.name:
existing = self.repository.get_by_name(supplier.tenant_id, supplier_data.name)
if existing:
raise ValueError(f"Supplier with name '{supplier_data.name}' already exists")
# Check for duplicate supplier code if changing
if (supplier_data.supplier_code and
supplier_data.supplier_code != supplier.supplier_code):
existing_code = self.repository.get_by_supplier_code(
supplier.tenant_id, supplier_data.supplier_code
)
if existing_code:
raise ValueError(
f"Supplier with code '{supplier_data.supplier_code}' already exists"
)
# Prepare update data
update_data = supplier_data.model_dump(exclude_unset=True)
update_data['updated_by'] = updated_by
update_data['updated_at'] = datetime.utcnow()
supplier = self.repository.update(supplier_id, update_data)
logger.info("Supplier updated successfully", supplier_id=str(supplier_id))
return supplier
async def delete_supplier(self, supplier_id: UUID) -> bool:
"""Delete supplier (soft delete by changing status)"""
logger.info("Deleting supplier", supplier_id=str(supplier_id))
supplier = self.repository.get_by_id(supplier_id)
if not supplier:
return False
# Check if supplier has active purchase orders
# TODO: Add check for active purchase orders once PO service is implemented
# Soft delete by changing status
self.repository.update(supplier_id, {
'status': SupplierStatus.inactive,
'updated_at': datetime.utcnow()
})
logger.info("Supplier deleted successfully", supplier_id=str(supplier_id))
return True
async def hard_delete_supplier(self, supplier_id: UUID, tenant_id: UUID) -> Dict[str, Any]:
"""
Hard delete supplier and all associated data (permanent deletion)
Returns deletion summary for audit purposes
"""
logger.info("Hard deleting supplier", supplier_id=str(supplier_id), tenant_id=str(tenant_id))
# Delegate to repository layer - all DB access is done there
deletion_summary = await self.repository.hard_delete_supplier(supplier_id)
if not deletion_summary:
raise ValueError("Supplier not found")
logger.info(
"Supplier hard deleted successfully",
supplier_id=str(supplier_id),
**deletion_summary
)
return deletion_summary
async def search_suppliers(
self,
tenant_id: UUID,
search_params: SupplierSearchParams
) -> List[Supplier]:
"""Search suppliers with filters"""
return await self.repository.search_suppliers(
tenant_id=tenant_id,
search_term=search_params.search_term,
supplier_type=search_params.supplier_type,
status=search_params.status,
limit=search_params.limit,
offset=search_params.offset
)
async def get_active_suppliers(self, tenant_id: UUID) -> List[Supplier]:
"""Get all active suppliers"""
return self.repository.get_active_suppliers(tenant_id)
async def get_suppliers_by_type(
self,
tenant_id: UUID,
supplier_type: SupplierType
) -> List[Supplier]:
"""Get suppliers by type"""
return self.repository.get_suppliers_by_type(tenant_id, supplier_type)
async def get_top_suppliers(self, tenant_id: UUID, limit: int = 10) -> List[Supplier]:
"""Get top performing suppliers"""
return self.repository.get_top_suppliers(tenant_id, limit)
async def approve_supplier(
self,
supplier_id: UUID,
approved_by: UUID,
notes: Optional[str] = None
) -> Optional[Supplier]:
"""Approve a pending supplier"""
logger.info("Approving supplier", supplier_id=str(supplier_id))
supplier = await self.repository.approve_supplier(supplier_id, approved_by)
if not supplier:
logger.warning("Failed to approve supplier - not found or not pending")
return None
if notes:
await self.repository.update(supplier_id, {
'notes': (supplier.notes or "") + f"\nApproval notes: {notes}",
'updated_at': datetime.utcnow()
})
logger.info("Supplier approved successfully", supplier_id=str(supplier_id))
return supplier
async def reject_supplier(
self,
supplier_id: UUID,
rejection_reason: str,
rejected_by: UUID
) -> Optional[Supplier]:
"""Reject a pending supplier"""
logger.info("Rejecting supplier", supplier_id=str(supplier_id))
supplier = await self.repository.reject_supplier(
supplier_id, rejection_reason, rejected_by
)
if not supplier:
logger.warning("Failed to reject supplier - not found or not pending")
return None
logger.info("Supplier rejected successfully", supplier_id=str(supplier_id))
return supplier
async def update_supplier_performance(
self,
supplier_id: UUID,
quality_rating: Optional[float] = None,
delivery_rating: Optional[float] = None,
order_increment: int = 0,
amount_increment: float = 0.0
) -> Optional[Supplier]:
"""Update supplier performance metrics"""
logger.info("Updating supplier performance", supplier_id=str(supplier_id))
return self.repository.update_supplier_stats(
supplier_id=supplier_id,
total_orders_increment=order_increment,
total_amount_increment=amount_increment,
new_quality_rating=quality_rating,
new_delivery_rating=delivery_rating
)
async def get_supplier_statistics(self, tenant_id: UUID) -> Dict[str, Any]:
"""Get supplier statistics for dashboard"""
return await self.repository.get_supplier_statistics(tenant_id)
async def get_suppliers_needing_review(
self,
tenant_id: UUID,
days_since_last_order: int = 30
) -> List[Supplier]:
"""Get suppliers that may need performance review"""
return self.repository.get_suppliers_needing_review(
tenant_id, days_since_last_order
)
def _generate_supplier_code(self, supplier_name: str) -> str:
"""Generate supplier code from name"""
# Take first 3 characters of each word, uppercase
words = supplier_name.strip().split()[:3] # Max 3 words
code_parts = []
for word in words:
if len(word) >= 3:
code_parts.append(word[:3].upper())
else:
code_parts.append(word.upper())
base_code = "".join(code_parts)[:8] # Max 8 characters
# Add random suffix to ensure uniqueness
import random
import string
suffix = ''.join(random.choices(string.digits, k=2))
return f"{base_code}{suffix}"
async def validate_supplier_data(
self,
tenant_id: UUID,
supplier_data: Dict[str, Any],
supplier_id: Optional[UUID] = None
) -> Dict[str, str]:
"""Validate supplier data and return errors"""
errors = {}
# Check required fields
if not supplier_data.get('name'):
errors['name'] = "Supplier name is required"
# Check email format if provided
email = supplier_data.get('email')
if email:
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
errors['email'] = "Invalid email format"
# Check phone format if provided
phone = supplier_data.get('phone')
if phone:
# Basic phone validation (digits, spaces, dashes, parentheses)
import re
phone_pattern = r'^[\d\s\-\(\)\+]+$'
if not re.match(phone_pattern, phone):
errors['phone'] = "Invalid phone format"
# Check lead time range
lead_time = supplier_data.get('standard_lead_time')
if lead_time is not None:
if lead_time < 0 or lead_time > 365:
errors['standard_lead_time'] = "Lead time must be between 0 and 365 days"
# Check credit limit
credit_limit = supplier_data.get('credit_limit')
if credit_limit is not None and credit_limit < 0:
errors['credit_limit'] = "Credit limit cannot be negative"
# Check minimum order amount
min_order = supplier_data.get('minimum_order_amount')
if min_order is not None and min_order < 0:
errors['minimum_order_amount'] = "Minimum order amount cannot be negative"
return errors
async def get_supplier_price_lists(
self,
supplier_id: UUID,
tenant_id: UUID,
is_active: bool = True
) -> List[Any]:
"""Get all price list items for a supplier"""
logger.info(
"Getting supplier price lists",
supplier_id=str(supplier_id),
tenant_id=str(tenant_id),
is_active=is_active
)
return await self.repository.get_supplier_price_lists(
supplier_id=supplier_id,
tenant_id=tenant_id,
is_active=is_active
)
async def get_supplier_price_list(
self,
price_list_id: UUID,
tenant_id: UUID
) -> Optional[Any]:
"""Get specific price list item"""
logger.info(
"Getting supplier price list item",
price_list_id=str(price_list_id),
tenant_id=str(tenant_id)
)
return await self.repository.get_supplier_price_list(
price_list_id=price_list_id,
tenant_id=tenant_id
)
async def create_supplier_price_list(
self,
supplier_id: UUID,
price_list_data: SupplierPriceListCreate,
tenant_id: UUID,
created_by: UUID
) -> Any:
"""Create a new price list item for a supplier"""
logger.info(
"Creating supplier price list item",
supplier_id=str(supplier_id),
tenant_id=str(tenant_id)
)
# Prepare creation data
create_data = price_list_data.model_dump(exclude_unset=True)
create_data.update({
'tenant_id': tenant_id,
'supplier_id': supplier_id,
'created_by': created_by,
'updated_by': created_by,
})
# Calculate price_per_unit if not provided
if 'price_per_unit' not in create_data or create_data['price_per_unit'] is None:
create_data['price_per_unit'] = create_data['unit_price']
price_list = await self.repository.create_supplier_price_list(create_data)
logger.info(
"Supplier price list item created successfully",
price_list_id=str(price_list.id),
supplier_id=str(supplier_id)
)
return price_list
async def update_supplier_price_list(
self,
price_list_id: UUID,
price_list_data: SupplierPriceListUpdate,
tenant_id: UUID,
updated_by: UUID
) -> Any:
"""Update a price list item"""
logger.info(
"Updating supplier price list item",
price_list_id=str(price_list_id),
tenant_id=str(tenant_id)
)
# Prepare update data
update_data = price_list_data.model_dump(exclude_unset=True)
update_data['updated_by'] = updated_by
update_data['updated_at'] = datetime.now()
price_list = await self.repository.update_supplier_price_list(
price_list_id=price_list_id,
update_data=update_data
)
logger.info(
"Supplier price list item updated successfully",
price_list_id=str(price_list_id)
)
return price_list
async def delete_supplier_price_list(
self,
price_list_id: UUID,
tenant_id: UUID
) -> bool:
"""Delete a price list item"""
logger.info(
"Deleting supplier price list item",
price_list_id=str(price_list_id),
tenant_id=str(tenant_id)
)
success = await self.repository.delete_supplier_price_list(
price_list_id=price_list_id
)
logger.info(
"Supplier price list item deletion completed",
price_list_id=str(price_list_id),
success=success
)
return success