從 Docker/macOS+Conda 部署遷移到 WSL2 Ubuntu 原生開發環境 主要變更: - 移除所有 Docker 相關配置檔案 (Dockerfile, docker-compose.yml, .dockerignore 等) - 移除 macOS/Conda 設置腳本 (SETUP.md, setup_conda.sh) - 新增 WSL Ubuntu 自動化環境設置腳本 (setup_dev_env.sh) - 新增後端/前端快速啟動腳本 (start_backend.sh, start_frontend.sh) - 統一開發端口配置 (backend: 8000, frontend: 5173) - 改進資料庫連接穩定性(連接池、超時設置、重試機制) - 更新專案文檔以反映當前 WSL 開發環境 Technical improvements: - Database connection pooling with health checks and auto-reconnection - Retry logic for long-running OCR tasks to prevent DB timeouts - Extended JWT token expiration to 24 hours - Support for Office documents (pptx, docx) via LibreOffice headless - Comprehensive system dependency installation in single script Environment: - OS: WSL2 Ubuntu 24.04 - Python: 3.12 (venv) - Node.js: 24.x LTS (nvm) - Backend Port: 8000 - Frontend Port: 5173 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
422 lines
14 KiB
Python
422 lines
14 KiB
Python
"""
|
|
Tool_OCR - Background Tasks Service
|
|
Handles async processing, cleanup, and scheduled tasks
|
|
"""
|
|
|
|
import logging
|
|
import asyncio
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional, Callable, Any
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.database import SessionLocal
|
|
from app.models.ocr import OCRBatch, OCRFile, OCRResult, BatchStatus, FileStatus
|
|
from app.services.ocr_service import OCRService
|
|
from app.services.file_manager import FileManager
|
|
from app.services.pdf_generator import PDFGenerator
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BackgroundTaskManager:
|
|
"""
|
|
Manages background tasks including retry logic, cleanup, and scheduled jobs
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
max_retries: int = 3,
|
|
retry_delay: int = 5,
|
|
cleanup_interval: int = 3600, # 1 hour
|
|
file_retention_hours: int = 24
|
|
):
|
|
self.max_retries = max_retries
|
|
self.retry_delay = retry_delay
|
|
self.cleanup_interval = cleanup_interval
|
|
self.file_retention_hours = file_retention_hours
|
|
self.ocr_service = OCRService()
|
|
self.file_manager = FileManager()
|
|
self.pdf_generator = PDFGenerator()
|
|
|
|
async def execute_with_retry(
|
|
self,
|
|
func: Callable,
|
|
*args,
|
|
max_retries: Optional[int] = None,
|
|
retry_delay: Optional[int] = None,
|
|
**kwargs
|
|
) -> Any:
|
|
"""
|
|
Execute a function with retry logic
|
|
|
|
Args:
|
|
func: Function to execute
|
|
args: Positional arguments for func
|
|
max_retries: Maximum retry attempts (overrides default)
|
|
retry_delay: Delay between retries in seconds (overrides default)
|
|
kwargs: Keyword arguments for func
|
|
|
|
Returns:
|
|
Function result
|
|
|
|
Raises:
|
|
Exception: If all retries are exhausted
|
|
"""
|
|
max_retries = max_retries or self.max_retries
|
|
retry_delay = retry_delay or self.retry_delay
|
|
|
|
last_exception = None
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
if asyncio.iscoroutinefunction(func):
|
|
return await func(*args, **kwargs)
|
|
else:
|
|
return func(*args, **kwargs)
|
|
except Exception as e:
|
|
last_exception = e
|
|
if attempt < max_retries:
|
|
logger.warning(
|
|
f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: {e}. "
|
|
f"Retrying in {retry_delay}s..."
|
|
)
|
|
await asyncio.sleep(retry_delay)
|
|
else:
|
|
logger.error(
|
|
f"All {max_retries + 1} attempts failed for {func.__name__}: {e}"
|
|
)
|
|
|
|
raise last_exception
|
|
|
|
def process_single_file_with_retry(
|
|
self,
|
|
ocr_file: OCRFile,
|
|
batch_id: int,
|
|
lang: str,
|
|
detect_layout: bool,
|
|
db: Session
|
|
) -> bool:
|
|
"""
|
|
Process a single file with retry logic
|
|
|
|
Args:
|
|
ocr_file: OCRFile instance
|
|
batch_id: Batch ID
|
|
lang: Language code
|
|
detect_layout: Whether to detect layout
|
|
db: Database session
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
for attempt in range(self.max_retries + 1):
|
|
try:
|
|
# Update file status
|
|
ocr_file.status = FileStatus.PROCESSING
|
|
ocr_file.started_at = datetime.utcnow()
|
|
ocr_file.retry_count = attempt
|
|
db.commit()
|
|
|
|
# Get file paths
|
|
file_path = Path(ocr_file.file_path)
|
|
paths = self.file_manager.get_file_paths(batch_id, ocr_file.id)
|
|
|
|
# Process OCR
|
|
result = self.ocr_service.process_image(
|
|
file_path,
|
|
lang=lang,
|
|
detect_layout=detect_layout
|
|
)
|
|
|
|
# Check if processing was successful
|
|
if result['status'] != 'success':
|
|
raise Exception(result.get('error_message', 'Unknown error during OCR processing'))
|
|
|
|
# Save results
|
|
json_path, markdown_path = self.ocr_service.save_results(
|
|
result=result,
|
|
output_dir=paths["output_dir"],
|
|
file_id=str(ocr_file.id)
|
|
)
|
|
|
|
# Extract data from result
|
|
text_regions = result.get('text_regions', [])
|
|
layout_data = result.get('layout_data')
|
|
images_metadata = result.get('images_metadata', [])
|
|
|
|
# Calculate average confidence (or use from result)
|
|
avg_confidence = result.get('average_confidence')
|
|
|
|
# Create OCR result record
|
|
ocr_result = OCRResult(
|
|
file_id=ocr_file.id,
|
|
markdown_path=str(markdown_path) if markdown_path else None,
|
|
json_path=str(json_path) if json_path else None,
|
|
images_dir=None, # Images dir not used in current implementation
|
|
detected_language=lang,
|
|
total_text_regions=len(text_regions),
|
|
average_confidence=avg_confidence,
|
|
layout_data=layout_data,
|
|
images_metadata=images_metadata
|
|
)
|
|
db.add(ocr_result)
|
|
|
|
# Update file status
|
|
ocr_file.status = FileStatus.COMPLETED
|
|
ocr_file.completed_at = datetime.utcnow()
|
|
ocr_file.processing_time = (ocr_file.completed_at - ocr_file.started_at).total_seconds()
|
|
|
|
# Commit with retry on connection errors
|
|
try:
|
|
db.commit()
|
|
except Exception as commit_error:
|
|
logger.warning(f"Commit failed, rolling back and retrying: {commit_error}")
|
|
db.rollback()
|
|
db.refresh(ocr_file)
|
|
ocr_file.status = FileStatus.COMPLETED
|
|
ocr_file.completed_at = datetime.utcnow()
|
|
ocr_file.processing_time = (ocr_file.completed_at - ocr_file.started_at).total_seconds()
|
|
db.commit()
|
|
|
|
logger.info(f"Successfully processed file {ocr_file.id} ({ocr_file.original_filename})")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Attempt {attempt + 1}/{self.max_retries + 1} failed for file {ocr_file.id}: {e}")
|
|
db.rollback() # Rollback failed transaction
|
|
|
|
if attempt < self.max_retries:
|
|
# Wait before retry
|
|
time.sleep(self.retry_delay)
|
|
else:
|
|
# Final failure
|
|
try:
|
|
ocr_file.status = FileStatus.FAILED
|
|
ocr_file.error_message = f"Failed after {self.max_retries + 1} attempts: {str(e)}"
|
|
ocr_file.completed_at = datetime.utcnow()
|
|
ocr_file.retry_count = attempt
|
|
db.commit()
|
|
except Exception as final_error:
|
|
logger.error(f"Failed to update error status: {final_error}")
|
|
db.rollback()
|
|
return False
|
|
|
|
return False
|
|
|
|
async def cleanup_expired_files(self, db: Session):
|
|
"""
|
|
Clean up files and batches older than retention period
|
|
|
|
Args:
|
|
db: Database session
|
|
"""
|
|
try:
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=self.file_retention_hours)
|
|
|
|
# Find expired batches
|
|
expired_batches = db.query(OCRBatch).filter(
|
|
OCRBatch.created_at < cutoff_time,
|
|
OCRBatch.status.in_([BatchStatus.COMPLETED, BatchStatus.FAILED, BatchStatus.PARTIAL])
|
|
).all()
|
|
|
|
logger.info(f"Found {len(expired_batches)} expired batches to clean up")
|
|
|
|
for batch in expired_batches:
|
|
try:
|
|
# Get batch directory
|
|
batch_dir = self.file_manager.base_upload_dir / "batches" / str(batch.id)
|
|
|
|
# Delete physical files
|
|
if batch_dir.exists():
|
|
import shutil
|
|
shutil.rmtree(batch_dir)
|
|
logger.info(f"Deleted batch directory: {batch_dir}")
|
|
|
|
# Delete database records
|
|
# Delete results first (foreign key constraint)
|
|
db.query(OCRResult).filter(
|
|
OCRResult.file_id.in_(
|
|
db.query(OCRFile.id).filter(OCRFile.batch_id == batch.id)
|
|
)
|
|
).delete(synchronize_session=False)
|
|
|
|
# Delete files
|
|
db.query(OCRFile).filter(OCRFile.batch_id == batch.id).delete()
|
|
|
|
# Delete batch
|
|
db.delete(batch)
|
|
db.commit()
|
|
|
|
logger.info(f"Cleaned up expired batch {batch.id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up batch {batch.id}: {e}")
|
|
db.rollback()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in cleanup_expired_files: {e}")
|
|
|
|
async def generate_pdf_background(
|
|
self,
|
|
result_id: int,
|
|
output_path: str,
|
|
css_template: str = "default",
|
|
db: Session = None
|
|
):
|
|
"""
|
|
Generate PDF in background with retry logic
|
|
|
|
Args:
|
|
result_id: OCR result ID
|
|
output_path: Output PDF path
|
|
css_template: CSS template name
|
|
db: Database session
|
|
"""
|
|
should_close_db = False
|
|
if db is None:
|
|
db = SessionLocal()
|
|
should_close_db = True
|
|
|
|
try:
|
|
# Get result
|
|
result = db.query(OCRResult).filter(OCRResult.id == result_id).first()
|
|
if not result:
|
|
logger.error(f"Result {result_id} not found")
|
|
return
|
|
|
|
# Generate PDF with retry
|
|
await self.execute_with_retry(
|
|
self.pdf_generator.generate_pdf,
|
|
markdown_path=result.markdown_path,
|
|
output_path=output_path,
|
|
css_template=css_template,
|
|
max_retries=2,
|
|
retry_delay=3
|
|
)
|
|
|
|
logger.info(f"Successfully generated PDF for result {result_id}: {output_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate PDF for result {result_id}: {e}")
|
|
finally:
|
|
if should_close_db:
|
|
db.close()
|
|
|
|
async def start_cleanup_scheduler(self):
|
|
"""
|
|
Start periodic cleanup scheduler
|
|
|
|
Runs cleanup task at specified intervals
|
|
"""
|
|
logger.info(f"Starting cleanup scheduler (interval: {self.cleanup_interval}s, retention: {self.file_retention_hours}h)")
|
|
|
|
while True:
|
|
try:
|
|
db = SessionLocal()
|
|
await self.cleanup_expired_files(db)
|
|
db.close()
|
|
except Exception as e:
|
|
logger.error(f"Error in cleanup scheduler: {e}")
|
|
|
|
# Wait for next interval
|
|
await asyncio.sleep(self.cleanup_interval)
|
|
|
|
|
|
# Global task manager instance
|
|
task_manager = BackgroundTaskManager()
|
|
|
|
|
|
def process_batch_files_with_retry(
|
|
batch_id: int,
|
|
lang: str,
|
|
detect_layout: bool,
|
|
db: Session
|
|
):
|
|
"""
|
|
Process all files in a batch with retry logic
|
|
|
|
Args:
|
|
batch_id: Batch ID
|
|
lang: Language code
|
|
detect_layout: Whether to detect layout
|
|
db: Database session
|
|
"""
|
|
try:
|
|
# Get batch
|
|
batch = db.query(OCRBatch).filter(OCRBatch.id == batch_id).first()
|
|
if not batch:
|
|
logger.error(f"Batch {batch_id} not found")
|
|
return
|
|
|
|
# Update batch status
|
|
batch.status = BatchStatus.PROCESSING
|
|
batch.started_at = datetime.utcnow()
|
|
db.commit()
|
|
|
|
# Get pending files
|
|
files = db.query(OCRFile).filter(
|
|
OCRFile.batch_id == batch_id,
|
|
OCRFile.status == FileStatus.PENDING
|
|
).all()
|
|
|
|
logger.info(f"Processing {len(files)} files in batch {batch_id} with retry logic")
|
|
|
|
# Process each file with retry
|
|
for ocr_file in files:
|
|
success = task_manager.process_single_file_with_retry(
|
|
ocr_file=ocr_file,
|
|
batch_id=batch_id,
|
|
lang=lang,
|
|
detect_layout=detect_layout,
|
|
db=db
|
|
)
|
|
|
|
# Update batch progress
|
|
if success:
|
|
batch.completed_files += 1
|
|
else:
|
|
batch.failed_files += 1
|
|
|
|
db.commit()
|
|
|
|
# Update batch final status
|
|
if batch.failed_files == 0:
|
|
batch.status = BatchStatus.COMPLETED
|
|
elif batch.completed_files > 0:
|
|
batch.status = BatchStatus.PARTIAL
|
|
else:
|
|
batch.status = BatchStatus.FAILED
|
|
|
|
batch.completed_at = datetime.utcnow()
|
|
|
|
# Commit with retry on connection errors
|
|
try:
|
|
db.commit()
|
|
except Exception as commit_error:
|
|
logger.warning(f"Batch commit failed, rolling back and retrying: {commit_error}")
|
|
db.rollback()
|
|
batch = db.query(OCRBatch).filter(OCRBatch.id == batch_id).first()
|
|
if batch:
|
|
batch.completed_at = datetime.utcnow()
|
|
db.commit()
|
|
|
|
logger.info(
|
|
f"Batch {batch_id} processing complete: "
|
|
f"{batch.completed_files} succeeded, {batch.failed_files} failed"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fatal error processing batch {batch_id}: {e}")
|
|
db.rollback() # Rollback any failed transaction
|
|
try:
|
|
batch = db.query(OCRBatch).filter(OCRBatch.id == batch_id).first()
|
|
if batch:
|
|
batch.status = BatchStatus.FAILED
|
|
batch.completed_at = datetime.utcnow()
|
|
db.commit()
|
|
except Exception as commit_error:
|
|
logger.error(f"Error updating batch status: {commit_error}")
|
|
db.rollback()
|