File size: 8,771 Bytes
0a4529c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# DEPENDENCIES
import re
import magic
from typing import List
from typing import Union
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
from config.settings import get_settings
from config.logging_config import get_logger
from utils.error_handler import FileTooLargeError
from utils.error_handler import InvalidFileTypeError


# Setup Settings and Logging
settings = get_settings()
logger   = get_logger(__name__)


class FileValidator:
    """
    Comprehensive file validation utilities
    """
    @staticmethod
    def validate_file_size(file_path: Path, max_size_bytes: Optional[int] = None) -> bool:
        """
        Validate file size
        """
        if max_size_bytes is None:
            max_size_bytes = settings.max_file_size_bytes
        
        file_size = file_path.stat().st_size
        
        if file_size > max_size_bytes:
            raise FileTooLargeError(file_size = file_size,
                                    max_size  = max_size_bytes,
                                   )
        
        return True
    

    @staticmethod
    def validate_file_type(file_path: Path, allowed_extensions: Optional[List[str]] = None) -> bool:
        """
        Validate file type by both extension and content
        """
        if allowed_extensions is None:
            allowed_extensions = settings.ALLOWED_EXTENSIONS
        
        # Check extension
        extension = file_path.suffix.lstrip('.').lower()
        
        if extension not in allowed_extensions:
            raise InvalidFileTypeError(file_type     = extension,
                                       allowed_types = allowed_extensions,
                                      )
        
        # Verify actual file content
        try:
            mime               = magic.Magic(mime = True)
            mime_type          = mime.from_file(str(file_path))
            
            # Map MIME types to extensions
            mime_to_extension  = {'application/pdf'                                                         : 'pdf',
                                  'application/vnd.openxmlformats-officedocument.wordprocessingml.document' : 'docx',
                                  'text/plain'                                                              : 'txt',
                                  'application/zip'                                                         : 'zip',
                                 }
            
            detected_extension = mime_to_extension.get(mime_type)
            
            if (detected_extension and (detected_extension != extension)):
                # Still allowing it but logging warning message
                logger.warning(f"File extension mismatch: {extension} vs detected {detected_extension}")
            
            return True
            
        except Exception as e:
            logger.warning(f"Could not verify file content type: {repr(e)}")
            # Fall back to extension validation only
            return True
    

    @staticmethod
    def validate_file_integrity(file_path: Path) -> bool:
        """
        Basic file integrity check
        """
        if not file_path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        if not file_path.is_file():
            raise ValueError(f"Path is not a file: {file_path}")
        
        if (file_path.stat().st_size == 0):
            raise ValueError(f"File is empty: {file_path}")
        
        return True
    

    @staticmethod
    def validate_filename(filename: str) -> bool:
        """
        Validate filename safety
        """
        # Check for path traversal attempts
        if (('..' in filename) or ('/' in filename) or ('\\' in filename)):
            raise ValueError("Filename contains path traversal attempts")
        
        # Check for dangerous characters
        dangerous_chars = ['<', '>', ':', '"', '|', '?', '*']
        
        for char in dangerous_chars:
            if char in filename:
                raise ValueError(f"Filename contains dangerous character: {char}")
        
        # Check length
        if (len(filename) > 255):
            raise ValueError("Filename too long")
        
        return True


class URLValidator:
    """
    URL validation utilities
    """
    
    @staticmethod
    def validate_url(url: str, allowed_domains: Optional[List[str]] = None) -> bool:
        """
        Validate URL format and domain
        """
        try:
            parsed = urlparse(url)
            
            # Check scheme
            if parsed.scheme not in ['http', 'https']:
                raise ValueError("URL must use HTTP or HTTPS protocol")
            
            # Check netloc (domain)
            if not parsed.netloc:
                raise ValueError("Invalid URL: missing domain")
            
            # Check domain if restrictions exist
            if allowed_domains:
                domain_allowed = any(((parsed.netloc.endswith(domain)) or (parsed.netloc == domain)) for domain in allowed_domains)
                
                if not domain_allowed:
                    raise ValueError(f"Domain not allowed: {parsed.netloc}")
            
            return True
            
        except Exception as e:
            raise ValueError(f"Invalid URL: {repr(e)}")
    

    @staticmethod
    def is_valid_url(url: str) -> bool:
        """
        Check if URL is valid without raising exceptions
        """
        try:
            return URLValidator.validate_url(url)

        except ValueError:
            return False
    

    @staticmethod
    def extract_domain(url: str) -> str:
        """
        Extract domain from URL
        """
        parsed = urlparse(url)
        
        return parsed.netloc


class TextValidator:
    """
    Text content validation
    """
    @staticmethod
    def validate_text_length(text: str, min_length: int = 1, max_length: Optional[int] = None) -> bool:
        """
        Validate text length
        """
        if (len(text.strip()) < min_length):
            raise ValueError(f"Text too short. Minimum {min_length} characters required.")
        
        if (max_length and len(text) > max_length):
            raise ValueError(f"Text too long. Maximum {max_length} characters allowed.")
        
        return True
    

    @staticmethod
    def is_meaningful_text(text: str, min_words: int = 3) -> bool:
        """
        Check if text contains meaningful content
        """
        words = text.strip().split()
        
        if (len(words) < min_words):
            return False
        
        # Check if it's not just special characters/numbers
        alpha_count = sum(1 for char in text if char.isalpha())
        
        if (alpha_count < min_words):
            return False
        
        return True
    

    @staticmethod
    def has_sufficient_content(text: str, min_chars: int = 50, min_sentences: int = 1) -> bool:
        """
        Check if text has sufficient content for processing
        """
        if len(text.strip()) < min_chars:
            return False
        
        # Count sentences (rough estimate)
        sentence_endings = re.findall(r'[.!?]+', text)
        
        if (len(sentence_endings) < min_sentences):
            return False
        
        return True


class DocumentValidator:
    """
    Document-specific validation
    """
    @staticmethod
    def validate_document_id(doc_id: str) -> bool:
        """
        Validate document ID format
        """
        pattern = r'^doc_\d+_[a-f0-9]{8}$'
        
        if (not re.match(pattern, doc_id)):
            raise ValueError(f"Invalid document ID format: {doc_id}")
        
        return True
    

    @staticmethod
    def validate_chunk_id(chunk_id: str) -> bool:
        """
        Validate chunk ID format
        """
        pattern = r'^chunk_doc_\d+_[a-f0-9]{8}_\d+$'
        
        if (not re.match(pattern, chunk_id)):
            raise ValueError(f"Invalid chunk ID format: {chunk_id}")
        
        return True


# Convenience functions
def validate_upload_file(file_path: Path) -> bool:
    """
    Comprehensive upload file validation
    """
    return (FileValidator.validate_file_integrity(file_path) and
            FileValidator.validate_file_size(file_path) and
            FileValidator.validate_file_type(file_path) and
            FileValidator.validate_filename(file_path.name)
           )


def validate_query_text(text: str) -> bool:
    """
    Validate query text for processing
    """
    return (TextValidator.validate_text_length(text, min_length = 1, max_length = 1000) and
            TextValidator.is_meaningful_text(text, min_words = 1)
           )