217 lines
5.8 KiB
Python
217 lines
5.8 KiB
Python
"""
|
|
File Utility Functions
|
|
Utilities for secure file operations including checksum verification
|
|
"""
|
|
|
|
import hashlib
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def calculate_file_checksum(file_path: str, algorithm: str = "sha256") -> str:
|
|
"""
|
|
Calculate checksum of a file.
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
algorithm: Hash algorithm (sha256, md5, etc.)
|
|
|
|
Returns:
|
|
Hexadecimal checksum string
|
|
|
|
Raises:
|
|
FileNotFoundError: If file doesn't exist
|
|
ValueError: If algorithm not supported
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
|
|
|
try:
|
|
hash_func = hashlib.new(algorithm)
|
|
except ValueError:
|
|
raise ValueError(f"Unsupported hash algorithm: {algorithm}")
|
|
|
|
# Read file in chunks to handle large files efficiently
|
|
with open(file_path, 'rb') as f:
|
|
while chunk := f.read(8192):
|
|
hash_func.update(chunk)
|
|
|
|
return hash_func.hexdigest()
|
|
|
|
|
|
def verify_file_checksum(file_path: str, expected_checksum: str, algorithm: str = "sha256") -> bool:
|
|
"""
|
|
Verify file matches expected checksum.
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
expected_checksum: Expected checksum value
|
|
algorithm: Hash algorithm used
|
|
|
|
Returns:
|
|
True if checksum matches, False otherwise
|
|
"""
|
|
try:
|
|
actual_checksum = calculate_file_checksum(file_path, algorithm)
|
|
matches = actual_checksum == expected_checksum
|
|
|
|
if matches:
|
|
logger.debug(f"Checksum verified for {file_path}")
|
|
else:
|
|
logger.warning(
|
|
f"Checksum mismatch for {file_path}",
|
|
expected=expected_checksum,
|
|
actual=actual_checksum
|
|
)
|
|
|
|
return matches
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error verifying checksum for {file_path}: {e}")
|
|
return False
|
|
|
|
|
|
def get_file_size(file_path: str) -> int:
|
|
"""
|
|
Get file size in bytes.
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
|
|
Returns:
|
|
File size in bytes
|
|
|
|
Raises:
|
|
FileNotFoundError: If file doesn't exist
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
|
|
|
return os.path.getsize(file_path)
|
|
|
|
|
|
def ensure_directory_exists(directory: str) -> Path:
|
|
"""
|
|
Ensure directory exists, create if necessary.
|
|
|
|
Args:
|
|
directory: Directory path
|
|
|
|
Returns:
|
|
Path object for directory
|
|
"""
|
|
path = Path(directory)
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def safe_file_delete(file_path: str) -> bool:
|
|
"""
|
|
Safely delete a file, logging any errors.
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
|
|
Returns:
|
|
True if deleted successfully, False otherwise
|
|
"""
|
|
try:
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
logger.info(f"Deleted file: {file_path}")
|
|
return True
|
|
else:
|
|
logger.warning(f"File not found for deletion: {file_path}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error deleting file {file_path}: {e}")
|
|
return False
|
|
|
|
|
|
def get_file_metadata(file_path: str) -> dict:
|
|
"""
|
|
Get comprehensive file metadata.
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
|
|
Returns:
|
|
Dictionary with file metadata
|
|
|
|
Raises:
|
|
FileNotFoundError: If file doesn't exist
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
|
|
|
stat = os.stat(file_path)
|
|
|
|
return {
|
|
"path": file_path,
|
|
"size_bytes": stat.st_size,
|
|
"created_at": stat.st_ctime,
|
|
"modified_at": stat.st_mtime,
|
|
"accessed_at": stat.st_atime,
|
|
"is_file": os.path.isfile(file_path),
|
|
"is_dir": os.path.isdir(file_path),
|
|
"exists": True
|
|
}
|
|
|
|
|
|
class ChecksummedFile:
|
|
"""
|
|
Context manager for working with checksummed files.
|
|
Automatically calculates and stores checksum when file is written.
|
|
"""
|
|
|
|
def __init__(self, file_path: str, checksum_path: Optional[str] = None, algorithm: str = "sha256"):
|
|
"""
|
|
Initialize checksummed file handler.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
checksum_path: Path to store checksum (default: file_path + '.checksum')
|
|
algorithm: Hash algorithm to use
|
|
"""
|
|
self.file_path = file_path
|
|
self.checksum_path = checksum_path or f"{file_path}.checksum"
|
|
self.algorithm = algorithm
|
|
self.checksum: Optional[str] = None
|
|
|
|
def calculate_and_save_checksum(self) -> str:
|
|
"""Calculate checksum and save to file"""
|
|
self.checksum = calculate_file_checksum(self.file_path, self.algorithm)
|
|
|
|
with open(self.checksum_path, 'w') as f:
|
|
f.write(f"{self.checksum} {os.path.basename(self.file_path)}\n")
|
|
|
|
logger.info(f"Saved checksum for {self.file_path}: {self.checksum}")
|
|
return self.checksum
|
|
|
|
def load_and_verify_checksum(self) -> bool:
|
|
"""Load expected checksum and verify file"""
|
|
try:
|
|
with open(self.checksum_path, 'r') as f:
|
|
expected_checksum = f.read().strip().split()[0]
|
|
|
|
return verify_file_checksum(self.file_path, expected_checksum, self.algorithm)
|
|
|
|
except FileNotFoundError:
|
|
logger.warning(f"Checksum file not found: {self.checksum_path}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error loading checksum: {e}")
|
|
return False
|
|
|
|
def get_stored_checksum(self) -> Optional[str]:
|
|
"""Get checksum from stored file"""
|
|
try:
|
|
with open(self.checksum_path, 'r') as f:
|
|
return f.read().strip().split()[0]
|
|
except FileNotFoundError:
|
|
return None
|