231 lines
8.2 KiB
Python
231 lines
8.2 KiB
Python
"""
|
|
Tool_OCR - Document Preprocessor Service
|
|
Handles file validation, format detection, and preprocessing
|
|
"""
|
|
|
|
import magic
|
|
from pathlib import Path
|
|
from typing import Tuple, Optional
|
|
import logging
|
|
from PIL import Image
|
|
import cv2
|
|
import numpy as np
|
|
|
|
from app.core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DocumentPreprocessor:
|
|
"""
|
|
Document preprocessing service for format standardization
|
|
Validates and prepares documents for OCR processing
|
|
"""
|
|
|
|
SUPPORTED_IMAGE_FORMATS = ['png', 'jpg', 'jpeg', 'bmp', 'tiff', 'tif']
|
|
SUPPORTED_PDF_FORMAT = ['pdf']
|
|
ALL_SUPPORTED_FORMATS = SUPPORTED_IMAGE_FORMATS + SUPPORTED_PDF_FORMAT
|
|
|
|
def __init__(self):
|
|
self.allowed_extensions = settings.allowed_extensions_list
|
|
self.max_file_size = settings.max_upload_size
|
|
logger.info(f"DocumentPreprocessor initialized with allowed_extensions: {self.allowed_extensions}")
|
|
|
|
def validate_file(self, file_path: Path) -> Tuple[bool, Optional[str], Optional[str]]:
|
|
"""
|
|
Validate file format, size, and integrity
|
|
|
|
Args:
|
|
file_path: Path to the file to validate
|
|
|
|
Returns:
|
|
Tuple of (is_valid, file_format, error_message)
|
|
"""
|
|
try:
|
|
# Check file exists
|
|
if not file_path.exists():
|
|
return False, None, f"File not found: {file_path}"
|
|
|
|
# Check file size
|
|
file_size = file_path.stat().st_size
|
|
if file_size > self.max_file_size:
|
|
max_mb = self.max_file_size / (1024 * 1024)
|
|
actual_mb = file_size / (1024 * 1024)
|
|
return False, None, f"File too large: {actual_mb:.2f}MB (max {max_mb:.2f}MB)"
|
|
|
|
# Detect file format using magic numbers
|
|
mime = magic.Magic(mime=True)
|
|
mime_type = mime.from_file(str(file_path))
|
|
|
|
# Map MIME type to format
|
|
file_format = self._mime_to_format(mime_type)
|
|
if not file_format:
|
|
return False, None, f"Unsupported file type: {mime_type}"
|
|
|
|
# Check if format is in allowed extensions
|
|
if file_format not in self.allowed_extensions:
|
|
return False, None, f"File format '{file_format}' not allowed"
|
|
|
|
# Validate file integrity
|
|
is_valid, error = self._validate_integrity(file_path, file_format)
|
|
if not is_valid:
|
|
return False, file_format, f"File corrupted: {error}"
|
|
|
|
logger.info(f"File validated successfully: {file_path.name} ({file_format})")
|
|
return True, file_format, None
|
|
|
|
except Exception as e:
|
|
logger.error(f"File validation error: {str(e)}")
|
|
return False, None, f"Validation error: {str(e)}"
|
|
|
|
def _mime_to_format(self, mime_type: str) -> Optional[str]:
|
|
"""Convert MIME type to file format"""
|
|
mime_map = {
|
|
'image/png': 'png',
|
|
'image/jpeg': 'jpg',
|
|
'image/jpg': 'jpg',
|
|
'image/bmp': 'bmp',
|
|
'image/tiff': 'tiff',
|
|
'image/x-tiff': 'tiff',
|
|
'application/pdf': 'pdf',
|
|
'application/msword': 'doc',
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx',
|
|
'application/vnd.ms-powerpoint': 'ppt',
|
|
'application/vnd.openxmlformats-officedocument.presentationml.presentation': 'pptx',
|
|
}
|
|
return mime_map.get(mime_type)
|
|
|
|
def _validate_integrity(self, file_path: Path, file_format: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validate file integrity by attempting to open it
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
file_format: Detected file format
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
"""
|
|
try:
|
|
if file_format in self.SUPPORTED_IMAGE_FORMATS:
|
|
# Try to open image
|
|
with Image.open(file_path) as img:
|
|
img.verify() # Verify image integrity
|
|
# Reopen for actual check (verify() closes the file)
|
|
with Image.open(file_path) as img:
|
|
_ = img.size # Force load to detect corruption
|
|
return True, None
|
|
|
|
elif file_format == 'pdf':
|
|
# Basic PDF validation - check file starts with PDF signature
|
|
with open(file_path, 'rb') as f:
|
|
header = f.read(5)
|
|
if header != b'%PDF-':
|
|
return False, "Invalid PDF header"
|
|
return True, None
|
|
|
|
elif file_format in ['doc', 'docx', 'ppt', 'pptx']:
|
|
# Office documents - basic validation (check file size and can be opened)
|
|
# Modern Office formats (docx, pptx) are ZIP-based
|
|
if file_format in ['docx', 'pptx']:
|
|
import zipfile
|
|
try:
|
|
with zipfile.ZipFile(file_path, 'r') as zf:
|
|
# Check if it has the required Office structure
|
|
if file_format == 'docx' and 'word/document.xml' not in zf.namelist():
|
|
return False, "Invalid DOCX structure"
|
|
elif file_format == 'pptx' and 'ppt/presentation.xml' not in zf.namelist():
|
|
return False, "Invalid PPTX structure"
|
|
except zipfile.BadZipFile:
|
|
return False, "Invalid Office file (corrupt ZIP)"
|
|
# Old formats (doc, ppt) - just check file exists and has content
|
|
return True, None
|
|
|
|
else:
|
|
return False, f"Unknown format: {file_format}"
|
|
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
def preprocess_image(
|
|
self,
|
|
image_path: Path,
|
|
enhance: bool = True,
|
|
output_path: Optional[Path] = None
|
|
) -> Tuple[bool, Optional[Path], Optional[str]]:
|
|
"""
|
|
Preprocess image to improve OCR accuracy
|
|
|
|
Args:
|
|
image_path: Path to input image
|
|
enhance: Whether to apply enhancement
|
|
output_path: Optional output path (defaults to temp directory)
|
|
|
|
Returns:
|
|
Tuple of (success, processed_image_path, error_message)
|
|
"""
|
|
try:
|
|
# Read image
|
|
img = cv2.imread(str(image_path))
|
|
if img is None:
|
|
return False, None, "Failed to read image"
|
|
|
|
if not enhance:
|
|
# No preprocessing, return original
|
|
return True, image_path, None
|
|
|
|
# Convert to grayscale
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Apply adaptive thresholding to handle varying lighting
|
|
processed = cv2.adaptiveThreshold(
|
|
gray,
|
|
255,
|
|
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY,
|
|
11,
|
|
2
|
|
)
|
|
|
|
# Denoise
|
|
processed = cv2.fastNlMeansDenoising(processed, None, 10, 7, 21)
|
|
|
|
# Determine output path
|
|
if output_path is None:
|
|
output_path = Path(settings.processed_dir) / f"processed_{image_path.name}"
|
|
|
|
# Save processed image
|
|
cv2.imwrite(str(output_path), processed)
|
|
|
|
logger.info(f"Image preprocessed: {image_path.name} -> {output_path.name}")
|
|
return True, output_path, None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Image preprocessing error: {str(e)}")
|
|
return False, None, f"Preprocessing error: {str(e)}"
|
|
|
|
def get_file_info(self, file_path: Path) -> dict:
|
|
"""
|
|
Get comprehensive file information
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
|
|
Returns:
|
|
Dictionary with file information
|
|
"""
|
|
stat = file_path.stat()
|
|
mime = magic.Magic(mime=True)
|
|
mime_type = mime.from_file(str(file_path))
|
|
|
|
return {
|
|
'name': file_path.name,
|
|
'path': str(file_path),
|
|
'size': stat.st_size,
|
|
'size_mb': stat.st_size / (1024 * 1024),
|
|
'mime_type': mime_type,
|
|
'format': self._mime_to_format(mime_type),
|
|
'created_at': stat.st_ctime,
|
|
'modified_at': stat.st_mtime,
|
|
}
|