""" File Utility Functions Utilities for secure file operations including checksum verification """ import hashlib import os from pathlib import Path from typing import Optional import logging logger = logging.getLogger(__name__) def calculate_file_checksum(file_path: str, algorithm: str = "sha256") -> str: """ Calculate checksum of a file. Args: file_path: Path to file algorithm: Hash algorithm (sha256, md5, etc.) Returns: Hexadecimal checksum string Raises: FileNotFoundError: If file doesn't exist ValueError: If algorithm not supported """ if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") try: hash_func = hashlib.new(algorithm) except ValueError: raise ValueError(f"Unsupported hash algorithm: {algorithm}") # Read file in chunks to handle large files efficiently with open(file_path, 'rb') as f: while chunk := f.read(8192): hash_func.update(chunk) return hash_func.hexdigest() def verify_file_checksum(file_path: str, expected_checksum: str, algorithm: str = "sha256") -> bool: """ Verify file matches expected checksum. Args: file_path: Path to file expected_checksum: Expected checksum value algorithm: Hash algorithm used Returns: True if checksum matches, False otherwise """ try: actual_checksum = calculate_file_checksum(file_path, algorithm) matches = actual_checksum == expected_checksum if matches: logger.debug(f"Checksum verified for {file_path}") else: logger.warning( f"Checksum mismatch for {file_path}", expected=expected_checksum, actual=actual_checksum ) return matches except Exception as e: logger.error(f"Error verifying checksum for {file_path}: {e}") return False def get_file_size(file_path: str) -> int: """ Get file size in bytes. Args: file_path: Path to file Returns: File size in bytes Raises: FileNotFoundError: If file doesn't exist """ if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") return os.path.getsize(file_path) def ensure_directory_exists(directory: str) -> Path: """ Ensure directory exists, create if necessary. Args: directory: Directory path Returns: Path object for directory """ path = Path(directory) path.mkdir(parents=True, exist_ok=True) return path def safe_file_delete(file_path: str) -> bool: """ Safely delete a file, logging any errors. Args: file_path: Path to file Returns: True if deleted successfully, False otherwise """ try: if os.path.exists(file_path): os.remove(file_path) logger.info(f"Deleted file: {file_path}") return True else: logger.warning(f"File not found for deletion: {file_path}") return False except Exception as e: logger.error(f"Error deleting file {file_path}: {e}") return False def get_file_metadata(file_path: str) -> dict: """ Get comprehensive file metadata. Args: file_path: Path to file Returns: Dictionary with file metadata Raises: FileNotFoundError: If file doesn't exist """ if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") stat = os.stat(file_path) return { "path": file_path, "size_bytes": stat.st_size, "created_at": stat.st_ctime, "modified_at": stat.st_mtime, "accessed_at": stat.st_atime, "is_file": os.path.isfile(file_path), "is_dir": os.path.isdir(file_path), "exists": True } class ChecksummedFile: """ Context manager for working with checksummed files. Automatically calculates and stores checksum when file is written. """ def __init__(self, file_path: str, checksum_path: Optional[str] = None, algorithm: str = "sha256"): """ Initialize checksummed file handler. Args: file_path: Path to the file checksum_path: Path to store checksum (default: file_path + '.checksum') algorithm: Hash algorithm to use """ self.file_path = file_path self.checksum_path = checksum_path or f"{file_path}.checksum" self.algorithm = algorithm self.checksum: Optional[str] = None def calculate_and_save_checksum(self) -> str: """Calculate checksum and save to file""" self.checksum = calculate_file_checksum(self.file_path, self.algorithm) with open(self.checksum_path, 'w') as f: f.write(f"{self.checksum} {os.path.basename(self.file_path)}\n") logger.info(f"Saved checksum for {self.file_path}: {self.checksum}") return self.checksum def load_and_verify_checksum(self) -> bool: """Load expected checksum and verify file""" try: with open(self.checksum_path, 'r') as f: expected_checksum = f.read().strip().split()[0] return verify_file_checksum(self.file_path, expected_checksum, self.algorithm) except FileNotFoundError: logger.warning(f"Checksum file not found: {self.checksum_path}") return False except Exception as e: logger.error(f"Error loading checksum: {e}") return False def get_stored_checksum(self) -> Optional[str]: """Get checksum from stored file""" try: with open(self.checksum_path, 'r') as f: return f.read().strip().split()[0] except FileNotFoundError: return None