""" Direct Extraction Engine using PyMuPDF Handles direct text and structure extraction from editable PDFs without OCR. This provides much faster processing and perfect accuracy for documents with extractable text. """ import os import logging import fitz # PyMuPDF import uuid from pathlib import Path from typing import Dict, List, Optional, Tuple, Any, Union from datetime import datetime import re from ..models.unified_document import ( UnifiedDocument, DocumentElement, Page, DocumentMetadata, BoundingBox, StyleInfo, TableData, TableCell, Dimensions, ElementType, ProcessingTrack ) logger = logging.getLogger(__name__) class DirectExtractionEngine: """ Engine for direct text extraction from editable PDFs using PyMuPDF. This engine provides: - Fast text extraction with exact positioning - Font and style information preservation - Table structure detection - Image extraction with coordinates - Hyperlink and annotation extraction """ def __init__(self, enable_table_detection: bool = True, enable_image_extraction: bool = True, min_table_rows: int = 2, min_table_cols: int = 2): """ Initialize the extraction engine. Args: enable_table_detection: Whether to detect and extract tables enable_image_extraction: Whether to extract images min_table_rows: Minimum rows for table detection min_table_cols: Minimum columns for table detection """ self.enable_table_detection = enable_table_detection self.enable_image_extraction = enable_image_extraction self.min_table_rows = min_table_rows self.min_table_cols = min_table_cols def extract(self, file_path: Path, output_dir: Optional[Path] = None) -> UnifiedDocument: """ Extract content from PDF file to UnifiedDocument format. Args: file_path: Path to PDF file output_dir: Optional directory to save extracted images Returns: UnifiedDocument with extracted content """ start_time = datetime.now() document_id = str(uuid.uuid4()) try: doc = fitz.open(str(file_path)) # Extract document metadata metadata = self._extract_metadata(file_path, doc, start_time) # Extract pages pages = [] for page_num in range(len(doc)): logger.info(f"Extracting page {page_num + 1}/{len(doc)}") page = self._extract_page( doc[page_num], page_num + 1, document_id, output_dir ) pages.append(page) doc.close() # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() metadata.processing_time = processing_time logger.info(f"Direct extraction completed in {processing_time:.2f}s") return UnifiedDocument( document_id=document_id, metadata=metadata, pages=pages ) except Exception as e: logger.error(f"Error during direct extraction: {e}") # Return partial result with error information processing_time = (datetime.now() - start_time).total_seconds() if 'metadata' not in locals(): metadata = DocumentMetadata( filename=file_path.name, file_type="pdf", file_size=file_path.stat().st_size if file_path.exists() else 0, created_at=datetime.now(), processing_track=ProcessingTrack.DIRECT, processing_time=processing_time ) return UnifiedDocument( document_id=document_id, metadata=metadata, pages=pages if 'pages' in locals() else [], processing_errors=[{ "error": str(e), "type": type(e).__name__ }] ) def _extract_metadata(self, file_path: Path, doc: fitz.Document, start_time: datetime) -> DocumentMetadata: """Extract document metadata""" pdf_metadata = doc.metadata return DocumentMetadata( filename=file_path.name, file_type="pdf", file_size=file_path.stat().st_size, created_at=start_time, processing_track=ProcessingTrack.DIRECT, processing_time=0.0, # Will be updated later title=pdf_metadata.get("title"), author=pdf_metadata.get("author"), subject=pdf_metadata.get("subject"), keywords=pdf_metadata.get("keywords", "").split(",") if pdf_metadata.get("keywords") else None, producer=pdf_metadata.get("producer"), creator=pdf_metadata.get("creator"), creation_date=self._parse_pdf_date(pdf_metadata.get("creationDate")), modification_date=self._parse_pdf_date(pdf_metadata.get("modDate")) ) def _parse_pdf_date(self, date_str: str) -> Optional[datetime]: """Parse PDF date string to datetime""" if not date_str: return None try: # PDF date format: D:YYYYMMDDHHmmSSOHH'mm # Example: D:20240101120000+09'00 if date_str.startswith("D:"): date_str = date_str[2:] # Extract just the date/time part (first 14 characters) if len(date_str) >= 14: date_part = date_str[:14] return datetime.strptime(date_part, "%Y%m%d%H%M%S") except: pass return None def _extract_page(self, page: fitz.Page, page_num: int, document_id: str, output_dir: Optional[Path]) -> Page: """Extract content from a single page""" elements = [] element_counter = 0 # Get page dimensions rect = page.rect dimensions = Dimensions( width=rect.width, height=rect.height, dpi=72 # PDF standard DPI ) # Extract text blocks with formatting (sort=True for reading order) text_dict = page.get_text("dict", sort=True) for block_idx, block in enumerate(text_dict.get("blocks", [])): if block.get("type") == 0: # Text block element = self._process_text_block( block, page_num, element_counter ) if element: elements.append(element) element_counter += 1 # Extract tables (if enabled) if self.enable_table_detection: try: # Try native table detection (PyMuPDF 1.23.0+) tables = page.find_tables() for table_idx, table in enumerate(tables): element = self._process_native_table( table, page_num, element_counter ) if element: elements.append(element) element_counter += 1 except AttributeError: # Fallback to positional table detection logger.debug("Native table detection not available, using positional detection") table_elements = self._detect_tables_by_position(page, page_num, element_counter) elements.extend(table_elements) element_counter += len(table_elements) # Extract images (if enabled) if self.enable_image_extraction: image_elements = self._extract_images( page, page_num, document_id, element_counter, output_dir ) elements.extend(image_elements) element_counter += len(image_elements) # Extract hyperlinks links = page.get_links() for link_idx, link in enumerate(links): # Create link annotation element if it has URI if link.get("uri"): from_rect = link.get("from") if from_rect: element = DocumentElement( element_id=f"link_{page_num}_{element_counter}", type=ElementType.REFERENCE, content={"uri": link["uri"], "type": "hyperlink"}, bbox=BoundingBox( x0=from_rect.x0, y0=from_rect.y0, x1=from_rect.x1, y1=from_rect.y1 ), metadata={"link_type": "external" if link["uri"].startswith("http") else "internal"} ) elements.append(element) element_counter += 1 # Extract vector graphics (as metadata) drawings = page.get_drawings() if drawings: logger.debug(f"Page {page_num} contains {len(drawings)} vector drawing commands") # PyMuPDF's sort=True already provides good reading order for multi-column layouts # (top-to-bottom, left-to-right within each row). We don't need to re-sort. # NOTE: If sort=True is not used in get_text(), uncomment the line below: # elements = self._sort_elements_for_reading_order(elements, dimensions) # Post-process elements for header/footer detection and structure elements = self._detect_headers_footers(elements, dimensions) elements = self._build_section_hierarchy(elements) elements = self._build_nested_lists(elements) return Page( page_number=page_num, elements=elements, dimensions=dimensions, metadata={ "has_drawings": len(drawings) > 0, "drawing_count": len(drawings), "link_count": len(links) } ) def _sort_elements_for_reading_order(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]: """ Sort elements by reading order, handling multi-column layouts. For multi-column layouts (e.g., two-column documents), this ensures elements are ordered correctly: top-to-bottom, then left-to-right within each row. Args: elements: List of document elements dimensions: Page dimensions Returns: Sorted list of elements in reading order """ if not elements: return elements # Detect if page has multi-column layout text_elements = [e for e in elements if e.bbox and e.is_text] if len(text_elements) < 3: # Too few elements to determine layout, just sort by Y position return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0)) # Cluster x-positions to detect columns x_positions = [e.bbox.x0 for e in text_elements] columns = self._detect_columns(x_positions, dimensions.width) if len(columns) <= 1: # Single column layout - simple top-to-bottom sort logger.debug(f"Detected single-column layout") return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0)) logger.debug(f"Detected {len(columns)}-column layout at x positions: {[f'{x:.1f}' for x in columns]}") # Multi-column layout - use newspaper-style reading order # (complete left column, then right column, etc.) # This is more appropriate for technical documents and data sheets element_data = [] for elem in elements: if not elem.bbox: element_data.append((elem, 0, 0)) continue # Find which column this element belongs to col_idx = 0 min_dist = float('inf') for i, col_x in enumerate(columns): dist = abs(elem.bbox.x0 - col_x) if dist < min_dist: min_dist = dist col_idx = i element_data.append((elem, col_idx, elem.bbox.y0)) # Sort by: column first, then Y position within column # This gives newspaper-style reading: complete column 1, then column 2, etc. element_data.sort(key=lambda x: (x[1], x[2])) logger.debug(f"Using newspaper-style column reading order (column by column, top to bottom)") return [e[0] for e in element_data] def _detect_columns(self, x_positions: List[float], page_width: float) -> List[float]: """ Detect column positions from x-coordinates of text elements. Args: x_positions: List of x-coordinates (left edges of text) page_width: Page width in points Returns: List of column x-positions (sorted left to right) """ if not x_positions: return [] # Cluster x-positions to find column starts # Use k-means-like approach: find groups of x-positions threshold = page_width * 0.15 # 15% of page width as clustering threshold sorted_x = sorted(set(x_positions)) if not sorted_x: return [] clusters = [[sorted_x[0]]] for x in sorted_x[1:]: # Check if x belongs to current cluster cluster_center = sum(clusters[-1]) / len(clusters[-1]) if abs(x - cluster_center) < threshold: clusters[-1].append(x) else: # Start new cluster clusters.append([x]) # Return average x position of each cluster (column start) column_positions = [sum(cluster) / len(cluster) for cluster in clusters] # Filter out columns that are too close to each other min_column_width = page_width * 0.2 # Columns must be at least 20% of page width apart filtered_columns = [column_positions[0]] for col_x in column_positions[1:]: if col_x - filtered_columns[-1] >= min_column_width: filtered_columns.append(col_x) return filtered_columns def _detect_headers_footers(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]: """Detect and mark header/footer elements based on page position""" page_height = dimensions.height header_threshold = page_height * 0.1 # Top 10% of page footer_threshold = page_height * 0.9 # Bottom 10% of page for elem in elements: # Skip non-text elements if not elem.is_text: continue # Check if element is in header region if elem.bbox.y1 <= header_threshold: # Only mark as header if it's short text if isinstance(elem.content, str) and len(elem.content) < 200: elem.type = ElementType.HEADER elem.metadata['is_page_header'] = True # Check if element is in footer region elif elem.bbox.y0 >= footer_threshold: # Short text in footer region if isinstance(elem.content, str) and len(elem.content) < 200: elem.type = ElementType.FOOTER elem.metadata['is_page_footer'] = True return elements def _build_section_hierarchy(self, elements: List[DocumentElement]) -> List[DocumentElement]: """Build hierarchical section structure based on font sizes""" # Collect all headers with their font sizes headers = [] for elem in elements: if elem.type in [ElementType.TITLE, ElementType.HEADER]: # Get average font size from style font_size = 12.0 # Default if elem.style and elem.style.font_size: font_size = elem.style.font_size headers.append((elem, font_size)) if not headers: return elements # Sort headers by font size to determine hierarchy levels font_sizes = sorted(set(size for _, size in headers), reverse=True) size_to_level = {size: level for level, size in enumerate(font_sizes, 1)} # Assign section levels to headers for elem, font_size in headers: level = size_to_level.get(font_size, 1) elem.metadata['section_level'] = level elem.metadata['font_size'] = font_size # Build parent-child relationships between headers header_stack = [] # Stack of (element, level) for elem, font_size in headers: level = elem.metadata['section_level'] # Pop headers that are at same or lower level (larger font) while header_stack and header_stack[-1][1] >= level: header_stack.pop() # Set parent header if header_stack: parent = header_stack[-1][0] elem.metadata['parent_section'] = parent.element_id if 'child_sections' not in parent.metadata: parent.metadata['child_sections'] = [] parent.metadata['child_sections'].append(elem.element_id) header_stack.append((elem, level)) # Link content to nearest preceding header at same or higher level current_header = None for elem in elements: if elem.type in [ElementType.TITLE, ElementType.HEADER]: current_header = elem elif current_header and elem.type not in [ElementType.HEADER, ElementType.FOOTER]: elem.metadata['section_id'] = current_header.element_id return elements def _build_nested_lists(self, elements: List[DocumentElement]) -> List[DocumentElement]: """Build nested list structure from flat list items""" # Group list items list_items = [e for e in elements if e.type == ElementType.LIST_ITEM] if not list_items: return elements # Sort by position (top to bottom) list_items.sort(key=lambda e: (e.bbox.y0, e.bbox.x0)) # Detect indentation levels based on x position x_positions = [item.bbox.x0 for item in list_items] if not x_positions: return elements min_x = min(x_positions) indent_unit = 20 # Typical indent size in points # Assign nesting levels for item in list_items: indent = item.bbox.x0 - min_x level = int(indent / indent_unit) item.metadata['list_level'] = level # Build parent-child relationships item_stack = [] # Stack of (element, level) for item in list_items: level = item.metadata.get('list_level', 0) # Pop items at same or deeper level while item_stack and item_stack[-1][1] >= level: item_stack.pop() # Set parent if item_stack: parent = item_stack[-1][0] item.metadata['parent_item'] = parent.element_id if 'children' not in parent.metadata: parent.metadata['children'] = [] parent.metadata['children'].append(item.element_id) # Also add to actual children list parent.children.append(item) item_stack.append((item, level)) return elements def _process_text_block(self, block: Dict, page_num: int, counter: int) -> Optional[DocumentElement]: """Process a text block into a DocumentElement""" # Calculate block bounding box bbox_data = block.get("bbox", [0, 0, 0, 0]) bbox = BoundingBox( x0=bbox_data[0], y0=bbox_data[1], x1=bbox_data[2], y1=bbox_data[3] ) # Extract text content and span information text_parts = [] styles = [] span_children = [] # Store span-level children for inline styling span_counter = 0 for line in block.get("lines", []): for span in line.get("spans", []): text = span.get("text", "") if text: text_parts.append(text) # Extract style information style = StyleInfo( font_name=span.get("font"), font_size=span.get("size"), font_weight="bold" if span.get("flags", 0) & 2**4 else "normal", font_style="italic" if span.get("flags", 0) & 2**1 else "normal", text_color=span.get("color") ) styles.append(style) # Create span child element for inline styling span_bbox_data = span.get("bbox", bbox_data) span_bbox = BoundingBox( x0=span_bbox_data[0], y0=span_bbox_data[1], x1=span_bbox_data[2], y1=span_bbox_data[3] ) span_element = DocumentElement( element_id=f"span_{page_num}_{counter}_{span_counter}", type=ElementType.TEXT, # Spans are always text content=text, bbox=span_bbox, style=style, confidence=1.0, metadata={"span_index": span_counter} ) span_children.append(span_element) span_counter += 1 if not text_parts: return None full_text = "".join(text_parts) # Determine element type based on content and style element_type = self._infer_element_type(full_text, styles) # Use the most common style for the block if styles: block_style = styles[0] # Could be improved with style merging else: block_style = None return DocumentElement( element_id=f"text_{page_num}_{counter}", type=element_type, content=full_text, bbox=bbox, style=block_style, confidence=1.0, # Direct extraction has perfect confidence children=span_children # Store span children for inline styling ) def _infer_element_type(self, text: str, styles: List[StyleInfo]) -> ElementType: """Infer element type based on text content and styling""" text_lower = text.lower().strip() # Check for common patterns if len(text_lower) < 100 and styles: # Short text with large font might be title/header avg_size = sum(s.font_size or 12 for s in styles) / len(styles) if avg_size > 16: return ElementType.TITLE elif avg_size > 14: return ElementType.HEADER # Check for list patterns if re.match(r'^[\d•·▪▫◦‣⁃]\s', text_lower): return ElementType.LIST_ITEM # Check for page numbers if re.match(r'^page\s+\d+|^\d+\s*$|^-\s*\d+\s*-$', text_lower): return ElementType.PAGE_NUMBER # Check for footnote patterns if re.match(r'^[\[\d+\]]|^\d+\)', text_lower): return ElementType.FOOTNOTE # Default to paragraph for longer text, text for shorter return ElementType.PARAGRAPH if len(text) > 150 else ElementType.TEXT def _process_native_table(self, table, page_num: int, counter: int) -> Optional[DocumentElement]: """Process a natively detected table""" try: # Extract table data data = table.extract() if not data or len(data) < self.min_table_rows: return None # Get table bounding box bbox_data = table.bbox bbox = BoundingBox( x0=bbox_data[0], y0=bbox_data[1], x1=bbox_data[2], y1=bbox_data[3] ) # Create table cells cells = [] for row_idx, row in enumerate(data): for col_idx, cell_text in enumerate(row): if cell_text: cells.append(TableCell( row=row_idx, col=col_idx, content=str(cell_text) if cell_text else "" )) # Create table data table_data = TableData( rows=len(data), cols=max(len(row) for row in data) if data else 0, cells=cells, headers=data[0] if data else None # Assume first row is header ) return DocumentElement( element_id=f"table_{page_num}_{counter}", type=ElementType.TABLE, content=table_data, bbox=bbox, confidence=1.0 ) except Exception as e: logger.error(f"Error processing native table: {e}") return None def _detect_tables_by_position(self, page: fitz.Page, page_num: int, counter: int) -> List[DocumentElement]: """Detect tables by analyzing text positioning""" tables = [] # Get all words with positions words = page.get_text("words") # Returns (x0, y0, x1, y1, "word", block_no, line_no, word_no) if not words: return tables # Group words by approximate row (y-coordinate) rows = {} for word in words: y = round(word[1] / 5) * 5 # Round to nearest 5 points if y not in rows: rows[y] = [] rows[y].append({ 'x0': word[0], 'y0': word[1], 'x1': word[2], 'y1': word[3], 'text': word[4], 'block': word[5] if len(word) > 5 else 0 }) # Sort rows by y-coordinate sorted_rows = sorted(rows.items(), key=lambda x: x[0]) # Find potential tables (consecutive rows with multiple columns) current_table_rows = [] tables_found = [] for y, words_in_row in sorted_rows: words_in_row.sort(key=lambda w: w['x0']) if len(words_in_row) >= self.min_table_cols: # Check if this could be a table row x_positions = [w['x0'] for w in words_in_row] # Check for somewhat regular spacing if self._has_regular_spacing(x_positions): current_table_rows.append((y, words_in_row)) else: # End current table if exists if len(current_table_rows) >= self.min_table_rows: tables_found.append(current_table_rows) current_table_rows = [] else: # End current table if exists if len(current_table_rows) >= self.min_table_rows: tables_found.append(current_table_rows) current_table_rows = [] # Don't forget the last table if len(current_table_rows) >= self.min_table_rows: tables_found.append(current_table_rows) # Convert detected tables to DocumentElements for table_idx, table_rows in enumerate(tables_found): if not table_rows: continue # Calculate table bounding box all_words = [] for _, words in table_rows: all_words.extend(words) min_x = min(w['x0'] for w in all_words) min_y = min(w['y0'] for w in all_words) max_x = max(w['x1'] for w in all_words) max_y = max(w['y1'] for w in all_words) bbox = BoundingBox(x0=min_x, y0=min_y, x1=max_x, y1=max_y) # Create table cells cells = [] for row_idx, (y, words) in enumerate(table_rows): # Group words into columns columns = self._group_into_columns(words, table_rows) for col_idx, col_text in enumerate(columns): if col_text: cells.append(TableCell( row=row_idx, col=col_idx, content=col_text )) # Create table data table_data = TableData( rows=len(table_rows), cols=max(len(self._group_into_columns(words, table_rows)) for _, words in table_rows), cells=cells ) element = DocumentElement( element_id=f"table_{page_num}_{counter + table_idx}", type=ElementType.TABLE, content=table_data, bbox=bbox, confidence=0.8, # Lower confidence for positional detection metadata={"detection_method": "positional"} ) tables.append(element) return tables def _has_regular_spacing(self, x_positions: List[float], tolerance: float = 0.3) -> bool: """Check if x positions have somewhat regular spacing""" if len(x_positions) < 3: return False spacings = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)] avg_spacing = sum(spacings) / len(spacings) # Check if spacings are within tolerance of average for spacing in spacings: if abs(spacing - avg_spacing) > avg_spacing * tolerance: return False return True def _group_into_columns(self, words: List[Dict], all_rows: List) -> List[str]: """Group words into columns based on x-position""" if not words: return [] # Find common column positions across all rows all_x_positions = [] for _, row_words in all_rows: all_x_positions.extend([w['x0'] for w in row_words]) # Cluster x-positions to find columns column_positions = self._cluster_positions(all_x_positions) # Assign words to columns columns = [""] * len(column_positions) for word in words: # Find closest column closest_col = 0 min_dist = float('inf') for col_idx, col_x in enumerate(column_positions): dist = abs(word['x0'] - col_x) if dist < min_dist: min_dist = dist closest_col = col_idx if columns[closest_col]: columns[closest_col] += " " + word['text'] else: columns[closest_col] = word['text'] return columns def _cluster_positions(self, positions: List[float], threshold: float = 20) -> List[float]: """Cluster positions to find common columns""" if not positions: return [] sorted_pos = sorted(positions) clusters = [[sorted_pos[0]]] for pos in sorted_pos[1:]: # Check if position belongs to current cluster if pos - clusters[-1][-1] < threshold: clusters[-1].append(pos) else: clusters.append([pos]) # Return average position of each cluster return [sum(cluster) / len(cluster) for cluster in clusters] def _extract_images(self, page: fitz.Page, page_num: int, document_id: str, counter: int, output_dir: Optional[Path]) -> List[DocumentElement]: """Extract images from page""" elements = [] image_list = page.get_images() for img_idx, img in enumerate(image_list): try: xref = img[0] # Get image position(s) img_rects = page.get_image_rects(xref) if not img_rects: continue rect = img_rects[0] # Use first occurrence bbox = BoundingBox( x0=rect.x0, y0=rect.y0, x1=rect.x1, y1=rect.y1 ) # Extract image data pix = fitz.Pixmap(page.parent, xref) image_data = { "width": pix.width, "height": pix.height, "colorspace": pix.colorspace.name if pix.colorspace else "unknown", "xref": xref } # Save image if output directory provided if output_dir: output_dir.mkdir(parents=True, exist_ok=True) image_filename = f"{document_id}_p{page_num}_img{img_idx}.png" image_path = output_dir / image_filename pix.save(str(image_path)) image_data["saved_path"] = str(image_path) logger.debug(f"Saved image to {image_path}") element = DocumentElement( element_id=f"image_{page_num}_{counter + img_idx}", type=ElementType.IMAGE, content=image_data, bbox=bbox, confidence=1.0, metadata={ "image_index": img_idx, "xref": xref } ) elements.append(element) pix = None # Free memory except Exception as e: logger.error(f"Error extracting image {img_idx}: {e}") return elements