Files
KPI-management/app/services/dashboard_service.py
DonaldFang 方士碩 f810ddc2ea Initial commit: KPI Management System Backend
Features:
- FastAPI backend with JWT authentication
- MySQL database with SQLAlchemy ORM
- KPI workflow: draft → pending → approved → evaluation → completed
- Ollama LLM API integration for AI features
- Gitea API integration for version control
- Complete API endpoints for KPI, dashboard, notifications

Tables: KPI_D_* prefix naming convention

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 16:20:57 +08:00

278 lines
8.8 KiB
Python

"""
儀表板服務
"""
from datetime import datetime
from typing import Optional, List
from decimal import Decimal
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.models.kpi_sheet import KPISheet, KPISheetStatus
from app.models.kpi_period import KPIPeriod
from app.models.department import Department
from app.models.employee import Employee
from app.models.dashboard_alert import DashboardAlert
from app.schemas.dashboard import (
ProgressStats,
DistributionItem,
TrendItem,
DashboardProgressResponse,
DashboardDistributionResponse,
DashboardTrendsResponse,
)
class DashboardService:
"""儀表板服務"""
def __init__(self, db: Session):
self.db = db
def get_progress(
self, period_id: Optional[int] = None, department_id: Optional[int] = None
) -> DashboardProgressResponse:
"""取得進度統計"""
# 取得期間
if period_id:
period = self.db.query(KPIPeriod).filter(KPIPeriod.id == period_id).first()
else:
period = (
self.db.query(KPIPeriod)
.filter(KPIPeriod.status != "completed")
.order_by(KPIPeriod.start_date.desc())
.first()
)
if not period:
return DashboardProgressResponse(
period_code="N/A",
period_name="無期間資料",
stats=ProgressStats(
total=0,
draft=0,
pending=0,
approved=0,
self_eval=0,
manager_eval=0,
completed=0,
),
completion_rate=0.0,
)
# 查詢統計
query = self.db.query(KPISheet).filter(KPISheet.period_id == period.id)
if department_id:
query = query.filter(KPISheet.department_id == department_id)
sheets = query.all()
total = len(sheets)
stats = ProgressStats(
total=total,
draft=sum(1 for s in sheets if s.status == KPISheetStatus.DRAFT),
pending=sum(1 for s in sheets if s.status == KPISheetStatus.PENDING),
approved=sum(1 for s in sheets if s.status == KPISheetStatus.APPROVED),
self_eval=sum(1 for s in sheets if s.status == KPISheetStatus.SELF_EVAL),
manager_eval=sum(
1 for s in sheets if s.status == KPISheetStatus.MANAGER_EVAL
),
completed=sum(1 for s in sheets if s.status == KPISheetStatus.COMPLETED),
)
completion_rate = (stats.completed / total * 100) if total > 0 else 0.0
return DashboardProgressResponse(
period_code=period.code,
period_name=period.name,
stats=stats,
completion_rate=round(completion_rate, 1),
)
def get_distribution(
self, period_id: Optional[int] = None
) -> DashboardDistributionResponse:
"""取得分佈統計"""
# 取得期間
if period_id:
period = self.db.query(KPIPeriod).filter(KPIPeriod.id == period_id).first()
else:
period = (
self.db.query(KPIPeriod)
.filter(KPIPeriod.status != "completed")
.order_by(KPIPeriod.start_date.desc())
.first()
)
if not period:
return DashboardDistributionResponse(
by_department=[],
by_status=[],
by_score_range=[],
)
sheets = (
self.db.query(KPISheet).filter(KPISheet.period_id == period.id).all()
)
total = len(sheets) or 1 # 避免除以零
# 按部門分佈
dept_counts = {}
for sheet in sheets:
dept_id = sheet.department_id
dept = self.db.query(Department).filter(Department.id == dept_id).first()
dept_name = dept.name if dept else "未知"
dept_counts[dept_name] = dept_counts.get(dept_name, 0) + 1
by_department = [
DistributionItem(
label=name, count=count, percentage=round(count / total * 100, 1)
)
for name, count in dept_counts.items()
]
# 按狀態分佈
status_labels = {
KPISheetStatus.DRAFT: "草稿",
KPISheetStatus.PENDING: "待審核",
KPISheetStatus.APPROVED: "已核准",
KPISheetStatus.SELF_EVAL: "自評中",
KPISheetStatus.MANAGER_EVAL: "主管評核中",
KPISheetStatus.COMPLETED: "已完成",
}
status_counts = {}
for sheet in sheets:
label = status_labels.get(sheet.status, sheet.status)
status_counts[label] = status_counts.get(label, 0) + 1
by_status = [
DistributionItem(
label=name, count=count, percentage=round(count / total * 100, 1)
)
for name, count in status_counts.items()
]
# 按分數區間分佈(僅已完成)
completed_sheets = [
s for s in sheets if s.status == KPISheetStatus.COMPLETED and s.total_score
]
score_ranges = {
"0-0.25": 0,
"0.25-0.5": 0,
"0.5-0.75": 0,
"0.75-1.0": 0,
}
for sheet in completed_sheets:
score = float(sheet.total_score)
if score < 0.25:
score_ranges["0-0.25"] += 1
elif score < 0.5:
score_ranges["0.25-0.5"] += 1
elif score < 0.75:
score_ranges["0.5-0.75"] += 1
else:
score_ranges["0.75-1.0"] += 1
completed_total = len(completed_sheets) or 1
by_score_range = [
DistributionItem(
label=name,
count=count,
percentage=round(count / completed_total * 100, 1),
)
for name, count in score_ranges.items()
]
return DashboardDistributionResponse(
by_department=by_department,
by_status=by_status,
by_score_range=by_score_range,
)
def get_trends(self, limit: int = 4) -> DashboardTrendsResponse:
"""取得趨勢統計"""
periods = (
self.db.query(KPIPeriod)
.filter(KPIPeriod.status == "completed")
.order_by(KPIPeriod.end_date.desc())
.limit(limit)
.all()
)
trends = []
for period in reversed(periods):
completed_sheets = (
self.db.query(KPISheet)
.filter(
KPISheet.period_id == period.id,
KPISheet.status == KPISheetStatus.COMPLETED,
)
.all()
)
if completed_sheets:
scores = [
float(s.total_score) for s in completed_sheets if s.total_score
]
avg_score = sum(scores) / len(scores) if scores else 0
else:
avg_score = 0
trends.append(
TrendItem(
period=period.code,
average_score=round(avg_score, 3),
completed_count=len(completed_sheets),
)
)
return DashboardTrendsResponse(trends=trends)
# ==================== 警示 ====================
def get_alerts(
self, is_resolved: Optional[bool] = False, limit: int = 50
) -> List[DashboardAlert]:
"""取得警示"""
query = self.db.query(DashboardAlert)
if is_resolved is not None:
query = query.filter(DashboardAlert.is_resolved == is_resolved)
return query.order_by(DashboardAlert.created_at.desc()).limit(limit).all()
def create_alert(
self,
alert_type: str,
severity: str,
title: str,
description: Optional[str] = None,
related_sheet_id: Optional[int] = None,
related_employee_id: Optional[int] = None,
) -> DashboardAlert:
"""建立警示"""
alert = DashboardAlert(
alert_type=alert_type,
severity=severity,
title=title,
description=description,
related_sheet_id=related_sheet_id,
related_employee_id=related_employee_id,
)
self.db.add(alert)
self.db.commit()
self.db.refresh(alert)
return alert
def resolve_alert(self, alert_id: int, resolver_id: int) -> Optional[DashboardAlert]:
"""解決警示"""
alert = self.db.query(DashboardAlert).filter(DashboardAlert.id == alert_id).first()
if not alert:
return None
alert.is_resolved = True
alert.resolved_at = datetime.utcnow()
alert.resolved_by = resolver_id
self.db.commit()
self.db.refresh(alert)
return alert