#!/usr/bin/env python3 """ Cross-reference validation script for Bakery-IA demo data. Validates UUID references across different services and fixtures. """ import json import os import sys from pathlib import Path from typing import Dict, List, Any, Optional from uuid import UUID # Configuration BASE_DIR = Path(__file__).parent.parent / "shared" / "demo" FIXTURES_DIR = BASE_DIR / "fixtures" / "professional" METADATA_DIR = BASE_DIR / "metadata" class ValidationError(Exception): """Custom exception for validation errors.""" pass class CrossReferenceValidator: def __init__(self): self.fixtures = {} self.cross_refs_map = {} self.errors = [] self.warnings = [] def load_fixtures(self): """Load all fixture files.""" fixture_files = [ "01-tenant.json", "02-auth.json", "03-inventory.json", "04-recipes.json", "05-suppliers.json", "06-production.json", "07-procurement.json", "08-orders.json", "09-sales.json", "10-forecasting.json" ] for filename in fixture_files: filepath = FIXTURES_DIR / filename if filepath.exists(): try: with open(filepath, 'r', encoding='utf-8') as f: self.fixtures[filename] = json.load(f) except (json.JSONDecodeError, IOError) as e: self.errors.append(f"Failed to load {filename}: {str(e)}") else: self.warnings.append(f"Fixture file {filename} not found") def load_cross_refs_map(self): """Load cross-reference mapping from metadata.""" map_file = METADATA_DIR / "cross_refs_map.json" if map_file.exists(): try: with open(map_file, 'r', encoding='utf-8') as f: data = json.load(f) self.cross_refs_map = data.get("references", []) except (json.JSONDecodeError, IOError) as e: self.errors.append(f"Failed to load cross_refs_map.json: {str(e)}") else: self.errors.append("cross_refs_map.json not found") def is_valid_uuid(self, uuid_str: str) -> bool: """Check if a string is a valid UUID.""" try: UUID(uuid_str) return True except ValueError: return False def get_entity_by_id(self, service: str, entity_type: str, entity_id: str) -> Optional[Dict]: """Find an entity by ID in the loaded fixtures.""" # Map service names to fixture files service_to_fixture = { "inventory": "03-inventory.json", "recipes": "04-recipes.json", "suppliers": "05-suppliers.json", "production": "06-production.json", "procurement": "07-procurement.json", "orders": "08-orders.json", "sales": "09-sales.json", "forecasting": "10-forecasting.json" } if service not in service_to_fixture: return None fixture_file = service_to_fixture[service] if fixture_file not in self.fixtures: return None fixture_data = self.fixtures[fixture_file] # Find the entity based on entity_type if entity_type == "Ingredient": return self._find_in_ingredients(fixture_data, entity_id) elif entity_type == "Recipe": return self._find_in_recipes(fixture_data, entity_id) elif entity_type == "Supplier": return self._find_in_suppliers(fixture_data, entity_id) elif entity_type == "ProductionBatch": return self._find_in_production_batches(fixture_data, entity_id) elif entity_type == "PurchaseOrder": return self._find_in_purchase_orders(fixture_data, entity_id) elif entity_type == "Customer": return self._find_in_customers(fixture_data, entity_id) elif entity_type == "SalesData": return self._find_in_sales_data(fixture_data, entity_id) elif entity_type == "Forecast": return self._find_in_forecasts(fixture_data, entity_id) return None def _find_in_ingredients(self, data: Dict, entity_id: str) -> Optional[Dict]: """Find ingredient by ID.""" if "ingredients" in data: for ingredient in data["ingredients"]: if ingredient.get("id") == entity_id: return ingredient return None def _find_in_recipes(self, data: Dict, entity_id: str) -> Optional[Dict]: """Find recipe by ID.""" if "recipes" in data: for recipe in data["recipes"]: if recipe.get("id") == entity_id: return recipe return None def _find_in_suppliers(self, data: Dict, entity_id: str) -> Optional[Dict]: """Find supplier by ID.""" if "suppliers" in data: for supplier in data["suppliers"]: if supplier.get("id") == entity_id: return supplier return None def _find_in_production_batches(self, data: Dict, entity_id: str) -> Optional[Dict]: """Find production batch by ID.""" if "production_batches" in data: for batch in data["production_batches"]: if batch.get("id") == entity_id: return batch return None def _find_in_purchase_orders(self, data: Dict, entity_id: str) -> Optional[Dict]: """Find purchase order by ID.""" if "purchase_orders" in data: for po in data["purchase_orders"]: if po.get("id") == entity_id: return po return None def _find_in_customers(self, data: Dict, entity_id: str) -> Optional[Dict]: """Find customer by ID.""" if "customers" in data: for customer in data["customers"]: if customer.get("id") == entity_id: return customer return None def _find_in_sales_data(self, data: Dict, entity_id: str) -> Optional[Dict]: """Find sales data by ID.""" if "sales_data" in data: for sales in data["sales_data"]: if sales.get("id") == entity_id: return sales return None def _find_in_forecasts(self, data: Dict, entity_id: str) -> Optional[Dict]: """Find forecast by ID.""" if "forecasts" in data: for forecast in data["forecasts"]: if forecast.get("id") == entity_id: return forecast return None def validate_cross_references(self): """Validate all cross-references defined in the map.""" for ref in self.cross_refs_map: from_service = ref["from_service"] from_entity = ref["from_entity"] from_field = ref["from_field"] to_service = ref["to_service"] to_entity = ref["to_entity"] required = ref.get("required", False) # Find all entities of the "from" type entities = self._get_all_entities(from_service, from_entity) for entity in entities: ref_id = entity.get(from_field) if not ref_id: if required: self.errors.append( f"{from_entity} {entity.get('id')} missing required field {from_field}" ) continue if not self.is_valid_uuid(ref_id): self.errors.append( f"{from_entity} {entity.get('id')} has invalid UUID in {from_field}: {ref_id}" ) continue # Check if the referenced entity exists target_entity = self.get_entity_by_id(to_service, to_entity, ref_id) if not target_entity: if required: self.errors.append( f"{from_entity} {entity.get('id')} references non-existent {to_entity} {ref_id}" ) else: self.warnings.append( f"{from_entity} {entity.get('id')} references non-existent {to_entity} {ref_id}" ) continue # Check filters if specified to_filter = ref.get("to_filter", {}) if to_filter: self._validate_filters_case_insensitive(target_entity, to_filter, entity, ref) def _get_all_entities(self, service: str, entity_type: str) -> List[Dict]: """Get all entities of a specific type from a service.""" entities = [] # Map entity types to fixture file and path entity_mapping = { "ProductionBatch": ("06-production.json", "production_batches"), "RecipeIngredient": ("04-recipes.json", "recipe_ingredients"), "Stock": ("03-inventory.json", "stock"), "PurchaseOrder": ("07-procurement.json", "purchase_orders"), "PurchaseOrderItem": ("07-procurement.json", "purchase_order_items"), "OrderItem": ("08-orders.json", "order_items"), "SalesData": ("09-sales.json", "sales_data"), "Forecast": ("10-forecasting.json", "forecasts") } if entity_type in entity_mapping: fixture_file, path = entity_mapping[entity_type] if fixture_file in self.fixtures: data = self.fixtures[fixture_file] if path in data: return data[path] return entities def _validate_filters_case_insensitive(self, target_entity: Dict, filters: Dict, source_entity: Dict, ref: Dict): """Validate that target entity matches specified filters (case-insensitive).""" for filter_key, filter_value in filters.items(): actual_value = target_entity.get(filter_key) if actual_value is None: self.errors.append( f"{source_entity.get('id')} references {target_entity.get('id')} " f"but {filter_key} is missing (expected {filter_value})" ) elif str(actual_value).lower() != str(filter_value).lower(): self.errors.append( f"{source_entity.get('id')} references {target_entity.get('id')} " f"but {filter_key}={actual_value} != {filter_value}" ) def validate_required_fields(self): """Validate required fields in all fixtures.""" required_fields_map = { "01-tenant.json": { "tenant": ["id", "name", "subscription_tier"] }, "02-auth.json": { "users": ["id", "name", "email", "role"] }, "03-inventory.json": { "ingredients": ["id", "name", "product_type", "ingredient_category"], "stock": ["id", "ingredient_id", "quantity", "location"] }, "04-recipes.json": { "recipes": ["id", "name", "status", "difficulty_level"], "recipe_ingredients": ["id", "recipe_id", "ingredient_id", "quantity"] }, "05-suppliers.json": { "suppliers": ["id", "name", "supplier_code", "status"] }, "06-production.json": { "equipment": ["id", "name", "type", "status"], "production_batches": ["id", "product_id", "status", "start_time"] }, "07-procurement.json": { "purchase_orders": ["id", "po_number", "supplier_id", "status"], "purchase_order_items": ["id", "purchase_order_id", "inventory_product_id", "ordered_quantity"] }, "08-orders.json": { "customers": ["id", "customer_code", "name", "customer_type"], "customer_orders": ["id", "customer_id", "order_number", "status"], "order_items": ["id", "order_id", "product_id", "quantity"] }, "09-sales.json": { "sales_data": ["id", "product_id", "quantity_sold", "unit_price"] }, "10-forecasting.json": { "forecasts": ["id", "product_id", "forecast_date", "predicted_quantity"] } } for filename, required_structure in required_fields_map.items(): if filename in self.fixtures: data = self.fixtures[filename] for entity_type, required_fields in required_structure.items(): if entity_type in data: entities = data[entity_type] if isinstance(entities, list): for entity in entities: if isinstance(entity, dict): for field in required_fields: if field not in entity: entity_id = entity.get('id', 'unknown') self.errors.append( f"{filename}: {entity_type} {entity_id} missing required field {field}" ) elif isinstance(entities, dict): # Handle tenant which is a single dict for field in required_fields: if field not in entities: entity_id = entities.get('id', 'unknown') self.errors.append( f"{filename}: {entity_type} {entity_id} missing required field {field}" ) def validate_date_formats(self): """Validate that all dates are in ISO format.""" date_fields = [ "created_at", "updated_at", "start_time", "end_time", "order_date", "delivery_date", "expected_delivery_date", "sale_date", "forecast_date", "contract_start_date", "contract_end_date" ] for filename, data in self.fixtures.items(): self._check_date_fields(data, date_fields, filename) def _check_date_fields(self, data: Any, date_fields: List[str], context: str): """Recursively check for date fields.""" if isinstance(data, dict): for key, value in data.items(): if key in date_fields and isinstance(value, str): if not self._is_iso_format(value): self.errors.append(f"{context}: Invalid date format in {key}: {value}") elif isinstance(value, (dict, list)): self._check_date_fields(value, date_fields, context) elif isinstance(data, list): for item in data: self._check_date_fields(item, date_fields, context) def _is_iso_format(self, date_str: str) -> bool: """Check if a string is in ISO format or BASE_TS marker.""" try: # Accept BASE_TS markers (e.g., "BASE_TS - 1h", "BASE_TS + 2d") if date_str.startswith("BASE_TS"): return True # Accept offset-based dates (used in some fixtures) if "_offset_" in date_str: return True # Simple check for ISO format (YYYY-MM-DDTHH:MM:SSZ or similar) if len(date_str) < 19: return False return date_str.endswith('Z') and date_str[10] == 'T' except: return False def run_validation(self) -> bool: """Run all validation checks.""" print("šŸ” Starting cross-reference validation...") # Load data self.load_fixtures() self.load_cross_refs_map() if self.errors: print("āŒ Errors during data loading:") for error in self.errors: print(f" - {error}") return False # Run validation checks print("šŸ“‹ Validating cross-references...") self.validate_cross_references() print("šŸ“ Validating required fields...") self.validate_required_fields() print("šŸ“… Validating date formats...") self.validate_date_formats() # Report results if self.errors: print(f"\nāŒ Validation failed with {len(self.errors)} errors:") for error in self.errors: print(f" - {error}") if self.warnings: print(f"\nāš ļø {len(self.warnings)} warnings:") for warning in self.warnings: print(f" - {warning}") return False else: print("\nāœ… All validation checks passed!") if self.warnings: print(f"āš ļø {len(self.warnings)} warnings:") for warning in self.warnings: print(f" - {warning}") return True if __name__ == "__main__": validator = CrossReferenceValidator() success = validator.run_validation() sys.exit(0 if success else 1)