278 lines
12 KiB
Python
278 lines
12 KiB
Python
# ================================================================
|
|
# services/data/app/services/sales_service.py - SIMPLIFIED VERSION
|
|
# ================================================================
|
|
"""Sales service without notes column for now"""
|
|
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, and_, func, desc
|
|
import structlog
|
|
import uuid
|
|
|
|
from app.models.sales import SalesData
|
|
from app.schemas.sales import (
|
|
SalesDataCreate,
|
|
SalesDataResponse,
|
|
SalesDataQuery
|
|
)
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
class SalesService:
|
|
|
|
@staticmethod
|
|
async def create_sales_record(sales_data: SalesDataCreate, db: AsyncSession) -> SalesDataResponse:
|
|
"""Create a new sales record"""
|
|
try:
|
|
# Create new sales record without notes and updated_at for now
|
|
db_record = SalesData(
|
|
id=uuid.uuid4(),
|
|
tenant_id=sales_data.tenant_id,
|
|
date=sales_data.date,
|
|
product_name=sales_data.product_name,
|
|
quantity_sold=sales_data.quantity_sold,
|
|
revenue=sales_data.revenue,
|
|
location_id=sales_data.location_id,
|
|
source=sales_data.source,
|
|
created_at=datetime.utcnow()
|
|
# Skip notes and updated_at until database is migrated
|
|
)
|
|
|
|
db.add(db_record)
|
|
await db.commit()
|
|
await db.refresh(db_record)
|
|
|
|
logger.debug("Sales record created", record_id=db_record.id, product=db_record.product_name)
|
|
|
|
return SalesDataResponse(
|
|
id=db_record.id,
|
|
tenant_id=db_record.tenant_id,
|
|
date=db_record.date,
|
|
product_name=db_record.product_name,
|
|
quantity_sold=db_record.quantity_sold,
|
|
revenue=db_record.revenue,
|
|
location_id=db_record.location_id,
|
|
source=db_record.source,
|
|
notes=None, # Always None for now
|
|
created_at=db_record.created_at,
|
|
updated_at=None # Always None for now
|
|
)
|
|
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error("Failed to create sales record", error=str(e))
|
|
raise
|
|
|
|
@staticmethod
|
|
async def get_sales_data(query: SalesDataQuery, db: AsyncSession) -> List[SalesDataResponse]:
|
|
"""Get sales data based on query parameters"""
|
|
try:
|
|
# Build query conditions
|
|
conditions = [SalesData.tenant_id == query.tenant_id]
|
|
|
|
if query.start_date:
|
|
conditions.append(SalesData.date >= query.start_date)
|
|
if query.end_date:
|
|
conditions.append(SalesData.date <= query.end_date)
|
|
if query.product_names:
|
|
conditions.append(SalesData.product_name.in_(query.product_names))
|
|
if query.location_ids:
|
|
conditions.append(SalesData.location_id.in_(query.location_ids))
|
|
if query.sources:
|
|
conditions.append(SalesData.source.in_(query.sources))
|
|
if query.min_quantity:
|
|
conditions.append(SalesData.quantity_sold >= query.min_quantity)
|
|
if query.max_quantity:
|
|
conditions.append(SalesData.quantity_sold <= query.max_quantity)
|
|
if query.min_revenue:
|
|
conditions.append(SalesData.revenue >= query.min_revenue)
|
|
if query.max_revenue:
|
|
conditions.append(SalesData.revenue <= query.max_revenue)
|
|
|
|
# Execute query
|
|
stmt = select(SalesData).where(and_(*conditions)).order_by(desc(SalesData.date))
|
|
|
|
if query.limit:
|
|
stmt = stmt.limit(query.limit)
|
|
if query.offset:
|
|
stmt = stmt.offset(query.offset)
|
|
|
|
result = await db.execute(stmt)
|
|
records = result.scalars().all()
|
|
|
|
logger.debug("Sales data retrieved", count=len(records), tenant_id=query.tenant_id)
|
|
|
|
return [SalesDataResponse(
|
|
id=record.id,
|
|
tenant_id=record.tenant_id,
|
|
date=record.date,
|
|
product_name=record.product_name,
|
|
quantity_sold=record.quantity_sold,
|
|
revenue=record.revenue,
|
|
location_id=record.location_id,
|
|
source=record.source,
|
|
notes=None, # Always None for now
|
|
created_at=record.created_at,
|
|
updated_at=None # Always None for now
|
|
) for record in records]
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to retrieve sales data", error=str(e))
|
|
raise
|
|
|
|
@staticmethod
|
|
async def get_sales_analytics(tenant_id: str, start_date: Optional[datetime],
|
|
end_date: Optional[datetime], db: AsyncSession) -> Dict[str, Any]:
|
|
"""Get basic sales analytics"""
|
|
try:
|
|
conditions = [SalesData.tenant_id == tenant_id]
|
|
|
|
if start_date:
|
|
conditions.append(SalesData.date >= start_date)
|
|
if end_date:
|
|
conditions.append(SalesData.date <= end_date)
|
|
|
|
# Total sales
|
|
total_stmt = select(
|
|
func.sum(SalesData.quantity_sold).label('total_quantity'),
|
|
func.sum(SalesData.revenue).label('total_revenue'),
|
|
func.count(SalesData.id).label('total_records')
|
|
).where(and_(*conditions))
|
|
|
|
total_result = await db.execute(total_stmt)
|
|
totals = total_result.first()
|
|
|
|
analytics = {
|
|
"total_quantity": int(totals.total_quantity or 0),
|
|
"total_revenue": float(totals.total_revenue or 0.0),
|
|
"total_records": int(totals.total_records or 0),
|
|
"average_order_value": float(totals.total_revenue or 0.0) / max(totals.total_records or 1, 1),
|
|
"date_range": {
|
|
"start": start_date.isoformat() if start_date else None,
|
|
"end": end_date.isoformat() if end_date else None
|
|
}
|
|
}
|
|
|
|
logger.debug("Sales analytics generated", tenant_id=tenant_id, total_records=analytics["total_records"])
|
|
return analytics
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to generate sales analytics", error=str(e))
|
|
raise
|
|
|
|
@staticmethod
|
|
async def export_sales_data(tenant_id: str, export_format: str, start_date: Optional[datetime],
|
|
end_date: Optional[datetime], products: Optional[List[str]],
|
|
db: AsyncSession) -> Optional[Dict[str, Any]]:
|
|
"""Export sales data in specified format"""
|
|
try:
|
|
# Build query conditions
|
|
conditions = [SalesData.tenant_id == tenant_id]
|
|
|
|
if start_date:
|
|
conditions.append(SalesData.date >= start_date)
|
|
if end_date:
|
|
conditions.append(SalesData.date <= end_date)
|
|
if products:
|
|
conditions.append(SalesData.product_name.in_(products))
|
|
|
|
stmt = select(SalesData).where(and_(*conditions)).order_by(desc(SalesData.date))
|
|
result = await db.execute(stmt)
|
|
records = result.scalars().all()
|
|
|
|
if not records:
|
|
return None
|
|
|
|
# Simple CSV export
|
|
if export_format.lower() == "csv":
|
|
import io
|
|
output = io.StringIO()
|
|
output.write("date,product_name,quantity_sold,revenue,location_id,source\n")
|
|
|
|
for record in records:
|
|
output.write(f"{record.date},{record.product_name},{record.quantity_sold},{record.revenue},{record.location_id or ''},{record.source}\n")
|
|
|
|
return {
|
|
"content": output.getvalue(),
|
|
"media_type": "text/csv",
|
|
"filename": f"sales_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
}
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to export sales data", error=str(e))
|
|
raise
|
|
|
|
@staticmethod
|
|
async def delete_sales_record(record_id: str, db: AsyncSession) -> bool:
|
|
"""Delete a sales record"""
|
|
try:
|
|
stmt = select(SalesData).where(SalesData.id == record_id)
|
|
result = await db.execute(stmt)
|
|
record = result.scalar_one_or_none()
|
|
|
|
if not record:
|
|
return False
|
|
|
|
await db.delete(record)
|
|
await db.commit()
|
|
|
|
logger.debug("Sales record deleted", record_id=record_id)
|
|
return True
|
|
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error("Failed to delete sales record", error=str(e))
|
|
raise
|
|
|
|
@staticmethod
|
|
async def get_products_list(tenant_id: str, db: AsyncSession) -> List[Dict[str, Any]]:
|
|
"""Get list of all products with sales data for tenant"""
|
|
try:
|
|
# Query to get unique products with aggregated sales data
|
|
query = (
|
|
select(
|
|
SalesData.product_name,
|
|
func.count(SalesData.id).label('total_sales'),
|
|
func.sum(SalesData.quantity_sold).label('total_quantity'),
|
|
func.sum(SalesData.revenue).label('total_revenue'),
|
|
func.min(SalesData.date).label('first_sale_date'),
|
|
func.max(SalesData.date).label('last_sale_date'),
|
|
func.avg(SalesData.quantity_sold).label('avg_quantity'),
|
|
func.avg(SalesData.revenue).label('avg_revenue')
|
|
)
|
|
.where(SalesData.tenant_id == tenant_id)
|
|
.group_by(SalesData.product_name)
|
|
.order_by(desc(func.sum(SalesData.revenue)))
|
|
)
|
|
|
|
result = await db.execute(query)
|
|
products_data = result.all()
|
|
|
|
# Format the response
|
|
products = []
|
|
for row in products_data:
|
|
products.append({
|
|
'product_name': row.product_name,
|
|
'total_sales': row.total_sales,
|
|
'total_quantity': int(row.total_quantity) if row.total_quantity else 0,
|
|
'total_revenue': float(row.total_revenue) if row.total_revenue else 0.0,
|
|
'first_sale_date': row.first_sale_date.isoformat() if row.first_sale_date else None,
|
|
'last_sale_date': row.last_sale_date.isoformat() if row.last_sale_date else None,
|
|
'avg_quantity': float(row.avg_quantity) if row.avg_quantity else 0.0,
|
|
'avg_revenue': float(row.avg_revenue) if row.avg_revenue else 0.0
|
|
})
|
|
|
|
logger.debug("Products list retrieved successfully",
|
|
tenant_id=tenant_id,
|
|
product_count=len(products))
|
|
|
|
return products
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get products list from database",
|
|
error=str(e),
|
|
tenant_id=tenant_id)
|
|
raise |