""" 儀表板服務 """ from datetime import datetime from typing import Optional, List from decimal import Decimal from sqlalchemy import func from sqlalchemy.orm import Session from app.models.kpi_sheet import KPISheet, KPISheetStatus from app.models.kpi_period import KPIPeriod from app.models.department import Department from app.models.employee import Employee from app.models.dashboard_alert import DashboardAlert from app.schemas.dashboard import ( ProgressStats, DistributionItem, TrendItem, DashboardProgressResponse, DashboardDistributionResponse, DashboardTrendsResponse, ) class DashboardService: """儀表板服務""" def __init__(self, db: Session): self.db = db def get_progress( self, period_id: Optional[int] = None, department_id: Optional[int] = None ) -> DashboardProgressResponse: """取得進度統計""" # 取得期間 if period_id: period = self.db.query(KPIPeriod).filter(KPIPeriod.id == period_id).first() else: period = ( self.db.query(KPIPeriod) .filter(KPIPeriod.status != "completed") .order_by(KPIPeriod.start_date.desc()) .first() ) if not period: return DashboardProgressResponse( period_code="N/A", period_name="無期間資料", stats=ProgressStats( total=0, draft=0, pending=0, approved=0, self_eval=0, manager_eval=0, completed=0, ), completion_rate=0.0, ) # 查詢統計 query = self.db.query(KPISheet).filter(KPISheet.period_id == period.id) if department_id: query = query.filter(KPISheet.department_id == department_id) sheets = query.all() total = len(sheets) stats = ProgressStats( total=total, draft=sum(1 for s in sheets if s.status == KPISheetStatus.DRAFT), pending=sum(1 for s in sheets if s.status == KPISheetStatus.PENDING), approved=sum(1 for s in sheets if s.status == KPISheetStatus.APPROVED), self_eval=sum(1 for s in sheets if s.status == KPISheetStatus.SELF_EVAL), manager_eval=sum( 1 for s in sheets if s.status == KPISheetStatus.MANAGER_EVAL ), completed=sum(1 for s in sheets if s.status == KPISheetStatus.COMPLETED), ) completion_rate = (stats.completed / total * 100) if total > 0 else 0.0 return DashboardProgressResponse( period_code=period.code, period_name=period.name, stats=stats, completion_rate=round(completion_rate, 1), ) def get_distribution( self, period_id: Optional[int] = None ) -> DashboardDistributionResponse: """取得分佈統計""" # 取得期間 if period_id: period = self.db.query(KPIPeriod).filter(KPIPeriod.id == period_id).first() else: period = ( self.db.query(KPIPeriod) .filter(KPIPeriod.status != "completed") .order_by(KPIPeriod.start_date.desc()) .first() ) if not period: return DashboardDistributionResponse( by_department=[], by_status=[], by_score_range=[], ) sheets = ( self.db.query(KPISheet).filter(KPISheet.period_id == period.id).all() ) total = len(sheets) or 1 # 避免除以零 # 按部門分佈 dept_counts = {} for sheet in sheets: dept_id = sheet.department_id dept = self.db.query(Department).filter(Department.id == dept_id).first() dept_name = dept.name if dept else "未知" dept_counts[dept_name] = dept_counts.get(dept_name, 0) + 1 by_department = [ DistributionItem( label=name, count=count, percentage=round(count / total * 100, 1) ) for name, count in dept_counts.items() ] # 按狀態分佈 status_labels = { KPISheetStatus.DRAFT: "草稿", KPISheetStatus.PENDING: "待審核", KPISheetStatus.APPROVED: "已核准", KPISheetStatus.SELF_EVAL: "自評中", KPISheetStatus.MANAGER_EVAL: "主管評核中", KPISheetStatus.COMPLETED: "已完成", } status_counts = {} for sheet in sheets: label = status_labels.get(sheet.status, sheet.status) status_counts[label] = status_counts.get(label, 0) + 1 by_status = [ DistributionItem( label=name, count=count, percentage=round(count / total * 100, 1) ) for name, count in status_counts.items() ] # 按分數區間分佈(僅已完成) completed_sheets = [ s for s in sheets if s.status == KPISheetStatus.COMPLETED and s.total_score ] score_ranges = { "0-0.25": 0, "0.25-0.5": 0, "0.5-0.75": 0, "0.75-1.0": 0, } for sheet in completed_sheets: score = float(sheet.total_score) if score < 0.25: score_ranges["0-0.25"] += 1 elif score < 0.5: score_ranges["0.25-0.5"] += 1 elif score < 0.75: score_ranges["0.5-0.75"] += 1 else: score_ranges["0.75-1.0"] += 1 completed_total = len(completed_sheets) or 1 by_score_range = [ DistributionItem( label=name, count=count, percentage=round(count / completed_total * 100, 1), ) for name, count in score_ranges.items() ] return DashboardDistributionResponse( by_department=by_department, by_status=by_status, by_score_range=by_score_range, ) def get_trends(self, limit: int = 4) -> DashboardTrendsResponse: """取得趨勢統計""" periods = ( self.db.query(KPIPeriod) .filter(KPIPeriod.status == "completed") .order_by(KPIPeriod.end_date.desc()) .limit(limit) .all() ) trends = [] for period in reversed(periods): completed_sheets = ( self.db.query(KPISheet) .filter( KPISheet.period_id == period.id, KPISheet.status == KPISheetStatus.COMPLETED, ) .all() ) if completed_sheets: scores = [ float(s.total_score) for s in completed_sheets if s.total_score ] avg_score = sum(scores) / len(scores) if scores else 0 else: avg_score = 0 trends.append( TrendItem( period=period.code, average_score=round(avg_score, 3), completed_count=len(completed_sheets), ) ) return DashboardTrendsResponse(trends=trends) # ==================== 警示 ==================== def get_alerts( self, is_resolved: Optional[bool] = False, limit: int = 50 ) -> List[DashboardAlert]: """取得警示""" query = self.db.query(DashboardAlert) if is_resolved is not None: query = query.filter(DashboardAlert.is_resolved == is_resolved) return query.order_by(DashboardAlert.created_at.desc()).limit(limit).all() def create_alert( self, alert_type: str, severity: str, title: str, description: Optional[str] = None, related_sheet_id: Optional[int] = None, related_employee_id: Optional[int] = None, ) -> DashboardAlert: """建立警示""" alert = DashboardAlert( alert_type=alert_type, severity=severity, title=title, description=description, related_sheet_id=related_sheet_id, related_employee_id=related_employee_id, ) self.db.add(alert) self.db.commit() self.db.refresh(alert) return alert def resolve_alert(self, alert_id: int, resolver_id: int) -> Optional[DashboardAlert]: """解決警示""" alert = self.db.query(DashboardAlert).filter(DashboardAlert.id == alert_id).first() if not alert: return None alert.is_resolved = True alert.resolved_at = datetime.utcnow() alert.resolved_by = resolver_id self.db.commit() self.db.refresh(alert) return alert