171 lines
7.4 KiB
Python
171 lines
7.4 KiB
Python
# services/sales/app/models/sales.py
|
|
"""
|
|
Sales data models for Sales Service
|
|
Enhanced with additional fields and relationships
|
|
"""
|
|
|
|
from sqlalchemy import Column, String, DateTime, Float, Integer, Text, Index, Boolean, Numeric, ForeignKey
|
|
from sqlalchemy.dialects.postgresql import UUID
|
|
from sqlalchemy.orm import relationship
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Any, Optional
|
|
|
|
from shared.database.base import Base
|
|
|
|
|
|
class SalesData(Base):
|
|
"""Enhanced sales data model"""
|
|
__tablename__ = "sales_data"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True)
|
|
date = Column(DateTime(timezone=True), nullable=False, index=True)
|
|
|
|
# Product reference to inventory service (REQUIRED)
|
|
inventory_product_id = Column(UUID(as_uuid=True), nullable=False, index=True) # Reference to inventory.ingredients.id
|
|
|
|
# Sales data
|
|
quantity_sold = Column(Integer, nullable=False)
|
|
unit_price = Column(Numeric(10, 2), nullable=True)
|
|
revenue = Column(Numeric(10, 2), nullable=False)
|
|
cost_of_goods = Column(Numeric(10, 2), nullable=True) # For profit calculation
|
|
discount_applied = Column(Numeric(5, 2), nullable=True, default=0.0) # Percentage
|
|
|
|
# Location and channel
|
|
location_id = Column(String(100), nullable=True, index=True)
|
|
sales_channel = Column(String(50), nullable=True, default="in_store") # in_store, online, delivery
|
|
|
|
# Data source and quality
|
|
source = Column(String(50), nullable=False, default="manual") # manual, pos, online, import
|
|
is_validated = Column(Boolean, default=False)
|
|
validation_notes = Column(Text, nullable=True)
|
|
|
|
# Additional metadata
|
|
notes = Column(Text, nullable=True)
|
|
weather_condition = Column(String(50), nullable=True) # For correlation analysis
|
|
is_holiday = Column(Boolean, default=False)
|
|
is_weekend = Column(Boolean, default=False)
|
|
|
|
# Audit fields
|
|
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
|
updated_at = Column(DateTime(timezone=True),
|
|
default=lambda: datetime.now(timezone.utc),
|
|
onupdate=lambda: datetime.now(timezone.utc))
|
|
created_by = Column(UUID(as_uuid=True), nullable=True) # User ID
|
|
|
|
# Performance-optimized indexes
|
|
__table_args__ = (
|
|
# Core query patterns
|
|
Index('idx_sales_tenant_date', 'tenant_id', 'date'),
|
|
Index('idx_sales_tenant_location', 'tenant_id', 'location_id'),
|
|
|
|
# Analytics queries
|
|
Index('idx_sales_date_range', 'date', 'tenant_id'),
|
|
Index('idx_sales_channel_date', 'sales_channel', 'date', 'tenant_id'),
|
|
|
|
# Data quality queries
|
|
Index('idx_sales_source_validated', 'source', 'is_validated', 'tenant_id'),
|
|
# Primary product reference index
|
|
Index('idx_sales_inventory_product', 'inventory_product_id', 'tenant_id'),
|
|
Index('idx_sales_product_date', 'inventory_product_id', 'date', 'tenant_id'),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert model to dictionary for API responses"""
|
|
return {
|
|
'id': str(self.id),
|
|
'tenant_id': str(self.tenant_id),
|
|
'date': self.date.isoformat() if self.date else None,
|
|
'inventory_product_id': str(self.inventory_product_id),
|
|
'quantity_sold': self.quantity_sold,
|
|
'unit_price': float(self.unit_price) if self.unit_price else None,
|
|
'revenue': float(self.revenue) if self.revenue else None,
|
|
'cost_of_goods': float(self.cost_of_goods) if self.cost_of_goods else None,
|
|
'discount_applied': float(self.discount_applied) if self.discount_applied else None,
|
|
'location_id': self.location_id,
|
|
'sales_channel': self.sales_channel,
|
|
'source': self.source,
|
|
'is_validated': self.is_validated,
|
|
'validation_notes': self.validation_notes,
|
|
'notes': self.notes,
|
|
'weather_condition': self.weather_condition,
|
|
'is_holiday': self.is_holiday,
|
|
'is_weekend': self.is_weekend,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None,
|
|
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
|
|
'created_by': str(self.created_by) if self.created_by else None,
|
|
}
|
|
|
|
@property
|
|
def profit_margin(self) -> Optional[float]:
|
|
"""Calculate profit margin if cost data is available"""
|
|
if self.revenue and self.cost_of_goods:
|
|
return float((self.revenue - self.cost_of_goods) / self.revenue * 100)
|
|
return None
|
|
|
|
|
|
# Product model removed - using inventory service as single source of truth
|
|
# Product data is now referenced via inventory_product_id in SalesData model
|
|
|
|
|
|
class SalesImportJob(Base):
|
|
"""Track sales data import jobs"""
|
|
__tablename__ = "sales_import_jobs"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
tenant_id = Column(UUID(as_uuid=True), nullable=False, index=True)
|
|
|
|
# Job details
|
|
filename = Column(String(255), nullable=False)
|
|
file_size = Column(Integer, nullable=True)
|
|
import_type = Column(String(50), nullable=False, default="csv") # csv, xlsx, api
|
|
|
|
# Processing status
|
|
status = Column(String(20), nullable=False, default="pending") # pending, processing, completed, failed
|
|
progress_percentage = Column(Float, default=0.0)
|
|
|
|
# Results
|
|
total_rows = Column(Integer, default=0)
|
|
processed_rows = Column(Integer, default=0)
|
|
successful_imports = Column(Integer, default=0)
|
|
failed_imports = Column(Integer, default=0)
|
|
duplicate_rows = Column(Integer, default=0)
|
|
|
|
# Error tracking
|
|
error_message = Column(Text, nullable=True)
|
|
validation_errors = Column(Text, nullable=True) # JSON string of validation errors
|
|
|
|
# Timestamps
|
|
started_at = Column(DateTime(timezone=True), nullable=True)
|
|
completed_at = Column(DateTime(timezone=True), nullable=True)
|
|
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
|
created_by = Column(UUID(as_uuid=True), nullable=True)
|
|
|
|
__table_args__ = (
|
|
Index('idx_import_jobs_tenant_status', 'tenant_id', 'status', 'created_at'),
|
|
Index('idx_import_jobs_status_date', 'status', 'created_at'),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert model to dictionary for API responses"""
|
|
return {
|
|
'id': str(self.id),
|
|
'tenant_id': str(self.tenant_id),
|
|
'filename': self.filename,
|
|
'file_size': self.file_size,
|
|
'import_type': self.import_type,
|
|
'status': self.status,
|
|
'progress_percentage': self.progress_percentage,
|
|
'total_rows': self.total_rows,
|
|
'processed_rows': self.processed_rows,
|
|
'successful_imports': self.successful_imports,
|
|
'failed_imports': self.failed_imports,
|
|
'duplicate_rows': self.duplicate_rows,
|
|
'error_message': self.error_message,
|
|
'validation_errors': self.validation_errors,
|
|
'started_at': self.started_at.isoformat() if self.started_at else None,
|
|
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None,
|
|
'created_by': str(self.created_by) if self.created_by else None,
|
|
} |