refactor: complete V1 to V2 migration and remove legacy architecture

Remove all V1 architecture components and promote V2 to primary:
- Delete all paddle_ocr_* table models (export, ocr, translation, user)
- Delete legacy routers (auth, export, ocr, translation)
- Delete legacy schemas and services
- Promote user_v2.py to user.py as primary user model
- Update all imports and dependencies to use V2 models only
- Update main.py version to 2.0.0

Database changes:
- Fix SQLAlchemy reserved word: rename audit_log.metadata to extra_data
- Add migration to drop all paddle_ocr_* tables
- Update alembic env to only import V2 models

Frontend fixes:
- Fix Select component exports in TaskHistoryPage.tsx
- Update to use simplified Select API with options prop
- Fix AxiosInstance TypeScript import syntax

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
egg
2025-11-14 21:27:39 +08:00
parent ad2b832fb6
commit fd98018ddd
34 changed files with 554 additions and 3787 deletions

View File

@@ -9,7 +9,7 @@ from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from datetime import datetime, timedelta
from app.models.user_v2 import User
from app.models.user import User
from app.models.task import Task, TaskStatus
from app.models.session import Session as UserSession
from app.models.audit_log import AuditLog

View File

@@ -1,421 +0,0 @@
"""
Tool_OCR - Background Tasks Service
Handles async processing, cleanup, and scheduled tasks
"""
import logging
import asyncio
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Callable, Any
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.ocr import OCRBatch, OCRFile, OCRResult, BatchStatus, FileStatus
from app.services.ocr_service import OCRService
from app.services.file_manager import FileManager
from app.services.pdf_generator import PDFGenerator
logger = logging.getLogger(__name__)
class BackgroundTaskManager:
"""
Manages background tasks including retry logic, cleanup, and scheduled jobs
"""
def __init__(
self,
max_retries: int = 3,
retry_delay: int = 5,
cleanup_interval: int = 3600, # 1 hour
file_retention_hours: int = 24
):
self.max_retries = max_retries
self.retry_delay = retry_delay
self.cleanup_interval = cleanup_interval
self.file_retention_hours = file_retention_hours
self.ocr_service = OCRService()
self.file_manager = FileManager()
self.pdf_generator = PDFGenerator()
async def execute_with_retry(
self,
func: Callable,
*args,
max_retries: Optional[int] = None,
retry_delay: Optional[int] = None,
**kwargs
) -> Any:
"""
Execute a function with retry logic
Args:
func: Function to execute
args: Positional arguments for func
max_retries: Maximum retry attempts (overrides default)
retry_delay: Delay between retries in seconds (overrides default)
kwargs: Keyword arguments for func
Returns:
Function result
Raises:
Exception: If all retries are exhausted
"""
max_retries = max_retries or self.max_retries
retry_delay = retry_delay or self.retry_delay
last_exception = None
for attempt in range(max_retries + 1):
try:
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries:
logger.warning(
f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: {e}. "
f"Retrying in {retry_delay}s..."
)
await asyncio.sleep(retry_delay)
else:
logger.error(
f"All {max_retries + 1} attempts failed for {func.__name__}: {e}"
)
raise last_exception
def process_single_file_with_retry(
self,
ocr_file: OCRFile,
batch_id: int,
lang: str,
detect_layout: bool,
db: Session
) -> bool:
"""
Process a single file with retry logic
Args:
ocr_file: OCRFile instance
batch_id: Batch ID
lang: Language code
detect_layout: Whether to detect layout
db: Database session
Returns:
bool: True if successful, False otherwise
"""
for attempt in range(self.max_retries + 1):
try:
# Update file status
ocr_file.status = FileStatus.PROCESSING
ocr_file.started_at = datetime.utcnow()
ocr_file.retry_count = attempt
db.commit()
# Get file paths
file_path = Path(ocr_file.file_path)
paths = self.file_manager.get_file_paths(batch_id, ocr_file.id)
# Process OCR
result = self.ocr_service.process_image(
file_path,
lang=lang,
detect_layout=detect_layout
)
# Check if processing was successful
if result['status'] != 'success':
raise Exception(result.get('error_message', 'Unknown error during OCR processing'))
# Save results
json_path, markdown_path = self.ocr_service.save_results(
result=result,
output_dir=paths["output_dir"],
file_id=str(ocr_file.id)
)
# Extract data from result
text_regions = result.get('text_regions', [])
layout_data = result.get('layout_data')
images_metadata = result.get('images_metadata', [])
# Calculate average confidence (or use from result)
avg_confidence = result.get('average_confidence')
# Create OCR result record
ocr_result = OCRResult(
file_id=ocr_file.id,
markdown_path=str(markdown_path) if markdown_path else None,
json_path=str(json_path) if json_path else None,
images_dir=None, # Images dir not used in current implementation
detected_language=lang,
total_text_regions=len(text_regions),
average_confidence=avg_confidence,
layout_data=layout_data,
images_metadata=images_metadata
)
db.add(ocr_result)
# Update file status
ocr_file.status = FileStatus.COMPLETED
ocr_file.completed_at = datetime.utcnow()
ocr_file.processing_time = (ocr_file.completed_at - ocr_file.started_at).total_seconds()
# Commit with retry on connection errors
try:
db.commit()
except Exception as commit_error:
logger.warning(f"Commit failed, rolling back and retrying: {commit_error}")
db.rollback()
db.refresh(ocr_file)
ocr_file.status = FileStatus.COMPLETED
ocr_file.completed_at = datetime.utcnow()
ocr_file.processing_time = (ocr_file.completed_at - ocr_file.started_at).total_seconds()
db.commit()
logger.info(f"Successfully processed file {ocr_file.id} ({ocr_file.original_filename})")
return True
except Exception as e:
logger.error(f"Attempt {attempt + 1}/{self.max_retries + 1} failed for file {ocr_file.id}: {e}")
db.rollback() # Rollback failed transaction
if attempt < self.max_retries:
# Wait before retry
time.sleep(self.retry_delay)
else:
# Final failure
try:
ocr_file.status = FileStatus.FAILED
ocr_file.error_message = f"Failed after {self.max_retries + 1} attempts: {str(e)}"
ocr_file.completed_at = datetime.utcnow()
ocr_file.retry_count = attempt
db.commit()
except Exception as final_error:
logger.error(f"Failed to update error status: {final_error}")
db.rollback()
return False
return False
async def cleanup_expired_files(self, db: Session):
"""
Clean up files and batches older than retention period
Args:
db: Database session
"""
try:
cutoff_time = datetime.utcnow() - timedelta(hours=self.file_retention_hours)
# Find expired batches
expired_batches = db.query(OCRBatch).filter(
OCRBatch.created_at < cutoff_time,
OCRBatch.status.in_([BatchStatus.COMPLETED, BatchStatus.FAILED, BatchStatus.PARTIAL])
).all()
logger.info(f"Found {len(expired_batches)} expired batches to clean up")
for batch in expired_batches:
try:
# Get batch directory
batch_dir = self.file_manager.base_upload_dir / "batches" / str(batch.id)
# Delete physical files
if batch_dir.exists():
import shutil
shutil.rmtree(batch_dir)
logger.info(f"Deleted batch directory: {batch_dir}")
# Delete database records
# Delete results first (foreign key constraint)
db.query(OCRResult).filter(
OCRResult.file_id.in_(
db.query(OCRFile.id).filter(OCRFile.batch_id == batch.id)
)
).delete(synchronize_session=False)
# Delete files
db.query(OCRFile).filter(OCRFile.batch_id == batch.id).delete()
# Delete batch
db.delete(batch)
db.commit()
logger.info(f"Cleaned up expired batch {batch.id}")
except Exception as e:
logger.error(f"Error cleaning up batch {batch.id}: {e}")
db.rollback()
except Exception as e:
logger.error(f"Error in cleanup_expired_files: {e}")
async def generate_pdf_background(
self,
result_id: int,
output_path: str,
css_template: str = "default",
db: Session = None
):
"""
Generate PDF in background with retry logic
Args:
result_id: OCR result ID
output_path: Output PDF path
css_template: CSS template name
db: Database session
"""
should_close_db = False
if db is None:
db = SessionLocal()
should_close_db = True
try:
# Get result
result = db.query(OCRResult).filter(OCRResult.id == result_id).first()
if not result:
logger.error(f"Result {result_id} not found")
return
# Generate PDF with retry
await self.execute_with_retry(
self.pdf_generator.generate_pdf,
markdown_path=result.markdown_path,
output_path=output_path,
css_template=css_template,
max_retries=2,
retry_delay=3
)
logger.info(f"Successfully generated PDF for result {result_id}: {output_path}")
except Exception as e:
logger.error(f"Failed to generate PDF for result {result_id}: {e}")
finally:
if should_close_db:
db.close()
async def start_cleanup_scheduler(self):
"""
Start periodic cleanup scheduler
Runs cleanup task at specified intervals
"""
logger.info(f"Starting cleanup scheduler (interval: {self.cleanup_interval}s, retention: {self.file_retention_hours}h)")
while True:
try:
db = SessionLocal()
await self.cleanup_expired_files(db)
db.close()
except Exception as e:
logger.error(f"Error in cleanup scheduler: {e}")
# Wait for next interval
await asyncio.sleep(self.cleanup_interval)
# Global task manager instance
task_manager = BackgroundTaskManager()
def process_batch_files_with_retry(
batch_id: int,
lang: str,
detect_layout: bool,
db: Session
):
"""
Process all files in a batch with retry logic
Args:
batch_id: Batch ID
lang: Language code
detect_layout: Whether to detect layout
db: Database session
"""
try:
# Get batch
batch = db.query(OCRBatch).filter(OCRBatch.id == batch_id).first()
if not batch:
logger.error(f"Batch {batch_id} not found")
return
# Update batch status
batch.status = BatchStatus.PROCESSING
batch.started_at = datetime.utcnow()
db.commit()
# Get pending files
files = db.query(OCRFile).filter(
OCRFile.batch_id == batch_id,
OCRFile.status == FileStatus.PENDING
).all()
logger.info(f"Processing {len(files)} files in batch {batch_id} with retry logic")
# Process each file with retry
for ocr_file in files:
success = task_manager.process_single_file_with_retry(
ocr_file=ocr_file,
batch_id=batch_id,
lang=lang,
detect_layout=detect_layout,
db=db
)
# Update batch progress
if success:
batch.completed_files += 1
else:
batch.failed_files += 1
db.commit()
# Update batch final status
if batch.failed_files == 0:
batch.status = BatchStatus.COMPLETED
elif batch.completed_files > 0:
batch.status = BatchStatus.PARTIAL
else:
batch.status = BatchStatus.FAILED
batch.completed_at = datetime.utcnow()
# Commit with retry on connection errors
try:
db.commit()
except Exception as commit_error:
logger.warning(f"Batch commit failed, rolling back and retrying: {commit_error}")
db.rollback()
batch = db.query(OCRBatch).filter(OCRBatch.id == batch_id).first()
if batch:
batch.completed_at = datetime.utcnow()
db.commit()
logger.info(
f"Batch {batch_id} processing complete: "
f"{batch.completed_files} succeeded, {batch.failed_files} failed"
)
except Exception as e:
logger.error(f"Fatal error processing batch {batch_id}: {e}")
db.rollback() # Rollback any failed transaction
try:
batch = db.query(OCRBatch).filter(OCRBatch.id == batch_id).first()
if batch:
batch.status = BatchStatus.FAILED
batch.completed_at = datetime.utcnow()
db.commit()
except Exception as commit_error:
logger.error(f"Error updating batch status: {commit_error}")
db.rollback()

