#!/usr/bin/env python3 """ Script to populate the database with test data for orders and customers """ import os import sys import uuid from datetime import datetime, timedelta from decimal import Decimal import asyncio import random # Add the parent directory to the path to import our modules sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_session from app.models.customer import Customer, CustomerContact from app.models.order import CustomerOrder, OrderItem, OrderStatusHistory # Test tenant ID - in a real environment this would be provided TEST_TENANT_ID = "946206b3-7446-436b-b29d-f265b28d9ff5" # Sample customer data SAMPLE_CUSTOMERS = [ { "name": "María García López", "customer_type": "individual", "email": "maria.garcia@email.com", "phone": "+34 612 345 678", "city": "Madrid", "country": "España", "customer_segment": "vip", "is_active": True }, { "name": "Panadería San Juan", "business_name": "Panadería San Juan S.L.", "customer_type": "business", "email": "pedidos@panaderiasjuan.com", "phone": "+34 687 654 321", "city": "Barcelona", "country": "España", "customer_segment": "wholesale", "is_active": True }, { "name": "Carlos Rodríguez Martín", "customer_type": "individual", "email": "carlos.rodriguez@email.com", "phone": "+34 698 765 432", "city": "Valencia", "country": "España", "customer_segment": "regular", "is_active": True }, { "name": "Ana Fernández Ruiz", "customer_type": "individual", "email": "ana.fernandez@email.com", "phone": "+34 634 567 890", "city": "Sevilla", "country": "España", "customer_segment": "regular", "is_active": True }, { "name": "Café Central", "business_name": "Café Central Madrid S.L.", "customer_type": "business", "email": "compras@cafecentral.es", "phone": "+34 623 456 789", "city": "Madrid", "country": "España", "customer_segment": "wholesale", "is_active": True }, { "name": "Laura Martínez Silva", "customer_type": "individual", "email": "laura.martinez@email.com", "phone": "+34 645 789 012", "city": "Bilbao", "country": "España", "customer_segment": "regular", "is_active": False # Inactive customer for testing } ] # Sample products (in a real system these would come from a products service) SAMPLE_PRODUCTS = [ {"id": str(uuid.uuid4()), "name": "Pan Integral Artesano", "price": Decimal("2.50"), "category": "Panadería"}, {"id": str(uuid.uuid4()), "name": "Croissant de Mantequilla", "price": Decimal("1.80"), "category": "Bollería"}, {"id": str(uuid.uuid4()), "name": "Tarta de Santiago", "price": Decimal("18.90"), "category": "Repostería"}, {"id": str(uuid.uuid4()), "name": "Magdalenas de Limón", "price": Decimal("0.90"), "category": "Bollería"}, {"id": str(uuid.uuid4()), "name": "Empanada de Atún", "price": Decimal("3.50"), "category": "Salado"}, {"id": str(uuid.uuid4()), "name": "Brownie de Chocolate", "price": Decimal("3.20"), "category": "Repostería"}, {"id": str(uuid.uuid4()), "name": "Baguette Francesa", "price": Decimal("2.80"), "category": "Panadería"}, {"id": str(uuid.uuid4()), "name": "Palmera de Chocolate", "price": Decimal("2.40"), "category": "Bollería"}, ] async def create_customers(session: AsyncSession) -> list[Customer]: """Create sample customers""" customers = [] for i, customer_data in enumerate(SAMPLE_CUSTOMERS): customer = Customer( tenant_id=TEST_TENANT_ID, customer_code=f"CUST-{i+1:04d}", name=customer_data["name"], business_name=customer_data.get("business_name"), customer_type=customer_data["customer_type"], email=customer_data["email"], phone=customer_data["phone"], city=customer_data["city"], country=customer_data["country"], is_active=customer_data["is_active"], preferred_delivery_method="delivery" if random.choice([True, False]) else "pickup", payment_terms=random.choice(["immediate", "net_30"]), customer_segment=customer_data["customer_segment"], priority_level=random.choice(["normal", "high"]) if customer_data["customer_segment"] == "vip" else "normal", discount_percentage=Decimal("5.0") if customer_data["customer_segment"] == "vip" else Decimal("10.0") if customer_data["customer_segment"] == "wholesale" else Decimal("0.0"), total_orders=random.randint(5, 50), total_spent=Decimal(str(random.randint(100, 5000))), average_order_value=Decimal(str(random.randint(15, 150))), last_order_date=datetime.now() - timedelta(days=random.randint(1, 30)) ) session.add(customer) customers.append(customer) await session.commit() return customers async def create_orders(session: AsyncSession, customers: list[Customer]): """Create sample orders in different statuses""" order_statuses = [ "pending", "confirmed", "in_production", "ready", "out_for_delivery", "delivered", "cancelled" ] order_types = ["standard", "rush", "recurring", "special"] priorities = ["low", "normal", "high"] delivery_methods = ["delivery", "pickup"] payment_statuses = ["pending", "partial", "paid", "failed"] for i in range(25): # Create 25 sample orders customer = random.choice(customers) order_status = random.choice(order_statuses) # Create order date in the last 30 days order_date = datetime.now() - timedelta(days=random.randint(0, 30)) # Create delivery date (1-7 days after order date) delivery_date = order_date + timedelta(days=random.randint(1, 7)) order = CustomerOrder( tenant_id=TEST_TENANT_ID, order_number=f"ORD-{datetime.now().year}-{i+1:04d}", customer_id=customer.id, status=order_status, order_type=random.choice(order_types), priority=random.choice(priorities), order_date=order_date, requested_delivery_date=delivery_date, confirmed_delivery_date=delivery_date if order_status not in ["pending", "cancelled"] else None, actual_delivery_date=delivery_date if order_status == "delivered" else None, delivery_method=random.choice(delivery_methods), delivery_instructions=random.choice([ None, "Dejar en recepción", "Llamar al timbre", "Cuidado con el escalón" ]), discount_percentage=customer.discount_percentage, payment_status=random.choice(payment_statuses) if order_status != "cancelled" else "failed", payment_method=random.choice(["cash", "card", "bank_transfer"]), payment_terms=customer.payment_terms, special_instructions=random.choice([ None, "Sin gluten", "Decoración especial", "Entrega temprano", "Cliente VIP" ]), order_source=random.choice(["manual", "online", "phone"]), sales_channel=random.choice(["direct", "wholesale"]), customer_notified_confirmed=order_status not in ["pending", "cancelled"], customer_notified_ready=order_status in ["ready", "out_for_delivery", "delivered"], customer_notified_delivered=order_status == "delivered", quality_score=Decimal(str(random.randint(70, 100) / 10)) if order_status == "delivered" else None, customer_rating=random.randint(3, 5) if order_status == "delivered" else None ) session.add(order) await session.flush() # Flush to get the order ID # Create order items num_items = random.randint(1, 5) subtotal = Decimal("0.00") for _ in range(num_items): product = random.choice(SAMPLE_PRODUCTS) quantity = random.randint(1, 10) unit_price = product["price"] line_total = unit_price * quantity order_item = OrderItem( order_id=order.id, product_id=product["id"], product_name=product["name"], product_category=product["category"], quantity=quantity, unit_of_measure="unidad", unit_price=unit_price, line_discount=Decimal("0.00"), line_total=line_total, status=order_status if order_status != "cancelled" else "cancelled" ) session.add(order_item) subtotal += line_total # Calculate financial totals discount_amount = subtotal * (order.discount_percentage / 100) tax_amount = (subtotal - discount_amount) * Decimal("0.21") # 21% VAT delivery_fee = Decimal("3.50") if order.delivery_method == "delivery" and subtotal < 25 else Decimal("0.00") total_amount = subtotal - discount_amount + tax_amount + delivery_fee # Update order with calculated totals order.subtotal = subtotal order.discount_amount = discount_amount order.tax_amount = tax_amount order.delivery_fee = delivery_fee order.total_amount = total_amount # Create status history status_history = OrderStatusHistory( order_id=order.id, from_status=None, to_status=order_status, event_type="status_change", event_description=f"Order created with status: {order_status}", change_source="system", changed_at=order_date, customer_notified=order_status != "pending" ) session.add(status_history) # Add additional status changes for non-pending orders if order_status != "pending": current_date = order_date for status in ["confirmed", "in_production", "ready"]: if order_statuses.index(status) <= order_statuses.index(order_status): current_date += timedelta(hours=random.randint(2, 12)) status_change = OrderStatusHistory( order_id=order.id, from_status="pending" if status == "confirmed" else None, to_status=status, event_type="status_change", event_description=f"Order status changed to: {status}", change_source="manual", changed_at=current_date, customer_notified=True ) session.add(status_change) await session.commit() async def main(): """Main function to seed the database""" print("🌱 Starting database seeding...") async for session in get_session(): try: print("📋 Creating customers...") customers = await create_customers(session) print(f"✅ Created {len(customers)} customers") print("📦 Creating orders...") await create_orders(session, customers) print("✅ Created orders with different statuses") print("🎉 Database seeding completed successfully!") except Exception as e: print(f"❌ Error during seeding: {e}") await session.rollback() raise finally: await session.close() if __name__ == "__main__": asyncio.run(main())