""" File and URL validation utilities for legacy document processing. """ import os import re from pathlib import Path from typing import Optional from urllib.parse import urlparse try: import structlog logger = structlog.get_logger(__name__) except ImportError: import logging logger = logging.getLogger(__name__) class ValidationError(Exception): """Custom exception for validation errors.""" pass def validate_file_path(file_path: str) -> None: """ Validate file path for legacy document processing. Args: file_path: Path to validate Raises: ValidationError: If path is invalid or inaccessible """ if not file_path: raise ValidationError("File path cannot be empty") if not isinstance(file_path, str): raise ValidationError("File path must be a string") # Convert to Path object for validation path = Path(file_path) # Check if file exists if not path.exists(): raise ValidationError(f"File does not exist: {file_path}") # Check if it's actually a file (not directory) if not path.is_file(): raise ValidationError(f"Path is not a file: {file_path}") # Check read permissions if not os.access(file_path, os.R_OK): raise ValidationError(f"File is not readable: {file_path}") # Check file size (prevent processing of extremely large files) file_size = path.stat().st_size max_size = 500 * 1024 * 1024 # 500MB limit if file_size > max_size: raise ValidationError(f"File too large ({file_size} bytes). Maximum size: {max_size} bytes") # Check for suspicious file extensions that might be dangerous suspicious_extensions = {'.exe', '.com', '.bat', '.cmd', '.scr', '.pif'} if path.suffix.lower() in suspicious_extensions: raise ValidationError(f"Potentially dangerous file extension: {path.suffix}") logger.debug("File validation passed", file_path=file_path, size=file_size) def validate_url(url: str) -> None: """ Validate URL for downloading legacy documents. Args: url: URL to validate Raises: ValidationError: If URL is invalid or unsafe """ if not url: raise ValidationError("URL cannot be empty") if not isinstance(url, str): raise ValidationError("URL must be a string") # Parse URL try: parsed = urlparse(url) except Exception as e: raise ValidationError(f"Invalid URL format: {str(e)}") # Only allow HTTPS for security if parsed.scheme != 'https': raise ValidationError("Only HTTPS URLs are allowed for security") # Check for valid hostname if not parsed.netloc: raise ValidationError("URL must have a valid hostname") # Block localhost and private IP ranges for security hostname = parsed.hostname if hostname: if hostname.lower() in ['localhost', '127.0.0.1', '::1']: raise ValidationError("Localhost URLs are not allowed") # Basic check for private IP ranges (simplified) if hostname.startswith(('192.168.', '10.', '172.')): raise ValidationError("Private IP addresses are not allowed") # URL length limit if len(url) > 2048: raise ValidationError("URL too long (maximum 2048 characters)") logger.debug("URL validation passed", url=url) def get_safe_filename(filename: str) -> str: """ Generate safe filename for caching downloaded files. Args: filename: Original filename Returns: str: Safe filename for filesystem storage """ if not filename: return "unknown_file" # Remove path components filename = os.path.basename(filename) # Replace unsafe characters safe_chars = re.compile(r'[^a-zA-Z0-9._-]') safe_filename = safe_chars.sub('_', filename) # Limit length if len(safe_filename) > 100: name, ext = os.path.splitext(safe_filename) safe_filename = name[:95] + ext # Ensure it's not empty and doesn't start with dot if not safe_filename or safe_filename.startswith('.'): safe_filename = "file_" + safe_filename return safe_filename def is_legacy_extension(file_path: str) -> bool: """ Check if file extension indicates a legacy format. Args: file_path: Path to check Returns: bool: True if extension suggests legacy format """ legacy_extensions = { # PC/DOS Era '.dbf', '.db', '.dbt', # dBASE '.wpd', '.wp', '.wp4', '.wp5', '.wp6', # WordPerfect '.wk1', '.wk3', '.wk4', '.wks', # Lotus 1-2-3 '.wb1', '.wb2', '.wb3', '.qpw', # Quattro Pro '.ws', '.wd', # WordStar '.sam', # AmiPro '.wri', # Write # Apple/Mac Era '.cwk', '.appleworks', # AppleWorks '.cws', # ClarisWorks '.mac', '.mcw', # MacWrite '.wn', # WriteNow '.hc', '.stack', # HyperCard '.pict', '.pic', # PICT '.pntg', '.drw', # MacPaint/MacDraw '.hqx', # BinHex '.sit', '.sitx', # StuffIt '.rsrc', # Resource fork '.scrapbook', # System 7 Scrapbook # Additional legacy formats '.vc', # VisiCalc '.wrk', '.wr1', # Symphony '.proj', '.π', # Think C/Pascal '.fp3', '.fp5', '.fp7', '.fmp12', # FileMaker '.px', '.mb', # Paradox '.fpt', '.cdx' # FoxPro } extension = Path(file_path).suffix.lower() return extension in legacy_extensions def validate_processing_method(method: str) -> None: """ Validate processing method parameter. Args: method: Processing method to validate Raises: ValidationError: If method is invalid """ valid_methods = { 'auto', 'primary', 'fallback', # Format-specific methods 'dbfread', 'simpledbf', 'pandas_dbf', 'libwpd', 'wpd_python', 'strings_extract', 'pylotus123', 'gnumeric', 'custom_wk_parser', 'libcwk', 'resource_fork', 'mac_textutil', 'hypercard_parser', 'hypertalk_extract' } if method not in valid_methods: raise ValidationError(f"Invalid processing method: {method}") def get_file_info(file_path: str) -> dict: """ Get basic file information for processing. Args: file_path: Path to analyze Returns: dict: File information including size, dates, extension """ try: path = Path(file_path) stat = path.stat() return { "filename": path.name, "extension": path.suffix.lower(), "size": stat.st_size, "created": stat.st_ctime, "modified": stat.st_mtime, "is_legacy_format": is_legacy_extension(file_path) } except Exception as e: logger.error("Failed to get file info", error=str(e), file_path=file_path) return { "filename": "unknown", "extension": "", "size": 0, "created": 0, "modified": 0, "is_legacy_format": False, "error": str(e) }