View File

@@ -1,512 +0,0 @@
"""
Tool_OCR - Export Service
Handles OCR result export in multiple formats with filtering and formatting rules
"""
import json
import logging
import zipfile
from pathlib import Path
from typing import List, Dict, Optional, Any
from datetime import datetime
import pandas as pd
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.ocr import OCRBatch, OCRFile, OCRResult, FileStatus
from app.models.export import ExportRule
from app.services.pdf_generator import PDFGenerator, PDFGenerationError
logger = logging.getLogger(__name__)
class ExportError(Exception):
"""Exception raised for export errors"""
pass
class ExportService:
"""
Export service for OCR results
Supported formats:
- TXT: Plain text export
- JSON: Full metadata export
- Excel: Tabular data export
- Markdown: Direct Markdown export
- PDF: Layout-preserved PDF export
- ZIP: Batch export archive
"""
def __init__(self):
"""Initialize export service"""
self.pdf_generator = PDFGenerator()
def apply_filters(
self,
results: List[OCRResult],
filters: Dict[str, Any]
) -> List[OCRResult]:
"""
Apply filters to OCR results
Args:
results: List of OCR results
filters: Filter configuration
- confidence_threshold: Minimum confidence (0.0-1.0)
- filename_pattern: Glob pattern for filename matching
- language: Filter by detected language
Returns:
List[OCRResult]: Filtered results
"""
filtered = results
# Confidence threshold filter
if "confidence_threshold" in filters:
threshold = filters["confidence_threshold"]
filtered = [r for r in filtered if r.average_confidence and r.average_confidence >= threshold]
# Filename pattern filter (using simple substring match)
if "filename_pattern" in filters:
pattern = filters["filename_pattern"].lower()
filtered = [
r for r in filtered
if pattern in r.file.original_filename.lower()
]
# Language filter
if "language" in filters:
lang = filters["language"]
filtered = [r for r in filtered if r.detected_language == lang]
return filtered
def export_to_txt(
self,
results: List[OCRResult],
output_path: Path,
formatting: Optional[Dict] = None
) -> Path:
"""
Export results to plain text file
Args:
results: List of OCR results
output_path: Output file path
formatting: Formatting options
- add_line_numbers: Add line numbers
- group_by_filename: Group text by source file
- include_metadata: Add file metadata headers
Returns:
Path: Output file path
Raises:
ExportError: If export fails
"""
try:
formatting = formatting or {}
output_lines = []
for idx, result in enumerate(results, 1):
# Read Markdown file
if not result.markdown_path or not Path(result.markdown_path).exists():
logger.warning(f"Markdown file not found for result {result.id}")
continue
markdown_content = Path(result.markdown_path).read_text(encoding="utf-8")
# Add metadata header if requested
if formatting.get("include_metadata", False):
output_lines.append(f"=" * 80)
output_lines.append(f"文件: {result.file.original_filename}")
output_lines.append(f"語言: {result.detected_language or '未知'}")
output_lines.append(f"信心度: {result.average_confidence:.2%}" if result.average_confidence else "信心度: N/A")
output_lines.append(f"=" * 80)
output_lines.append("")
# Add content with optional line numbers
if formatting.get("add_line_numbers", False):
for line_num, line in enumerate(markdown_content.split('\n'), 1):
output_lines.append(f"{line_num:4d} | {line}")
else:
output_lines.append(markdown_content)
# Add separator between files if grouping
if formatting.get("group_by_filename", False) and idx < len(results):
output_lines.append("\n" + "-" * 80 + "\n")
# Write to file
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text("\n".join(output_lines), encoding="utf-8")
logger.info(f"Exported {len(results)} results to TXT: {output_path}")
return output_path
except Exception as e:
raise ExportError(f"TXT export failed: {str(e)}")
def export_to_json(
self,
results: List[OCRResult],
output_path: Path,
include_layout: bool = True,
include_images: bool = True
) -> Path:
"""
Export results to JSON file with full metadata
Args:
results: List of OCR results
output_path: Output file path
include_layout: Include layout data
include_images: Include images metadata
Returns:
Path: Output file path
Raises:
ExportError: If export fails
"""
try:
export_data = {
"export_time": datetime.utcnow().isoformat(),
"total_files": len(results),
"results": []
}
for result in results:
result_data = {
"file_id": result.file.id,
"filename": result.file.original_filename,
"file_format": result.file.file_format,
"file_size": result.file.file_size,
"processing_time": result.file.processing_time,
"detected_language": result.detected_language,
"total_text_regions": result.total_text_regions,
"average_confidence": result.average_confidence,
"markdown_path": result.markdown_path,
}
# Include layout data if requested
if include_layout and result.layout_data:
result_data["layout_data"] = result.layout_data
# Include images metadata if requested
if include_images and result.images_metadata:
result_data["images_metadata"] = result.images_metadata
export_data["results"].append(result_data)
# Write to file
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(
json.dumps(export_data, ensure_ascii=False, indent=2),
encoding="utf-8"
)
logger.info(f"Exported {len(results)} results to JSON: {output_path}")
return output_path
except Exception as e:
raise ExportError(f"JSON export failed: {str(e)}")
def export_to_excel(
self,
results: List[OCRResult],
output_path: Path,
include_confidence: bool = True,
include_processing_time: bool = True
) -> Path:
"""
Export results to Excel file
Args:
results: List of OCR results
output_path: Output file path
include_confidence: Include confidence scores
include_processing_time: Include processing time
Returns:
Path: Output file path
Raises:
ExportError: If export fails
"""
try:
rows = []
for result in results:
# Read Markdown content
text_content = ""
if result.markdown_path and Path(result.markdown_path).exists():
text_content = Path(result.markdown_path).read_text(encoding="utf-8")
row = {
"文件名": result.file.original_filename,
"格式": result.file.file_format,
"大小(字節)": result.file.file_size,
"語言": result.detected_language or "未知",
"文本區域數": result.total_text_regions,
"提取內容": text_content[:1000] + "..." if len(text_content) > 1000 else text_content,
}
if include_confidence:
row["平均信心度"] = f"{result.average_confidence:.2%}" if result.average_confidence else "N/A"
if include_processing_time:
row["處理時間(秒)"] = f"{result.file.processing_time:.2f}" if result.file.processing_time else "N/A"
rows.append(row)
# Create DataFrame and export
df = pd.DataFrame(rows)
output_path.parent.mkdir(parents=True, exist_ok=True)
df.to_excel(output_path, index=False, engine='openpyxl')
logger.info(f"Exported {len(results)} results to Excel: {output_path}")
return output_path
except Exception as e:
raise ExportError(f"Excel export failed: {str(e)}")
def export_to_markdown(
self,
results: List[OCRResult],
output_path: Path,
combine: bool = True
) -> Path:
"""
Export results to Markdown file(s)
Args:
results: List of OCR results
output_path: Output file path (or directory if not combining)
combine: Combine all results into one file
Returns:
Path: Output file/directory path
Raises:
ExportError: If export fails
"""
try:
if combine:
# Combine all Markdown files into one
combined_content = []
for result in results:
if not result.markdown_path or not Path(result.markdown_path).exists():
continue
markdown_content = Path(result.markdown_path).read_text(encoding="utf-8")
# Add header
combined_content.append(f"# {result.file.original_filename}\n")
combined_content.append(markdown_content)
combined_content.append("\n---\n") # Separator
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text("\n".join(combined_content), encoding="utf-8")
logger.info(f"Exported {len(results)} results to combined Markdown: {output_path}")
return output_path
else:
# Export each result to separate file
output_path.mkdir(parents=True, exist_ok=True)
for result in results:
if not result.markdown_path or not Path(result.markdown_path).exists():
continue
# Copy Markdown file to output directory
src_path = Path(result.markdown_path)
dst_path = output_path / f"{result.file.original_filename}.md"
dst_path.write_text(src_path.read_text(encoding="utf-8"), encoding="utf-8")
logger.info(f"Exported {len(results)} results to separate Markdown files: {output_path}")
return output_path
except Exception as e:
raise ExportError(f"Markdown export failed: {str(e)}")
def export_to_pdf(
self,
result: OCRResult,
output_path: Path,
css_template: str = "default",
metadata: Optional[Dict] = None
) -> Path:
"""
Export single result to PDF with layout preservation
Args:
result: OCR result
output_path: Output PDF path
css_template: CSS template name or custom CSS
metadata: Optional PDF metadata
Returns:
Path: Output PDF path
Raises:
ExportError: If export fails
"""
try:
if not result.markdown_path or not Path(result.markdown_path).exists():
raise ExportError(f"Markdown file not found for result {result.id}")
markdown_path = Path(result.markdown_path)
# Prepare metadata
pdf_metadata = metadata or {}
if "title" not in pdf_metadata:
pdf_metadata["title"] = result.file.original_filename
# Generate PDF
self.pdf_generator.generate_pdf(
markdown_path=markdown_path,
output_path=output_path,
css_template=css_template,
metadata=pdf_metadata
)
logger.info(f"Exported result {result.id} to PDF: {output_path}")
return output_path
except PDFGenerationError as e:
raise ExportError(f"PDF generation failed: {str(e)}")
except Exception as e:
raise ExportError(f"PDF export failed: {str(e)}")
def export_batch_to_zip(
self,
db: Session,
batch_id: int,
output_path: Path,
include_formats: Optional[List[str]] = None
) -> Path:
"""
Export entire batch to ZIP archive
Args:
db: Database session
batch_id: Batch ID
output_path: Output ZIP path
include_formats: List of formats to include (markdown, json, txt, excel, pdf)
Returns:
Path: Output ZIP path
Raises:
ExportError: If export fails
"""
try:
include_formats = include_formats or ["markdown", "json"]
# Get batch and results
batch = db.query(OCRBatch).filter(OCRBatch.id == batch_id).first()
if not batch:
raise ExportError(f"Batch {batch_id} not found")
results = db.query(OCRResult).join(OCRFile).filter(
OCRFile.batch_id == batch_id,
OCRFile.status == FileStatus.COMPLETED
).all()
if not results:
raise ExportError(f"No completed results found for batch {batch_id}")
# Create temporary export directory
temp_dir = output_path.parent / f"temp_export_{batch_id}"
temp_dir.mkdir(parents=True, exist_ok=True)
try:
# Export in requested formats
if "markdown" in include_formats:
md_dir = temp_dir / "markdown"
self.export_to_markdown(results, md_dir, combine=False)
if "json" in include_formats:
json_path = temp_dir / "batch_results.json"
self.export_to_json(results, json_path)
if "txt" in include_formats:
txt_path = temp_dir / "batch_results.txt"
self.export_to_txt(results, txt_path)
if "excel" in include_formats:
excel_path = temp_dir / "batch_results.xlsx"
self.export_to_excel(results, excel_path)
# Create ZIP archive
output_path.parent.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in temp_dir.rglob('*'):
if file_path.is_file():
arcname = file_path.relative_to(temp_dir)
zipf.write(file_path, arcname)
logger.info(f"Exported batch {batch_id} to ZIP: {output_path}")
return output_path
finally:
# Clean up temporary directory
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
raise ExportError(f"Batch ZIP export failed: {str(e)}")
def apply_export_rule(
self,
db: Session,
results: List[OCRResult],
rule_id: int
) -> List[OCRResult]:
"""
Apply export rule to filter and format results
Args:
db: Database session
results: List of OCR results
rule_id: Export rule ID
Returns:
List[OCRResult]: Filtered results
Raises:
ExportError: If rule not found
"""
rule = db.query(ExportRule).filter(ExportRule.id == rule_id).first()
if not rule:
raise ExportError(f"Export rule {rule_id} not found")
config = rule.config_json
# Apply filters
if "filters" in config:
results = self.apply_filters(results, config["filters"])
# Note: Formatting options are applied in individual export methods
return results
def get_export_formats(self) -> Dict[str, str]:
"""
Get available export formats
Returns:
Dict mapping format codes to descriptions
"""
return {
"txt": "純文本格式 (.txt)",
"json": "JSON 格式 - 包含完整元數據 (.json)",
"excel": "Excel 表格格式 (.xlsx)",
"markdown": "Markdown 格式 (.md)",
"pdf": "版面保留 PDF 格式 (.pdf)",
"zip": "批次打包格式 (.zip)",
}

