Initial commit: HBR 文章爬蟲專案

- Scrapy 爬蟲框架,爬取 HBR 繁體中文文章
- Flask Web 應用程式,提供文章查詢介面
- SQL Server 資料庫整合
- 自動化排程與郵件通知功能

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-12-03 17:19:56 +08:00
commit f524713cb6
35 changed files with 6719 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
# This package will contain the spiders of your Scrapy project
#
# Please refer to the documentation for information on how to create and manage
# your spiders.

View File

@@ -0,0 +1,232 @@
"""
資料庫連線模組
提供資料庫連線、查詢、插入等功能
"""
import pymysql
import logging
from contextlib import contextmanager
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class DatabaseManager:
"""資料庫管理類別"""
def __init__(self, host: str, port: int, user: str, password: str,
database: str = None, charset: str = 'utf8mb4'):
"""
初始化資料庫連線參數
Args:
host: 資料庫主機位址
port: 資料庫埠號
user: 資料庫使用者名稱
password: 資料庫密碼
database: 資料庫名稱(可選,用於建立連線時指定)
charset: 字元集(預設 utf8mb4
"""
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.charset = charset
self.connection = None
@contextmanager
def get_connection(self, database: Optional[str] = None):
"""
取得資料庫連線(使用 context manager 自動管理連線)
Args:
database: 資料庫名稱(可選,覆蓋初始化時的設定)
Yields:
pymysql.Connection: 資料庫連線物件
"""
db_name = database or self.database
connection = None
try:
connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=db_name,
charset=self.charset,
cursorclass=pymysql.cursors.DictCursor,
autocommit=False
)
yield connection
connection.commit()
except Exception as e:
if connection:
connection.rollback()
logger.error(f"資料庫連線錯誤: {e}")
raise
finally:
if connection:
connection.close()
def test_connection(self, database: Optional[str] = None) -> bool:
"""
測試資料庫連線
Args:
database: 資料庫名稱(可選)
Returns:
bool: 連線成功返回 True失敗返回 False
"""
try:
with self.get_connection(database) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
logger.info("資料庫連線測試成功")
return True
except Exception as e:
logger.error(f"資料庫連線測試失敗: {e}")
return False
def create_database(self, database_name: str) -> bool:
"""
建立資料庫(如果不存在)
Args:
database_name: 資料庫名稱
Returns:
bool: 建立成功返回 True失敗返回 False
"""
try:
# 先連接到系統資料庫(不指定資料庫)
with self.get_connection(None) as conn:
with conn.cursor() as cursor:
# 建立資料庫(如果不存在)
cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{database_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
conn.commit()
logger.info(f"資料庫 {database_name} 建立成功(或已存在)")
return True
except Exception as e:
logger.error(f"建立資料庫失敗: {e}")
return False
def execute_sql_file(self, sql_file_path: str, database: Optional[str] = None) -> bool:
"""
執行 SQL 檔案(用於建立資料表)
Args:
sql_file_path: SQL 檔案路徑
database: 資料庫名稱(可選)
Returns:
bool: 執行成功返回 True失敗返回 False
"""
try:
db_name = database or self.database
with open(sql_file_path, 'r', encoding='utf-8') as f:
sql_content = f.read()
with self.get_connection(db_name) as conn:
with conn.cursor() as cursor:
# 分割 SQL 語句(以分號分隔)
statements = [s.strip() for s in sql_content.split(';') if s.strip()]
for statement in statements:
if statement:
cursor.execute(statement)
conn.commit()
logger.info(f"SQL 檔案執行成功: {sql_file_path}")
return True
except Exception as e:
logger.error(f"執行 SQL 檔案失敗: {e}")
return False
def execute_query(self, query: str, params: tuple = None,
database: Optional[str] = None) -> list:
"""
執行查詢語句
Args:
query: SQL 查詢語句
params: 查詢參數(可選)
database: 資料庫名稱(可選)
Returns:
list: 查詢結果列表
"""
try:
db_name = database or self.database
with self.get_connection(db_name) as conn:
with conn.cursor() as cursor:
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.fetchall()
except Exception as e:
logger.error(f"查詢執行失敗: {e}")
return []
def execute_update(self, query: str, params: tuple = None,
database: Optional[str] = None) -> int:
"""
執行更新語句INSERT, UPDATE, DELETE
Args:
query: SQL 更新語句
params: 更新參數(可選)
database: 資料庫名稱(可選)
Returns:
int: 受影響的列數
"""
try:
db_name = database or self.database
with self.get_connection(db_name) as conn:
with conn.cursor() as cursor:
if params:
affected_rows = cursor.execute(query, params)
else:
affected_rows = cursor.execute(query)
conn.commit()
return affected_rows
except Exception as e:
logger.error(f"更新執行失敗: {e}")
return 0
def get_database_manager() -> DatabaseManager:
"""
從環境變數或設定檔取得資料庫連線資訊,建立 DatabaseManager 實例
優先順序:
1. 環境變數
2. Scrapy settings如果可用
3. 預設值
Returns:
DatabaseManager: 資料庫管理物件
"""
import os
# 嘗試從 Scrapy settings 取得設定
try:
from scrapy.utils.project import get_project_settings
settings = get_project_settings()
host = settings.get('DB_HOST', os.environ.get('DB_HOST', 'mysql.theaken.com'))
port = settings.getint('DB_PORT', int(os.environ.get('DB_PORT', 33306)))
user = settings.get('DB_USER', os.environ.get('DB_USER', 'A101'))
password = settings.get('DB_PASSWORD', os.environ.get('DB_PASSWORD', 'Aa123456'))
database = settings.get('DB_NAME', os.environ.get('DB_NAME', 'db_A101'))
except:
# 如果無法取得 Scrapy settings使用環境變數或預設值
host = os.environ.get('DB_HOST', 'mysql.theaken.com')
port = int(os.environ.get('DB_PORT', 33306))
user = os.environ.get('DB_USER', 'A101')
password = os.environ.get('DB_PASSWORD', 'Aa123456')
database = os.environ.get('DB_NAME', 'db_A101')
return DatabaseManager(host, port, user, password, database)

View File

@@ -0,0 +1,19 @@
# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html
import scrapy
class HbrArticleItem(scrapy.Item):
# define the fields for your item here like:
title = scrapy.Field()
url = scrapy.Field()
author = scrapy.Field()
publish_date = scrapy.Field()
summary = scrapy.Field()
is_paywalled = scrapy.Field()
category = scrapy.Field()
tags = scrapy.Field()
content = scrapy.Field()

View File

@@ -0,0 +1,378 @@
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
import csv
import os
import logging
from datetime import datetime
from hbr_crawler.database import get_database_manager
logger = logging.getLogger(__name__)
class CsvExportPipeline:
def __init__(self):
self.file = None
self.writer = None
self.items = []
def open_spider(self, spider):
# 確保在專案根目錄建立 CSV 檔案
import os
# 取得專案根目錄(上一層目錄)
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
csv_path = os.path.join(project_root, 'hbr_articles.csv')
self.file = open(csv_path, 'w', newline='', encoding='utf-8')
fieldnames = ['title', 'url', 'author', 'publish_date', 'summary', 'is_paywalled', 'category', 'tags', 'content']
self.writer = csv.DictWriter(self.file, fieldnames=fieldnames)
self.writer.writeheader()
def close_spider(self, spider):
if self.file:
self.file.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 將 tags 列表轉換為字串
if 'tags' in adapter and isinstance(adapter['tags'], list):
adapter['tags'] = ', '.join(adapter['tags'])
# 將所有欄位轉換為字串,避免 None 值
row = {}
for field in ['title', 'url', 'author', 'publish_date', 'summary', 'is_paywalled', 'category', 'tags', 'content']:
value = adapter.get(field, '')
row[field] = str(value) if value is not None else ''
self.writer.writerow(row)
return item
class DatabasePipeline:
"""資料庫儲存 Pipeline"""
def __init__(self):
self.db_manager = None
self.db_name = 'db_A101' # 預設資料庫名稱
self.tag_cache = {} # 標籤快取,避免重複查詢
def open_spider(self, spider):
"""爬蟲開始時初始化資料庫連線"""
try:
self.db_manager = get_database_manager()
# 取得資料庫名稱
self.db_name = self.db_manager.database or 'db_A001'
# 測試連線
if not self.db_manager.test_connection(self.db_name):
logger.warning("資料庫連線失敗DatabasePipeline 將不會儲存資料")
logger.warning("提示:請確認:")
logger.warning(f" 1. 資料庫 {self.db_name} 是否可存取")
logger.warning(" 2. 使用者是否有存取該資料庫的權限")
logger.warning(" 3. 可執行 python test_db_connection.py 檢查連線")
self.db_manager = None
else:
logger.info("資料庫連線成功")
# 載入標籤快取
self._load_tag_cache()
except Exception as e:
logger.warning(f"初始化資料庫連線失敗: {e}")
logger.warning("DatabasePipeline 將不會儲存資料,但爬蟲會繼續執行")
self.db_manager = None
def close_spider(self, spider):
"""爬蟲結束時關閉連線"""
self.tag_cache = {}
logger.info("資料庫 Pipeline 關閉")
def _load_tag_cache(self):
"""載入現有標籤到快取"""
try:
tags = self.db_manager.execute_query(
"SELECT id, name FROM tags",
database=self.db_name
)
self.tag_cache = {tag['name']: tag['id'] for tag in tags}
logger.info(f"載入 {len(self.tag_cache)} 個標籤到快取")
except Exception as e:
logger.warning(f"載入標籤快取失敗: {e}")
self.tag_cache = {}
def _get_or_create_tag(self, tag_name: str) -> int:
"""
取得或建立標籤,返回標籤 ID
Args:
tag_name: 標籤名稱
Returns:
int: 標籤 ID
"""
if not tag_name or not tag_name.strip():
return None
tag_name = tag_name.strip()
# 先檢查快取
if tag_name in self.tag_cache:
return self.tag_cache[tag_name]
try:
# 查詢資料庫
tags = self.db_manager.execute_query(
"SELECT id FROM tags WHERE name = %s",
params=(tag_name,),
database=self.db_name
)
if tags:
tag_id = tags[0]['id']
self.tag_cache[tag_name] = tag_id
return tag_id
# 建立新標籤
affected_rows = self.db_manager.execute_update(
"INSERT INTO tags (name) VALUES (%s)",
params=(tag_name,),
database=self.db_name
)
if affected_rows > 0:
# 取得新建立的標籤 ID
tags = self.db_manager.execute_query(
"SELECT id FROM tags WHERE name = %s",
params=(tag_name,),
database=self.db_name
)
if tags:
tag_id = tags[0]['id']
self.tag_cache[tag_name] = tag_id
logger.debug(f"建立新標籤: {tag_name} (ID: {tag_id})")
return tag_id
return None
except Exception as e:
logger.error(f"取得或建立標籤失敗: {e}")
return None
def _check_article_exists(self, url: str) -> int:
"""
檢查文章是否存在,返回文章 ID
Args:
url: 文章 URL
Returns:
int: 文章 ID如果不存在返回 None
"""
try:
articles = self.db_manager.execute_query(
"SELECT id FROM articles WHERE url = %s",
params=(url,),
database=self.db_name
)
if articles:
return articles[0]['id']
return None
except Exception as e:
logger.error(f"檢查文章是否存在失敗: {e}")
return None
def _insert_article(self, item) -> int:
"""
插入新文章,返回文章 ID
Args:
item: Scrapy Item 物件
Returns:
int: 文章 ID
"""
adapter = ItemAdapter(item)
crawled_at = datetime.now()
try:
# 處理發布日期
publish_date = adapter.get('publish_date')
if publish_date and str(publish_date).strip():
# 嘗試解析日期字串
try:
# 如果已經是 datetime 物件,直接使用
if isinstance(publish_date, datetime):
publish_date = publish_date
else:
# 嘗試解析常見日期格式
from dateutil import parser
publish_date = parser.parse(str(publish_date))
except:
publish_date = None
else:
publish_date = None
affected_rows = self.db_manager.execute_update(
"""INSERT INTO articles
(title, url, author, publish_date, summary, is_paywalled,
category, content, crawled_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
params=(
adapter.get('title', ''),
adapter.get('url', ''),
adapter.get('author') or None,
publish_date,
adapter.get('summary') or None,
adapter.get('is_paywalled', 0),
adapter.get('category') or None,
adapter.get('content') or None,
crawled_at
),
database=self.db_name
)
if affected_rows > 0:
# 取得新建立的文章 ID
articles = self.db_manager.execute_query(
"SELECT id FROM articles WHERE url = %s",
params=(adapter.get('url'),),
database=self.db_name
)
if articles:
return articles[0]['id']
return None
except Exception as e:
logger.error(f"插入文章失敗: {e}")
return None
def _update_article(self, article_id: int, item):
"""
更新現有文章
Args:
article_id: 文章 ID
item: Scrapy Item 物件
"""
adapter = ItemAdapter(item)
crawled_at = datetime.now()
try:
# 處理發布日期
publish_date = adapter.get('publish_date')
if publish_date and str(publish_date).strip():
try:
if isinstance(publish_date, datetime):
publish_date = publish_date
else:
from dateutil import parser
publish_date = parser.parse(str(publish_date))
except:
publish_date = None
else:
publish_date = None
self.db_manager.execute_update(
"""UPDATE articles
SET title = %s, author = %s, publish_date = %s,
summary = %s, is_paywalled = %s, category = %s,
content = %s, crawled_at = %s
WHERE id = %s""",
params=(
adapter.get('title', ''),
adapter.get('author') or None,
publish_date,
adapter.get('summary') or None,
adapter.get('is_paywalled', 0),
adapter.get('category') or None,
adapter.get('content') or None,
crawled_at,
article_id
),
database=self.db_name
)
except Exception as e:
logger.error(f"更新文章失敗: {e}")
def _link_article_tags(self, article_id: int, tags: list):
"""
建立文章與標籤的關聯
Args:
article_id: 文章 ID
tags: 標籤名稱列表
"""
if not article_id or not tags:
return
try:
for tag_name in tags:
if not tag_name or not tag_name.strip():
continue
tag_id = self._get_or_create_tag(tag_name.strip())
if not tag_id:
continue
# 檢查關聯是否已存在
existing = self.db_manager.execute_query(
"SELECT id FROM article_tags WHERE article_id = %s AND tag_id = %s",
params=(article_id, tag_id),
database=self.db_name
)
if not existing:
# 建立新關聯
self.db_manager.execute_update(
"INSERT INTO article_tags (article_id, tag_id) VALUES (%s, %s)",
params=(article_id, tag_id),
database=self.db_name
)
except Exception as e:
logger.error(f"建立文章標籤關聯失敗: {e}")
def process_item(self, item, spider):
"""處理爬取的項目"""
if not self.db_manager:
return item
adapter = ItemAdapter(item)
url = adapter.get('url', '')
if not url:
logger.warning("文章 URL 為空,跳過資料庫儲存")
return item
try:
# 檢查文章是否已存在
article_id = self._check_article_exists(url)
if article_id:
# 更新現有文章
self._update_article(article_id, item)
logger.debug(f"更新文章: {url} (ID: {article_id})")
else:
# 插入新文章
article_id = self._insert_article(item)
if article_id:
logger.debug(f"插入新文章: {url} (ID: {article_id})")
else:
logger.warning(f"插入文章失敗: {url}")
return item
# 處理標籤
tags = adapter.get('tags', [])
if tags:
if isinstance(tags, str):
# 如果是字串(逗號分隔),轉換為列表
tags = [t.strip() for t in tags.split(',') if t.strip()]
elif isinstance(tags, list):
tags = [str(t).strip() for t in tags if t and str(t).strip()]
if tags:
self._link_article_tags(article_id, tags)
except Exception as e:
logger.error(f"處理文章項目失敗: {e}")
return item

View File

@@ -0,0 +1,94 @@
# Scrapy settings for hbr_crawler project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# https://docs.scrapy.org/en/latest/topics/settings.html
# https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
BOT_NAME = 'hbr_crawler'
SPIDER_MODULES = ['hbr_crawler.spiders']
NEWSPIDER_MODULE = 'hbr_crawler.spiders'
# Obey robots.txt rules
ROBOTSTXT_OBEY = True
# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
DOWNLOAD_DELAY = 1
# The download delay setting will honor only one of:
#CONCURRENT_REQUESTS_PER_DOMAIN = 16
#CONCURRENT_REQUESTS_PER_IP = 16
# Disable cookies (enabled by default)
#COOKIES_ENABLED = False
# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False
# Override the default request headers:
#DEFAULT_REQUEST_HEADERS = {
# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
# 'Accept-Language': 'en',
#}
# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
#SPIDER_MIDDLEWARES = {
# 'hbr_crawler.middlewares.HbrCrawlerSpiderMiddleware': 543,
#}
# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#DOWNLOADER_MIDDLEWARES = {
# 'hbr_crawler.middlewares.HbrCrawlerDownloaderMiddleware': 543,
#}
# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
#EXTENSIONS = {
# 'scrapy.extensions.telnet.TelnetConsole': None,
#}
# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
'hbr_crawler.pipelines.CsvExportPipeline': 300,
'hbr_crawler.pipelines.DatabasePipeline': 400,
}
# 資料庫設定
DB_HOST = 'mysql.theaken.com'
DB_PORT = 33306
DB_USER = 'A101'
DB_PASSWORD = 'Aa123456'
DB_NAME = 'db_A101'
# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 1
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False
# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = 'httpcache'
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'
# Set settings whose default value is deprecated to a future-proof value
REQUEST_FINGERPRINTER_IMPLEMENTATION = '2.7'
TWISTED_REACTOR = 'twisted.internet.asyncioreactor.AsyncioSelectorReactor'
FEED_EXPORT_ENCODING = 'utf-8'

View File

@@ -0,0 +1,4 @@
# This package will contain the spiders of your Scrapy project
#
# Please refer to the documentation for information on how to create and manage
# your spiders.

View File

@@ -0,0 +1,158 @@
import scrapy
from hbr_crawler.items import HbrArticleItem
import re
from datetime import datetime
class HbrSpider(scrapy.Spider):
name = 'hbr'
allowed_domains = ['hbrtaiwan.com']
start_urls = [
'https://www.hbrtaiwan.com/',
'https://www.hbrtaiwan.com/topic/management',
'https://www.hbrtaiwan.com/topic/leadership',
'https://www.hbrtaiwan.com/topic/strategy',
'https://www.hbrtaiwan.com/topic/innovation',
'https://www.hbrtaiwan.com/topic/technology',
]
def __init__(self, start_url=None, test_mode=False, *args, **kwargs):
super(HbrSpider, self).__init__(*args, **kwargs)
self.test_mode = test_mode == 'true' or test_mode is True
# 如果提供了 start_url則使用它作為唯一的起始 URL
if start_url:
self.start_urls = [start_url]
self.logger.info(f"使用自訂起始 URL: {start_url}")
if self.test_mode:
self.logger.info("測試模式:僅爬取第一層,不追蹤分頁")
def parse(self, response):
# 解析文章列表頁面
# 優先使用 HBR Taiwan 實際使用的選擇器
articles = response.css('.articleItem, article, .article-item, .post-item, .content-item')
if not articles:
# 嘗試其他可能的选择器
articles = response.css('.article, .post, .item')
# 如果還是沒有找到,嘗試直接查找包含文章連結的元素
if not articles:
# 查找所有包含 /article/ 路徑的連結
article_links = response.css('a[href*="/article/"]')
seen_urls = set()
for link in article_links:
href = link.css('::attr(href)').get()
if href and '/article/' in href:
if not href.startswith('http'):
href = response.urljoin(href)
if href not in seen_urls:
seen_urls.add(href)
yield response.follow(href, self.parse_article)
return
for article in articles:
# 提取文章連結 - 優先查找 h1, h3 內的連結,然後是 div 內的連結
link = article.css('h1 a::attr(href), h3 a::attr(href), .itemthumb a::attr(href), .imgBox a::attr(href), a::attr(href)').get()
if link and not link.startswith('javascript:') and not link.startswith('#'):
if not link.startswith('http'):
link = response.urljoin(link)
# 只處理文章連結
if '/article/' in link:
yield response.follow(link, self.parse_article)
# 尋找分頁連結(測試模式下不追蹤分頁)
if not self.test_mode:
next_page = response.css('a.next::attr(href), .pagination a:last-child::attr(href)').get()
if next_page:
yield response.follow(next_page, self.parse)
def parse_article(self, response):
item = HbrArticleItem()
# 標題 - 優先使用 HBR Taiwan 實際使用的選擇器
title = response.css('h1.articleTitle::text, h1.article-title::text, h1::text, .article-title::text, .post-title::text').get()
if not title:
# 嘗試從 title 標籤提取
title = response.css('title::text').get()
# 移除網站名稱後綴
if title and '' in title:
title = title.split('')[0].strip()
item['title'] = title.strip() if title else ''
# URL
item['url'] = response.url
# 作者 - 優先使用 HBR Taiwan 實際使用的選擇器
author = response.css('.authorName::text, .author::text, .byline::text, .writer::text, .author-name::text').get()
if not author:
# 嘗試從 meta 標籤獲取
author = response.css('meta[name="author"]::attr(content)').get()
if not author:
# 嘗試從作者區塊提取
author = response.css('.authorBox .authorName::text, .author-info .authorName::text').get()
item['author'] = author.strip() if author else ''
# 發布日期
publish_date = response.css('.date::text, .publish-date::text, .post-date::text').get()
if not publish_date:
publish_date = response.css('meta[property="article:published_time"]::attr(content)').get()
item['publish_date'] = publish_date.strip() if publish_date else ''
# 摘要
summary = response.css('.summary::text, .excerpt::text, .description::text').get()
if not summary:
summary = response.css('meta[name="description"]::attr(content)').get()
item['summary'] = summary.strip() if summary else ''
# 檢查是否為付費文章
paywall_indicators = response.css('.paywall, .premium, .subscription-required, .member-only')
is_paywalled = 1 if paywall_indicators else 0
item['is_paywalled'] = is_paywalled
# 分類
category = response.css('.category::text, .section::text, .topic::text').get()
if not category:
# 從 URL 路徑推斷分類
url_parts = response.url.split('/')
if len(url_parts) > 3:
category = url_parts[3]
item['category'] = category.strip() if category else ''
# 標籤
tags = response.css('.tags a::text, .tag::text, .keywords a::text').getall()
item['tags'] = [tag.strip() for tag in tags if tag.strip()]
# 文章內容(僅非付費文章)
content = ''
if not is_paywalled:
content_selectors = [
'.articleContent',
'.article-content',
'.post-content',
'.content',
'.entry-content',
'.article-body',
'.post-body',
'.articleText'
]
for selector in content_selectors:
content_elements = response.css(selector)
if content_elements:
# 提取所有段落文字
paragraphs = content_elements.css('p::text').getall()
if paragraphs:
content = ' '.join(paragraphs)
else:
# 如果沒有段落,提取所有文字
content = ' '.join(content_elements.css('::text').getall())
if content.strip():
break
item['content'] = content.strip() if content else ''
yield item

11
hbr_crawler/scrapy.cfg Normal file
View File

@@ -0,0 +1,11 @@
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.io/en/latest/deploy.html
[settings]
default = hbr_crawler.settings
[deploy]
#url = http://localhost:6800/
project = hbr_crawler