Files
bakery-ia/services/production/scripts/demo/seed_demo_equipment.py
2025-10-17 07:31:14 +02:00

236 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Demo Equipment Seeding Script for Production Service
Creates production equipment for demo template tenants
This script runs as a Kubernetes init job inside the production-service container.
"""
import asyncio
import uuid
import sys
import os
import json
from datetime import datetime, timezone, timedelta
from pathlib import Path
# Add app to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
import structlog
from app.models.production import Equipment, EquipmentType, EquipmentStatus
# Configure logging
logger = structlog.get_logger()
# Base demo tenant IDs
DEMO_TENANT_SAN_PABLO = uuid.UUID("a1b2c3d4-e5f6-47a8-b9c0-d1e2f3a4b5c6") # Individual bakery
DEMO_TENANT_LA_ESPIGA = uuid.UUID("b2c3d4e5-f6a7-48b9-c0d1-e2f3a4b5c6d7") # Central bakery
# Base reference date for date calculations
BASE_REFERENCE_DATE = datetime(2025, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
def load_equipment_data():
"""Load equipment data from JSON file"""
data_file = Path(__file__).parent / "equipos_es.json"
if not data_file.exists():
raise FileNotFoundError(f"Equipment data file not found: {data_file}")
with open(data_file, 'r', encoding='utf-8') as f:
return json.load(f)
def calculate_date_from_offset(offset_days: int) -> datetime:
"""Calculate a date based on offset from BASE_REFERENCE_DATE"""
return BASE_REFERENCE_DATE + timedelta(days=offset_days)
async def seed_equipment_for_tenant(
db: AsyncSession,
tenant_id: uuid.UUID,
tenant_name: str,
equipment_list: list
):
"""Seed equipment for a specific tenant"""
logger.info(f"Seeding equipment for: {tenant_name}", tenant_id=str(tenant_id))
# Check if equipment already exists
result = await db.execute(
select(Equipment).where(Equipment.tenant_id == tenant_id).limit(1)
)
existing = result.scalar_one_or_none()
if existing:
logger.info(f"Equipment already exists for {tenant_name}, skipping seed")
return {"tenant_id": str(tenant_id), "equipment_created": 0, "skipped": True}
count = 0
for equip_data in equipment_list:
# Calculate dates from offsets
install_date = None
if "install_date_offset_days" in equip_data:
install_date = calculate_date_from_offset(equip_data["install_date_offset_days"])
last_maintenance_date = None
if "last_maintenance_offset_days" in equip_data:
last_maintenance_date = calculate_date_from_offset(equip_data["last_maintenance_offset_days"])
# Calculate next maintenance date
next_maintenance_date = None
if last_maintenance_date and equip_data.get("maintenance_interval_days"):
next_maintenance_date = last_maintenance_date + timedelta(
days=equip_data["maintenance_interval_days"]
)
# Map status string to enum
status_mapping = {
"operational": EquipmentStatus.OPERATIONAL,
"warning": EquipmentStatus.WARNING,
"maintenance": EquipmentStatus.MAINTENANCE,
"down": EquipmentStatus.DOWN
}
status = status_mapping.get(equip_data["status"], EquipmentStatus.OPERATIONAL)
# Map type string to enum
type_mapping = {
"oven": EquipmentType.OVEN,
"mixer": EquipmentType.MIXER,
"proofer": EquipmentType.PROOFER,
"freezer": EquipmentType.FREEZER,
"packaging": EquipmentType.PACKAGING,
"other": EquipmentType.OTHER
}
equipment_type = type_mapping.get(equip_data["type"], EquipmentType.OTHER)
# Create equipment
equipment = Equipment(
id=uuid.UUID(equip_data["id"]),
tenant_id=tenant_id,
name=equip_data["name"],
type=equipment_type,
model=equip_data.get("model"),
serial_number=equip_data.get("serial_number"),
location=equip_data.get("location"),
status=status,
power_kw=equip_data.get("power_kw"),
capacity=equip_data.get("capacity"),
efficiency_percentage=equip_data.get("efficiency_percentage"),
current_temperature=equip_data.get("current_temperature"),
target_temperature=equip_data.get("target_temperature"),
maintenance_interval_days=equip_data.get("maintenance_interval_days"),
last_maintenance_date=last_maintenance_date,
next_maintenance_date=next_maintenance_date,
install_date=install_date,
notes=equip_data.get("notes"),
created_at=BASE_REFERENCE_DATE,
updated_at=BASE_REFERENCE_DATE
)
db.add(equipment)
count += 1
logger.debug(f"Created equipment: {equipment.name}", equipment_id=str(equipment.id))
await db.commit()
logger.info(f"Successfully created {count} equipment items for {tenant_name}")
return {
"tenant_id": str(tenant_id),
"equipment_created": count,
"skipped": False
}
async def seed_all(db: AsyncSession):
"""Seed all demo tenants with equipment"""
logger.info("Starting demo equipment seed process")
# Load equipment data
data = load_equipment_data()
results = []
# Seed San Pablo (Individual Bakery)
result_san_pablo = await seed_equipment_for_tenant(
db,
DEMO_TENANT_SAN_PABLO,
"San Pablo - Individual Bakery",
data["equipos_individual_bakery"]
)
results.append(result_san_pablo)
# Seed La Espiga (Central Bakery)
result_la_espiga = await seed_equipment_for_tenant(
db,
DEMO_TENANT_LA_ESPIGA,
"La Espiga - Central Bakery",
data["equipos_central_bakery"]
)
results.append(result_la_espiga)
total_created = sum(r["equipment_created"] for r in results)
return {
"results": results,
"total_equipment_created": total_created,
"status": "completed"
}
async def main():
"""Main execution function"""
# Get database URL from environment
database_url = os.getenv("PRODUCTION_DATABASE_URL")
if not database_url:
logger.error("PRODUCTION_DATABASE_URL environment variable must be set")
return 1
# Ensure asyncpg driver
if database_url.startswith("postgresql://"):
database_url = database_url.replace("postgresql://", "postgresql+asyncpg://", 1)
# Create async engine
engine = create_async_engine(database_url, echo=False)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with async_session() as session:
result = await seed_all(session)
logger.info(
"Equipment seed completed successfully!",
total_equipment=result["total_equipment_created"],
status=result["status"]
)
# Print summary
print("\n" + "="*60)
print("DEMO EQUIPMENT SEED SUMMARY")
print("="*60)
for tenant_result in result["results"]:
tenant_id = tenant_result["tenant_id"]
count = tenant_result["equipment_created"]
skipped = tenant_result.get("skipped", False)
status = "SKIPPED (already exists)" if skipped else f"CREATED {count} items"
print(f"Tenant {tenant_id}: {status}")
print(f"\nTotal Equipment Created: {result['total_equipment_created']}")
print("="*60 + "\n")
return 0
except Exception as e:
logger.error(f"Equipment seed failed: {str(e)}", exc_info=True)
return 1
finally:
await engine.dispose()
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)