Spaces:
Sleeping
Sleeping
| # DEPENDENCIES | |
| import chardet | |
| import hashlib | |
| from pathlib import Path | |
| from typing import Optional | |
| from datetime import datetime | |
| from config.models import DocumentType | |
| from utils.text_cleaner import TextCleaner | |
| from config.models import DocumentMetadata | |
| from config.logging_config import get_logger | |
| from utils.error_handler import handle_errors | |
| from utils.error_handler import TextEncodingError | |
| # Setup Logging | |
| logger = get_logger(__name__) | |
| class TXTParser: | |
| """ | |
| Plain text file parser with automatic encoding detection : handles various text encodings and formats | |
| """ | |
| # Common encodings to try | |
| COMMON_ENCODINGS = ['utf-8', | |
| 'utf-16', | |
| 'ascii', | |
| 'latin-1', | |
| 'cp1252', | |
| 'iso-8859-1', | |
| ] | |
| def __init__(self): | |
| self.logger = logger | |
| def parse(self, file_path: Path, extract_metadata: bool = True, clean_text: bool = True, encoding: Optional[str] = None) -> tuple[str, Optional[DocumentMetadata]]: | |
| """ | |
| Parse text file and extract content | |
| Arguments: | |
| ----------- | |
| file_path { Path } : Path to text file | |
| extract_metadata { bool } : Extract document metadata | |
| clean_text { bool } : Clean extracted text | |
| encoding { str } : Force specific encoding (None = auto-detect) | |
| Returns: | |
| -------- | |
| { tuple } : Tuple of (extracted_text, metadata) | |
| Raises: | |
| ------- | |
| TextEncodingError : If file cannot be decoded | |
| """ | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| raise TextEncodingError(str(file_path), encoding = "unknown", original_error = FileNotFoundError(f"Text file not found: {file_path}")) | |
| self.logger.info(f"Parsing TXT: {file_path}") | |
| # Detect encoding if not specified | |
| if encoding is None: | |
| encoding = self.detect_encoding(file_path) | |
| self.logger.info(f"Detected encoding: {encoding}") | |
| try: | |
| # Read file with detected/specified encoding | |
| with open(file_path, 'r', encoding = encoding, errors = 'replace') as f: | |
| text_content = f.read() | |
| # Extract metadata | |
| metadata = None | |
| if extract_metadata: | |
| metadata = self._extract_metadata(file_path = file_path, | |
| encoding = encoding, | |
| text_length = len(text_content), | |
| ) | |
| # Clean text | |
| if clean_text: | |
| text_content = TextCleaner.clean(text_content, | |
| remove_html = False, | |
| normalize_whitespace = True, | |
| preserve_structure = True, | |
| ) | |
| self.logger.info(f"Successfully parsed TXT: {len(text_content)} characters") | |
| return text_content, metadata | |
| except Exception as e: | |
| self.logger.error(f"Failed to parse TXT {file_path}: {repr(e)}") | |
| raise TextEncodingError(str(file_path), encoding = encoding, original_error = e) | |
| def detect_encoding(self, file_path: Path) -> str: | |
| """ | |
| Detect file encoding using chardet | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to text file | |
| Returns: | |
| -------- | |
| { str } : Detected encoding name | |
| """ | |
| try: | |
| # Read raw bytes | |
| with open(file_path, 'rb') as f: | |
| # Read first 10KB for detection | |
| raw_data = f.read(10000) | |
| # Detect encoding | |
| result = chardet.detect(raw_data) | |
| encoding = result['encoding'] | |
| confidence = result['confidence'] | |
| self.logger.debug(f"Encoding detection: {encoding} (confidence: {confidence:.2%})") | |
| # If confidence is low, try common encodings | |
| if (confidence < 0.7): | |
| self.logger.warning(f"Low confidence ({confidence:.2%}) for detected encoding {encoding}") | |
| encoding = self._try_common_encodings(file_path = file_path) | |
| # Fallback to UTF-8 | |
| return encoding or 'utf-8' | |
| except Exception as e: | |
| self.logger.warning(f"Encoding detection failed: {repr(e)}, using UTF-8") | |
| return 'utf-8' | |
| def _try_common_encodings(self, file_path: Path) -> Optional[str]: | |
| """ | |
| Try reading file with common encodings | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to text file | |
| Returns: | |
| -------- | |
| { str } : Working encoding or None | |
| """ | |
| for encoding in self.COMMON_ENCODINGS: | |
| try: | |
| with open(file_path, 'r', encoding = encoding) as f: | |
| # Try reading first 1000 chars | |
| f.read(1000) | |
| self.logger.info(f"Successfully read with encoding: {encoding}") | |
| return encoding | |
| except (UnicodeDecodeError, LookupError): | |
| continue | |
| return None | |
| def _extract_metadata(self, file_path: Path, encoding: str, text_length: int) -> DocumentMetadata: | |
| """ | |
| Extract metadata from text file | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to text file | |
| encoding { str } : File encoding | |
| text_length { int } : Length of text content | |
| Returns: | |
| -------- | |
| { DocumentMetadata } : DocumentMetadata object | |
| """ | |
| # Get file stats | |
| stat = file_path.stat() | |
| file_size = stat.st_size | |
| created_time = datetime.fromtimestamp(stat.st_ctime) | |
| modified_time = datetime.fromtimestamp(stat.st_mtime) | |
| # Generate document ID | |
| doc_hash = hashlib.md5(str(file_path).encode()).hexdigest() | |
| doc_id = f"doc_{int(datetime.now().timestamp())}_{doc_hash}" | |
| # Estimate pages (rough: 3000 characters per page) | |
| estimated_pages = max(1, text_length // 3000) | |
| # Count lines | |
| with open(file_path, 'r', encoding = encoding, errors = 'replace') as f: | |
| num_lines = sum(1 for _ in f) | |
| # Create metadata object | |
| metadata = DocumentMetadata(document_id = doc_id, | |
| filename = file_path.name, | |
| file_path = file_path, | |
| document_type = DocumentType.TXT, | |
| title = file_path.stem, | |
| created_date = created_time, | |
| modified_date = modified_time, | |
| file_size_bytes = file_size, | |
| num_pages = estimated_pages, | |
| extra = {"encoding" : encoding, | |
| "num_lines" : num_lines, | |
| "text_length" : text_length, | |
| } | |
| ) | |
| return metadata | |
| def read_lines(self, file_path: Path, start_line: int = 0, end_line: Optional[int] = None, encoding: Optional[str] = None) -> list[str]: | |
| """ | |
| Read specific lines from file | |
| Arguments: | |
| ----------- | |
| file_path { Path } : Path to text file | |
| start_line { int } : Starting line (0-indexed) | |
| end_line { int } : Ending line (None = end of file) | |
| encoding { str } : File encoding (None = auto-detect) | |
| Returns: | |
| -------- | |
| { list } : List of lines | |
| """ | |
| if encoding is None: | |
| encoding = self.detect_encoding(file_path) | |
| try: | |
| with open(file_path, 'r', encoding = encoding, errors = 'replace') as f: | |
| lines = f.readlines() | |
| if end_line is None: | |
| return lines[start_line:] | |
| else: | |
| return lines[start_line:end_line] | |
| except Exception as e: | |
| self.logger.error(f"Failed to read lines: {repr(e)}") | |
| raise TextEncodingError(str(file_path), encoding = encoding, original_error = e) | |
| def count_lines(self, file_path: Path, encoding: Optional[str] = None) -> int: | |
| """ | |
| Count number of lines in file | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to text file | |
| encoding { str } : File encoding (None = auto-detect) | |
| Returns: | |
| -------- | |
| { int } : Number of lines | |
| """ | |
| if encoding is None: | |
| encoding = self.detect_encoding(file_path) | |
| try: | |
| with open(file_path, 'r', encoding = encoding, errors = 'replace') as f: | |
| return sum(1 for _ in f) | |
| except Exception as e: | |
| self.logger.error(f"Failed to count lines: {repr(e)}") | |
| raise TextEncodingError(str(file_path), encoding = encoding, original_error = e) | |
| def get_file_info(self, file_path: Path) -> dict: | |
| """ | |
| Get comprehensive file information | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to text file | |
| Returns: | |
| -------- | |
| { dict } : Dictionary with file info | |
| """ | |
| encoding = self.detect_encoding(file_path) | |
| with open(file_path, 'r', encoding = encoding, errors = 'replace') as f: | |
| content = f.read() | |
| lines = content.split('\n') | |
| return {"encoding" : encoding, | |
| "size_bytes" : file_path.stat().st_size, | |
| "num_lines" : len(lines), | |
| "num_characters" : len(content), | |
| "num_words" : len(content.split()), | |
| "avg_line_length" : sum(len(line) for line in lines) / len(lines) if lines else 0, | |
| } | |
| def is_empty(self, file_path: Path) -> bool: | |
| """ | |
| Check if file is empty or contains only whitespace | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to text file | |
| Returns: | |
| -------- | |
| { bool } : True if empty | |
| """ | |
| try: | |
| # Check file size first | |
| if file_path.stat().st_size == 0: | |
| return True | |
| # Read and check content | |
| encoding = self.detect_encoding(file_path) | |
| with open(file_path, 'r', encoding = encoding, errors = 'replace') as f: | |
| content = f.read().strip() | |
| return len(content) == 0 | |
| except Exception as e: | |
| self.logger.warning(f"Error checking if file is empty: {repr(e)}") | |
| return True | |