""" 排程服務模組 處理每日新聞抓取與報告產生 """ from datetime import datetime, date from typing import List from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from sqlalchemy.orm import Session import logging from app.db.session import SessionLocal from app.core.config import settings from app.models import ( NewsSource, NewsArticle, CrawlJob, CrawlStatus, Group, Keyword, ArticleGroupMatch, Report, ReportArticle, ReportStatus ) from app.services.crawler_service import get_crawler from app.services.llm_service import generate_summary from app.services.notification_service import send_delay_notification logger = logging.getLogger(__name__) scheduler = BackgroundScheduler() def run_daily_crawl(): """執行每日新聞抓取""" logger.info("開始每日新聞抓取...") db = SessionLocal() try: # 取得所有啟用的新聞來源 sources = db.query(NewsSource).filter(NewsSource.is_active == True).all() # 取得所有關鍵字 all_keywords = db.query(Keyword).filter(Keyword.is_active == True).all() keywords_list = list(set([kw.keyword for kw in all_keywords])) for source in sources: logger.info(f"抓取來源: {source.name}") # 建立抓取任務記錄 job = CrawlJob( source_id=source.id, status=CrawlStatus.RUNNING, scheduled_at=datetime.now(), started_at=datetime.now() ) db.add(job) db.commit() try: # 取得爬蟲 crawler = get_crawler(source.code) # 抓取文章列表 articles_data = crawler.get_article_list(keywords_list) articles_count = 0 for article_data in articles_data: # 檢查是否已存在 existing = db.query(NewsArticle).filter( NewsArticle.source_id == source.id, NewsArticle.url == article_data["url"] ).first() if existing: continue # 抓取全文 content = crawler.get_article_content(article_data["url"]) # 儲存文章 article = NewsArticle( source_id=source.id, title=article_data["title"], url=article_data["url"], content=content, published_at=article_data.get("published_at"), crawled_at=datetime.now() ) db.add(article) db.commit() db.refresh(article) # 關鍵字匹配 match_article_to_groups(db, article) articles_count += 1 # 更新任務狀態 job.status = CrawlStatus.COMPLETED job.completed_at = datetime.now() job.articles_count = articles_count crawler.close() except Exception as e: job.status = CrawlStatus.FAILED job.completed_at = datetime.now() job.error_message = str(e) job.retry_count += 1 logger.error(f"抓取失敗 (來源: {source.name})", exc_info=True) db.commit() # 產生今日報告 generate_daily_reports(db) logger.info("每日新聞抓取完成") except Exception as e: logger.error("抓取過程發生錯誤", exc_info=True) finally: db.close() def match_article_to_groups(db: Session, article: NewsArticle): """將文章匹配到群組""" # 取得所有群組及其關鍵字 groups = db.query(Group).filter(Group.is_active == True).all() article_text = f"{article.title} {article.content or ''}" for group in groups: keywords = db.query(Keyword).filter( Keyword.group_id == group.id, Keyword.is_active == True ).all() matched_keywords = [] for kw in keywords: if kw.keyword.lower() in article_text.lower(): matched_keywords.append(kw.keyword) if matched_keywords: # 計算匹配分數 score = len(matched_keywords) / len(keywords) * 100 if keywords else 0 match = ArticleGroupMatch( article_id=article.id, group_id=group.id, matched_keywords=matched_keywords, match_score=score ) db.add(match) db.commit() def generate_daily_reports(db: Session): """產生今日報告""" logger.info("產生今日報告...") today = date.today() groups = db.query(Group).filter(Group.is_active == True).all() for group in groups: # 檢查今日報告是否已存在 existing = db.query(Report).filter( Report.group_id == group.id, Report.report_date == today ).first() if existing: continue # 取得今日匹配的文章 matches = db.query(ArticleGroupMatch).filter( ArticleGroupMatch.group_id == group.id ).join(NewsArticle).filter( NewsArticle.crawled_at >= datetime.combine(today, datetime.min.time()) ).all() if not matches: continue # 建立報告 report = Report( group_id=group.id, title=f"{group.name}日報 - {today.strftime('%Y/%m/%d')}", report_date=today, status=ReportStatus.DRAFT ) db.add(report) db.commit() db.refresh(report) # 關聯文章 articles = [] for match in matches: article = db.query(NewsArticle).filter(NewsArticle.id == match.article_id).first() if article: ra = ReportArticle( report_id=report.id, article_id=article.id, is_included=True ) db.add(ra) articles.append(article) db.commit() # 產生 AI 摘要 if articles: summary = generate_summary(group, articles) report.ai_summary = summary report.status = ReportStatus.PENDING db.commit() logger.info(f"已產生報告: {report.title} ({len(articles)} 篇文章)") def check_publish_deadline(): """檢查發布截止時間""" db = SessionLocal() try: today = date.today() # 取得尚未發布的報告 pending_reports = db.query(Report).filter( Report.report_date == today, Report.status.in_([ReportStatus.DRAFT, ReportStatus.PENDING]) ).all() for report in pending_reports: report.status = ReportStatus.DELAYED send_delay_notification(db, report) db.commit() finally: db.close() def init_scheduler(): """初始化排程器""" # 解析排程時間 crawl_time = settings.crawl_schedule_time.split(":") crawl_hour = int(crawl_time[0]) crawl_minute = int(crawl_time[1]) deadline_time = "09:00".split(":") # 可從設定讀取 deadline_hour = int(deadline_time[0]) deadline_minute = int(deadline_time[1]) # 每日抓取任務 scheduler.add_job( run_daily_crawl, CronTrigger(hour=crawl_hour, minute=crawl_minute), id="daily_crawl", replace_existing=True ) # 發布截止時間檢查 scheduler.add_job( check_publish_deadline, CronTrigger(hour=deadline_hour, minute=deadline_minute), id="check_deadline", replace_existing=True ) # 啟動排程器 if not scheduler.running: scheduler.start() logger.info(f"排程器已啟動: 每日 {settings.crawl_schedule_time} 抓取") def shutdown_scheduler(): """關閉排程器""" if scheduler.running: scheduler.shutdown()