#!/usr/bin/env python3 """ Migrate all demo JSON files from offset_days/ISO timestamps to BASE_TS markers. This script performs a one-time migration to align with the new architecture. """ import json import sys from pathlib import Path from datetime import datetime, timezone from typing import Any, Dict # Base reference date used in current JSON files BASE_REFERENCE_ISO = "2025-01-15T06:00:00Z" BASE_REFERENCE = datetime.fromisoformat(BASE_REFERENCE_ISO.replace('Z', '+00:00')) # Date fields to transform by entity type DATE_FIELDS_MAP = { 'purchase_orders': [ 'order_date', 'required_delivery_date', 'estimated_delivery_date', 'expected_delivery_date', 'sent_to_supplier_at', 'supplier_confirmation_date', 'created_at', 'updated_at' ], 'batches': [ 'planned_start_time', 'planned_end_time', 'actual_start_time', 'actual_end_time', 'completed_at', 'created_at', 'updated_at' ], 'equipment': [ 'install_date', 'last_maintenance_date', 'next_maintenance_date', 'created_at', 'updated_at' ], 'ingredients': ['created_at', 'updated_at'], 'stock_batches': [ 'received_date', 'expiration_date', 'best_before_date', 'created_at', 'updated_at' ], 'customers': ['last_order_date', 'created_at', 'updated_at'], 'orders': [ 'order_date', 'delivery_date', 'promised_date', 'completed_at', 'created_at', 'updated_at' ], 'completed_orders': [ 'order_date', 'delivery_date', 'promised_date', 'completed_at', 'created_at', 'updated_at' ], 'forecasts': ['forecast_date', 'created_at', 'updated_at'], 'prediction_batches': ['prediction_date', 'created_at', 'updated_at'], 'sales_data': ['created_at', 'updated_at'], 'quality_controls': ['created_at', 'updated_at'], 'quality_alerts': ['created_at', 'updated_at'], 'customer_orders': [ 'order_date', 'delivery_date', 'promised_date', 'completed_at', 'created_at', 'updated_at' ], 'order_items': ['created_at', 'updated_at'], 'procurement_requirements': ['created_at', 'updated_at'], 'replenishment_plans': ['created_at', 'updated_at'], 'production_schedules': ['schedule_date', 'created_at', 'updated_at'], 'users': ['created_at', 'updated_at'], 'stock': ['expiration_date', 'received_date', 'created_at', 'updated_at'], 'recipes': ['created_at', 'updated_at'], 'recipe_ingredients': ['created_at', 'updated_at'], 'suppliers': ['created_at', 'updated_at'], 'production_batches': ['start_time', 'end_time', 'created_at', 'updated_at'], 'purchase_order_items': ['created_at', 'updated_at'], # Enterprise children files 'local_inventory': ['expiration_date', 'received_date', 'created_at', 'updated_at'], 'local_sales': ['created_at', 'updated_at'], 'local_orders': ['order_date', 'delivery_date', 'created_at', 'updated_at'], 'local_production_batches': [ 'planned_start_time', 'planned_end_time', 'actual_start_time', 'actual_end_time', 'created_at', 'updated_at' ], 'local_forecasts': ['forecast_date', 'created_at', 'updated_at'] } def calculate_offset_from_base(iso_timestamp: str) -> str: """ Calculate BASE_TS offset from an ISO timestamp. Args: iso_timestamp: ISO 8601 timestamp string Returns: BASE_TS marker string (e.g., "BASE_TS + 2d 3h") """ try: target_time = datetime.fromisoformat(iso_timestamp.replace('Z', '+00:00')) except (ValueError, AttributeError): return None # Calculate offset from BASE_REFERENCE offset = target_time - BASE_REFERENCE total_seconds = int(offset.total_seconds()) if total_seconds == 0: return "BASE_TS" # Convert to days, hours, minutes days = offset.days remaining_seconds = total_seconds - (days * 86400) hours = remaining_seconds // 3600 minutes = (remaining_seconds % 3600) // 60 # Build BASE_TS expression parts = [] if days != 0: parts.append(f"{abs(days)}d") if hours != 0: parts.append(f"{abs(hours)}h") if minutes != 0: parts.append(f"{abs(minutes)}m") if not parts: return "BASE_TS" operator = "+" if total_seconds > 0 else "-" return f"BASE_TS {operator} {' '.join(parts)}" def migrate_date_field(value: Any, field_name: str) -> Any: """ Migrate a single date field to BASE_TS format. Args: value: Field value (can be ISO string, offset_days dict, or None) field_name: Name of the field being migrated Returns: BASE_TS marker string or original value (if already BASE_TS or None) """ if value is None: return None # Already a BASE_TS marker - keep as-is if isinstance(value, str) and value.startswith("BASE_TS"): return value # Handle ISO timestamp strings if isinstance(value, str) and ('T' in value or 'Z' in value): return calculate_offset_from_base(value) # Handle offset_days dictionary format (from inventory stock) if isinstance(value, dict) and 'offset_days' in value: days = value.get('offset_days', 0) hour = value.get('hour', 0) minute = value.get('minute', 0) parts = [] if days != 0: parts.append(f"{abs(days)}d") if hour != 0: parts.append(f"{abs(hour)}h") if minute != 0: parts.append(f"{abs(minute)}m") if not parts: return "BASE_TS" operator = "+" if days >= 0 else "-" return f"BASE_TS {operator} {' '.join(parts)}" return None def migrate_entity(entity: Dict[str, Any], date_fields: list) -> Dict[str, Any]: """ Migrate all date fields in an entity to BASE_TS format. Also removes *_offset_days fields as they're now redundant. Args: entity: Entity dictionary date_fields: List of date field names to migrate Returns: Migrated entity dictionary """ migrated = entity.copy() # Remove offset_days fields and migrate their values offset_fields_to_remove = [] for key in list(migrated.keys()): if key.endswith('_offset_days'): # Extract base field name base_field = key.replace('_offset_days', '') # Calculate BASE_TS marker offset_days = migrated[key] if offset_days == 0: migrated[base_field] = "BASE_TS" else: operator = "+" if offset_days > 0 else "-" migrated[base_field] = f"BASE_TS {operator} {abs(offset_days)}d" offset_fields_to_remove.append(key) # Remove offset_days fields for key in offset_fields_to_remove: del migrated[key] # Migrate ISO timestamp fields for field in date_fields: if field in migrated: migrated[field] = migrate_date_field(migrated[field], field) return migrated def migrate_json_file(file_path: Path) -> bool: """ Migrate a single JSON file to BASE_TS format. Args: file_path: Path to JSON file Returns: True if file was modified, False otherwise """ print(f"\nšŸ“„ Processing: {file_path.relative_to(file_path.parents[3])}") try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) except Exception as e: print(f" āŒ Failed to load: {e}") return False modified = False # Migrate each entity type for entity_type, date_fields in DATE_FIELDS_MAP.items(): if entity_type in data: original_count = len(data[entity_type]) data[entity_type] = [ migrate_entity(entity, date_fields) for entity in data[entity_type] ] if original_count > 0: print(f" āœ… Migrated {original_count} {entity_type}") modified = True if modified: # Write back with pretty formatting with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f" šŸ’¾ File updated successfully") return modified def main(): """Main migration function""" # Find all JSON files in demo fixtures root_dir = Path(__file__).parent.parent fixtures_dir = root_dir / "shared" / "demo" / "fixtures" if not fixtures_dir.exists(): print(f"āŒ Fixtures directory not found: {fixtures_dir}") return 1 # Find all JSON files json_files = list(fixtures_dir.rglob("*.json")) if not json_files: print(f"āŒ No JSON files found in {fixtures_dir}") return 1 print(f"šŸ” Found {len(json_files)} JSON files to migrate") # Migrate each file total_modified = 0 for json_file in sorted(json_files): if migrate_json_file(json_file): total_modified += 1 print(f"\nāœ… Migration complete: {total_modified}/{len(json_files)} files modified") return 0 if __name__ == "__main__": sys.exit(main())