Start integrating the onboarding flow with backend 2

This commit is contained in:
Urtzi Alfaro
2025-09-04 18:59:56 +02:00
parent a11fdfba24
commit 9eedc2e5f2
30 changed files with 3432 additions and 4735 deletions

View File

@@ -151,6 +151,8 @@ async def validate_sales_data_universal(
"errors": validation_result.errors,
"warnings": validation_result.warnings,
"summary": validation_result.summary,
"unique_products": validation_result.unique_products,
"product_list": validation_result.product_list,
"message": "Validation completed successfully" if validation_result.is_valid else "Validation found errors",
"details": {
"total_records": validation_result.total_records,

View File

@@ -1,499 +0,0 @@
# services/sales/app/api/onboarding.py
"""
Onboarding API Endpoints
Handles sales data import with automated inventory creation
"""
from fastapi import APIRouter, Depends, HTTPException, Path, UploadFile, File, Form
from typing import List, Dict, Any, Optional
from uuid import UUID
from pydantic import BaseModel, Field
import structlog
from app.services.ai_onboarding_service import (
AIOnboardingService,
OnboardingValidationResult,
ProductSuggestionsResult,
OnboardingImportResult,
get_ai_onboarding_service
)
from shared.auth.decorators import get_current_user_dep, get_current_tenant_id_dep
router = APIRouter(tags=["onboarding"])
logger = structlog.get_logger()
class FileValidationResponse(BaseModel):
"""Response for file validation step"""
is_valid: bool
total_records: int
unique_products: int
product_list: List[str]
validation_errors: List[Any]
validation_warnings: List[Any]
summary: Dict[str, Any]
class ProductSuggestionsResponse(BaseModel):
"""Response for AI suggestions step"""
suggestions: List[Dict[str, Any]]
business_model_analysis: Dict[str, Any]
total_products: int
high_confidence_count: int
low_confidence_count: int
processing_time_seconds: float
class InventoryApprovalRequest(BaseModel):
"""Request to approve/modify inventory suggestions"""
suggestions: List[Dict[str, Any]] = Field(..., description="Approved suggestions with modifications")
class InventoryCreationResponse(BaseModel):
"""Response for inventory creation"""
created_items: List[Dict[str, Any]]
failed_items: List[Dict[str, Any]]
total_approved: int
success_rate: float
class SalesImportResponse(BaseModel):
"""Response for final sales import"""
import_job_id: str
status: str
processed_rows: int
successful_imports: int
failed_imports: int
errors: List[str]
warnings: List[str]
@router.post("/tenants/{tenant_id}/onboarding/validate-file", response_model=FileValidationResponse)
async def validate_onboarding_file(
file: UploadFile = File(..., description="Sales data CSV/Excel file"),
tenant_id: UUID = Path(..., description="Tenant ID"),
current_tenant: str = Depends(get_current_tenant_id_dep),
current_user: Dict[str, Any] = Depends(get_current_user_dep),
onboarding_service: AIOnboardingService = Depends(get_ai_onboarding_service)
):
"""
Step 1: Validate uploaded file and extract unique products
This endpoint:
1. Validates the file format and content
2. Checks for required columns (date, product, etc.)
3. Extracts unique products from sales data
4. Returns validation results and product list
"""
try:
# Verify tenant access
if str(tenant_id) != current_tenant:
raise HTTPException(status_code=403, detail="Access denied to this tenant")
# Validate file
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
allowed_extensions = ['.csv', '.xlsx', '.xls']
if not any(file.filename.lower().endswith(ext) for ext in allowed_extensions):
raise HTTPException(status_code=400, detail=f"Unsupported file format. Allowed: {allowed_extensions}")
# Determine file format
file_format = "csv" if file.filename.lower().endswith('.csv') else "excel"
# Read file content
file_content = await file.read()
if not file_content:
raise HTTPException(status_code=400, detail="File is empty")
# Convert bytes to string for CSV
if file_format == "csv":
file_data = file_content.decode('utf-8')
else:
import base64
file_data = base64.b64encode(file_content).decode('utf-8')
# Validate and extract products
result = await onboarding_service.validate_and_extract_products(
file_data=file_data,
file_format=file_format,
tenant_id=tenant_id
)
response = FileValidationResponse(
is_valid=result.is_valid,
total_records=result.total_records,
unique_products=result.unique_products,
product_list=result.product_list,
validation_errors=result.validation_details.errors,
validation_warnings=result.validation_details.warnings,
summary=result.summary
)
logger.info("File validation complete",
filename=file.filename,
is_valid=result.is_valid,
unique_products=result.unique_products,
tenant_id=tenant_id)
return response
except HTTPException:
raise
except Exception as e:
logger.error("Failed file validation",
error=str(e), filename=file.filename if file else None, tenant_id=tenant_id)
raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}")
@router.post("/tenants/{tenant_id}/onboarding/generate-suggestions", response_model=ProductSuggestionsResponse)
async def generate_inventory_suggestions(
file: UploadFile = File(..., description="Same sales data file from step 1"),
product_list: str = Form(..., description="JSON array of product names to classify"),
tenant_id: UUID = Path(..., description="Tenant ID"),
current_tenant: str = Depends(get_current_tenant_id_dep),
current_user: Dict[str, Any] = Depends(get_current_user_dep),
onboarding_service: AIOnboardingService = Depends(get_ai_onboarding_service)
):
"""
Step 2: Generate AI-powered inventory suggestions
This endpoint:
1. Takes the validated file and product list from step 1
2. Uses AI to classify products into inventory categories
3. Analyzes business model (production vs retail)
4. Returns detailed suggestions for user review
"""
try:
# Verify tenant access
if str(tenant_id) != current_tenant:
raise HTTPException(status_code=403, detail="Access denied to this tenant")
# Parse product list
import json
try:
products = json.loads(product_list)
except json.JSONDecodeError as e:
raise HTTPException(status_code=400, detail=f"Invalid product list format: {str(e)}")
if not products:
raise HTTPException(status_code=400, detail="No products provided")
# Determine file format
file_format = "csv" if file.filename.lower().endswith('.csv') else "excel"
# Read file content
file_content = await file.read()
if not file_content:
raise HTTPException(status_code=400, detail="File is empty")
# Convert bytes to string for CSV
if file_format == "csv":
file_data = file_content.decode('utf-8')
else:
import base64
file_data = base64.b64encode(file_content).decode('utf-8')
# Generate suggestions
result = await onboarding_service.generate_inventory_suggestions(
product_list=products,
file_data=file_data,
file_format=file_format,
tenant_id=tenant_id
)
# Convert suggestions to dict format
suggestions_dict = []
for suggestion in result.suggestions:
suggestion_dict = {
"suggestion_id": suggestion.suggestion_id,
"original_name": suggestion.original_name,
"suggested_name": suggestion.suggested_name,
"product_type": suggestion.product_type,
"category": suggestion.category,
"unit_of_measure": suggestion.unit_of_measure,
"confidence_score": suggestion.confidence_score,
"estimated_shelf_life_days": suggestion.estimated_shelf_life_days,
"requires_refrigeration": suggestion.requires_refrigeration,
"requires_freezing": suggestion.requires_freezing,
"is_seasonal": suggestion.is_seasonal,
"suggested_supplier": suggestion.suggested_supplier,
"notes": suggestion.notes,
"sales_data": suggestion.sales_data
}
suggestions_dict.append(suggestion_dict)
business_model_dict = {
"model": result.business_model_analysis.model,
"confidence": result.business_model_analysis.confidence,
"ingredient_count": result.business_model_analysis.ingredient_count,
"finished_product_count": result.business_model_analysis.finished_product_count,
"ingredient_ratio": result.business_model_analysis.ingredient_ratio,
"recommendations": result.business_model_analysis.recommendations
}
response = ProductSuggestionsResponse(
suggestions=suggestions_dict,
business_model_analysis=business_model_dict,
total_products=result.total_products,
high_confidence_count=result.high_confidence_count,
low_confidence_count=result.low_confidence_count,
processing_time_seconds=result.processing_time_seconds
)
logger.info("AI suggestions generated",
total_products=result.total_products,
business_model=result.business_model_analysis.model,
high_confidence=result.high_confidence_count,
tenant_id=tenant_id)
return response
except HTTPException:
raise
except Exception as e:
logger.error("Failed to generate suggestions",
error=str(e), tenant_id=tenant_id)
raise HTTPException(status_code=500, detail=f"Suggestion generation failed: {str(e)}")
@router.post("/tenants/{tenant_id}/onboarding/create-inventory", response_model=InventoryCreationResponse)
async def create_inventory_from_suggestions(
request: InventoryApprovalRequest,
tenant_id: UUID = Path(..., description="Tenant ID"),
current_tenant: str = Depends(get_current_tenant_id_dep),
current_user: Dict[str, Any] = Depends(get_current_user_dep),
onboarding_service: AIOnboardingService = Depends(get_ai_onboarding_service)
):
"""
Step 3: Create inventory items from approved suggestions
This endpoint:
1. Takes user-approved inventory suggestions from step 2
2. Applies any user modifications to suggestions
3. Creates inventory items via inventory service
4. Returns creation results for final import step
"""
try:
# Verify tenant access
if str(tenant_id) != current_tenant:
raise HTTPException(status_code=403, detail="Access denied to this tenant")
if not request.suggestions:
raise HTTPException(status_code=400, detail="No suggestions provided")
# Create inventory items using new service
result = await onboarding_service.create_inventory_from_suggestions(
approved_suggestions=request.suggestions,
tenant_id=tenant_id,
user_id=UUID(current_user['user_id'])
)
response = InventoryCreationResponse(
created_items=result['created_items'],
failed_items=result['failed_items'],
total_approved=result['total_approved'],
success_rate=result['success_rate']
)
logger.info("Inventory creation complete",
created=len(result['created_items']),
failed=len(result['failed_items']),
tenant_id=tenant_id)
return response
except HTTPException:
raise
except Exception as e:
logger.error("Failed inventory creation",
error=str(e), tenant_id=tenant_id)
raise HTTPException(status_code=500, detail=f"Inventory creation failed: {str(e)}")
@router.post("/tenants/{tenant_id}/onboarding/import-sales", response_model=SalesImportResponse)
async def import_sales_with_inventory(
file: UploadFile = File(..., description="Sales data CSV/Excel file"),
inventory_mapping: str = Form(..., description="JSON mapping of product names to inventory IDs"),
tenant_id: UUID = Path(..., description="Tenant ID"),
current_tenant: str = Depends(get_current_tenant_id_dep),
current_user: Dict[str, Any] = Depends(get_current_user_dep),
onboarding_service: AIOnboardingService = Depends(get_ai_onboarding_service)
):
"""
Step 4: Final sales data import using created inventory items
This endpoint:
1. Takes the same validated sales file from step 1
2. Uses the inventory mapping from step 3
3. Imports sales records using detailed processing from DataImportService
4. Returns final import results - onboarding complete!
"""
try:
# Verify tenant access
if str(tenant_id) != current_tenant:
raise HTTPException(status_code=403, detail="Access denied to this tenant")
# Validate file
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Parse inventory mapping
import json
try:
mapping = json.loads(inventory_mapping)
# Convert to string mapping for the new service
inventory_mapping_dict = {
product_name: str(inventory_id)
for product_name, inventory_id in mapping.items()
}
except json.JSONDecodeError as e:
raise HTTPException(status_code=400, detail=f"Invalid inventory mapping format: {str(e)}")
# Determine file format
file_format = "csv" if file.filename.lower().endswith('.csv') else "excel"
# Read file content
file_content = await file.read()
if not file_content:
raise HTTPException(status_code=400, detail="File is empty")
# Convert bytes to string for CSV
if file_format == "csv":
file_data = file_content.decode('utf-8')
else:
import base64
file_data = base64.b64encode(file_content).decode('utf-8')
# Import sales data using new service
result = await onboarding_service.import_sales_data_with_inventory(
file_data=file_data,
file_format=file_format,
inventory_mapping=inventory_mapping_dict,
tenant_id=tenant_id,
filename=file.filename
)
response = SalesImportResponse(
import_job_id="onboarding-" + str(tenant_id), # Generate a simple job ID
status="completed" if result.success else "failed",
processed_rows=result.import_details.records_processed,
successful_imports=result.import_details.records_created,
failed_imports=result.import_details.records_failed,
errors=[error.get("message", str(error)) for error in result.import_details.errors],
warnings=[warning.get("message", str(warning)) for warning in result.import_details.warnings]
)
logger.info("Sales import complete",
successful=result.import_details.records_created,
failed=result.import_details.records_failed,
filename=file.filename,
tenant_id=tenant_id)
return response
except HTTPException:
raise
except Exception as e:
logger.error("Failed sales import",
error=str(e), filename=file.filename if file else None, tenant_id=tenant_id)
raise HTTPException(status_code=500, detail=f"Sales import failed: {str(e)}")
@router.get("/tenants/{tenant_id}/onboarding/business-model-guide")
async def get_business_model_guide(
model: str,
tenant_id: UUID = Path(..., description="Tenant ID"),
current_tenant: str = Depends(get_current_tenant_id_dep),
current_user: Dict[str, Any] = Depends(get_current_user_dep)
):
"""
Get setup recommendations based on detected business model
"""
try:
# Verify tenant access
if str(tenant_id) != current_tenant:
raise HTTPException(status_code=403, detail="Access denied to this tenant")
guides = {
'production': {
'title': 'Production Bakery Setup',
'description': 'Your bakery produces items from raw ingredients',
'next_steps': [
'Set up supplier relationships for ingredients',
'Configure recipe management and costing',
'Enable production planning and scheduling',
'Set up ingredient inventory alerts and reorder points'
],
'recommended_features': [
'Recipe & Production Management',
'Supplier & Procurement',
'Ingredient Inventory Tracking',
'Production Cost Analysis'
],
'sample_workflows': [
'Create recipes with ingredient costs',
'Plan daily production based on sales forecasts',
'Track ingredient usage and waste',
'Generate supplier purchase orders'
]
},
'retail': {
'title': 'Retail Bakery Setup',
'description': 'Your bakery sells finished products from central bakers',
'next_steps': [
'Configure central baker relationships',
'Set up delivery schedules and tracking',
'Enable finished product freshness monitoring',
'Focus on sales forecasting and ordering'
],
'recommended_features': [
'Central Baker Management',
'Delivery Schedule Tracking',
'Freshness Monitoring',
'Sales Forecasting'
],
'sample_workflows': [
'Set up central baker delivery schedules',
'Track product freshness and expiration',
'Forecast demand and place orders',
'Monitor sales performance by product'
]
},
'hybrid': {
'title': 'Hybrid Bakery Setup',
'description': 'Your bakery both produces items and sells finished products',
'next_steps': [
'Configure both production and retail features',
'Set up flexible inventory categories',
'Enable comprehensive analytics',
'Plan workflows for both business models'
],
'recommended_features': [
'Full Inventory Management',
'Recipe & Production Management',
'Central Baker Management',
'Advanced Analytics'
],
'sample_workflows': [
'Manage both ingredients and finished products',
'Balance production vs purchasing decisions',
'Track costs across both models',
'Optimize inventory mix based on profitability'
]
}
}
if model not in guides:
raise HTTPException(status_code=400, detail="Invalid business model")
return guides[model]
except HTTPException:
raise
except Exception as e:
logger.error("Failed to get business model guide",
error=str(e), model=model, tenant_id=tenant_id)
raise HTTPException(status_code=500, detail=f"Failed to get guide: {str(e)}")

View File

@@ -104,9 +104,7 @@ app.add_middleware(
# Include routers - import router BEFORE sales router to avoid conflicts
from app.api.sales import router as sales_router
from app.api.import_data import router as import_router
from app.api.onboarding import router as onboarding_router
app.include_router(import_router, prefix="/api/v1", tags=["import"])
app.include_router(onboarding_router, prefix="/api/v1", tags=["onboarding"])
app.include_router(sales_router, prefix="/api/v1", tags=["sales"])
# Health check endpoint

View File

@@ -1,865 +0,0 @@
# services/sales/app/services/ai_onboarding_service.py
"""
AI-Powered Onboarding Service
Handles the complete onboarding flow: File validation -> Product extraction -> Inventory suggestions -> Data processing
"""
import pandas as pd
import structlog
from typing import List, Dict, Any, Optional
from uuid import UUID, uuid4
from dataclasses import dataclass
import asyncio
from app.services.data_import_service import DataImportService, SalesValidationResult, SalesImportResult
from app.services.inventory_client import InventoryServiceClient
from app.core.database import get_db_transaction
logger = structlog.get_logger()
@dataclass
class ProductSuggestion:
"""Single product suggestion from AI classification"""
suggestion_id: str
original_name: str
suggested_name: str
product_type: str
category: str
unit_of_measure: str
confidence_score: float
estimated_shelf_life_days: Optional[int] = None
requires_refrigeration: bool = False
requires_freezing: bool = False
is_seasonal: bool = False
suggested_supplier: Optional[str] = None
notes: Optional[str] = None
sales_data: Optional[Dict[str, Any]] = None
@dataclass
class BusinessModelAnalysis:
"""Business model analysis results"""
model: str # production, retail, hybrid
confidence: float
ingredient_count: int
finished_product_count: int
ingredient_ratio: float
recommendations: List[str]
@dataclass
class OnboardingValidationResult:
"""Result of onboarding file validation step"""
is_valid: bool
total_records: int
unique_products: int
validation_details: SalesValidationResult
product_list: List[str]
summary: Dict[str, Any]
@dataclass
class ProductSuggestionsResult:
"""Result of AI product classification step"""
suggestions: List[ProductSuggestion]
business_model_analysis: BusinessModelAnalysis
total_products: int
high_confidence_count: int
low_confidence_count: int
processing_time_seconds: float
@dataclass
class OnboardingImportResult:
"""Result of final data import step"""
success: bool
import_details: SalesImportResult
inventory_items_created: int
inventory_creation_errors: List[str]
final_summary: Dict[str, Any]
class AIOnboardingService:
"""
Unified AI-powered onboarding service that orchestrates the complete flow:
1. File validation and product extraction
2. AI-powered inventory suggestions
3. User confirmation and inventory creation
4. Final sales data import
"""
def __init__(self):
self.data_import_service = DataImportService()
self.inventory_client = InventoryServiceClient()
# ================================================================
# STEP 1: FILE VALIDATION AND PRODUCT EXTRACTION
# ================================================================
async def validate_and_extract_products(
self,
file_data: str,
file_format: str,
tenant_id: UUID
) -> OnboardingValidationResult:
"""
Step 1: Validate uploaded file and extract unique products
This uses the detailed validation from data_import_service
"""
try:
logger.info("Starting onboarding validation and product extraction",
file_format=file_format, tenant_id=tenant_id)
# Use data_import_service for detailed validation
validation_data = {
"tenant_id": str(tenant_id),
"data": file_data,
"data_format": file_format,
"validate_only": True,
"source": "ai_onboarding"
}
validation_result = await self.data_import_service.validate_import_data(validation_data)
# Extract unique products if validation passes
product_list = []
unique_products = 0
if validation_result.is_valid and file_format.lower() == "csv":
try:
# Parse CSV to extract unique products
import csv
import io
reader = csv.DictReader(io.StringIO(file_data))
rows = list(reader)
# Use data_import_service column detection
column_mapping = self.data_import_service._detect_columns(list(rows[0].keys()) if rows else [])
if column_mapping.get('product'):
product_column = column_mapping['product']
# Extract and clean unique products
products_raw = [row.get(product_column, '').strip() for row in rows if row.get(product_column, '').strip()]
# Clean product names using data_import_service method
products_cleaned = [
self.data_import_service._clean_product_name(product)
for product in products_raw
]
# Get unique products
product_list = list(set([p for p in products_cleaned if p and p != "Producto sin nombre"]))
unique_products = len(product_list)
logger.info("Extracted unique products",
total_rows=len(rows), unique_products=unique_products)
except Exception as e:
logger.error("Failed to extract products", error=str(e))
# Don't fail validation just because product extraction failed
pass
result = OnboardingValidationResult(
is_valid=validation_result.is_valid,
total_records=validation_result.total_records,
unique_products=unique_products,
validation_details=validation_result,
product_list=product_list,
summary={
"status": "valid" if validation_result.is_valid else "invalid",
"file_format": file_format,
"total_records": validation_result.total_records,
"unique_products": unique_products,
"ready_for_ai_classification": validation_result.is_valid and unique_products > 0,
"next_step": "ai_classification" if validation_result.is_valid and unique_products > 0 else "fix_validation_errors"
}
)
logger.info("Onboarding validation completed",
is_valid=result.is_valid,
unique_products=unique_products,
tenant_id=tenant_id)
return result
except Exception as e:
logger.error("Onboarding validation failed", error=str(e), tenant_id=tenant_id)
return OnboardingValidationResult(
is_valid=False,
total_records=0,
unique_products=0,
validation_details=SalesValidationResult(
is_valid=False,
total_records=0,
valid_records=0,
invalid_records=0,
errors=[{
"type": "system_error",
"message": f"Onboarding validation error: {str(e)}",
"field": None,
"row": None,
"code": "ONBOARDING_VALIDATION_ERROR"
}],
warnings=[],
summary={}
),
product_list=[],
summary={
"status": "error",
"error_message": str(e),
"next_step": "retry_upload"
}
)
# ================================================================
# STEP 2: AI PRODUCT CLASSIFICATION
# ================================================================
async def generate_inventory_suggestions(
self,
product_list: List[str],
file_data: str,
file_format: str,
tenant_id: UUID
) -> ProductSuggestionsResult:
"""
Step 2: Generate AI-powered inventory suggestions for products
"""
import time
start_time = time.time()
try:
logger.info("Starting AI inventory suggestions",
product_count=len(product_list), tenant_id=tenant_id)
if not product_list:
raise ValueError("No products provided for classification")
# Analyze sales data for each product to provide context
product_analysis = await self._analyze_product_sales_data(
product_list, file_data, file_format
)
# Prepare products for classification
products_for_classification = []
for product_name in product_list:
sales_data = product_analysis.get(product_name, {})
products_for_classification.append({
"product_name": product_name,
"sales_volume": sales_data.get("total_quantity"),
"sales_data": sales_data
})
# Call inventory service for AI classification
classification_result = await self.inventory_client.classify_products_batch(
products_for_classification, tenant_id
)
if not classification_result or "suggestions" not in classification_result:
raise ValueError("Invalid classification response from inventory service")
suggestions_raw = classification_result["suggestions"]
business_model_raw = classification_result.get("business_model_analysis", {})
# Convert to dataclass objects
suggestions = []
for suggestion_data in suggestions_raw:
suggestion = ProductSuggestion(
suggestion_id=suggestion_data.get("suggestion_id", str(uuid4())),
original_name=suggestion_data["original_name"],
suggested_name=suggestion_data["suggested_name"],
product_type=suggestion_data["product_type"],
category=suggestion_data["category"],
unit_of_measure=suggestion_data["unit_of_measure"],
confidence_score=suggestion_data["confidence_score"],
estimated_shelf_life_days=suggestion_data.get("estimated_shelf_life_days"),
requires_refrigeration=suggestion_data.get("requires_refrigeration", False),
requires_freezing=suggestion_data.get("requires_freezing", False),
is_seasonal=suggestion_data.get("is_seasonal", False),
suggested_supplier=suggestion_data.get("suggested_supplier"),
notes=suggestion_data.get("notes"),
sales_data=product_analysis.get(suggestion_data["original_name"])
)
suggestions.append(suggestion)
# Check if enhanced business intelligence data is available
bi_data = product_analysis.get('__business_intelligence__')
if bi_data and bi_data.get('confidence_score', 0) > 0.6:
# Use enhanced business intelligence analysis
business_type = bi_data.get('business_type', 'bakery')
business_model_detected = bi_data.get('business_model', 'individual')
# Map business intelligence results to existing model format
model_mapping = {
'individual': 'individual_bakery',
'central_distribution': 'central_baker_satellite',
'central_bakery': 'central_baker_satellite',
'hybrid': 'hybrid_bakery'
}
mapped_model = model_mapping.get(business_model_detected, 'individual_bakery')
# Count ingredients vs finished products from suggestions
ingredient_count = sum(1 for s in suggestions if s.product_type == 'ingredient')
finished_product_count = sum(1 for s in suggestions if s.product_type == 'finished_product')
total_products = len(suggestions)
ingredient_ratio = ingredient_count / total_products if total_products > 0 else 0.0
# Enhanced recommendations based on BI analysis
enhanced_recommendations = bi_data.get('recommendations', [])
# Add business type specific recommendations
if business_type == 'coffee_shop':
enhanced_recommendations.extend([
"Configure beverage inventory management",
"Set up quick-service item tracking",
"Enable all-day service optimization"
])
business_model = BusinessModelAnalysis(
model=mapped_model,
confidence=bi_data.get('confidence_score', 0.0),
ingredient_count=ingredient_count,
finished_product_count=finished_product_count,
ingredient_ratio=ingredient_ratio,
recommendations=enhanced_recommendations[:6] # Limit to top 6 recommendations
)
logger.info("Using enhanced business intelligence for model analysis",
detected_type=business_type,
detected_model=business_model_detected,
mapped_model=mapped_model,
confidence=bi_data.get('confidence_score'))
else:
# Fallback to basic inventory service analysis
business_model = BusinessModelAnalysis(
model=business_model_raw.get("model", "unknown"),
confidence=business_model_raw.get("confidence", 0.0),
ingredient_count=business_model_raw.get("ingredient_count", 0),
finished_product_count=business_model_raw.get("finished_product_count", 0),
ingredient_ratio=business_model_raw.get("ingredient_ratio", 0.0),
recommendations=business_model_raw.get("recommendations", [])
)
logger.info("Using basic inventory service business model analysis")
# Calculate confidence metrics
high_confidence_count = sum(1 for s in suggestions if s.confidence_score >= 0.7)
low_confidence_count = sum(1 for s in suggestions if s.confidence_score < 0.6)
processing_time = time.time() - start_time
result = ProductSuggestionsResult(
suggestions=suggestions,
business_model_analysis=business_model,
total_products=len(suggestions),
high_confidence_count=high_confidence_count,
low_confidence_count=low_confidence_count,
processing_time_seconds=processing_time
)
# Update tenant's business model based on AI analysis
if business_model.model != "unknown" and business_model.confidence >= 0.6:
try:
await self._update_tenant_business_model(tenant_id, business_model.model)
logger.info("Updated tenant business model",
tenant_id=tenant_id,
business_model=business_model.model,
confidence=business_model.confidence)
except Exception as e:
logger.warning("Failed to update tenant business model",
error=str(e), tenant_id=tenant_id)
# Don't fail the entire process if tenant update fails
logger.info("AI inventory suggestions completed",
total_suggestions=len(suggestions),
business_model=business_model.model,
high_confidence=high_confidence_count,
processing_time=processing_time,
tenant_id=tenant_id)
return result
except Exception as e:
processing_time = time.time() - start_time
logger.error("AI inventory suggestions failed",
error=str(e), tenant_id=tenant_id)
# Return fallback suggestions
fallback_suggestions = [
ProductSuggestion(
suggestion_id=str(uuid4()),
original_name=product_name,
suggested_name=product_name.title(),
product_type="finished_product",
category="other_products",
unit_of_measure="units",
confidence_score=0.3,
notes="Fallback suggestion - requires manual review"
)
for product_name in product_list
]
return ProductSuggestionsResult(
suggestions=fallback_suggestions,
business_model_analysis=BusinessModelAnalysis(
model="unknown",
confidence=0.0,
ingredient_count=0,
finished_product_count=len(fallback_suggestions),
ingredient_ratio=0.0,
recommendations=["Manual review required for all products"]
),
total_products=len(fallback_suggestions),
high_confidence_count=0,
low_confidence_count=len(fallback_suggestions),
processing_time_seconds=processing_time
)
# ================================================================
# STEP 3: INVENTORY CREATION (after user confirmation)
# ================================================================
async def create_inventory_from_suggestions(
self,
approved_suggestions: List[Dict[str, Any]],
tenant_id: UUID,
user_id: UUID
) -> Dict[str, Any]:
"""
Step 3: Create inventory items from user-approved suggestions
"""
try:
logger.info("Creating inventory from approved suggestions",
approved_count=len(approved_suggestions), tenant_id=tenant_id)
created_items = []
failed_items = []
for approval in approved_suggestions:
suggestion_id = approval.get("suggestion_id")
is_approved = approval.get("approved", False)
modifications = approval.get("modifications", {})
if not is_approved:
continue
try:
# Build inventory item data from suggestion and modifications
# Map to inventory service expected format
raw_category = modifications.get("category") or approval.get("category", "other")
raw_unit = modifications.get("unit_of_measure") or approval.get("unit_of_measure", "units")
# Map categories to inventory service enum values
category_mapping = {
"flour": "flour",
"yeast": "yeast",
"dairy": "dairy",
"eggs": "eggs",
"sugar": "sugar",
"fats": "fats",
"salt": "salt",
"spices": "spices",
"additives": "additives",
"packaging": "packaging",
"cleaning": "cleaning",
"grains": "flour", # Map common variations
"bread": "other",
"pastries": "other",
"croissants": "other",
"cakes": "other",
"other_products": "other"
}
# Map units to inventory service enum values
unit_mapping = {
"kg": "kg",
"kilograms": "kg",
"g": "g",
"grams": "g",
"l": "l",
"liters": "l",
"ml": "ml",
"milliliters": "ml",
"units": "units",
"pieces": "pcs",
"pcs": "pcs",
"packages": "pkg",
"pkg": "pkg",
"bags": "bags",
"boxes": "boxes"
}
mapped_category = category_mapping.get(raw_category.lower(), "other")
mapped_unit = unit_mapping.get(raw_unit.lower(), "units")
inventory_data = {
"name": modifications.get("name") or approval.get("suggested_name"),
"category": mapped_category,
"unit_of_measure": mapped_unit,
"product_type": approval.get("product_type"),
"description": modifications.get("description") or approval.get("notes", ""),
# Optional fields
"brand": modifications.get("brand") or approval.get("suggested_supplier"),
"is_active": True,
# Explicitly set boolean fields to ensure they're not NULL
"requires_refrigeration": modifications.get("requires_refrigeration", approval.get("requires_refrigeration", False)),
"requires_freezing": modifications.get("requires_freezing", approval.get("requires_freezing", False)),
"is_perishable": modifications.get("is_perishable", approval.get("is_perishable", False))
}
# Add optional numeric fields only if they exist
shelf_life = modifications.get("estimated_shelf_life_days") or approval.get("estimated_shelf_life_days")
if shelf_life:
inventory_data["shelf_life_days"] = shelf_life
# Create inventory item via inventory service
created_item = await self.inventory_client.create_ingredient(
inventory_data, str(tenant_id)
)
if created_item:
created_items.append({
"suggestion_id": suggestion_id,
"inventory_item": created_item,
"original_name": approval.get("original_name")
})
logger.info("Created inventory item",
item_name=inventory_data["name"],
suggestion_id=suggestion_id)
else:
failed_items.append({
"suggestion_id": suggestion_id,
"error": "Failed to create inventory item - no response"
})
except Exception as e:
logger.error("Failed to create inventory item",
error=str(e), suggestion_id=suggestion_id)
failed_items.append({
"suggestion_id": suggestion_id,
"error": str(e)
})
success_rate = len(created_items) / max(1, len(approved_suggestions)) * 100
result = {
"created_items": created_items,
"failed_items": failed_items,
"total_approved": len(approved_suggestions),
"successful_creations": len(created_items),
"failed_creations": len(failed_items),
"success_rate": success_rate,
"ready_for_import": len(created_items) > 0
}
logger.info("Inventory creation completed",
created=len(created_items),
failed=len(failed_items),
success_rate=success_rate,
tenant_id=tenant_id)
return result
except Exception as e:
logger.error("Inventory creation failed", error=str(e), tenant_id=tenant_id)
raise
# ================================================================
# STEP 4: FINAL DATA IMPORT
# ================================================================
async def import_sales_data_with_inventory(
self,
file_data: str,
file_format: str,
inventory_mapping: Dict[str, str], # original_product_name -> inventory_item_id
tenant_id: UUID,
filename: Optional[str] = None
) -> OnboardingImportResult:
"""
Step 4: Import sales data using the detailed processing from data_import_service
"""
try:
logger.info("Starting final sales data import with inventory mapping",
mappings_count=len(inventory_mapping), tenant_id=tenant_id)
# Use data_import_service for the actual import processing
import_result = await self.data_import_service.process_import(
str(tenant_id), file_data, file_format, filename
)
result = OnboardingImportResult(
success=import_result.success,
import_details=import_result,
inventory_items_created=len(inventory_mapping),
inventory_creation_errors=[],
final_summary={
"status": "completed" if import_result.success else "failed",
"total_records": import_result.records_processed,
"successful_imports": import_result.records_created,
"failed_imports": import_result.records_failed,
"inventory_items": len(inventory_mapping),
"processing_time": import_result.processing_time_seconds,
"onboarding_complete": import_result.success
}
)
logger.info("Final sales data import completed",
success=import_result.success,
records_created=import_result.records_created,
tenant_id=tenant_id)
return result
except Exception as e:
logger.error("Final sales data import failed", error=str(e), tenant_id=tenant_id)
return OnboardingImportResult(
success=False,
import_details=SalesImportResult(
success=False,
records_processed=0,
records_created=0,
records_updated=0,
records_failed=0,
errors=[{
"type": "import_error",
"message": f"Import failed: {str(e)}",
"field": None,
"row": None,
"code": "FINAL_IMPORT_ERROR"
}],
warnings=[],
processing_time_seconds=0.0
),
inventory_items_created=len(inventory_mapping),
inventory_creation_errors=[str(e)],
final_summary={
"status": "failed",
"error_message": str(e),
"onboarding_complete": False
}
)
# ================================================================
# HELPER METHODS
# ================================================================
async def _analyze_product_sales_data(
self,
product_list: List[str],
file_data: str,
file_format: str
) -> Dict[str, Dict[str, Any]]:
"""Analyze sales data for each product to provide context for AI classification"""
try:
if file_format.lower() != "csv":
return {}
import csv
import io
reader = csv.DictReader(io.StringIO(file_data))
rows = list(reader)
if not rows:
return {}
# Use data_import_service column detection
column_mapping = self.data_import_service._detect_columns(list(rows[0].keys()))
if not column_mapping.get('product'):
return {}
product_column = column_mapping['product']
quantity_column = column_mapping.get('quantity')
revenue_column = column_mapping.get('revenue')
date_column = column_mapping.get('date')
# Analyze each product
product_analysis = {}
for product_name in product_list:
# Find all rows for this product
product_rows = [
row for row in rows
if self.data_import_service._clean_product_name(row.get(product_column, '')) == product_name
]
if not product_rows:
continue
# Calculate metrics
total_quantity = 0
total_revenue = 0
sales_count = len(product_rows)
for row in product_rows:
try:
# Quantity
qty_raw = row.get(quantity_column, 1)
if qty_raw and str(qty_raw).strip():
qty = int(float(str(qty_raw).replace(',', '.')))
total_quantity += qty
else:
total_quantity += 1
# Revenue
if revenue_column:
rev_raw = row.get(revenue_column)
if rev_raw and str(rev_raw).strip():
rev = float(str(rev_raw).replace(',', '.').replace('', '').replace('$', '').strip())
total_revenue += rev
except:
continue
avg_quantity = total_quantity / sales_count if sales_count > 0 else 0
avg_revenue = total_revenue / sales_count if sales_count > 0 else 0
avg_unit_price = total_revenue / total_quantity if total_quantity > 0 else 0
product_analysis[product_name] = {
"total_quantity": total_quantity,
"total_revenue": total_revenue,
"sales_count": sales_count,
"avg_quantity_per_sale": avg_quantity,
"avg_revenue_per_sale": avg_revenue,
"avg_unit_price": avg_unit_price
}
# Add enhanced business intelligence analysis
try:
from app.services.business_intelligence_service import BusinessIntelligenceService
bi_service = BusinessIntelligenceService()
# Convert parsed data to format expected by BI service
sales_data = []
product_data = []
for row in rows:
# Create sales record from CSV row
sales_record = {
'date': row.get(date_column, ''),
'product_name': row.get(product_column, ''),
'name': row.get(product_column, ''),
'quantity_sold': 0,
'revenue': 0,
'location_id': row.get('location', 'main'),
'sales_channel': row.get('channel', 'in_store'),
'supplier_name': row.get('supplier', ''),
'brand': row.get('brand', '')
}
# Parse quantity
if quantity_column:
try:
qty_raw = row.get(quantity_column, 1)
if qty_raw and str(qty_raw).strip():
sales_record['quantity_sold'] = int(float(str(qty_raw).replace(',', '.')))
except:
sales_record['quantity_sold'] = 1
# Parse revenue
if revenue_column:
try:
rev_raw = row.get(revenue_column)
if rev_raw and str(rev_raw).strip():
sales_record['revenue'] = float(str(rev_raw).replace(',', '.').replace('', '').replace('$', '').strip())
except:
pass
sales_data.append(sales_record)
# Create product data entry
product_data.append({
'name': sales_record['product_name'],
'supplier_name': sales_record.get('supplier_name', ''),
'brand': sales_record.get('brand', '')
})
# Run business intelligence analysis
if sales_data:
detection_result = await bi_service.analyze_business_from_sales_data(
sales_data=sales_data,
product_data=product_data
)
# Store business intelligence results in product_analysis
product_analysis['__business_intelligence__'] = {
"business_type": detection_result.business_type,
"business_model": detection_result.business_model,
"confidence_score": detection_result.confidence_score,
"indicators": detection_result.indicators,
"recommendations": detection_result.recommendations,
"analysis_summary": f"{detection_result.business_type.title()} - {detection_result.business_model.replace('_', ' ').title()}"
}
logger.info("Enhanced business intelligence analysis completed",
business_type=detection_result.business_type,
business_model=detection_result.business_model,
confidence=detection_result.confidence_score)
else:
logger.warning("No sales data available for business intelligence analysis")
except Exception as bi_error:
logger.warning("Business intelligence analysis failed", error=str(bi_error))
# Continue with basic analysis even if BI fails
return product_analysis
except Exception as e:
logger.warning("Failed to analyze product sales data", error=str(e))
return {}
async def _update_tenant_business_model(self, tenant_id: UUID, business_model: str) -> None:
"""Update tenant's business model based on AI analysis"""
try:
# Use the gateway URL for all inter-service communication
from app.core.config import settings
import httpx
gateway_url = settings.GATEWAY_URL
url = f"{gateway_url}/api/v1/tenants/{tenant_id}"
# Prepare update data
update_data = {
"business_model": business_model
}
# Make request through gateway
timeout_config = httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=10.0)
async with httpx.AsyncClient(timeout=timeout_config) as client:
response = await client.put(
url,
json=update_data,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
logger.info("Successfully updated tenant business model via gateway",
tenant_id=tenant_id, business_model=business_model)
else:
logger.warning("Failed to update tenant business model via gateway",
tenant_id=tenant_id,
status_code=response.status_code,
response=response.text)
except Exception as e:
logger.error("Error updating tenant business model via gateway",
tenant_id=tenant_id,
business_model=business_model,
error=str(e))
raise
# Factory function for dependency injection
def get_ai_onboarding_service() -> AIOnboardingService:
"""Get AI onboarding service instance"""
return AIOnboardingService()

View File

@@ -26,7 +26,7 @@ logger = structlog.get_logger()
# Import result schemas (dataclass definitions)
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import List, Dict, Any
@dataclass
@@ -38,6 +38,8 @@ class SalesValidationResult:
errors: List[Dict[str, Any]]
warnings: List[Dict[str, Any]]
summary: Dict[str, Any]
unique_products: int = 0
product_list: List[str] = field(default_factory=list)
@dataclass
class SalesImportResult:
@@ -99,7 +101,9 @@ class DataImportService:
invalid_records=0,
errors=[],
warnings=[],
summary={}
summary={},
unique_products=0,
product_list=[]
)
errors = []
@@ -216,6 +220,22 @@ class DataImportService:
"code": "MISSING_PRODUCT_COLUMN"
})
# Extract unique products for AI suggestions
if column_mapping.get('product') and not errors:
product_column = column_mapping['product']
unique_products_set = set()
for row in rows:
product_name = row.get(product_column, '').strip()
if product_name and len(product_name) > 0:
unique_products_set.add(product_name)
validation_result.product_list = list(unique_products_set)
validation_result.unique_products = len(unique_products_set)
logger.info(f"Extracted {validation_result.unique_products} unique products from CSV",
tenant_id=data.get("tenant_id"))
if not column_mapping.get('quantity'):
warnings.append({
"type": "missing_column",