Files
hbr-crawler/web_app.py
DonaldFang 方士碩 f524713cb6 Initial commit: HBR 文章爬蟲專案
- Scrapy 爬蟲框架,爬取 HBR 繁體中文文章
- Flask Web 應用程式,提供文章查詢介面
- SQL Server 資料庫整合
- 自動化排程與郵件通知功能

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 17:19:56 +08:00

558 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
HBR 爬蟲系統 Web 服務
提供查詢介面和統計功能
"""
import os
import sys
from pathlib import Path
from flask import Flask, render_template, request, jsonify
from datetime import datetime, timedelta
import logging
try:
from flask_cors import CORS
CORS_AVAILABLE = True
except ImportError:
CORS_AVAILABLE = False
# 加入專案路徑
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from hbr_crawler.hbr_crawler.database import get_database_manager
# 設定日誌
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__,
template_folder='templates',
static_folder='static')
# 啟用 CORS如果需要跨域請求
if CORS_AVAILABLE:
CORS(app)
# 取得資料庫管理物件
db_manager = None
def get_db():
"""取得資料庫管理物件(單例模式)"""
global db_manager
if db_manager is None:
db_manager = get_database_manager()
return db_manager
@app.route('/')
def index():
"""首頁"""
return render_template('index.html')
@app.route('/api/articles', methods=['GET'])
def get_articles():
"""取得文章列表 API"""
try:
db = get_db()
# 取得查詢參數
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 20))
category = request.args.get('category', '')
tag = request.args.get('tag', '')
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
keyword = request.args.get('keyword', '')
is_paywalled = request.args.get('is_paywalled', '')
language = request.args.get('language', '')
# 建立查詢條件
where_conditions = []
params = []
# 使用別名 'a' 建立查詢條件(因為查詢會使用 JOIN
if category:
where_conditions.append("a.category = %s")
params.append(category)
if tag:
# 使用 article_tags 關聯表查詢標籤
where_conditions.append("t.name LIKE %s")
params.append(f'%{tag}%')
use_join = True
else:
use_join = False
if start_date:
where_conditions.append("a.publish_date >= %s")
params.append(start_date)
if end_date:
where_conditions.append("a.publish_date <= %s")
params.append(end_date)
if keyword:
where_conditions.append("(a.title LIKE %s OR a.summary LIKE %s OR a.content LIKE %s)")
params.extend([f'%{keyword}%', f'%{keyword}%', f'%{keyword}%'])
if is_paywalled != '':
where_conditions.append("a.is_paywalled = %s")
params.append(int(is_paywalled))
# language 欄位不存在,暫時跳過
# if language:
# where_conditions.append("a.language = %s")
# params.append(language)
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
# 計算總數
if use_join:
count_query = f"""
SELECT COUNT(DISTINCT a.id) as count
FROM articles a
LEFT JOIN article_tags at ON a.id = at.article_id
LEFT JOIN tags t ON at.tag_id = t.id
WHERE {where_clause}
"""
else:
count_query = f"SELECT COUNT(*) as count FROM articles a WHERE {where_clause}"
count_params = tuple(params) if params else None
count_result = db.execute_query(count_query, count_params, database='db_A101')
total = count_result[0]['count'] if count_result and len(count_result) > 0 else 0
# 取得文章列表
offset = (page - 1) * per_page
query_params = list(params)
query_params.extend([per_page, offset])
# 查詢文章列表(使用 LEFT JOIN 取得標籤)
if use_join:
query = f"""
SELECT a.id, a.title, a.url, a.author, a.publish_date, a.summary,
a.is_paywalled, a.category, a.crawled_at,
GROUP_CONCAT(DISTINCT t.name SEPARATOR ', ') as tags
FROM articles a
LEFT JOIN article_tags at ON a.id = at.article_id
LEFT JOIN tags t ON at.tag_id = t.id
WHERE {where_clause}
GROUP BY a.id, a.title, a.url, a.author, a.publish_date, a.summary,
a.is_paywalled, a.category, a.crawled_at
ORDER BY a.crawled_at DESC
LIMIT %s OFFSET %s
"""
else:
# 沒有標籤查詢時,使用子查詢取得標籤
query = f"""
SELECT a.id, a.title, a.url, a.author, a.publish_date, a.summary,
a.is_paywalled, a.category, a.crawled_at,
(SELECT GROUP_CONCAT(t.name SEPARATOR ', ')
FROM article_tags at
INNER JOIN tags t ON at.tag_id = t.id
WHERE at.article_id = a.id) as tags
FROM articles a
WHERE {where_clause}
ORDER BY a.crawled_at DESC
LIMIT %s OFFSET %s
"""
articles = db.execute_query(query, tuple(query_params), database='db_A101')
# 確保 articles 是列表
if not articles:
articles = []
# 為每篇文章添加預設值並處理資料格式
for article in articles:
if 'tags' not in article or article['tags'] is None:
article['tags'] = ''
if 'language' not in article:
article['language'] = 'zh-TW'
# 確保日期格式正確
if article.get('publish_date') and isinstance(article['publish_date'], datetime):
article['publish_date'] = article['publish_date'].strftime('%Y-%m-%d %H:%M:%S')
if article.get('crawled_at') and isinstance(article['crawled_at'], datetime):
article['crawled_at'] = article['crawled_at'].strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"查詢到 {len(articles)} 篇文章,總數: {total}")
return jsonify({
'success': True,
'data': articles,
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'pages': (total + per_page - 1) // per_page
}
})
except Exception as e:
logger.error(f"取得文章列表失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/article/<int:article_id>', methods=['GET'])
def get_article(article_id):
"""取得單篇文章詳情"""
try:
db = get_db()
query = "SELECT * FROM articles WHERE id = %s"
result = db.execute_query(query, (article_id,), database='db_A101')
if result:
return jsonify({'success': True, 'data': result[0]})
else:
return jsonify({'success': False, 'error': '文章不存在'}), 404
except Exception as e:
logger.error(f"取得文章詳情失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/statistics', methods=['GET'])
def get_statistics():
"""取得統計資料"""
try:
db = get_db()
db_name = 'db_A101'
stats = {}
# 文章總數
total_result = db.execute_query("SELECT COUNT(*) as count FROM articles", database=db_name)
stats['total_articles'] = total_result[0]['count'] if total_result else 0
# 付費/非付費文章統計
paywall_result = db.execute_query(
"SELECT is_paywalled, COUNT(*) as count FROM articles GROUP BY is_paywalled",
database=db_name
)
stats['paywall'] = {row['is_paywalled']: row['count'] for row in paywall_result}
# 分類分布
category_result = db.execute_query(
"SELECT category, COUNT(*) as count FROM articles WHERE category IS NOT NULL AND category != '' GROUP BY category ORDER BY count DESC LIMIT 10",
database=db_name
)
stats['categories'] = [{'name': row['category'], 'count': row['count']} for row in category_result]
# 作者統計
author_result = db.execute_query(
"SELECT author, COUNT(*) as count FROM articles WHERE author IS NOT NULL AND author != '' GROUP BY author ORDER BY count DESC LIMIT 10",
database=db_name
)
stats['authors'] = [{'name': row['author'], 'count': row['count']} for row in author_result]
# 語言分布
language_result = db.execute_query(
"SELECT language, COUNT(*) as count FROM articles GROUP BY language",
database=db_name
)
stats['languages'] = {row['language']: row['count'] for row in language_result}
# 最近30天文章數量趨勢
date_result = db.execute_query(
"""
SELECT DATE(crawled_at) as date, COUNT(*) as count
FROM articles
WHERE crawled_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY DATE(crawled_at)
ORDER BY date DESC
""",
database=db_name
)
stats['daily_trend'] = [{'date': str(row['date']), 'count': row['count']} for row in date_result]
return jsonify({'success': True, 'data': stats})
except Exception as e:
logger.error(f"取得統計資料失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/categories', methods=['GET'])
def get_categories():
"""取得所有分類列表"""
try:
db = get_db()
result = db.execute_query(
"SELECT DISTINCT category FROM articles WHERE category IS NOT NULL AND category != '' ORDER BY category",
database='db_A101'
)
categories = [row['category'] for row in result]
return jsonify({'success': True, 'data': categories})
except Exception as e:
logger.error(f"取得分類列表失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/tags', methods=['GET'])
def get_tags():
"""取得所有標籤列表"""
try:
db = get_db()
result = db.execute_query(
"SELECT DISTINCT tags FROM articles WHERE tags IS NOT NULL AND tags != ''",
database='db_A101'
)
# 解析逗號分隔的標籤
all_tags = set()
for row in result:
tags = [t.strip() for t in row['tags'].split(',') if t.strip()]
all_tags.update(tags)
return jsonify({'success': True, 'data': sorted(list(all_tags))})
except Exception as e:
logger.error(f"取得標籤列表失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/run-crawler', methods=['POST'])
def run_crawler():
"""手動觸發爬蟲執行"""
try:
import subprocess
result = subprocess.run(
[sys.executable, 'run_crawler.py'],
capture_output=True,
text=True,
timeout=300 # 5分鐘超時
)
return jsonify({
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr
})
except subprocess.TimeoutExpired:
return jsonify({'success': False, 'error': '爬蟲執行超時'}), 500
except Exception as e:
logger.error(f"執行爬蟲失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/save-crawler-config', methods=['POST'])
def save_crawler_config():
"""儲存爬蟲設定"""
try:
config = request.get_json()
# 驗證設定
if not config:
return jsonify({'success': False, 'error': '設定資料為空'}), 400
# 儲存到檔案(可選)
import json
config_file = Path(__file__).parent / 'crawler_config.json'
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
logger.info(f"爬蟲設定已儲存: {config_file}")
return jsonify({'success': True, 'message': '設定已儲存'})
except Exception as e:
logger.error(f"儲存爬蟲設定失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/load-crawler-config', methods=['GET'])
def load_crawler_config():
"""載入爬蟲設定"""
try:
config_file = Path(__file__).parent / 'crawler_config.json'
if config_file.exists():
import json
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
return jsonify({'success': True, 'data': config})
else:
# 返回預設設定
default_config = {
'urls': [
'https://www.hbrtaiwan.com/',
'https://www.hbrtaiwan.com/topic/management',
'https://www.hbrtaiwan.com/topic/leadership',
'https://www.hbrtaiwan.com/topic/strategy',
'https://www.hbrtaiwan.com/topic/innovation',
'https://www.hbrtaiwan.com/topic/technology'
],
'downloadDelay': 1,
'maxDepth': 3,
'concurrentRequests': 16,
'skipPaywalled': True,
'followPagination': True,
'obeyRobotsTxt': True,
'articleListSelector': '.articleItem, article, .article-item, .post-item, .content-item',
'titleSelector': 'h1.articleTitle, h1.article-title, h1, .article-title, .post-title',
'authorSelector': '.authorName, .author, .byline, .writer, .author-name',
'contentSelector': '.articleContent, .article-content, .post-content, .content, .articleText'
}
return jsonify({'success': True, 'data': default_config})
except Exception as e:
logger.error(f"載入爬蟲設定失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/test-crawler-config', methods=['POST'])
def test_crawler_config():
"""測試爬蟲設定(僅測試第一個 URL"""
try:
config = request.get_json()
if not config or not config.get('urls') or len(config['urls']) == 0:
return jsonify({'success': False, 'error': '請至少提供一個起始 URL'}), 400
# 使用 Scrapy 測試第一個 URL
import subprocess
import tempfile
import json
# 建立臨時設定檔
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False, encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
temp_config_file = f.name
try:
# 執行測試爬蟲(僅爬取第一個 URL深度 1
# 使用 stats 收集器來獲取準確的統計資訊
test_result = subprocess.run(
[sys.executable, '-m', 'scrapy', 'crawl', 'hbr',
'-a', f'start_url={config["urls"][0]}',
'-a', 'test_mode=true',
'-s', 'LOG_LEVEL=INFO',
'-s', 'STATS_CLASS=scrapy.statscollectors.MemoryStatsCollector'],
cwd=str(Path(__file__).parent / 'hbr_crawler'),
capture_output=True,
text=True,
timeout=60,
encoding='utf-8',
errors='replace'
)
# 從輸出中解析文章數量
articles_found = 0
output_lines = test_result.stdout.split('\n') if test_result.stdout else []
stderr_lines = test_result.stderr.split('\n') if test_result.stderr else []
all_lines = output_lines + stderr_lines
import re
# 方法1: 查找 Scrapy 統計資訊中的 item_scraped_count
# Scrapy 輸出格式: 'item_scraped_count': 5 或 'items': 5
for line in all_lines:
# 匹配 'item_scraped_count': 數字 或 'items': 數字
match = re.search(r"['\"]?item_scraped_count['\"]?\s*[:=]\s*(\d+)", line, re.IGNORECASE)
if match:
articles_found = int(match.group(1))
break
# 匹配 'items': 數字(在某些 Scrapy 版本中)
match = re.search(r"['\"]?items['\"]?\s*[:=]\s*(\d+)", line, re.IGNORECASE)
if match:
articles_found = int(match.group(1))
break
# 方法2: 查找日誌中的 "Scraped from" 或 "item_scraped" 訊息
if articles_found == 0:
for line in all_lines:
# Scrapy 日誌格式: [hbr] DEBUG: Scraped from <200 https://...>
if 'Scraped from' in line or 'item_scraped' in line.lower():
articles_found += 1
# 方法3: 查找統計摘要中的數字(格式: "items": 5
if articles_found == 0:
for line in all_lines:
# 匹配 JSON 格式的統計: "items": 5 或 'items': 5
match = re.search(r"['\"]items['\"]\s*:\s*(\d+)", line, re.IGNORECASE)
if match:
articles_found = int(match.group(1))
break
# 方法4: 如果還是0檢查是否有錯誤或警告
if articles_found == 0:
has_error = False
error_lines = []
for line in all_lines:
if 'ERROR' in line.upper() or 'CRITICAL' in line.upper():
has_error = True
error_lines.append(line)
elif 'No module named' in line or 'ImportError' in line:
has_error = True
error_lines.append(line)
if has_error:
error_msg = '\n'.join(error_lines[:5]) # 只取前5行錯誤
return jsonify({
'success': False,
'error': f'爬蟲執行時發生錯誤',
'data': {
'articles_found': 0,
'output': test_result.stdout[:1000] if test_result.stdout else '',
'error': error_msg[:500],
'returncode': test_result.returncode
}
})
# 如果找到文章,返回成功;如果沒找到但沒有錯誤,可能是選擇器問題
if articles_found == 0:
# 檢查是否成功連接到網站
has_connection = False
for line in all_lines:
if '200' in line or 'downloaded' in line.lower() or 'response' in line.lower():
has_connection = True
break
if has_connection:
return jsonify({
'success': True,
'data': {
'articles_found': 0,
'output': test_result.stdout[:1000] if test_result.stdout else '',
'error': '成功連接到網站,但未找到文章。可能是 CSS 選擇器不正確,或網站結構已變更。',
'returncode': test_result.returncode,
'warning': '未找到文章,請檢查 CSS 選擇器設定'
}
})
return jsonify({
'success': True,
'data': {
'articles_found': articles_found,
'output': test_result.stdout[:1000] if test_result.stdout else '', # 返回前 1000 字元
'error': test_result.stderr[:500] if test_result.stderr else '',
'returncode': test_result.returncode
}
})
finally:
# 清理臨時檔案
try:
os.unlink(temp_config_file)
except:
pass
except subprocess.TimeoutExpired:
return jsonify({'success': False, 'error': '測試超時'}), 500
except Exception as e:
logger.error(f"測試爬蟲設定失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == '__main__':
# 建立必要的目錄
os.makedirs('templates', exist_ok=True)
os.makedirs('static', exist_ok=True)
# 啟動服務
print("=" * 60)
print("HBR 爬蟲系統 Web 服務")
print("=" * 60)
print("服務地址: http://localhost:5000")
print("按 Ctrl+C 停止服務")
print("=" * 60)
app.run(host='0.0.0.0', port=5000, debug=True)