""" Tool_OCR - Export Service Handles OCR result export in multiple formats with filtering and formatting rules """ import json import logging import zipfile from pathlib import Path from typing import List, Dict, Optional, Any from datetime import datetime import pandas as pd from sqlalchemy.orm import Session from app.core.config import settings from app.models.ocr import OCRBatch, OCRFile, OCRResult, FileStatus from app.models.export import ExportRule from app.services.pdf_generator import PDFGenerator, PDFGenerationError logger = logging.getLogger(__name__) class ExportError(Exception): """Exception raised for export errors""" pass class ExportService: """ Export service for OCR results Supported formats: - TXT: Plain text export - JSON: Full metadata export - Excel: Tabular data export - Markdown: Direct Markdown export - PDF: Layout-preserved PDF export - ZIP: Batch export archive """ def __init__(self): """Initialize export service""" self.pdf_generator = PDFGenerator() def apply_filters( self, results: List[OCRResult], filters: Dict[str, Any] ) -> List[OCRResult]: """ Apply filters to OCR results Args: results: List of OCR results filters: Filter configuration - confidence_threshold: Minimum confidence (0.0-1.0) - filename_pattern: Glob pattern for filename matching - language: Filter by detected language Returns: List[OCRResult]: Filtered results """ filtered = results # Confidence threshold filter if "confidence_threshold" in filters: threshold = filters["confidence_threshold"] filtered = [r for r in filtered if r.average_confidence and r.average_confidence >= threshold] # Filename pattern filter (using simple substring match) if "filename_pattern" in filters: pattern = filters["filename_pattern"].lower() filtered = [ r for r in filtered if pattern in r.file.original_filename.lower() ] # Language filter if "language" in filters: lang = filters["language"] filtered = [r for r in filtered if r.detected_language == lang] return filtered def export_to_txt( self, results: List[OCRResult], output_path: Path, formatting: Optional[Dict] = None ) -> Path: """ Export results to plain text file Args: results: List of OCR results output_path: Output file path formatting: Formatting options - add_line_numbers: Add line numbers - group_by_filename: Group text by source file - include_metadata: Add file metadata headers Returns: Path: Output file path Raises: ExportError: If export fails """ try: formatting = formatting or {} output_lines = [] for idx, result in enumerate(results, 1): # Read Markdown file if not result.markdown_path or not Path(result.markdown_path).exists(): logger.warning(f"Markdown file not found for result {result.id}") continue markdown_content = Path(result.markdown_path).read_text(encoding="utf-8") # Add metadata header if requested if formatting.get("include_metadata", False): output_lines.append(f"=" * 80) output_lines.append(f"文件: {result.file.original_filename}") output_lines.append(f"語言: {result.detected_language or '未知'}") output_lines.append(f"信心度: {result.average_confidence:.2%}" if result.average_confidence else "信心度: N/A") output_lines.append(f"=" * 80) output_lines.append("") # Add content with optional line numbers if formatting.get("add_line_numbers", False): for line_num, line in enumerate(markdown_content.split('\n'), 1): output_lines.append(f"{line_num:4d} | {line}") else: output_lines.append(markdown_content) # Add separator between files if grouping if formatting.get("group_by_filename", False) and idx < len(results): output_lines.append("\n" + "-" * 80 + "\n") # Write to file output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text("\n".join(output_lines), encoding="utf-8") logger.info(f"Exported {len(results)} results to TXT: {output_path}") return output_path except Exception as e: raise ExportError(f"TXT export failed: {str(e)}") def export_to_json( self, results: List[OCRResult], output_path: Path, include_layout: bool = True, include_images: bool = True ) -> Path: """ Export results to JSON file with full metadata Args: results: List of OCR results output_path: Output file path include_layout: Include layout data include_images: Include images metadata Returns: Path: Output file path Raises: ExportError: If export fails """ try: export_data = { "export_time": datetime.utcnow().isoformat(), "total_files": len(results), "results": [] } for result in results: result_data = { "file_id": result.file.id, "filename": result.file.original_filename, "file_format": result.file.file_format, "file_size": result.file.file_size, "processing_time": result.file.processing_time, "detected_language": result.detected_language, "total_text_regions": result.total_text_regions, "average_confidence": result.average_confidence, "markdown_path": result.markdown_path, } # Include layout data if requested if include_layout and result.layout_data: result_data["layout_data"] = result.layout_data # Include images metadata if requested if include_images and result.images_metadata: result_data["images_metadata"] = result.images_metadata export_data["results"].append(result_data) # Write to file output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text( json.dumps(export_data, ensure_ascii=False, indent=2), encoding="utf-8" ) logger.info(f"Exported {len(results)} results to JSON: {output_path}") return output_path except Exception as e: raise ExportError(f"JSON export failed: {str(e)}") def export_to_excel( self, results: List[OCRResult], output_path: Path, include_confidence: bool = True, include_processing_time: bool = True ) -> Path: """ Export results to Excel file Args: results: List of OCR results output_path: Output file path include_confidence: Include confidence scores include_processing_time: Include processing time Returns: Path: Output file path Raises: ExportError: If export fails """ try: rows = [] for result in results: # Read Markdown content text_content = "" if result.markdown_path and Path(result.markdown_path).exists(): text_content = Path(result.markdown_path).read_text(encoding="utf-8") row = { "文件名": result.file.original_filename, "格式": result.file.file_format, "大小(字節)": result.file.file_size, "語言": result.detected_language or "未知", "文本區域數": result.total_text_regions, "提取內容": text_content[:1000] + "..." if len(text_content) > 1000 else text_content, } if include_confidence: row["平均信心度"] = f"{result.average_confidence:.2%}" if result.average_confidence else "N/A" if include_processing_time: row["處理時間(秒)"] = f"{result.file.processing_time:.2f}" if result.file.processing_time else "N/A" rows.append(row) # Create DataFrame and export df = pd.DataFrame(rows) output_path.parent.mkdir(parents=True, exist_ok=True) df.to_excel(output_path, index=False, engine='openpyxl') logger.info(f"Exported {len(results)} results to Excel: {output_path}") return output_path except Exception as e: raise ExportError(f"Excel export failed: {str(e)}") def export_to_markdown( self, results: List[OCRResult], output_path: Path, combine: bool = True ) -> Path: """ Export results to Markdown file(s) Args: results: List of OCR results output_path: Output file path (or directory if not combining) combine: Combine all results into one file Returns: Path: Output file/directory path Raises: ExportError: If export fails """ try: if combine: # Combine all Markdown files into one combined_content = [] for result in results: if not result.markdown_path or not Path(result.markdown_path).exists(): continue markdown_content = Path(result.markdown_path).read_text(encoding="utf-8") # Add header combined_content.append(f"# {result.file.original_filename}\n") combined_content.append(markdown_content) combined_content.append("\n---\n") # Separator output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text("\n".join(combined_content), encoding="utf-8") logger.info(f"Exported {len(results)} results to combined Markdown: {output_path}") return output_path else: # Export each result to separate file output_path.mkdir(parents=True, exist_ok=True) for result in results: if not result.markdown_path or not Path(result.markdown_path).exists(): continue # Copy Markdown file to output directory src_path = Path(result.markdown_path) dst_path = output_path / f"{result.file.original_filename}.md" dst_path.write_text(src_path.read_text(encoding="utf-8"), encoding="utf-8") logger.info(f"Exported {len(results)} results to separate Markdown files: {output_path}") return output_path except Exception as e: raise ExportError(f"Markdown export failed: {str(e)}") def export_to_pdf( self, result: OCRResult, output_path: Path, css_template: str = "default", metadata: Optional[Dict] = None ) -> Path: """ Export single result to PDF with layout preservation Args: result: OCR result output_path: Output PDF path css_template: CSS template name or custom CSS metadata: Optional PDF metadata Returns: Path: Output PDF path Raises: ExportError: If export fails """ try: if not result.markdown_path or not Path(result.markdown_path).exists(): raise ExportError(f"Markdown file not found for result {result.id}") markdown_path = Path(result.markdown_path) # Prepare metadata pdf_metadata = metadata or {} if "title" not in pdf_metadata: pdf_metadata["title"] = result.file.original_filename # Generate PDF self.pdf_generator.generate_pdf( markdown_path=markdown_path, output_path=output_path, css_template=css_template, metadata=pdf_metadata ) logger.info(f"Exported result {result.id} to PDF: {output_path}") return output_path except PDFGenerationError as e: raise ExportError(f"PDF generation failed: {str(e)}") except Exception as e: raise ExportError(f"PDF export failed: {str(e)}") def export_batch_to_zip( self, db: Session, batch_id: int, output_path: Path, include_formats: Optional[List[str]] = None ) -> Path: """ Export entire batch to ZIP archive Args: db: Database session batch_id: Batch ID output_path: Output ZIP path include_formats: List of formats to include (markdown, json, txt, excel, pdf) Returns: Path: Output ZIP path Raises: ExportError: If export fails """ try: include_formats = include_formats or ["markdown", "json"] # Get batch and results batch = db.query(OCRBatch).filter(OCRBatch.id == batch_id).first() if not batch: raise ExportError(f"Batch {batch_id} not found") results = db.query(OCRResult).join(OCRFile).filter( OCRFile.batch_id == batch_id, OCRFile.status == FileStatus.COMPLETED ).all() if not results: raise ExportError(f"No completed results found for batch {batch_id}") # Create temporary export directory temp_dir = output_path.parent / f"temp_export_{batch_id}" temp_dir.mkdir(parents=True, exist_ok=True) try: # Export in requested formats if "markdown" in include_formats: md_dir = temp_dir / "markdown" self.export_to_markdown(results, md_dir, combine=False) if "json" in include_formats: json_path = temp_dir / "batch_results.json" self.export_to_json(results, json_path) if "txt" in include_formats: txt_path = temp_dir / "batch_results.txt" self.export_to_txt(results, txt_path) if "excel" in include_formats: excel_path = temp_dir / "batch_results.xlsx" self.export_to_excel(results, excel_path) # Create ZIP archive output_path.parent.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for file_path in temp_dir.rglob('*'): if file_path.is_file(): arcname = file_path.relative_to(temp_dir) zipf.write(file_path, arcname) logger.info(f"Exported batch {batch_id} to ZIP: {output_path}") return output_path finally: # Clean up temporary directory import shutil shutil.rmtree(temp_dir, ignore_errors=True) except Exception as e: raise ExportError(f"Batch ZIP export failed: {str(e)}") def apply_export_rule( self, db: Session, results: List[OCRResult], rule_id: int ) -> List[OCRResult]: """ Apply export rule to filter and format results Args: db: Database session results: List of OCR results rule_id: Export rule ID Returns: List[OCRResult]: Filtered results Raises: ExportError: If rule not found """ rule = db.query(ExportRule).filter(ExportRule.id == rule_id).first() if not rule: raise ExportError(f"Export rule {rule_id} not found") config = rule.config_json # Apply filters if "filters" in config: results = self.apply_filters(results, config["filters"]) # Note: Formatting options are applied in individual export methods return results def get_export_formats(self) -> Dict[str, str]: """ Get available export formats Returns: Dict mapping format codes to descriptions """ return { "txt": "純文本格式 (.txt)", "json": "JSON 格式 - 包含完整元數據 (.json)", "excel": "Excel 表格格式 (.xlsx)", "markdown": "Markdown 格式 (.md)", "pdf": "版面保留 PDF 格式 (.pdf)", "zip": "批次打包格式 (.zip)", }