View File

@@ -1,420 +0,0 @@
"""
Tool_OCR - File Management Service
Handles file uploads, storage, validation, and cleanup
"""
import logging
import shutil
import uuid
from pathlib import Path
from typing import List, Tuple, Optional
from datetime import datetime, timedelta
from fastapi import UploadFile
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.ocr import OCRBatch, OCRFile, FileStatus
from app.services.preprocessor import DocumentPreprocessor
logger = logging.getLogger(__name__)
class FileManagementError(Exception):
"""Exception raised for file management errors"""
pass
class FileManager:
"""
File management service for upload, storage, and cleanup
Directory structure:
uploads/
├── batches/
│ └── {batch_id}/
│ ├── inputs/ # Original uploaded files
│ ├── outputs/ # OCR results
│ │ ├── markdown/ # Markdown files
│ │ ├── json/ # JSON files
│ │ └── images/ # Extracted images
│ └── exports/ # Export files (PDF, Excel, etc.)
"""
def __init__(self):
"""Initialize file manager"""
self.preprocessor = DocumentPreprocessor()
self.base_upload_dir = Path(settings.upload_dir)
self.base_upload_dir.mkdir(parents=True, exist_ok=True)
def create_batch_directory(self, batch_id: int) -> Path:
"""
Create directory structure for a batch
Args:
batch_id: Batch ID
Returns:
Path: Batch directory path
"""
batch_dir = self.base_upload_dir / "batches" / str(batch_id)
# Create subdirectories
(batch_dir / "inputs").mkdir(parents=True, exist_ok=True)
(batch_dir / "outputs" / "markdown").mkdir(parents=True, exist_ok=True)
(batch_dir / "outputs" / "json").mkdir(parents=True, exist_ok=True)
(batch_dir / "outputs" / "images").mkdir(parents=True, exist_ok=True)
(batch_dir / "exports").mkdir(parents=True, exist_ok=True)
logger.info(f"Created batch directory: {batch_dir}")
return batch_dir
def get_batch_directory(self, batch_id: int) -> Path:
"""
Get batch directory path
Args:
batch_id: Batch ID
Returns:
Path: Batch directory path
"""
return self.base_upload_dir / "batches" / str(batch_id)
def validate_upload(self, file: UploadFile) -> Tuple[bool, Optional[str]]:
"""
Validate uploaded file before saving
Args:
file: Uploaded file
Returns:
Tuple of (is_valid, error_message)
"""
# Check filename
if not file.filename:
return False, "文件名不能為空"
# Check file size (read content size)
file.file.seek(0, 2) # Seek to end
file_size = file.file.tell()
file.file.seek(0) # Reset to beginning
if file_size == 0:
return False, "文件為空"
if file_size > settings.max_upload_size:
max_mb = settings.max_upload_size / (1024 * 1024)
return False, f"文件大小超過限制 ({max_mb}MB)"
# Check file extension
file_ext = Path(file.filename).suffix.lower()
allowed_extensions = {'.png', '.jpg', '.jpeg', '.pdf', '.doc', '.docx', '.ppt', '.pptx'}
if file_ext not in allowed_extensions:
return False, f"不支持的文件格式 ({file_ext}),僅支持: {', '.join(allowed_extensions)}"
return True, None
def save_upload(
self,
file: UploadFile,
batch_id: int,
validate: bool = True
) -> Tuple[Path, str]:
"""
Save uploaded file to batch directory
Args:
file: Uploaded file
batch_id: Batch ID
validate: Whether to validate file
Returns:
Tuple of (file_path, original_filename)
Raises:
FileManagementError: If file validation or saving fails
"""
# Validate if requested
if validate:
is_valid, error_msg = self.validate_upload(file)
if not is_valid:
raise FileManagementError(error_msg)
# Generate unique filename to avoid conflicts
original_filename = file.filename
file_ext = Path(original_filename).suffix
unique_filename = f"{uuid.uuid4()}{file_ext}"
# Get batch input directory
batch_dir = self.get_batch_directory(batch_id)
input_dir = batch_dir / "inputs"
input_dir.mkdir(parents=True, exist_ok=True)
# Save file
file_path = input_dir / unique_filename
try:
with file_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
logger.info(f"Saved upload: {file_path} (original: {original_filename})")
return file_path, original_filename
except Exception as e:
# Clean up partial file if exists
file_path.unlink(missing_ok=True)
raise FileManagementError(f"保存文件失敗: {str(e)}")
def validate_saved_file(self, file_path: Path) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Validate saved file using preprocessor
Args:
file_path: Path to saved file
Returns:
Tuple of (is_valid, error_message, detected_format)
"""
return self.preprocessor.validate_file(file_path)
def create_batch(
self,
db: Session,
user_id: int,
batch_name: Optional[str] = None
) -> OCRBatch:
"""
Create new OCR batch
Args:
db: Database session
user_id: User ID
batch_name: Optional batch name
Returns:
OCRBatch: Created batch object
"""
# Create batch record
batch = OCRBatch(
user_id=user_id,
batch_name=batch_name or f"Batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
db.add(batch)
db.commit()
db.refresh(batch)
# Create directory structure
self.create_batch_directory(batch.id)
logger.info(f"Created batch: {batch.id} for user {user_id}")
return batch
def add_file_to_batch(
self,
db: Session,
batch_id: int,
file: UploadFile
) -> OCRFile:
"""
Add file to batch and save to disk
Args:
db: Database session
batch_id: Batch ID
file: Uploaded file
Returns:
OCRFile: Created file record
Raises:
FileManagementError: If file operations fail
"""
# Save file to disk
file_path, original_filename = self.save_upload(file, batch_id)
# Validate saved file
is_valid, detected_format, error_msg = self.validate_saved_file(file_path)
# Create file record
ocr_file = OCRFile(
batch_id=batch_id,
filename=file_path.name,
original_filename=original_filename,
file_path=str(file_path),
file_size=file_path.stat().st_size,
file_format=detected_format or Path(original_filename).suffix.lower().lstrip('.'),
status=FileStatus.PENDING if is_valid else FileStatus.FAILED,
error_message=error_msg if not is_valid else None
)
db.add(ocr_file)
# Update batch total_files count
batch = db.query(OCRBatch).filter(OCRBatch.id == batch_id).first()
if batch:
batch.total_files += 1
if not is_valid:
batch.failed_files += 1
db.commit()
db.refresh(ocr_file)
logger.info(f"Added file to batch {batch_id}: {ocr_file.id} (status: {ocr_file.status})")
return ocr_file
def add_files_to_batch(
self,
db: Session,
batch_id: int,
files: List[UploadFile]
) -> List[OCRFile]:
"""
Add multiple files to batch
Args:
db: Database session
batch_id: Batch ID
files: List of uploaded files
Returns:
List[OCRFile]: List of created file records
"""
ocr_files = []
for file in files:
try:
ocr_file = self.add_file_to_batch(db, batch_id, file)
ocr_files.append(ocr_file)
except FileManagementError as e:
logger.error(f"Failed to add file {file.filename} to batch {batch_id}: {e}")
# Continue with other files
continue
return ocr_files
def get_file_paths(self, batch_id: int, file_id: int) -> dict:
"""
Get all paths for a file in a batch
Args:
batch_id: Batch ID
file_id: File ID
Returns:
Dict containing all relevant paths
"""
batch_dir = self.get_batch_directory(batch_id)
return {
"input_dir": batch_dir / "inputs",
"output_dir": batch_dir / "outputs",
"markdown_dir": batch_dir / "outputs" / "markdown",
"json_dir": batch_dir / "outputs" / "json",
"images_dir": batch_dir / "outputs" / "images" / str(file_id),
"export_dir": batch_dir / "exports",
}
def cleanup_expired_batches(self, db: Session, retention_hours: int = 24) -> int:
"""
Clean up expired batch files
Args:
db: Database session
retention_hours: Number of hours to retain files
Returns:
int: Number of batches cleaned up
"""
cutoff_time = datetime.utcnow() - timedelta(hours=retention_hours)
# Find expired batches
expired_batches = db.query(OCRBatch).filter(
OCRBatch.created_at < cutoff_time
).all()
cleaned_count = 0
for batch in expired_batches:
try:
# Delete batch directory
batch_dir = self.get_batch_directory(batch.id)
if batch_dir.exists():
shutil.rmtree(batch_dir)
logger.info(f"Deleted batch directory: {batch_dir}")
# Delete database records (cascade will handle related records)
db.delete(batch)
cleaned_count += 1
except Exception as e:
logger.error(f"Failed to cleanup batch {batch.id}: {e}")
continue
if cleaned_count > 0:
db.commit()
logger.info(f"Cleaned up {cleaned_count} expired batches")
return cleaned_count
def verify_file_ownership(
self,
db: Session,
user_id: int,
batch_id: int
) -> bool:
"""
Verify user owns the batch
Args:
db: Database session
user_id: User ID
batch_id: Batch ID
Returns:
bool: True if user owns batch, False otherwise
"""
batch = db.query(OCRBatch).filter(
OCRBatch.id == batch_id,
OCRBatch.user_id == user_id
).first()
return batch is not None
def get_batch_statistics(self, db: Session, batch_id: int) -> dict:
"""
Get statistics for a batch
Args:
db: Database session
batch_id: Batch ID
Returns:
Dict containing batch statistics
"""
batch = db.query(OCRBatch).filter(OCRBatch.id == batch_id).first()
if not batch:
return {}
# Calculate total file size
total_size = sum(f.file_size for f in batch.files)
# Calculate processing time
processing_time = None
if batch.completed_at and batch.started_at:
processing_time = (batch.completed_at - batch.started_at).total_seconds()
return {
"batch_id": batch.id,
"batch_name": batch.batch_name,
"status": batch.status,
"total_files": batch.total_files,
"completed_files": batch.completed_files,
"failed_files": batch.failed_files,
"pending_files": batch.total_files - batch.completed_files - batch.failed_files,
"progress_percentage": batch.progress_percentage,
"total_file_size": total_size,
"total_file_size_mb": round(total_size / (1024 * 1024), 2),
"created_at": batch.created_at.isoformat(),
"started_at": batch.started_at.isoformat() if batch.started_at else None,
"completed_at": batch.completed_at.isoformat() if batch.completed_at else None,
"processing_time": processing_time,
}

View File

@@ -1,282 +0,0 @@
"""
Tool_OCR - Translation Service (RESERVED)
Abstract interface and stub implementation for future translation feature
"""
from abc import ABC, abstractmethod
from typing import Dict, Optional, List
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class TranslationEngine(str, Enum):
"""Supported translation engines"""
OFFLINE = "offline" # Argos Translate (offline)
ERNIE = "ernie" # Baidu ERNIE API
GOOGLE = "google" # Google Translate API
DEEPL = "deepl" # DeepL API
class LanguageCode(str, Enum):
"""Supported language codes"""
CHINESE = "zh"
ENGLISH = "en"
JAPANESE = "ja"
KOREAN = "ko"
FRENCH = "fr"
GERMAN = "de"
SPANISH = "es"
class TranslationServiceInterface(ABC):
"""
Abstract interface for translation services
This interface defines the contract for all translation engine implementations.
Future implementations should inherit from this class.
"""
@abstractmethod
def translate_text(
self,
text: str,
source_lang: str,
target_lang: str,
**kwargs
) -> str:
"""
Translate a single text string
Args:
text: Text to translate
source_lang: Source language code
target_lang: Target language code
**kwargs: Engine-specific parameters
Returns:
str: Translated text
"""
pass
@abstractmethod
def translate_document(
self,
markdown_content: str,
source_lang: str,
target_lang: str,
preserve_structure: bool = True,
**kwargs
) -> Dict[str, any]:
"""
Translate a Markdown document while preserving structure
Args:
markdown_content: Markdown content to translate
source_lang: Source language code
target_lang: Target language code
preserve_structure: Whether to preserve markdown structure
**kwargs: Engine-specific parameters
Returns:
Dict containing:
- translated_content: Translated markdown
- metadata: Translation metadata (engine, time, etc.)
"""
pass
@abstractmethod
def batch_translate(
self,
texts: List[str],
source_lang: str,
target_lang: str,
**kwargs
) -> List[str]:
"""
Translate multiple texts in batch
Args:
texts: List of texts to translate
source_lang: Source language code
target_lang: Target language code
**kwargs: Engine-specific parameters
Returns:
List[str]: List of translated texts
"""
pass
@abstractmethod
def get_supported_languages(self) -> List[str]:
"""
Get list of supported language codes for this engine
Returns:
List[str]: List of supported language codes
"""
pass
@abstractmethod
def validate_config(self) -> bool:
"""
Validate engine configuration (API keys, model files, etc.)
Returns:
bool: True if configuration is valid
"""
pass
class TranslationEngineFactory:
"""
Factory for creating translation engine instances
RESERVED: This is a placeholder for future implementation.
When translation feature is implemented, this factory will instantiate
the appropriate translation engine based on configuration.
"""
@staticmethod
def create_engine(
engine_type: TranslationEngine,
config: Optional[Dict] = None
) -> TranslationServiceInterface:
"""
Create a translation engine instance
Args:
engine_type: Type of translation engine
config: Engine-specific configuration
Returns:
TranslationServiceInterface: Translation engine instance
Raises:
NotImplementedError: Always raised (stub implementation)
"""
raise NotImplementedError(
"Translation feature is not yet implemented. "
"This is a reserved placeholder for future development."
)
@staticmethod
def get_available_engines() -> List[str]:
"""
Get list of available translation engines
Returns:
List[str]: List of engine types (currently empty)
"""
return []
@staticmethod
def is_engine_available(engine_type: TranslationEngine) -> bool:
"""
Check if a specific engine is available
Args:
engine_type: Engine type to check
Returns:
bool: Always False (stub implementation)
"""
return False
class StubTranslationService:
"""
Stub translation service for API endpoints
This service provides placeholder responses for translation endpoints
until the feature is fully implemented.
"""
@staticmethod
def get_feature_status() -> Dict[str, any]:
"""
Get translation feature status
Returns:
Dict with feature status information
"""
return {
"available": False,
"status": "reserved",
"message": "Translation feature is reserved for future implementation",
"supported_engines": [],
"planned_engines": [
{
"type": "offline",
"name": "Argos Translate",
"description": "Offline neural translation",
"status": "planned"
},
{
"type": "ernie",
"name": "Baidu ERNIE",
"description": "Baidu AI translation API",
"status": "planned"
},
{
"type": "google",
"name": "Google Translate",
"description": "Google Cloud Translation API",
"status": "planned"
},
{
"type": "deepl",
"name": "DeepL",
"description": "DeepL translation API",
"status": "planned"
}
],
"roadmap": {
"phase": "Phase 5",
"priority": "low",
"implementation_after": "Production deployment and user feedback"
}
}
@staticmethod
def get_supported_languages() -> List[Dict[str, str]]:
"""
Get list of languages planned for translation support
Returns:
List of language info dicts
"""
return [
{"code": "zh", "name": "Chinese (Simplified)", "status": "planned"},
{"code": "en", "name": "English", "status": "planned"},
{"code": "ja", "name": "Japanese", "status": "planned"},
{"code": "ko", "name": "Korean", "status": "planned"},
{"code": "fr", "name": "French", "status": "planned"},
{"code": "de", "name": "German", "status": "planned"},
{"code": "es", "name": "Spanish", "status": "planned"},
]
# Example placeholder for future engine implementations:
#
# class ArgosTranslationEngine(TranslationServiceInterface):
# """Offline translation using Argos Translate"""
# def __init__(self, model_path: str):
# self.model_path = model_path
# # Initialize Argos models
#
# def translate_text(self, text, source_lang, target_lang, **kwargs):
# # Implementation here
# pass
#
# class ERNIETranslationEngine(TranslationServiceInterface):
# """Baidu ERNIE API translation"""
# def __init__(self, api_key: str, api_secret: str):
# self.api_key = api_key
# self.api_secret = api_secret
#
# def translate_text(self, text, source_lang, target_lang, **kwargs):
# # Implementation here
# pass