Initial commit - production deployment
This commit is contained in:
216
services/training/app/utils/file_utils.py
Normal file
216
services/training/app/utils/file_utils.py
Normal file
@@ -0,0 +1,216 @@
|
||||
"""
|
||||
File Utility Functions
|
||||
Utilities for secure file operations including checksum verification
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def calculate_file_checksum(file_path: str, algorithm: str = "sha256") -> str:
|
||||
"""
|
||||
Calculate checksum of a file.
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
algorithm: Hash algorithm (sha256, md5, etc.)
|
||||
|
||||
Returns:
|
||||
Hexadecimal checksum string
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
ValueError: If algorithm not supported
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f"File not found: {file_path}")
|
||||
|
||||
try:
|
||||
hash_func = hashlib.new(algorithm)
|
||||
except ValueError:
|
||||
raise ValueError(f"Unsupported hash algorithm: {algorithm}")
|
||||
|
||||
# Read file in chunks to handle large files efficiently
|
||||
with open(file_path, 'rb') as f:
|
||||
while chunk := f.read(8192):
|
||||
hash_func.update(chunk)
|
||||
|
||||
return hash_func.hexdigest()
|
||||
|
||||
|
||||
def verify_file_checksum(file_path: str, expected_checksum: str, algorithm: str = "sha256") -> bool:
|
||||
"""
|
||||
Verify file matches expected checksum.
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
expected_checksum: Expected checksum value
|
||||
algorithm: Hash algorithm used
|
||||
|
||||
Returns:
|
||||
True if checksum matches, False otherwise
|
||||
"""
|
||||
try:
|
||||
actual_checksum = calculate_file_checksum(file_path, algorithm)
|
||||
matches = actual_checksum == expected_checksum
|
||||
|
||||
if matches:
|
||||
logger.debug(f"Checksum verified for {file_path}")
|
||||
else:
|
||||
logger.warning(
|
||||
f"Checksum mismatch for {file_path}",
|
||||
expected=expected_checksum,
|
||||
actual=actual_checksum
|
||||
)
|
||||
|
||||
return matches
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error verifying checksum for {file_path}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def get_file_size(file_path: str) -> int:
|
||||
"""
|
||||
Get file size in bytes.
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
|
||||
Returns:
|
||||
File size in bytes
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f"File not found: {file_path}")
|
||||
|
||||
return os.path.getsize(file_path)
|
||||
|
||||
|
||||
def ensure_directory_exists(directory: str) -> Path:
|
||||
"""
|
||||
Ensure directory exists, create if necessary.
|
||||
|
||||
Args:
|
||||
directory: Directory path
|
||||
|
||||
Returns:
|
||||
Path object for directory
|
||||
"""
|
||||
path = Path(directory)
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
return path
|
||||
|
||||
|
||||
def safe_file_delete(file_path: str) -> bool:
|
||||
"""
|
||||
Safely delete a file, logging any errors.
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
|
||||
Returns:
|
||||
True if deleted successfully, False otherwise
|
||||
"""
|
||||
try:
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
logger.info(f"Deleted file: {file_path}")
|
||||
return True
|
||||
else:
|
||||
logger.warning(f"File not found for deletion: {file_path}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting file {file_path}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def get_file_metadata(file_path: str) -> dict:
|
||||
"""
|
||||
Get comprehensive file metadata.
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
|
||||
Returns:
|
||||
Dictionary with file metadata
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f"File not found: {file_path}")
|
||||
|
||||
stat = os.stat(file_path)
|
||||
|
||||
return {
|
||||
"path": file_path,
|
||||
"size_bytes": stat.st_size,
|
||||
"created_at": stat.st_ctime,
|
||||
"modified_at": stat.st_mtime,
|
||||
"accessed_at": stat.st_atime,
|
||||
"is_file": os.path.isfile(file_path),
|
||||
"is_dir": os.path.isdir(file_path),
|
||||
"exists": True
|
||||
}
|
||||
|
||||
|
||||
class ChecksummedFile:
|
||||
"""
|
||||
Context manager for working with checksummed files.
|
||||
Automatically calculates and stores checksum when file is written.
|
||||
"""
|
||||
|
||||
def __init__(self, file_path: str, checksum_path: Optional[str] = None, algorithm: str = "sha256"):
|
||||
"""
|
||||
Initialize checksummed file handler.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file
|
||||
checksum_path: Path to store checksum (default: file_path + '.checksum')
|
||||
algorithm: Hash algorithm to use
|
||||
"""
|
||||
self.file_path = file_path
|
||||
self.checksum_path = checksum_path or f"{file_path}.checksum"
|
||||
self.algorithm = algorithm
|
||||
self.checksum: Optional[str] = None
|
||||
|
||||
def calculate_and_save_checksum(self) -> str:
|
||||
"""Calculate checksum and save to file"""
|
||||
self.checksum = calculate_file_checksum(self.file_path, self.algorithm)
|
||||
|
||||
with open(self.checksum_path, 'w') as f:
|
||||
f.write(f"{self.checksum} {os.path.basename(self.file_path)}\n")
|
||||
|
||||
logger.info(f"Saved checksum for {self.file_path}: {self.checksum}")
|
||||
return self.checksum
|
||||
|
||||
def load_and_verify_checksum(self) -> bool:
|
||||
"""Load expected checksum and verify file"""
|
||||
try:
|
||||
with open(self.checksum_path, 'r') as f:
|
||||
expected_checksum = f.read().strip().split()[0]
|
||||
|
||||
return verify_file_checksum(self.file_path, expected_checksum, self.algorithm)
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.warning(f"Checksum file not found: {self.checksum_path}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading checksum: {e}")
|
||||
return False
|
||||
|
||||
def get_stored_checksum(self) -> Optional[str]:
|
||||
"""Get checksum from stored file"""
|
||||
try:
|
||||
with open(self.checksum_path, 'r') as f:
|
||||
return f.read().strip().split()[0]
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
Reference in New Issue
Block a user