- Scrapy 爬蟲框架,爬取 HBR 繁體中文文章 - Flask Web 應用程式,提供文章查詢介面 - SQL Server 資料庫整合 - 自動化排程與郵件通知功能 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
345 lines
12 KiB
Python
345 lines
12 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
HBR 爬蟲系統 Web 服務
|
||
提供查詢介面和統計功能
|
||
"""
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
from flask import Flask, render_template, request, jsonify
|
||
from datetime import datetime, timedelta
|
||
import logging
|
||
|
||
# 加入專案路徑
|
||
project_root = Path(__file__).parent
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
from hbr_crawler.hbr_crawler.database import get_database_manager
|
||
|
||
# 設定日誌
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
app = Flask(__name__,
|
||
template_folder='templates',
|
||
static_folder='static')
|
||
|
||
# 取得資料庫管理物件
|
||
db_manager = None
|
||
|
||
def get_db():
|
||
"""取得資料庫管理物件(單例模式)"""
|
||
global db_manager
|
||
if db_manager is None:
|
||
db_manager = get_database_manager()
|
||
return db_manager
|
||
|
||
|
||
@app.route('/')
|
||
def index():
|
||
"""首頁"""
|
||
return render_template('index.html')
|
||
|
||
|
||
@app.route('/api/articles', methods=['GET'])
|
||
def get_articles():
|
||
"""取得文章列表 API"""
|
||
try:
|
||
db = get_db()
|
||
|
||
# 取得查詢參數
|
||
page = int(request.args.get('page', 1))
|
||
per_page = int(request.args.get('per_page', 20))
|
||
category = request.args.get('category', '')
|
||
tag = request.args.get('tag', '')
|
||
start_date = request.args.get('start_date', '')
|
||
end_date = request.args.get('end_date', '')
|
||
keyword = request.args.get('keyword', '')
|
||
is_paywalled = request.args.get('is_paywalled', '')
|
||
|
||
# 建立查詢條件(使用別名 'a')
|
||
where_conditions = []
|
||
params = []
|
||
use_join = False
|
||
|
||
if category:
|
||
where_conditions.append("a.category = %s")
|
||
params.append(category)
|
||
|
||
if tag:
|
||
where_conditions.append("t.name LIKE %s")
|
||
params.append(f'%{tag}%')
|
||
use_join = True
|
||
|
||
if start_date:
|
||
where_conditions.append("a.publish_date >= %s")
|
||
params.append(start_date)
|
||
|
||
if end_date:
|
||
where_conditions.append("a.publish_date <= %s")
|
||
params.append(end_date)
|
||
|
||
if keyword:
|
||
where_conditions.append("(a.title LIKE %s OR a.summary LIKE %s OR a.content LIKE %s)")
|
||
params.extend([f'%{keyword}%', f'%{keyword}%', f'%{keyword}%'])
|
||
|
||
if is_paywalled != '':
|
||
where_conditions.append("a.is_paywalled = %s")
|
||
params.append(int(is_paywalled))
|
||
|
||
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
|
||
|
||
# 計算總數
|
||
if use_join:
|
||
count_query = f"""
|
||
SELECT COUNT(DISTINCT a.id) as count
|
||
FROM articles a
|
||
LEFT JOIN article_tags at ON a.id = at.article_id
|
||
LEFT JOIN tags t ON at.tag_id = t.id
|
||
WHERE {where_clause}
|
||
"""
|
||
else:
|
||
count_query = f"SELECT COUNT(*) as count FROM articles a WHERE {where_clause}"
|
||
|
||
count_params = tuple(params) if params else None
|
||
count_result = db.execute_query(count_query, count_params, database='db_A101')
|
||
total = count_result[0]['count'] if count_result and len(count_result) > 0 else 0
|
||
|
||
# 取得文章列表
|
||
offset = (page - 1) * per_page
|
||
query_params = list(params)
|
||
query_params.extend([per_page, offset])
|
||
|
||
# 查詢文章列表(使用 LEFT JOIN 取得標籤)
|
||
if use_join:
|
||
query = f"""
|
||
SELECT a.id, a.title, a.url, a.author, a.publish_date, a.summary,
|
||
a.is_paywalled, a.category, a.crawled_at,
|
||
GROUP_CONCAT(DISTINCT t.name SEPARATOR ', ') as tags
|
||
FROM articles a
|
||
LEFT JOIN article_tags at ON a.id = at.article_id
|
||
LEFT JOIN tags t ON at.tag_id = t.id
|
||
WHERE {where_clause}
|
||
GROUP BY a.id, a.title, a.url, a.author, a.publish_date, a.summary,
|
||
a.is_paywalled, a.category, a.crawled_at
|
||
ORDER BY a.crawled_at DESC
|
||
LIMIT %s OFFSET %s
|
||
"""
|
||
else:
|
||
# 沒有標籤查詢時,使用子查詢取得標籤
|
||
query = f"""
|
||
SELECT a.id, a.title, a.url, a.author, a.publish_date, a.summary,
|
||
a.is_paywalled, a.category, a.crawled_at,
|
||
(SELECT GROUP_CONCAT(t.name SEPARATOR ', ')
|
||
FROM article_tags at
|
||
INNER JOIN tags t ON at.tag_id = t.id
|
||
WHERE at.article_id = a.id) as tags
|
||
FROM articles a
|
||
WHERE {where_clause}
|
||
ORDER BY a.crawled_at DESC
|
||
LIMIT %s OFFSET %s
|
||
"""
|
||
|
||
articles = db.execute_query(query, tuple(query_params), database='db_A101')
|
||
|
||
# 確保 articles 是列表
|
||
if not articles:
|
||
articles = []
|
||
|
||
# 為每篇文章添加預設值並處理資料格式
|
||
for article in articles:
|
||
if 'tags' not in article or article['tags'] is None:
|
||
article['tags'] = ''
|
||
if 'language' not in article:
|
||
article['language'] = 'zh-TW'
|
||
# 確保日期格式正確
|
||
if article.get('publish_date') and isinstance(article['publish_date'], datetime):
|
||
article['publish_date'] = article['publish_date'].strftime('%Y-%m-%d %H:%M:%S')
|
||
if article.get('crawled_at') and isinstance(article['crawled_at'], datetime):
|
||
article['crawled_at'] = article['crawled_at'].strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
logger.info(f"查詢到 {len(articles)} 篇文章,總數: {total}")
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'data': articles,
|
||
'pagination': {
|
||
'page': page,
|
||
'per_page': per_page,
|
||
'total': total,
|
||
'pages': (total + per_page - 1) // per_page if total > 0 else 0
|
||
}
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"取得文章列表失敗: {e}", exc_info=True)
|
||
return jsonify({'success': False, 'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/article/<int:article_id>', methods=['GET'])
|
||
def get_article(article_id):
|
||
"""取得單篇文章詳情"""
|
||
try:
|
||
db = get_db()
|
||
query = """
|
||
SELECT a.*,
|
||
(SELECT GROUP_CONCAT(t.name SEPARATOR ', ')
|
||
FROM article_tags at
|
||
INNER JOIN tags t ON at.tag_id = t.id
|
||
WHERE at.article_id = a.id) as tags
|
||
FROM articles a
|
||
WHERE a.id = %s
|
||
"""
|
||
result = db.execute_query(query, (article_id,), database='db_A101')
|
||
|
||
if result and len(result) > 0:
|
||
article = result[0]
|
||
# 處理日期格式
|
||
if article.get('publish_date') and isinstance(article['publish_date'], datetime):
|
||
article['publish_date'] = article['publish_date'].strftime('%Y-%m-%d %H:%M:%S')
|
||
if article.get('crawled_at') and isinstance(article['crawled_at'], datetime):
|
||
article['crawled_at'] = article['crawled_at'].strftime('%Y-%m-%d %H:%M:%S')
|
||
if 'tags' not in article or article['tags'] is None:
|
||
article['tags'] = ''
|
||
return jsonify({'success': True, 'data': article})
|
||
else:
|
||
return jsonify({'success': False, 'error': '文章不存在'}), 404
|
||
except Exception as e:
|
||
logger.error(f"取得文章詳情失敗: {e}")
|
||
return jsonify({'success': False, 'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/statistics', methods=['GET'])
|
||
def get_statistics():
|
||
"""取得統計資料"""
|
||
try:
|
||
db = get_db()
|
||
db_name = 'db_A101'
|
||
|
||
stats = {}
|
||
|
||
# 文章總數
|
||
total_result = db.execute_query("SELECT COUNT(*) as count FROM articles", database=db_name)
|
||
stats['total_articles'] = total_result[0]['count'] if total_result and len(total_result) > 0 else 0
|
||
|
||
# 付費/非付費文章統計
|
||
paywall_result = db.execute_query(
|
||
"SELECT is_paywalled, COUNT(*) as count FROM articles GROUP BY is_paywalled",
|
||
database=db_name
|
||
)
|
||
stats['paywall'] = {}
|
||
if paywall_result:
|
||
for row in paywall_result:
|
||
stats['paywall'][row['is_paywalled']] = row['count']
|
||
|
||
# 分類分布
|
||
category_result = db.execute_query(
|
||
"SELECT category, COUNT(*) as count FROM articles WHERE category IS NOT NULL AND category != '' GROUP BY category ORDER BY count DESC LIMIT 10",
|
||
database=db_name
|
||
)
|
||
stats['categories'] = [{'name': row['category'], 'count': row['count']} for row in category_result] if category_result else []
|
||
|
||
# 作者統計
|
||
author_result = db.execute_query(
|
||
"SELECT author, COUNT(*) as count FROM articles WHERE author IS NOT NULL AND author != '' GROUP BY author ORDER BY count DESC LIMIT 10",
|
||
database=db_name
|
||
)
|
||
stats['authors'] = [{'name': row['author'], 'count': row['count']} for row in author_result] if author_result else []
|
||
|
||
# 語言分布(暫時跳過,因為欄位不存在)
|
||
stats['languages'] = {}
|
||
|
||
# 最近30天文章數量趨勢
|
||
try:
|
||
date_result = db.execute_query(
|
||
"""
|
||
SELECT DATE(crawled_at) as date, COUNT(*) as count
|
||
FROM articles
|
||
WHERE crawled_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)
|
||
GROUP BY DATE(crawled_at)
|
||
ORDER BY date DESC
|
||
""",
|
||
database=db_name
|
||
)
|
||
stats['daily_trend'] = [{'date': str(row['date']), 'count': row['count']} for row in date_result] if date_result else []
|
||
except:
|
||
stats['daily_trend'] = []
|
||
|
||
return jsonify({'success': True, 'data': stats})
|
||
except Exception as e:
|
||
logger.error(f"取得統計資料失敗: {e}", exc_info=True)
|
||
return jsonify({'success': False, 'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/categories', methods=['GET'])
|
||
def get_categories():
|
||
"""取得所有分類列表"""
|
||
try:
|
||
db = get_db()
|
||
result = db.execute_query(
|
||
"SELECT DISTINCT category FROM articles WHERE category IS NOT NULL AND category != '' ORDER BY category",
|
||
database='db_A101'
|
||
)
|
||
categories = [row['category'] for row in result] if result else []
|
||
return jsonify({'success': True, 'data': categories})
|
||
except Exception as e:
|
||
logger.error(f"取得分類列表失敗: {e}")
|
||
return jsonify({'success': False, 'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/tags', methods=['GET'])
|
||
def get_tags():
|
||
"""取得所有標籤列表"""
|
||
try:
|
||
db = get_db()
|
||
result = db.execute_query(
|
||
"SELECT DISTINCT name FROM tags ORDER BY name",
|
||
database='db_A101'
|
||
)
|
||
tags = [row['name'] for row in result] if result else []
|
||
return jsonify({'success': True, 'data': tags})
|
||
except Exception as e:
|
||
logger.error(f"取得標籤列表失敗: {e}")
|
||
return jsonify({'success': False, 'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/run-crawler', methods=['POST'])
|
||
def run_crawler():
|
||
"""手動觸發爬蟲執行"""
|
||
try:
|
||
import subprocess
|
||
result = subprocess.run(
|
||
[sys.executable, 'run_crawler.py'],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=300 # 5分鐘超時
|
||
)
|
||
|
||
return jsonify({
|
||
'success': result.returncode == 0,
|
||
'output': result.stdout,
|
||
'error': result.stderr
|
||
})
|
||
except subprocess.TimeoutExpired:
|
||
return jsonify({'success': False, 'error': '爬蟲執行超時'}), 500
|
||
except Exception as e:
|
||
logger.error(f"執行爬蟲失敗: {e}")
|
||
return jsonify({'success': False, 'error': str(e)}), 500
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 建立必要的目錄
|
||
os.makedirs('templates', exist_ok=True)
|
||
os.makedirs('static', exist_ok=True)
|
||
|
||
# 啟動服務
|
||
print("=" * 60)
|
||
print("HBR 爬蟲系統 Web 服務")
|
||
print("=" * 60)
|
||
print("服務地址: http://localhost:5000")
|
||
print("按 Ctrl+C 停止服務")
|
||
print("=" * 60)
|
||
|
||
app.run(host='0.0.0.0', port=5000, debug=True)
|
||
|