Files
Document_translator/app/services/dify_client.py
2025-10-02 17:13:24 +08:00

494 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Dify API 客戶端服務
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
import time
import requests
from typing import Dict, Any, Optional
from flask import current_app
from app.utils.logger import get_logger
from app.utils.exceptions import APIError
from app.models.stats import APIUsageStats
logger = get_logger(__name__)
class DifyClient:
"""Dify API 客戶端"""
def __init__(self):
# 翻译API配置
self.translation_base_url = current_app.config.get('DIFY_TRANSLATION_BASE_URL', '')
self.translation_api_key = current_app.config.get('DIFY_TRANSLATION_API_KEY', '')
# OCR API配置
self.ocr_base_url = current_app.config.get('DIFY_OCR_BASE_URL', '')
self.ocr_api_key = current_app.config.get('DIFY_OCR_API_KEY', '')
self.timeout = (10, 60) # (連接超時, 讀取超時)
self.max_retries = 3
self.retry_delay = 1.6 # 指數退避基數
if not self.translation_base_url or not self.translation_api_key:
logger.warning("Dify Translation API configuration is incomplete")
if not self.ocr_base_url or not self.ocr_api_key:
logger.warning("Dify OCR API configuration is incomplete")
def _make_request(self, method: str, endpoint: str, data: Dict[str, Any] = None,
user_id: int = None, job_id: int = None, files_data: Dict = None,
api_type: str = 'translation') -> Dict[str, Any]:
"""發送 HTTP 請求到 Dify API"""
# 根据API类型选择配置
if api_type == 'ocr':
base_url = self.ocr_base_url
api_key = self.ocr_api_key
if not base_url or not api_key:
raise APIError("Dify OCR API 未配置完整")
else: # translation
base_url = self.translation_base_url
api_key = self.translation_api_key
if not base_url or not api_key:
raise APIError("Dify Translation API 未配置完整")
url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}"
headers = {
'Authorization': f'Bearer {api_key}',
'User-Agent': 'PANJIT-Document-Translator/1.0'
}
# 只有在非文件上传时才设置JSON Content-Type
if not files_data:
headers['Content-Type'] = 'application/json'
# 重試邏輯
last_exception = None
start_time = time.time()
for attempt in range(self.max_retries):
try:
# logger.debug(f"Making Dify API request: {method} {url} (attempt {attempt + 1})")
if method.upper() == 'GET':
response = requests.get(url, headers=headers, timeout=self.timeout, params=data)
elif files_data:
# 文件上传请求使用multipart/form-data
response = requests.post(url, headers=headers, timeout=self.timeout, files=files_data, data=data)
else:
# 普通JSON请求
response = requests.post(url, headers=headers, timeout=self.timeout, json=data)
# 計算響應時間
response_time_ms = int((time.time() - start_time) * 1000)
# 檢查響應狀態
response.raise_for_status()
# 解析響應
result = response.json()
# 記錄 API 使用統計
if user_id:
self._record_api_usage(
user_id=user_id,
job_id=job_id,
endpoint=endpoint,
response_data=result,
response_time_ms=response_time_ms,
success=True
)
# logger.debug(f"Dify API request successful: {response_time_ms}ms")
return result
except requests.exceptions.RequestException as e:
last_exception = e
response_time_ms = int((time.time() - start_time) * 1000)
# 記錄失敗的 API 調用
if user_id:
self._record_api_usage(
user_id=user_id,
job_id=job_id,
endpoint=endpoint,
response_data={},
response_time_ms=response_time_ms,
success=False,
error_message=str(e)
)
logger.warning(f"Dify API request failed (attempt {attempt + 1}): {str(e)}")
# 如果是最後一次嘗試,拋出異常
if attempt == self.max_retries - 1:
break
# 指數退避
delay = self.retry_delay ** attempt
# logger.debug(f"Retrying in {delay} seconds...")
time.sleep(delay)
# 所有重試都失敗了
error_msg = f"Dify API request failed after {self.max_retries} attempts: {str(last_exception)}"
logger.error(error_msg)
raise APIError(error_msg)
def _record_api_usage(self, user_id: int, job_id: Optional[int], endpoint: str,
response_data: Dict, response_time_ms: int, success: bool,
error_message: str = None):
"""記錄 API 使用統計"""
try:
# 從響應中提取使用量資訊
metadata = response_data.get('metadata', {})
# 如果 job_id 無效,則設為 None 以避免外鍵約束錯誤
APIUsageStats.record_api_call(
user_id=user_id,
job_id=job_id, # 已經是 Optional如果無效會被設為 NULL
api_endpoint=endpoint,
metadata=metadata,
response_time_ms=response_time_ms,
success=success,
error_message=error_message
)
except Exception as e:
logger.warning(f"Failed to record API usage: {str(e)}")
def translate_text(self, text: str, source_language: str, target_language: str,
user_id: int = None, job_id: int = None, conversation_id: str = None) -> Dict[str, Any]:
"""翻譯文字"""
if not text.strip():
raise APIError("翻譯文字不能為空")
# 構建標準翻譯 prompt英文指令格式
language_names = {
'zh-tw': 'Traditional Chinese',
'zh-cn': 'Simplified Chinese',
'en': 'English',
'ja': 'Japanese',
'ko': 'Korean',
'vi': 'Vietnamese',
'th': 'Thai',
'id': 'Indonesian',
'ms': 'Malay',
'es': 'Spanish',
'fr': 'French',
'de': 'German',
'ru': 'Russian',
'ar': 'Arabic'
}
source_lang_name = language_names.get(source_language, source_language)
target_lang_name = language_names.get(target_language, target_language)
query = f"""Task: Translate ONLY into {target_lang_name} from {source_lang_name}.
Rules:
- Output translation text ONLY (no source text, no notes, no questions, no language-detection remarks).
- Preserve original line breaks.
- Do NOT wrap in quotes or code blocks.
- Maintain original formatting and structure.
{text.strip()}"""
# 構建請求資料 - 使用成功版本的格式
request_data = {
'inputs': {},
'response_mode': 'blocking',
'user': f"user_{user_id}" if user_id else "doc-translator-user",
'query': query
}
# 如果有 conversation_id加入請求中以維持對話連續性
if conversation_id:
request_data['conversation_id'] = conversation_id
logger.info(f"[TRANSLATION] Sending translation request...")
logger.info(f"[TRANSLATION] Request data: {request_data}")
logger.info(f"[TRANSLATION] Text length: {len(text)} characters")
try:
response = self._make_request(
method='POST',
endpoint='/chat-messages',
data=request_data,
user_id=user_id,
job_id=job_id
)
# 從響應中提取翻譯結果 - 使用成功版本的方式
answer = response.get('answer')
if not isinstance(answer, str) or not answer.strip():
raise APIError("Dify API 返回空的翻譯結果")
return {
'success': True,
'translated_text': answer,
'source_text': text,
'source_language': source_language,
'target_language': target_language,
'conversation_id': response.get('conversation_id'),
'metadata': response.get('metadata', {})
}
except APIError:
raise
except Exception as e:
error_msg = f"翻譯請求處理錯誤: {str(e)}"
logger.error(error_msg)
raise APIError(error_msg)
def test_connection(self) -> bool:
"""測試 Dify API 連接"""
try:
# 發送簡單的測試請求
test_data = {
'inputs': {'text': 'test'},
'response_mode': 'blocking',
'user': 'health_check'
}
response = self._make_request(
method='POST',
endpoint='/chat-messages',
data=test_data
)
return response is not None
except Exception as e:
logger.error(f"Dify API connection test failed: {str(e)}")
return False
def get_app_info(self) -> Dict[str, Any]:
"""取得 Dify 應用資訊"""
try:
response = self._make_request(
method='GET',
endpoint='/parameters'
)
return {
'success': True,
'app_info': response
}
except Exception as e:
logger.error(f"Failed to get Dify app info: {str(e)}")
return {
'success': False,
'error': str(e)
}
@classmethod
def load_config_from_file(cls, file_path: str = 'api.txt'):
"""從檔案載入 Dify API 配置"""
try:
import os
from pathlib import Path
config_file = Path(file_path)
if not config_file.exists():
logger.warning(f"Dify config file not found: {file_path}")
return
with open(config_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line.startswith('#') or not line:
continue # 跳过注释和空行
# 翻译API配置兼容旧格式
if line.startswith('base_url:') or line.startswith('translation_base_url:'):
base_url = line.split(':', 1)[1].strip()
current_app.config['DIFY_TRANSLATION_BASE_URL'] = base_url
# 兼容旧配置
current_app.config['DIFY_API_BASE_URL'] = base_url
elif line.startswith('api:') or line.startswith('translation_api:'):
api_key = line.split(':', 1)[1].strip()
current_app.config['DIFY_TRANSLATION_API_KEY'] = api_key
# 兼容旧配置
current_app.config['DIFY_API_KEY'] = api_key
# OCR API配置
elif line.startswith('ocr_base_url:'):
ocr_base_url = line.split(':', 1)[1].strip()
current_app.config['DIFY_OCR_BASE_URL'] = ocr_base_url
elif line.startswith('ocr_api:'):
ocr_api_key = line.split(':', 1)[1].strip()
current_app.config['DIFY_OCR_API_KEY'] = ocr_api_key
logger.info("Dify API config loaded from file")
except Exception as e:
logger.error(f"Failed to load Dify config from file: {str(e)}")
def upload_file(self, image_data: bytes, filename: str, user_id: int = None) -> str:
"""上传图片文件到Dify OCR API并返回file_id"""
if not image_data:
raise APIError("图片数据不能为空")
logger.info(f"[OCR-UPLOAD] Starting file upload to Dify OCR API")
logger.info(f"[OCR-UPLOAD] File: {filename}, Size: {len(image_data)} bytes, User: {user_id}")
# 构建文件上传数据
files_data = {
'file': (filename, image_data, 'image/png') # 假设为PNG格式
}
form_data = {
'user': f"user_{user_id}" if user_id else "doc-translator-user"
}
# logger.debug(f"[OCR-UPLOAD] Upload form_data: {form_data}")
# logger.debug(f"[OCR-UPLOAD] Using OCR API: {self.ocr_base_url}")
try:
response = self._make_request(
method='POST',
endpoint='/files/upload',
data=form_data,
files_data=files_data,
user_id=user_id,
api_type='ocr' # 使用OCR API
)
logger.info(f"[OCR-UPLOAD] Raw Dify upload response: {response}")
file_id = response.get('id')
if not file_id:
logger.error(f"[OCR-UPLOAD] No file ID in response: {response}")
raise APIError("Dify 文件上传失败未返回文件ID")
logger.info(f"[OCR-UPLOAD] ✓ File uploaded successfully: {file_id}")
# logger.debug(f"[OCR-UPLOAD] File details: name={response.get('name')}, size={response.get('size')}, type={response.get('mime_type')}")
return file_id
except APIError:
raise
except Exception as e:
error_msg = f"文件上传到Dify失败: {str(e)}"
logger.error(f"[OCR-UPLOAD] ✗ Upload failed: {error_msg}")
raise APIError(error_msg)
def ocr_image_with_dify(self, image_data: bytes, filename: str = "image.png",
user_id: int = None, job_id: int = None) -> str:
"""使用Dify进行图像OCR识别"""
logger.info(f"[OCR-RECOGNITION] Starting OCR process for {filename}")
logger.info(f"[OCR-RECOGNITION] Image size: {len(image_data)} bytes, User: {user_id}, Job: {job_id}")
try:
# 1. 先上传文件获取file_id
logger.info(f"[OCR-RECOGNITION] Step 1: Uploading image to Dify...")
file_id = self.upload_file(image_data, filename, user_id)
logger.info(f"[OCR-RECOGNITION] Step 1 ✓ File uploaded with ID: {file_id}")
# 2. 构建OCR请求
# 系统提示词已在Dify Chat Flow中配置这里只需要发送简单的用户query
query = "將圖片中的文字完整的提取出來"
logger.info(f"[OCR-RECOGNITION] Step 2: Preparing OCR request...")
# logger.debug(f"[OCR-RECOGNITION] Query: {query}")
# 3. 构建Chat Flow请求根据最新Dify运行记录图片应该放在files数组中
request_data = {
'inputs': {},
'response_mode': 'blocking',
'user': f"user_{user_id}" if user_id else "doc-translator-user",
'query': query,
'files': [
{
'type': 'image',
'transfer_method': 'local_file',
'upload_file_id': file_id
}
]
}
logger.info(f"[OCR-RECOGNITION] Step 3: Sending OCR request to Dify...")
logger.info(f"[OCR-RECOGNITION] Request data: {request_data}")
logger.info(f"[OCR-RECOGNITION] Using OCR API: {self.ocr_base_url}")
response = self._make_request(
method='POST',
endpoint='/chat-messages',
data=request_data,
user_id=user_id,
job_id=job_id,
api_type='ocr' # 使用OCR API
)
logger.info(f"[OCR-RECOGNITION] Step 3 ✓ Received response from Dify")
logger.info(f"[OCR-RECOGNITION] Raw Dify OCR response: {response}")
# 从响应中提取OCR结果
answer = response.get('answer', '')
metadata = response.get('metadata', {})
conversation_id = response.get('conversation_id', '')
logger.info(f"[OCR-RECOGNITION] Response details:")
logger.info(f"[OCR-RECOGNITION] - Answer length: {len(answer) if answer else 0} characters")
logger.info(f"[OCR-RECOGNITION] - Conversation ID: {conversation_id}")
logger.info(f"[OCR-RECOGNITION] - Metadata: {metadata}")
if not isinstance(answer, str) or not answer.strip():
logger.error(f"[OCR-RECOGNITION] ✗ Empty or invalid answer from Dify")
logger.error(f"[OCR-RECOGNITION] Answer type: {type(answer)}, Content: '{answer}'")
raise APIError("Dify OCR 返回空的识别结果")
# 记录OCR识别的前100个字符用于调试
preview = answer[:100] + "..." if len(answer) > 100 else answer
logger.info(f"[OCR-RECOGNITION] ✓ OCR completed successfully")
logger.info(f"[OCR-RECOGNITION] Extracted {len(answer)} characters")
# logger.debug(f"[OCR-RECOGNITION] Text preview: {preview}")
return answer.strip()
except APIError:
raise
except Exception as e:
error_msg = f"Dify OCR识别失败: {str(e)}"
logger.error(f"[OCR-RECOGNITION] ✗ OCR process failed: {error_msg}")
logger.error(f"[OCR-RECOGNITION] Exception details: {type(e).__name__}: {str(e)}")
raise APIError(error_msg)
def init_dify_config(app):
"""初始化 Dify 配置"""
with app.app_context():
# 從 api.txt 載入配置
DifyClient.load_config_from_file()
# 檢查配置完整性
translation_base_url = app.config.get('DIFY_TRANSLATION_BASE_URL')
translation_api_key = app.config.get('DIFY_TRANSLATION_API_KEY')
ocr_base_url = app.config.get('DIFY_OCR_BASE_URL')
ocr_api_key = app.config.get('DIFY_OCR_API_KEY')
logger.info("Dify API Configuration Status:")
if translation_base_url and translation_api_key:
logger.info("✓ Translation API configured successfully")
else:
logger.warning("✗ Translation API configuration is incomplete")
logger.warning(f" - Translation Base URL: {'' if translation_base_url else ''}")
logger.warning(f" - Translation API Key: {'' if translation_api_key else ''}")
if ocr_base_url and ocr_api_key:
logger.info("✓ OCR API configured successfully")
else:
logger.warning("✗ OCR API configuration is incomplete (扫描PDF功能将不可用)")
logger.warning(f" - OCR Base URL: {'' if ocr_base_url else ''}")
logger.warning(f" - OCR API Key: {'' if ocr_api_key else ''}")