Spaces:
Running
Running
| # DEPENDENCIES | |
| from enum import Enum | |
| from typing import List | |
| from pathlib import Path | |
| from typing import Union | |
| from typing import Optional | |
| from utils.helpers import IDGenerator | |
| from config.models import DocumentType | |
| from config.models import DocumentMetadata | |
| from utils.file_handler import FileHandler | |
| from utils.error_handler import RAGException | |
| from config.logging_config import get_logger | |
| from document_parser.pdf_parser import PDFParser | |
| from document_parser.txt_parser import TXTParser | |
| from document_parser.ocr_engine import OCREngine | |
| from document_parser.docx_parser import DOCXParser | |
| from utils.error_handler import InvalidFileTypeError | |
| from document_parser.zip_handler import ArchiveHandler | |
| # Setup Logging | |
| logger = get_logger(__name__) | |
| class ParserFactory: | |
| """ | |
| Factory class for creating appropriate document parsers: implements Factory pattern for extensible parser selection | |
| """ | |
| def __init__(self): | |
| self.logger = logger | |
| # Initialize parsers (reusable instances) | |
| self._parsers = {DocumentType.PDF : PDFParser(), | |
| DocumentType.DOCX : DOCXParser(), | |
| DocumentType.TXT : TXTParser(), | |
| } | |
| # Initialize helper components | |
| self._ocr_engine = None | |
| self._archive_handler = None | |
| # File extension to DocumentType mapping | |
| self._extension_mapping = {'pdf' : DocumentType.PDF, | |
| 'docx' : DocumentType.DOCX, | |
| 'doc' : DocumentType.DOCX, | |
| 'txt' : DocumentType.TXT, | |
| 'text' : DocumentType.TXT, | |
| 'md' : DocumentType.TXT, | |
| 'log' : DocumentType.TXT, | |
| 'csv' : DocumentType.TXT, | |
| 'json' : DocumentType.TXT, | |
| 'xml' : DocumentType.TXT, | |
| 'png' : DocumentType.IMAGE, | |
| 'jpg' : DocumentType.IMAGE, | |
| 'jpeg' : DocumentType.IMAGE, | |
| 'gif' : DocumentType.IMAGE, | |
| 'bmp' : DocumentType.IMAGE, | |
| 'tiff' : DocumentType.IMAGE, | |
| 'webp' : DocumentType.IMAGE, | |
| 'zip' : DocumentType.ARCHIVE, | |
| 'tar' : DocumentType.ARCHIVE, | |
| 'gz' : DocumentType.ARCHIVE, | |
| 'tgz' : DocumentType.ARCHIVE, | |
| 'rar' : DocumentType.ARCHIVE, | |
| '7z' : DocumentType.ARCHIVE, | |
| } | |
| def get_parser(self, file_path: Path): | |
| """ | |
| Get appropriate parser for file | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to document | |
| Returns: | |
| -------- | |
| { object } : Parser instance or handler | |
| Raises: | |
| ------- | |
| InvalidFileTypeError : If file type not supported | |
| """ | |
| doc_type = self.detect_document_type(file_path = file_path) | |
| # Handle special types (image, archive) | |
| if (doc_type == DocumentType.IMAGE): | |
| return self._get_ocr_engine() | |
| elif (doc_type == DocumentType.ARCHIVE): | |
| return self._get_archive_handler() | |
| # Handle standard document types | |
| elif doc_type in self._parsers: | |
| return self._parsers[doc_type] | |
| else: | |
| raise InvalidFileTypeError(file_type = str(doc_type), | |
| allowed_types = [t.value for t in self._parsers.keys()] + [DocumentType.IMAGE.value, DocumentType.ARCHIVE.value], | |
| ) | |
| def detect_document_type(self, file_path: Path) -> Union[DocumentType, str]: | |
| """ | |
| Detect document type from file extension and content | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to document | |
| Returns: | |
| -------- | |
| { Union } : DocumentType enum or string for special types | |
| Raises: | |
| ------- | |
| InvalidFileTypeError : If type cannot be determined | |
| """ | |
| file_path = Path(file_path) | |
| # Get extension | |
| extension = file_path.suffix.lstrip('.').lower() | |
| # Check if extension is mapped | |
| if extension in self._extension_mapping: | |
| doc_type = self._extension_mapping[extension] | |
| self.logger.debug(f"Detected type {doc_type} from extension .{extension}") | |
| return doc_type | |
| # Try detecting from file content | |
| detected_type = FileHandler.detect_file_type(file_path) | |
| if (detected_type and (detected_type in self._extension_mapping)): | |
| doc_type = self._extension_mapping[detected_type] | |
| self.logger.debug(f"Detected type {doc_type} from content") | |
| return doc_type | |
| raise InvalidFileTypeError(file_type = extension, allowed_types = list(self._extension_mapping.keys())) | |
| def parse(self, file_path: Union[str, Path], extract_metadata: bool = True, clean_text: bool = True, **kwargs) -> tuple[str, Optional[DocumentMetadata]]: | |
| """ | |
| Parse document using appropriate parser | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to document | |
| extract_metadata { bool } : Extract document metadata | |
| clean_text { bool } : Clean extracted text | |
| **kwargs : Additional parser-specific arguments | |
| Returns: | |
| -------- | |
| { tuple } : Tuple of (extracted_text, metadata) | |
| Raises: | |
| ------- | |
| InvalidFileTypeError : If file type not supported | |
| RAGException : If parsing fails | |
| """ | |
| file_path = Path(file_path) | |
| self.logger.info(f"Parsing document: {file_path}") | |
| # Get appropriate parser/handler | |
| parser = self.get_parser(file_path) | |
| # Handle different parser types | |
| if isinstance(parser, (PDFParser, DOCXParser, TXTParser)): | |
| # Standard document parser | |
| text, metadata = parser.parse(file_path, | |
| extract_metadata = extract_metadata, | |
| clean_text = clean_text, | |
| **kwargs | |
| ) | |
| elif isinstance(parser, OCREngine): | |
| # Image file - use OCR | |
| text = parser.extract_text_from_image(file_path) | |
| metadata = self._create_image_metadata(file_path) if extract_metadata else None | |
| elif isinstance(parser, ArchiveHandler): | |
| # Archive file - extract and parse contents | |
| return self._parse_archive(file_path = file_path, | |
| extract_metadata = extract_metadata, | |
| clean_text = clean_text, | |
| **kwargs | |
| ) | |
| else: | |
| raise InvalidFileTypeError(file_type = file_path.suffix, allowed_types = self.get_supported_extensions()) | |
| self.logger.info(f"Successfully parsed {file_path.name}: {len(text)} chars, type={metadata.document_type if metadata else 'unknown'}") | |
| return text, metadata | |
| def _get_ocr_engine(self) -> OCREngine: | |
| """ | |
| Get OCR engine instance (lazy initialization) | |
| Returns: | |
| -------- | |
| { OCREngine } : OCR engine instance | |
| """ | |
| if self._ocr_engine is None: | |
| self._ocr_engine = OCREngine() | |
| return self._ocr_engine | |
| def _get_archive_handler(self) -> ArchiveHandler: | |
| """ | |
| Get archive handler instance (lazy initialization) | |
| Returns: | |
| -------- | |
| { ArchiveHandler } : Archive handler instance | |
| """ | |
| if self._archive_handler is None: | |
| self._archive_handler = ArchiveHandler() | |
| return self._archive_handler | |
| def _create_image_metadata(self, file_path: Path) -> DocumentMetadata: | |
| """ | |
| Create metadata for image file | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to image file | |
| Returns: | |
| -------- | |
| { DocumentMetadata } : DocumentMetadata object | |
| """ | |
| stat = file_path.stat() | |
| return DocumentMetadata(document_id = IDGenerator.generate_document_id(), | |
| filename = file_path.name, | |
| file_path = file_path, | |
| document_type = DocumentType.IMAGE, | |
| file_size_bytes = stat.st_size, | |
| created_date = stat.st_ctime, | |
| modified_date = stat.st_mtime, | |
| extra = {"file_type": "image"}, | |
| ) | |
| def _parse_archive(self, file_path: Path, extract_metadata: bool = True, clean_text: bool = True, **kwargs) -> tuple[str, Optional[DocumentMetadata]]: | |
| """ | |
| Parse archive file: extract contents and parse all supported files | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to archive file | |
| extract_metadata { bool } : Extract document metadata | |
| clean_text { bool } : Clean extracted text | |
| **kwargs : Additional arguments | |
| Returns: | |
| -------- | |
| { tuple } : Tuple of (combined_text, metadata) | |
| """ | |
| archive_handler = self._get_archive_handler() | |
| # Extract archive contents | |
| extracted_files = archive_handler.extract_archive(file_path) | |
| # Parse all extracted files | |
| combined_text = "" | |
| all_metadata = list() | |
| for extracted_file in extracted_files: | |
| if self.is_supported(extracted_file): | |
| try: | |
| file_text, file_metadata = self.parse(extracted_file, | |
| extract_metadata = extract_metadata, | |
| clean_text = clean_text, | |
| **kwargs | |
| ) | |
| combined_text += f"\n\n[FILE: {extracted_file.name}]\n{file_text}" | |
| if file_metadata: | |
| all_metadata.append(file_metadata) | |
| except Exception as e: | |
| self.logger.warning(f"Failed to parse extracted file {extracted_file}: {repr(e)}") | |
| continue | |
| # Create combined metadata | |
| combined_metadata = None | |
| if extract_metadata and all_metadata: | |
| combined_metadata = DocumentMetadata(document_id = IDGenerator.generate_document_id(), | |
| filename = file_path.name, | |
| file_path = file_path, | |
| document_type = DocumentType.ARCHIVE, | |
| file_size_bytes = file_path.stat().st_size, | |
| extra = {"archive_contents" : len(extracted_files), | |
| "parsed_files" : len(all_metadata), | |
| "contained_documents" : [meta.document_id for meta in all_metadata], | |
| } | |
| ) | |
| return combined_text.strip(), combined_metadata | |
| def is_supported(self, file_path: Path) -> bool: | |
| """ | |
| Check if file type is supported. | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to document | |
| Returns: | |
| -------- | |
| { bool } : True if supported | |
| """ | |
| try: | |
| self.detect_document_type(file_path = file_path) | |
| return True | |
| except InvalidFileTypeError: | |
| return False | |
| def get_supported_extensions(self) -> list[str]: | |
| """ | |
| Get list of supported file extensions. | |
| Returns: | |
| -------- | |
| { list } : List of extensions (without dot) | |
| """ | |
| return list(self._extension_mapping.keys()) | |
| def register_parser(self, doc_type: DocumentType, parser_instance, extensions: Optional[list[str]] = None): | |
| """ | |
| Register a new parser type (for extensibility) | |
| Arguments: | |
| ---------- | |
| doc_type { DocumentType } : Document type enum | |
| parser_instance : Parser instance | |
| extensions { list } : File extensions to map to this parser | |
| """ | |
| self._parsers[doc_type] = parser_instance | |
| if extensions: | |
| for ext in extensions: | |
| self._extension_mapping[ext.lstrip('.')] = doc_type | |
| self.logger.info(f"Registered parser for {doc_type}") | |
| def batch_parse(self, file_paths: list[Path], extract_metadata: bool = True, clean_text: bool = True, skip_errors: bool = True) -> list[tuple[Path, str, Optional[DocumentMetadata]]]: | |
| """ | |
| Parse multiple documents. | |
| Arguments: | |
| ---------- | |
| file_paths { list } : List of file paths | |
| extract_metadata { bool } : Extract metadata | |
| clean_text { str } : Clean text | |
| skip_errors { bool } : Skip files that fail to parse | |
| Returns: | |
| -------- | |
| { list } : List of (file_path, text, metadata) tuples | |
| """ | |
| results = list() | |
| for file_path in file_paths: | |
| try: | |
| text, metadata = self.parse(file_path, | |
| extract_metadata = extract_metadata, | |
| clean_text = clean_text, | |
| ) | |
| results.append((file_path, text, metadata)) | |
| except Exception as e: | |
| self.logger.error(f"Failed to parse {file_path}: {repr(e)}") | |
| if not skip_errors: | |
| raise | |
| # Add placeholder for failed file | |
| results.append((file_path, "", None)) | |
| self.logger.info(f"Batch parsed {len(results)}/{len(file_paths)} files successfully") | |
| return results | |
| def parse_directory(self, directory: Path, recursive: bool = False, pattern: str = "*", **kwargs) -> list[tuple[Path, str, Optional[DocumentMetadata]]]: | |
| """ | |
| Parse all supported documents in a directory | |
| Arguments: | |
| ---------- | |
| directory { Path } : Directory path | |
| recursive { bool } : Search recursively | |
| pattern { str } : File pattern (glob) | |
| **kwargs : Additional parse arguments | |
| Returns: | |
| -------- | |
| { list } : List of (file_path, text, metadata) tuples | |
| """ | |
| directory = Path(directory) | |
| # Get all files | |
| all_files = FileHandler.list_files(directory, pattern=pattern, recursive=recursive) | |
| # Filter to supported types | |
| supported_files = [f for f in all_files if self.is_supported(f)] | |
| self.logger.info(f"Found {len(supported_files)} supported files in {directory} ({len(all_files) - len(supported_files)} unsupported)") | |
| # Parse all files | |
| return self.batch_parse(supported_files, **kwargs) | |
| def get_parser_info(self) -> dict: | |
| """ | |
| Get information about registered parsers | |
| Returns: | |
| -------- | |
| { dict } : Dictionary with parser information | |
| """ | |
| info = {"supported_types" : [t.value for t in self._parsers.keys()] + ['image', 'archive'], | |
| "supported_extensions" : self.get_supported_extensions(), | |
| "parser_classes" : {t.value: type(p).__name__ for t, p in self._parsers.items()}, | |
| "special_handlers" : {"image" : "OCREngine", | |
| "archive" : "ArchiveHandler"}, | |
| } | |
| return info | |
| # Global factory instance | |
| _factory = None | |
| def get_parser_factory() -> ParserFactory: | |
| """ | |
| Get global parser factory instance (singleton) | |
| Returns: | |
| -------- | |
| { ParserFactory } : ParserFactory instance | |
| """ | |
| global _factory | |
| if _factory is None: | |
| _factory = ParserFactory() | |
| return _factory | |
| # Convenience functions | |
| def parse_document(file_path: Union[str, Path], **kwargs) -> tuple[str, Optional[DocumentMetadata]]: | |
| """ | |
| Convenience function to parse a document | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to document | |
| **kwargs : Additional arguments | |
| Returns: | |
| -------- | |
| { tuple } : Tuple of (text, metadata) | |
| """ | |
| factory = get_parser_factory() | |
| return factory.parse(file_path, **kwargs) | |
| def is_supported_file(file_path: Union[str, Path]) -> bool: | |
| """ | |
| Check if file is supported | |
| Arguments: | |
| ---------- | |
| file_path { Path } : Path to file | |
| Returns: | |
| -------- | |
| { bool } : True if supported | |
| """ | |
| factory = get_parser_factory() | |
| return factory.is_supported(Path(file_path)) | |
| def get_supported_extensions() -> list[str]: | |
| """ | |
| Get list of supported extensions. | |
| Returns: | |
| -------- | |
| { list } : List of extensions | |
| """ | |
| factory = get_parser_factory() | |
| return factory.get_supported_extensions() | |