fix: OCR Track reflow PDF and translation with image text filtering

- Add OCR Track support for reflow PDF generation using raw_ocr_regions.json
- Add OCR Track translation extraction from raw_ocr_regions instead of elements
- Add raw_ocr_translations output format for OCR Track documents
- Add exclusion zone filtering to remove text overlapping with images
- Update API validation to accept both translations and raw_ocr_translations
- Add page_number field to TranslatedItem for proper tracking

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
egg
2025-12-12 11:02:35 +08:00
parent 24253ac15e
commit 1f18010040
11 changed files with 1040 additions and 149 deletions

View File

@@ -578,7 +578,10 @@ async def download_translated_pdf(
with open(translation_file, 'r', encoding='utf-8') as f:
translation_data = json.load(f)
if not translation_data.get('translations'):
# Check for translations (Direct Track) or raw_ocr_translations (OCR Track)
has_translations = translation_data.get('translations')
has_raw_ocr_translations = translation_data.get('raw_ocr_translations')
if not has_translations and not has_raw_ocr_translations:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Translation file is empty or incomplete"

View File

@@ -146,6 +146,7 @@ class TranslatedItem:
original_content: str
translated_content: str
element_type: str
page_number: int = 1
cell_position: Optional[Tuple[int, int]] = None

View File

@@ -4701,11 +4701,242 @@ class PDFGeneratorService:
logger.error(f"Failed to embed image for reflow: {e}")
return None
def _collect_exclusion_zones(self, page_data: Dict) -> List[Tuple[float, float, float, float]]:
"""
Collect exclusion zones (image bboxes) from page elements.
These zones are used to filter out OCR text that overlaps with images,
preventing text inside images from appearing in reflow PDFs.
Args:
page_data: Page dictionary containing 'elements'
Returns:
List of (x0, y0, x1, y1) tuples representing image bounding boxes
"""
exclusion_zones = []
elements = page_data.get('elements', [])
for elem in elements:
elem_type = elem.get('type', '')
# Collect image/chart bboxes
if elem_type in ('image', 'Image', 'figure', 'Figure', 'chart', 'Chart'):
bbox = elem.get('bbox', {})
if isinstance(bbox, dict):
x0 = bbox.get('x0', 0)
y0 = bbox.get('y0', 0)
x1 = bbox.get('x1', 0)
y1 = bbox.get('y1', 0)
if x1 > x0 and y1 > y0:
exclusion_zones.append((x0, y0, x1, y1))
# Collect embedded images in tables
if elem_type in ('table', 'Table'):
metadata = elem.get('metadata', {})
embedded_images = metadata.get('embedded_images', [])
for emb_img in embedded_images:
emb_bbox = emb_img.get('bbox', [])
if isinstance(emb_bbox, list) and len(emb_bbox) >= 4:
x0, y0, x1, y1 = emb_bbox[0], emb_bbox[1], emb_bbox[2], emb_bbox[3]
if x1 > x0 and y1 > y0:
exclusion_zones.append((x0, y0, x1, y1))
return exclusion_zones
def _is_region_overlapping_exclusion(
self,
region_bbox: List,
exclusion_zones: List[Tuple[float, float, float, float]],
ioa_threshold: float = 0.3
) -> bool:
"""
Check if a text region overlaps significantly with any exclusion zone.
Uses IoA (Intersection over Area) to determine overlap.
Args:
region_bbox: Quadrilateral bbox [[x0,y0], [x1,y1], [x2,y2], [x3,y3]]
exclusion_zones: List of (x0, y0, x1, y1) tuples
ioa_threshold: Overlap threshold (default 0.3 = 30%)
Returns:
True if region should be excluded
"""
if not exclusion_zones or not region_bbox:
return False
# Convert quadrilateral to rectangular bbox
if len(region_bbox) >= 4:
xs = [p[0] for p in region_bbox]
ys = [p[1] for p in region_bbox]
tx0, ty0, tx1, ty1 = min(xs), min(ys), max(xs), max(ys)
else:
return False
text_area = (tx1 - tx0) * (ty1 - ty0)
if text_area <= 0:
return False
for zx0, zy0, zx1, zy1 in exclusion_zones:
# Calculate intersection
ix0 = max(tx0, zx0)
iy0 = max(ty0, zy0)
ix1 = min(tx1, zx1)
iy1 = min(ty1, zy1)
if ix1 > ix0 and iy1 > iy0:
intersection_area = (ix1 - ix0) * (iy1 - iy0)
ioa = intersection_area / text_area
if ioa >= ioa_threshold:
return True
return False
def _filter_regions_by_exclusion(
self,
regions: List[Dict],
exclusion_zones: List[Tuple[float, float, float, float]],
ioa_threshold: float = 0.3
) -> List[Dict]:
"""
Filter out text regions that overlap with exclusion zones (images).
Args:
regions: List of raw OCR regions with 'text' and 'bbox'
exclusion_zones: List of (x0, y0, x1, y1) tuples
ioa_threshold: Overlap threshold
Returns:
Filtered list of regions
"""
if not exclusion_zones:
return regions
filtered = []
excluded_count = 0
for region in regions:
bbox = region.get('bbox', [])
if self._is_region_overlapping_exclusion(bbox, exclusion_zones, ioa_threshold):
excluded_count += 1
text = region.get('text', '')[:20]
logger.debug(f"Excluding text '{text}...' due to image overlap")
else:
filtered.append(region)
if excluded_count > 0:
logger.info(f"Filtered {excluded_count} text regions overlapping with images")
return filtered
def _render_reflow_elements(
self,
page_data: Dict,
result_dir: Path,
styles: Dict,
story: List
) -> None:
"""
Render page elements in reflow format (Direct Track logic).
This method processes elements from the JSON and renders them
as flowing content (text, tables, images).
Args:
page_data: Page dictionary containing 'elements'
result_dir: Path to result directory for images
styles: Style dictionary for paragraphs
story: List to append rendered elements to
"""
# Get elements in reading order
elements = self._get_elements_in_reading_order(page_data)
for elem in elements:
elem_type = elem.get('type', elem.get('element_type', 'text'))
content = elem.get('content', elem.get('text', ''))
# Types that can have dict content (handled specially)
dict_content_types = ('table', 'Table', 'image', 'figure', 'Image', 'Figure', 'chart', 'Chart')
# Ensure content is a string for text elements
if isinstance(content, dict):
# Tables, images, charts have dict content - handled by their respective methods
if elem_type not in dict_content_types:
# Skip other elements with dict content
continue
elif not isinstance(content, str):
content = str(content) if content else ''
if elem_type in ('table', 'Table'):
# Handle table
table = self._create_reflow_table(elem, styles)
if table:
story.append(table)
story.append(Spacer(1, 12))
# Handle embedded images in table (from metadata)
metadata = elem.get('metadata', {})
embedded_images = metadata.get('embedded_images', [])
for emb_img in embedded_images:
img_path_str = emb_img.get('saved_path', '')
if img_path_str:
img_path = result_dir / img_path_str
if not img_path.exists():
img_path = result_dir / Path(img_path_str).name
if img_path.exists():
try:
img = PlatypusImage(str(img_path))
# Scale to fit page width if necessary
max_width = 450
if img.drawWidth > max_width:
ratio = max_width / img.drawWidth
img.drawWidth = max_width
img.drawHeight *= ratio
story.append(img)
story.append(Spacer(1, 8))
logger.info(f"Embedded table image in reflow: {img_path.name}")
except Exception as e:
logger.warning(f"Failed to embed table image: {e}")
elif elem_type in ('image', 'figure', 'Image', 'Figure', 'chart', 'Chart'):
# Handle image/chart
img = self._embed_image_reflow(elem, result_dir)
if img:
story.append(img)
story.append(Spacer(1, 8))
elif elem_type in ('title', 'Title'):
# Title text
if content:
content = content.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
story.append(Paragraph(content, styles['Title']))
elif elem_type in ('section_header', 'SectionHeader', 'h1', 'H1'):
# Heading 1
if content:
content = content.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
story.append(Paragraph(content, styles['Heading1']))
elif elem_type in ('h2', 'H2', 'Heading2'):
# Heading 2
if content:
content = content.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
story.append(Paragraph(content, styles['Heading2']))
else:
# Body text (default)
if content:
content = content.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
story.append(Paragraph(content, styles['Body']))
def generate_reflow_pdf(
self,
json_path: Path,
output_path: Path,
source_file_path: Optional[Path] = None
source_file_path: Optional[Path] = None,
use_elements_only: bool = False
) -> bool:
"""
Generate reflow layout PDF from OCR/Direct JSON data.
@@ -4713,10 +4944,15 @@ class PDFGeneratorService:
This creates a flowing document with consistent font sizes,
proper reading order, and inline tables/images.
For OCR Track: Uses raw_ocr_regions.json for text content (ensures all text is included)
For Direct Track: Uses content.cells for tables (structured data available)
Args:
json_path: Path to result JSON file (UnifiedDocument format)
output_path: Path to save generated PDF
source_file_path: Optional path to original source file (for images)
use_elements_only: If True, always use elements from JSON (for translated PDFs
where translations are applied to elements, not raw_ocr_regions)
Returns:
True if successful, False otherwise
@@ -4727,6 +4963,12 @@ class PDFGeneratorService:
with open(json_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# Detect processing track
metadata = json_data.get('metadata', {})
processing_track = metadata.get('processing_track', 'direct')
is_ocr_track = processing_track == 'ocr'
logger.info(f"Reflow PDF generation - Processing track: {processing_track}")
# Get styles
styles = self._get_reflow_styles()
@@ -4741,93 +4983,88 @@ class PDFGeneratorService:
else:
result_dir = json_path.parent
# Extract task_id from result_dir (directory name is the task_id)
task_id = result_dir.name
# Process each page
pages = json_data.get('pages', [])
for page_idx, page_data in enumerate(pages):
page_num = page_idx + 1 # 1-indexed
if page_idx > 0:
# Add page break between pages
story.append(Spacer(1, 30))
# Get elements in reading order
elements = self._get_elements_in_reading_order(page_data)
# === OCR Track: Use raw_ocr_regions.json for text ===
# But for translated PDFs (use_elements_only=True), use elements which have translations applied
if is_ocr_track and not use_elements_only and TEXT_REGION_RENDERER_AVAILABLE and load_raw_ocr_regions:
# Load raw OCR regions for this page
raw_regions = load_raw_ocr_regions(str(result_dir), task_id, page_num)
for elem in elements:
elem_type = elem.get('type', elem.get('element_type', 'text'))
content = elem.get('content', elem.get('text', ''))
if raw_regions:
logger.info(f"OCR Track reflow: Using {len(raw_regions)} raw OCR regions for page {page_num}")
# Types that can have dict content (handled specially)
dict_content_types = ('table', 'Table', 'image', 'figure', 'Image', 'Figure', 'chart', 'Chart')
# Collect exclusion zones (image bboxes) to filter text inside images
exclusion_zones = self._collect_exclusion_zones(page_data)
if exclusion_zones:
logger.info(f"Collected {len(exclusion_zones)} exclusion zones for text filtering")
raw_regions = self._filter_regions_by_exclusion(raw_regions, exclusion_zones)
# Ensure content is a string for text elements
if isinstance(content, dict):
# Tables, images, charts have dict content - handled by their respective methods
if elem_type not in dict_content_types:
# Skip other elements with dict content
continue
elif not isinstance(content, str):
content = str(content) if content else ''
# Sort by Y coordinate (top to bottom reading order)
def get_y_coord(region):
bbox = region.get('bbox', [])
if bbox and len(bbox) >= 4:
# bbox is [[x0,y0], [x1,y1], [x2,y2], [x3,y3]]
# Get average Y of top-left and top-right corners
return (bbox[0][1] + bbox[1][1]) / 2
return 0
if elem_type in ('table', 'Table'):
# Handle table
table = self._create_reflow_table(elem, styles)
if table:
story.append(table)
story.append(Spacer(1, 12))
sorted_regions = sorted(raw_regions, key=get_y_coord)
# Handle embedded images in table (from metadata)
metadata = elem.get('metadata', {})
embedded_images = metadata.get('embedded_images', [])
for emb_img in embedded_images:
img_path_str = emb_img.get('saved_path', '')
if img_path_str:
img_path = result_dir / img_path_str
if not img_path.exists():
img_path = result_dir / Path(img_path_str).name
if img_path.exists():
try:
img = PlatypusImage(str(img_path))
# Scale to fit page width if necessary
max_width = 450
if img.drawWidth > max_width:
ratio = max_width / img.drawWidth
img.drawWidth = max_width
img.drawHeight *= ratio
story.append(img)
story.append(Spacer(1, 8))
logger.info(f"Embedded table image in reflow: {img_path.name}")
except Exception as e:
logger.warning(f"Failed to embed table image: {e}")
elif elem_type in ('image', 'figure', 'Image', 'Figure', 'chart', 'Chart'):
# Handle image/chart
img = self._embed_image_reflow(elem, result_dir)
if img:
story.append(img)
story.append(Spacer(1, 8))
elif elem_type in ('title', 'Title'):
# Title text
if content:
content = content.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
story.append(Paragraph(content, styles['Title']))
elif elem_type in ('section_header', 'SectionHeader', 'h1', 'H1'):
# Heading 1
if content:
content = content.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
story.append(Paragraph(content, styles['Heading1']))
elif elem_type in ('h2', 'H2', 'Heading2'):
# Heading 2
if content:
content = content.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
story.append(Paragraph(content, styles['Heading2']))
# Render text blocks as paragraphs
for region in sorted_regions:
text = region.get('text', '')
if text:
text = text.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
story.append(Paragraph(text, styles['Body']))
# Also render images/charts from elements
elements = self._get_elements_in_reading_order(page_data)
for elem in elements:
elem_type = elem.get('type', elem.get('element_type', ''))
if elem_type in ('image', 'figure', 'Image', 'Figure', 'chart', 'Chart'):
img = self._embed_image_reflow(elem, result_dir)
if img:
story.append(img)
story.append(Spacer(1, 8))
# Handle embedded images in tables
elif elem_type in ('table', 'Table'):
elem_metadata = elem.get('metadata', {})
embedded_images = elem_metadata.get('embedded_images', [])
for emb_img in embedded_images:
img_path_str = emb_img.get('saved_path', '')
if img_path_str:
img_path = result_dir / img_path_str
if not img_path.exists():
img_path = result_dir / Path(img_path_str).name
if img_path.exists():
try:
img = PlatypusImage(str(img_path))
max_width = 450
if img.drawWidth > max_width:
ratio = max_width / img.drawWidth
img.drawWidth = max_width
img.drawHeight *= ratio
story.append(img)
story.append(Spacer(1, 8))
except Exception as e:
logger.warning(f"Failed to embed table image: {e}")
else:
# Body text (default)
if content:
content = content.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
story.append(Paragraph(content, styles['Body']))
# Fallback to elements if raw OCR regions not found
logger.warning(f"OCR Track: No raw OCR regions found for page {page_num}, falling back to elements")
self._render_reflow_elements(page_data, result_dir, styles, story)
else:
# === Direct Track: Use structured content ===
self._render_reflow_elements(page_data, result_dir, styles, story)
if not story:
logger.warning("No content to generate reflow PDF")
@@ -4869,6 +5106,9 @@ class PDFGeneratorService:
merges them to replace original content with translations, and
generates a PDF with the translated content at original positions.
For OCR Track: Uses raw_ocr_translations to translate raw OCR regions
For Direct Track: Uses translations dict to translate elements
Args:
result_json_path: Path to original result JSON file (UnifiedDocument format)
translation_json_path: Path to translation JSON file
@@ -4894,7 +5134,25 @@ class PDFGeneratorService:
with open(translation_json_path, 'r', encoding='utf-8') as f:
translation_json = json.load(f)
# Extract translations dict from translation JSON
# Check if this is OCR Track with raw_ocr_translations
raw_ocr_translations = translation_json.get('raw_ocr_translations', [])
processing_track = translation_json.get('processing_track', '')
target_lang = translation_json.get('target_lang', 'unknown')
if raw_ocr_translations and processing_track == 'ocr':
# OCR Track: Generate PDF using translated raw OCR regions
logger.info(
f"Generating translated PDF (OCR Track): {len(raw_ocr_translations)} "
f"raw OCR translations, target_lang={target_lang}"
)
return self._generate_translated_pdf_ocr_track(
result_json=result_json,
raw_ocr_translations=raw_ocr_translations,
output_path=output_path,
result_dir=result_json_path.parent
)
# Direct Track: Use element-based translations
translations = translation_json.get('translations', {})
if not translations:
logger.warning("No translations found in translation JSON")
@@ -4908,9 +5166,8 @@ class PDFGeneratorService:
# Apply translations to result JSON
translated_doc = apply_translations(result_json, translations)
target_lang = translation_json.get('target_lang', 'unknown')
logger.info(
f"Generating translated PDF: {len(translations)} translations applied, "
f"Generating translated PDF (Direct Track): {len(translations)} translations applied, "
f"target_lang={target_lang}"
)
@@ -4927,10 +5184,12 @@ class PDFGeneratorService:
try:
# Use reflow PDF generation for better translated content display
# Pass result_json_path.parent as image directory (not the temp file's parent)
# use_elements_only=True ensures we use translated elements, not raw_ocr_regions
success = self.generate_reflow_pdf(
json_path=tmp_path,
output_path=output_path,
source_file_path=result_json_path.parent # Contains extracted images
source_file_path=result_json_path.parent, # Contains extracted images
use_elements_only=True # Use elements with translations applied
)
return success
finally:
@@ -4950,6 +5209,165 @@ class PDFGeneratorService:
traceback.print_exc()
return False
def _generate_translated_pdf_ocr_track(
self,
result_json: Dict,
raw_ocr_translations: List[Dict],
output_path: Path,
result_dir: Path
) -> bool:
"""
Generate translated reflow PDF for OCR Track documents.
Uses raw_ocr_translations to render translated text in reading order.
Args:
result_json: Original result JSON data
raw_ocr_translations: List of {page, index, original, translated}
output_path: Path to save generated PDF
result_dir: Path to result directory for images
Returns:
True if successful, False otherwise
"""
try:
# Get styles
styles = self._get_reflow_styles()
# Build document content
story = []
# Build translation lookup: {(page, index): translated_text}
translation_lookup: Dict[Tuple[int, int], str] = {}
for trans in raw_ocr_translations:
page = trans.get('page', 1)
idx = trans.get('index', 0)
translated = trans.get('translated', '')
if translated:
translation_lookup[(page, idx)] = translated
logger.info(f"Built translation lookup with {len(translation_lookup)} entries")
# Process each page
pages = result_json.get('pages', [])
task_id = result_dir.name
for page_idx, page_data in enumerate(pages):
page_num = page_idx + 1 # 1-indexed
if page_idx > 0:
# Add page break between pages
story.append(Spacer(1, 30))
# Load raw OCR regions for this page
if TEXT_REGION_RENDERER_AVAILABLE and load_raw_ocr_regions:
raw_regions = load_raw_ocr_regions(str(result_dir), task_id, page_num)
if raw_regions:
logger.info(
f"OCR Track translated PDF: Processing {len(raw_regions)} regions "
f"for page {page_num}"
)
# Collect exclusion zones (image bboxes) to filter text inside images
exclusion_zones = self._collect_exclusion_zones(page_data)
# Sort by Y coordinate (top to bottom reading order)
# Keep original indices for translation lookup
def get_y_coord(region_tuple):
region = region_tuple[1]
bbox = region.get('bbox', [])
if bbox and len(bbox) >= 4:
return (bbox[0][1] + bbox[1][1]) / 2
return 0
indexed_regions = list(enumerate(raw_regions))
sorted_regions = sorted(indexed_regions, key=get_y_coord)
# Render translated text blocks as paragraphs (skip those overlapping images)
for original_idx, region in sorted_regions:
# Skip regions overlapping with images
bbox = region.get('bbox', [])
if exclusion_zones and self._is_region_overlapping_exclusion(bbox, exclusion_zones):
continue
# Look up translation
translated_text = translation_lookup.get(
(page_num, original_idx),
region.get('text', '') # Fallback to original
)
if translated_text:
# Escape HTML special chars
translated_text = (translated_text
.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;'))
story.append(Paragraph(translated_text, styles['Body']))
# Also render images/charts from elements
elements = self._get_elements_in_reading_order(page_data)
for elem in elements:
elem_type = elem.get('type', elem.get('element_type', ''))
if elem_type in ('image', 'figure', 'Image', 'Figure', 'chart', 'Chart'):
img = self._embed_image_reflow(elem, result_dir)
if img:
story.append(img)
story.append(Spacer(1, 8))
# Handle embedded images in tables
elif elem_type in ('table', 'Table'):
elem_metadata = elem.get('metadata', {})
embedded_images = elem_metadata.get('embedded_images', [])
for emb_img in embedded_images:
img_path_str = emb_img.get('saved_path', '')
if img_path_str:
img_path = result_dir / img_path_str
if not img_path.exists():
img_path = result_dir / Path(img_path_str).name
if img_path.exists():
try:
img = PlatypusImage(str(img_path))
max_width = 450
if img.drawWidth > max_width:
ratio = max_width / img.drawWidth
img.drawWidth = max_width
img.drawHeight *= ratio
story.append(img)
story.append(Spacer(1, 8))
except Exception as e:
logger.warning(f"Failed to embed table image: {e}")
else:
logger.warning(
f"No raw OCR regions found for page {page_num}, skipping"
)
if not story:
logger.warning("No content to generate translated OCR Track PDF")
return False
# Create PDF document
doc = SimpleDocTemplate(
str(output_path),
pagesize=A4,
leftMargin=50,
rightMargin=50,
topMargin=50,
bottomMargin=50
)
# Build PDF
doc.build(story)
logger.info(
f"Generated translated OCR Track PDF: {output_path} "
f"({output_path.stat().st_size} bytes)"
)
return True
except Exception as e:
logger.error(f"Failed to generate translated OCR Track PDF: {e}")
import traceback
traceback.print_exc()
return False
def generate_translated_layout_pdf(
self,
result_json_path: Path,

View File

@@ -233,19 +233,118 @@ class TranslationService:
self._total_tokens = 0
self._total_latency = 0.0
def _load_raw_ocr_regions(
self,
result_dir: Path,
task_id: str,
page_num: int
) -> List[Dict]:
"""
Load raw OCR regions for a specific page.
Args:
result_dir: Path to result directory
task_id: Task ID
page_num: Page number (1-indexed)
Returns:
List of raw OCR region dictionaries with 'text' and 'bbox'
"""
import glob
# Pattern: {task_id}_*_page_{page_num}_raw_ocr_regions.json
pattern = str(result_dir / f"{task_id}_*_page_{page_num}_raw_ocr_regions.json")
matches = glob.glob(pattern)
if not matches:
logger.warning(f"No raw OCR regions file found for page {page_num}")
return []
try:
with open(matches[0], 'r', encoding='utf-8') as f:
regions = json.load(f)
logger.info(f"Loaded {len(regions)} raw OCR regions from {matches[0]}")
return regions
except Exception as e:
logger.error(f"Failed to load raw OCR regions: {e}")
return []
def extract_translatable_elements_ocr_track(
self,
result_json: Dict,
result_dir: Path,
task_id: str
) -> Tuple[List[TranslatableItem], int]:
"""
Extract translatable elements from raw OCR regions for OCR Track documents.
Args:
result_json: UnifiedDocument JSON data
result_dir: Path to result directory
task_id: Task ID
Returns:
Tuple of (list of TranslatableItem, total region count)
"""
items = []
total_regions = 0
for page in result_json.get('pages', []):
page_number = page.get('page_number', 1)
# Load raw OCR regions for this page
raw_regions = self._load_raw_ocr_regions(result_dir, task_id, page_number)
for idx, region in enumerate(raw_regions):
total_regions += 1
text = region.get('text', '').strip()
if text:
# Use index as element_id for raw OCR regions
items.append(TranslatableItem(
element_id=f"raw_ocr_{page_number}_{idx}",
content=text,
element_type='raw_ocr_region',
page_number=page_number,
cell_position=(idx, 0) # Store original index in cell_position
))
logger.info(
f"Extracted {len(items)} translatable items from {total_regions} raw OCR regions (OCR Track)"
)
return items, total_regions
def extract_translatable_elements(
self,
result_json: Dict
result_json: Dict,
result_dir: Optional[Path] = None,
task_id: Optional[str] = None
) -> Tuple[List[TranslatableItem], int]:
"""
Extract all translatable elements from a result JSON.
For OCR Track documents, extracts from raw_ocr_regions.json files.
For Direct Track documents, extracts from elements in result JSON.
Args:
result_json: UnifiedDocument JSON data
result_dir: Path to result directory (required for OCR Track)
task_id: Task ID (required for OCR Track)
Returns:
Tuple of (list of TranslatableItem, total element count)
"""
# Check processing track
metadata = result_json.get('metadata', {})
processing_track = metadata.get('processing_track', 'direct')
# For OCR Track, use raw OCR regions
if processing_track == 'ocr' and result_dir and task_id:
return self.extract_translatable_elements_ocr_track(
result_json, result_dir, task_id
)
# For Direct Track, use element-based extraction
items = []
total_elements = 0
@@ -290,7 +389,7 @@ class TranslationService:
))
logger.info(
f"Extracted {len(items)} translatable items from {total_elements} elements"
f"Extracted {len(items)} translatable items from {total_elements} elements (Direct Track)"
)
return items, total_elements
@@ -378,6 +477,7 @@ class TranslationService:
original_content=item.content,
translated_content=translated_content,
element_type=item.element_type,
page_number=item.page_number,
cell_position=item.cell_position
))
@@ -392,6 +492,7 @@ class TranslationService:
original_content=item.content,
translated_content=item.content, # Keep original
element_type=item.element_type,
page_number=item.page_number,
cell_position=item.cell_position
)
for item in batch.items
@@ -429,6 +530,7 @@ class TranslationService:
original_content=item.content,
translated_content=response.translated_text,
element_type=item.element_type,
page_number=item.page_number,
cell_position=item.cell_position
)
@@ -440,6 +542,7 @@ class TranslationService:
original_content=item.content,
translated_content=item.content, # Keep original
element_type=item.element_type,
page_number=item.page_number,
cell_position=item.cell_position
)
@@ -451,7 +554,8 @@ class TranslationService:
target_lang: str,
total_elements: int,
processing_time: float,
batch_count: int
batch_count: int,
processing_track: str = 'direct'
) -> Dict:
"""
Build the translation result JSON structure.
@@ -464,52 +568,98 @@ class TranslationService:
total_elements: Total elements in document
processing_time: Processing time in seconds
batch_count: Number of batches used
processing_track: 'ocr' or 'direct' - determines output format
Returns:
Translation result dictionary
"""
# Build translations dict
translations: Dict[str, Any] = {}
total_chars = 0
is_ocr_track = processing_track == 'ocr'
for item in translated_items:
total_chars += len(item.translated_content)
if is_ocr_track:
# OCR Track: Build raw_ocr_translations list
raw_ocr_translations: List[Dict] = []
if item.element_type == 'table_cell':
# Group table cells by element_id
if item.element_id not in translations:
translations[item.element_id] = {'cells': []}
for item in translated_items:
total_chars += len(item.translated_content)
translations[item.element_id]['cells'].append({
'row': item.cell_position[0] if item.cell_position else 0,
'col': item.cell_position[1] if item.cell_position else 0,
'content': item.translated_content
})
else:
translations[item.element_id] = item.translated_content
if item.element_type == 'raw_ocr_region':
# Extract page and index from element_id: "raw_ocr_{page}_{idx}"
page_num = item.page_number
original_idx = item.cell_position[0] if item.cell_position else 0
# Build statistics
translated_element_ids = set(item.element_id for item in translated_items)
skipped = total_elements - len(translated_element_ids)
raw_ocr_translations.append({
'page': page_num,
'index': original_idx,
'original': item.original_content,
'translated': item.translated_content
})
result = {
'schema_version': '1.0.0',
'source_document': source_document,
'source_lang': source_lang,
'target_lang': target_lang,
'provider': 'dify',
'translated_at': datetime.utcnow().isoformat() + 'Z',
'statistics': {
'total_elements': total_elements,
'translated_elements': len(translated_element_ids),
'skipped_elements': skipped,
'total_characters': total_chars,
'processing_time_seconds': round(processing_time, 2),
'total_tokens': self._total_tokens,
'batch_count': batch_count
},
'translations': translations
}
# Build statistics
skipped = total_elements - len(raw_ocr_translations)
result = {
'schema_version': '1.0.0',
'source_document': source_document,
'source_lang': source_lang,
'target_lang': target_lang,
'provider': 'dify',
'translated_at': datetime.utcnow().isoformat() + 'Z',
'processing_track': 'ocr',
'statistics': {
'total_elements': total_elements,
'translated_elements': len(raw_ocr_translations),
'skipped_elements': skipped,
'total_characters': total_chars,
'processing_time_seconds': round(processing_time, 2),
'total_tokens': self._total_tokens,
'batch_count': batch_count
},
'translations': {}, # Empty for OCR Track
'raw_ocr_translations': raw_ocr_translations
}
else:
# Direct Track: Build translations dict (existing logic)
translations: Dict[str, Any] = {}
for item in translated_items:
total_chars += len(item.translated_content)
if item.element_type == 'table_cell':
# Group table cells by element_id
if item.element_id not in translations:
translations[item.element_id] = {'cells': []}
translations[item.element_id]['cells'].append({
'row': item.cell_position[0] if item.cell_position else 0,
'col': item.cell_position[1] if item.cell_position else 0,
'content': item.translated_content
})
else:
translations[item.element_id] = item.translated_content
# Build statistics
translated_element_ids = set(item.element_id for item in translated_items)
skipped = total_elements - len(translated_element_ids)
result = {
'schema_version': '1.0.0',
'source_document': source_document,
'source_lang': source_lang,
'target_lang': target_lang,
'provider': 'dify',
'translated_at': datetime.utcnow().isoformat() + 'Z',
'statistics': {
'total_elements': total_elements,
'translated_elements': len(translated_element_ids),
'skipped_elements': skipped,
'total_characters': total_chars,
'processing_time_seconds': round(processing_time, 2),
'total_tokens': self._total_tokens,
'batch_count': batch_count
},
'translations': translations
}
return result
@@ -548,9 +698,13 @@ class TranslationService:
result_json = json.load(f)
source_document = result_json.get('metadata', {}).get('filename', 'unknown')
processing_track = result_json.get('metadata', {}).get('processing_track', 'direct')
result_dir = result_json_path.parent
# Extract translatable elements
items, total_elements = self.extract_translatable_elements(result_json)
# Extract translatable elements (passes result_dir and task_id for OCR Track)
items, total_elements = self.extract_translatable_elements(
result_json, result_dir, task_id
)
if not items:
logger.warning("No translatable elements found")
@@ -597,7 +751,8 @@ class TranslationService:
target_lang=target_lang,
total_elements=total_elements,
processing_time=processing_time,
batch_count=len(batches)
batch_count=len(batches),
processing_track=processing_track
)
# Save result