Files
hbr-crawler/web_app_fixed.py
DonaldFang 方士碩 f524713cb6 Initial commit: HBR 文章爬蟲專案
- Scrapy 爬蟲框架,爬取 HBR 繁體中文文章
- Flask Web 應用程式,提供文章查詢介面
- SQL Server 資料庫整合
- 自動化排程與郵件通知功能

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 17:19:56 +08:00

345 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
HBR 爬蟲系統 Web 服務
提供查詢介面和統計功能
"""
import os
import sys
from pathlib import Path
from flask import Flask, render_template, request, jsonify
from datetime import datetime, timedelta
import logging
# 加入專案路徑
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from hbr_crawler.hbr_crawler.database import get_database_manager
# 設定日誌
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__,
template_folder='templates',
static_folder='static')
# 取得資料庫管理物件
db_manager = None
def get_db():
"""取得資料庫管理物件(單例模式)"""
global db_manager
if db_manager is None:
db_manager = get_database_manager()
return db_manager
@app.route('/')
def index():
"""首頁"""
return render_template('index.html')
@app.route('/api/articles', methods=['GET'])
def get_articles():
"""取得文章列表 API"""
try:
db = get_db()
# 取得查詢參數
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 20))
category = request.args.get('category', '')
tag = request.args.get('tag', '')
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
keyword = request.args.get('keyword', '')
is_paywalled = request.args.get('is_paywalled', '')
# 建立查詢條件(使用別名 'a'
where_conditions = []
params = []
use_join = False
if category:
where_conditions.append("a.category = %s")
params.append(category)
if tag:
where_conditions.append("t.name LIKE %s")
params.append(f'%{tag}%')
use_join = True
if start_date:
where_conditions.append("a.publish_date >= %s")
params.append(start_date)
if end_date:
where_conditions.append("a.publish_date <= %s")
params.append(end_date)
if keyword:
where_conditions.append("(a.title LIKE %s OR a.summary LIKE %s OR a.content LIKE %s)")
params.extend([f'%{keyword}%', f'%{keyword}%', f'%{keyword}%'])
if is_paywalled != '':
where_conditions.append("a.is_paywalled = %s")
params.append(int(is_paywalled))
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
# 計算總數
if use_join:
count_query = f"""
SELECT COUNT(DISTINCT a.id) as count
FROM articles a
LEFT JOIN article_tags at ON a.id = at.article_id
LEFT JOIN tags t ON at.tag_id = t.id
WHERE {where_clause}
"""
else:
count_query = f"SELECT COUNT(*) as count FROM articles a WHERE {where_clause}"
count_params = tuple(params) if params else None
count_result = db.execute_query(count_query, count_params, database='db_A101')
total = count_result[0]['count'] if count_result and len(count_result) > 0 else 0
# 取得文章列表
offset = (page - 1) * per_page
query_params = list(params)
query_params.extend([per_page, offset])
# 查詢文章列表(使用 LEFT JOIN 取得標籤)
if use_join:
query = f"""
SELECT a.id, a.title, a.url, a.author, a.publish_date, a.summary,
a.is_paywalled, a.category, a.crawled_at,
GROUP_CONCAT(DISTINCT t.name SEPARATOR ', ') as tags
FROM articles a
LEFT JOIN article_tags at ON a.id = at.article_id
LEFT JOIN tags t ON at.tag_id = t.id
WHERE {where_clause}
GROUP BY a.id, a.title, a.url, a.author, a.publish_date, a.summary,
a.is_paywalled, a.category, a.crawled_at
ORDER BY a.crawled_at DESC
LIMIT %s OFFSET %s
"""
else:
# 沒有標籤查詢時,使用子查詢取得標籤
query = f"""
SELECT a.id, a.title, a.url, a.author, a.publish_date, a.summary,
a.is_paywalled, a.category, a.crawled_at,
(SELECT GROUP_CONCAT(t.name SEPARATOR ', ')
FROM article_tags at
INNER JOIN tags t ON at.tag_id = t.id
WHERE at.article_id = a.id) as tags
FROM articles a
WHERE {where_clause}
ORDER BY a.crawled_at DESC
LIMIT %s OFFSET %s
"""
articles = db.execute_query(query, tuple(query_params), database='db_A101')
# 確保 articles 是列表
if not articles:
articles = []
# 為每篇文章添加預設值並處理資料格式
for article in articles:
if 'tags' not in article or article['tags'] is None:
article['tags'] = ''
if 'language' not in article:
article['language'] = 'zh-TW'
# 確保日期格式正確
if article.get('publish_date') and isinstance(article['publish_date'], datetime):
article['publish_date'] = article['publish_date'].strftime('%Y-%m-%d %H:%M:%S')
if article.get('crawled_at') and isinstance(article['crawled_at'], datetime):
article['crawled_at'] = article['crawled_at'].strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"查詢到 {len(articles)} 篇文章,總數: {total}")
return jsonify({
'success': True,
'data': articles,
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'pages': (total + per_page - 1) // per_page if total > 0 else 0
}
})
except Exception as e:
logger.error(f"取得文章列表失敗: {e}", exc_info=True)
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/article/<int:article_id>', methods=['GET'])
def get_article(article_id):
"""取得單篇文章詳情"""
try:
db = get_db()
query = """
SELECT a.*,
(SELECT GROUP_CONCAT(t.name SEPARATOR ', ')
FROM article_tags at
INNER JOIN tags t ON at.tag_id = t.id
WHERE at.article_id = a.id) as tags
FROM articles a
WHERE a.id = %s
"""
result = db.execute_query(query, (article_id,), database='db_A101')
if result and len(result) > 0:
article = result[0]
# 處理日期格式
if article.get('publish_date') and isinstance(article['publish_date'], datetime):
article['publish_date'] = article['publish_date'].strftime('%Y-%m-%d %H:%M:%S')
if article.get('crawled_at') and isinstance(article['crawled_at'], datetime):
article['crawled_at'] = article['crawled_at'].strftime('%Y-%m-%d %H:%M:%S')
if 'tags' not in article or article['tags'] is None:
article['tags'] = ''
return jsonify({'success': True, 'data': article})
else:
return jsonify({'success': False, 'error': '文章不存在'}), 404
except Exception as e:
logger.error(f"取得文章詳情失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/statistics', methods=['GET'])
def get_statistics():
"""取得統計資料"""
try:
db = get_db()
db_name = 'db_A101'
stats = {}
# 文章總數
total_result = db.execute_query("SELECT COUNT(*) as count FROM articles", database=db_name)
stats['total_articles'] = total_result[0]['count'] if total_result and len(total_result) > 0 else 0
# 付費/非付費文章統計
paywall_result = db.execute_query(
"SELECT is_paywalled, COUNT(*) as count FROM articles GROUP BY is_paywalled",
database=db_name
)
stats['paywall'] = {}
if paywall_result:
for row in paywall_result:
stats['paywall'][row['is_paywalled']] = row['count']
# 分類分布
category_result = db.execute_query(
"SELECT category, COUNT(*) as count FROM articles WHERE category IS NOT NULL AND category != '' GROUP BY category ORDER BY count DESC LIMIT 10",
database=db_name
)
stats['categories'] = [{'name': row['category'], 'count': row['count']} for row in category_result] if category_result else []
# 作者統計
author_result = db.execute_query(
"SELECT author, COUNT(*) as count FROM articles WHERE author IS NOT NULL AND author != '' GROUP BY author ORDER BY count DESC LIMIT 10",
database=db_name
)
stats['authors'] = [{'name': row['author'], 'count': row['count']} for row in author_result] if author_result else []
# 語言分布(暫時跳過,因為欄位不存在)
stats['languages'] = {}
# 最近30天文章數量趨勢
try:
date_result = db.execute_query(
"""
SELECT DATE(crawled_at) as date, COUNT(*) as count
FROM articles
WHERE crawled_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY DATE(crawled_at)
ORDER BY date DESC
""",
database=db_name
)
stats['daily_trend'] = [{'date': str(row['date']), 'count': row['count']} for row in date_result] if date_result else []
except:
stats['daily_trend'] = []
return jsonify({'success': True, 'data': stats})
except Exception as e:
logger.error(f"取得統計資料失敗: {e}", exc_info=True)
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/categories', methods=['GET'])
def get_categories():
"""取得所有分類列表"""
try:
db = get_db()
result = db.execute_query(
"SELECT DISTINCT category FROM articles WHERE category IS NOT NULL AND category != '' ORDER BY category",
database='db_A101'
)
categories = [row['category'] for row in result] if result else []
return jsonify({'success': True, 'data': categories})
except Exception as e:
logger.error(f"取得分類列表失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/tags', methods=['GET'])
def get_tags():
"""取得所有標籤列表"""
try:
db = get_db()
result = db.execute_query(
"SELECT DISTINCT name FROM tags ORDER BY name",
database='db_A101'
)
tags = [row['name'] for row in result] if result else []
return jsonify({'success': True, 'data': tags})
except Exception as e:
logger.error(f"取得標籤列表失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/run-crawler', methods=['POST'])
def run_crawler():
"""手動觸發爬蟲執行"""
try:
import subprocess
result = subprocess.run(
[sys.executable, 'run_crawler.py'],
capture_output=True,
text=True,
timeout=300 # 5分鐘超時
)
return jsonify({
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr
})
except subprocess.TimeoutExpired:
return jsonify({'success': False, 'error': '爬蟲執行超時'}), 500
except Exception as e:
logger.error(f"執行爬蟲失敗: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == '__main__':
# 建立必要的目錄
os.makedirs('templates', exist_ok=True)
os.makedirs('static', exist_ok=True)
# 啟動服務
print("=" * 60)
print("HBR 爬蟲系統 Web 服務")
print("=" * 60)
print("服務地址: http://localhost:5000")
print("按 Ctrl+C 停止服務")
print("=" * 60)
app.run(host='0.0.0.0', port=5000, debug=True)