""" KPI 表單服務 """ from datetime import datetime from decimal import Decimal from typing import Optional, List from fastapi import HTTPException, status from sqlalchemy.orm import Session from app.models.employee import Employee from app.models.kpi_period import KPIPeriod from app.models.kpi_sheet import KPISheet, KPISheetStatus, VALID_STATUS_TRANSITIONS from app.models.kpi_item import KPIItem from app.models.kpi_template import KPITemplate, KPIPreset from app.models.kpi_review_log import KPIReviewLog from app.schemas.kpi_sheet import KPISheetCreate, SelfEvalRequest, ManagerEvalRequest from app.schemas.kpi_item import KPIItemCreate # 等級對應獎金月數 LEVEL_MONTHS = { 0: Decimal("0"), 1: Decimal("0.25"), 2: Decimal("0.5"), 3: Decimal("0.75"), 4: Decimal("1.0"), } class KPISheetService: """KPI 表單服務""" def __init__(self, db: Session): self.db = db # ==================== 查詢 ==================== def get_by_id(self, sheet_id: int) -> Optional[KPISheet]: """根據 ID 取得表單""" return self.db.query(KPISheet).filter(KPISheet.id == sheet_id).first() def get_by_employee_period( self, employee_id: int, period_id: int ) -> Optional[KPISheet]: """根據員工和期間取得表單""" return ( self.db.query(KPISheet) .filter( KPISheet.employee_id == employee_id, KPISheet.period_id == period_id ) .first() ) def get_multi( self, period_id: Optional[int] = None, department_id: Optional[int] = None, status: Optional[str] = None, skip: int = 0, limit: int = 100, ) -> List[KPISheet]: """查詢多筆表單""" query = self.db.query(KPISheet) if period_id: query = query.filter(KPISheet.period_id == period_id) if department_id: query = query.filter(KPISheet.department_id == department_id) if status: query = query.filter(KPISheet.status == status) return query.offset(skip).limit(limit).all() def get_my_sheets(self, employee_id: int) -> List[KPISheet]: """取得我的 KPI 表單""" return ( self.db.query(KPISheet) .filter(KPISheet.employee_id == employee_id) .order_by(KPISheet.created_at.desc()) .all() ) def get_pending_for_manager(self, manager_id: int) -> List[KPISheet]: """取得待主管審核的表單""" return ( self.db.query(KPISheet) .join(Employee, KPISheet.employee_id == Employee.id) .filter( Employee.manager_id == manager_id, KPISheet.status == KPISheetStatus.PENDING, ) .all() ) # ==================== 建立 ==================== def create(self, employee: Employee, data: KPISheetCreate) -> KPISheet: """建立 KPI 表單""" # 檢查期間是否在設定期間 period = self.db.query(KPIPeriod).filter(KPIPeriod.id == data.period_id).first() if not period: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={"code": "PERIOD001", "message": "期間不存在"}, ) if period.status != "setting": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={"code": "PERIOD001", "message": "目前不在 KPI 設定期間"}, ) # 檢查是否已存在 existing = self.get_by_employee_period(employee.id, data.period_id) if existing: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail={"code": "KPI005", "message": "該期間已有 KPI 表單"}, ) # 建立表單 sheet = KPISheet( employee_id=employee.id, period_id=data.period_id, department_id=employee.department_id, status=KPISheetStatus.DRAFT, ) self.db.add(sheet) self.db.flush() # 建立項目 for i, item_data in enumerate(data.items): item = KPIItem( sheet_id=sheet.id, template_id=item_data.template_id, sort_order=i, name=item_data.name, category=item_data.category, weight=item_data.weight, level0_criteria=item_data.level0_criteria, level1_criteria=item_data.level1_criteria, level2_criteria=item_data.level2_criteria, level3_criteria=item_data.level3_criteria, level4_criteria=item_data.level4_criteria, ) self.db.add(item) self.db.commit() self.db.refresh(sheet) return sheet def copy_from_previous(self, employee: Employee, period_id: int) -> KPISheet: """從上期複製 KPI""" # 找到上一期表單 prev_sheet = ( self.db.query(KPISheet) .filter( KPISheet.employee_id == employee.id, KPISheet.period_id != period_id, KPISheet.status == KPISheetStatus.COMPLETED, ) .order_by(KPISheet.created_at.desc()) .first() ) if not prev_sheet: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={"code": "KPI006", "message": "找不到上期 KPI 可供複製"}, ) # 轉換為建立資料 items = [ KPIItemCreate( template_id=item.template_id, name=item.name, category=item.category, weight=item.weight, level0_criteria=item.level0_criteria, level1_criteria=item.level1_criteria, level2_criteria=item.level2_criteria, level3_criteria=item.level3_criteria, level4_criteria=item.level4_criteria, ) for item in prev_sheet.items ] data = KPISheetCreate(period_id=period_id, items=items) return self.create(employee, data) def apply_preset( self, employee: Employee, period_id: int, preset_id: int ) -> KPISheet: """套用預設組合""" preset = self.db.query(KPIPreset).filter(KPIPreset.id == preset_id).first() if not preset: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={"code": "PRESET001", "message": "預設組合不存在"}, ) # 轉換為建立資料 items = [] for preset_item in preset.items: template = preset_item.template items.append( KPIItemCreate( template_id=template.id, name=template.name, category=template.category, weight=preset_item.default_weight, level0_criteria=template.level0_desc, level1_criteria=template.level1_desc, level2_criteria=template.level2_desc, level3_criteria=template.level3_desc, level4_criteria=template.level4_desc, ) ) data = KPISheetCreate(period_id=period_id, items=items) return self.create(employee, data) # ==================== 驗證 ==================== def validate_weight(self, sheet_id: int) -> bool: """驗證權重總和是否等於 100""" items = self.db.query(KPIItem).filter(KPIItem.sheet_id == sheet_id).all() total_weight = sum(item.weight for item in items) return total_weight == 100 def _check_status_transition(self, sheet: KPISheet, new_status: str) -> None: """檢查狀態轉換是否合法""" if not sheet.can_transition_to(new_status): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={ "code": "KPI003", "message": f"無法從 {sheet.status} 轉換到 {new_status}", }, ) def _log_action( self, sheet: KPISheet, action: str, actor: Employee, from_status: str, to_status: str, comment: Optional[str] = None, ) -> None: """記錄審核動作""" log = KPIReviewLog( sheet_id=sheet.id, action=action, actor_id=actor.id, from_status=from_status, to_status=to_status, comment=comment, ) self.db.add(log) # ==================== 審核流程 ==================== def submit(self, sheet: KPISheet, actor: Employee) -> KPISheet: """提交 KPI""" from_status = sheet.status self._check_status_transition(sheet, KPISheetStatus.PENDING) # 檢查權重 if not self.validate_weight(sheet.id): total = sum(item.weight for item in sheet.items) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={ "code": "KPI001", "message": "權重總和必須等於 100%", "details": {"current_total": total, "expected": 100}, }, ) # 檢查項目數量 if len(sheet.items) < 3: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={ "code": "KPI002", "message": "KPI 項目數量不足,至少需要 3 項", }, ) sheet.status = KPISheetStatus.PENDING sheet.submitted_at = datetime.utcnow() self._log_action(sheet, "submit", actor, from_status, sheet.status) self.db.commit() self.db.refresh(sheet) return sheet def approve( self, sheet: KPISheet, actor: Employee, comment: Optional[str] = None ) -> KPISheet: """審核通過""" from_status = sheet.status self._check_status_transition(sheet, KPISheetStatus.APPROVED) sheet.status = KPISheetStatus.APPROVED sheet.approved_by = actor.id sheet.approved_at = datetime.utcnow() sheet.approve_comment = comment self._log_action(sheet, "approve", actor, from_status, sheet.status, comment) self.db.commit() self.db.refresh(sheet) return sheet def reject(self, sheet: KPISheet, actor: Employee, reason: str) -> KPISheet: """退回""" from_status = sheet.status self._check_status_transition(sheet, KPISheetStatus.DRAFT) sheet.status = KPISheetStatus.DRAFT sheet.rejected_by = actor.id sheet.rejected_at = datetime.utcnow() sheet.reject_reason = reason self._log_action(sheet, "reject", actor, from_status, sheet.status, reason) self.db.commit() self.db.refresh(sheet) return sheet # ==================== 評核 ==================== def self_eval( self, sheet: KPISheet, actor: Employee, data: SelfEvalRequest ) -> KPISheet: """員工自評""" from_status = sheet.status # 如果是 approved 狀態,先轉換到 self_eval if sheet.status == KPISheetStatus.APPROVED: sheet.status = KPISheetStatus.SELF_EVAL self._check_status_transition(sheet, KPISheetStatus.MANAGER_EVAL) # 更新項目自評 item_map = {item.id: item for item in sheet.items} for eval_item in data.items: item = item_map.get(eval_item.id) if item: item.self_eval_level = eval_item.level item.self_eval_note = eval_item.note sheet.status = KPISheetStatus.MANAGER_EVAL sheet.self_eval_at = datetime.utcnow() self._log_action(sheet, "self_eval", actor, from_status, sheet.status) self.db.commit() self.db.refresh(sheet) return sheet def manager_eval( self, sheet: KPISheet, actor: Employee, data: ManagerEvalRequest ) -> KPISheet: """主管評核""" from_status = sheet.status self._check_status_transition(sheet, KPISheetStatus.COMPLETED) # 更新項目評核 item_map = {item.id: item for item in sheet.items} for eval_item in data.items: item = item_map.get(eval_item.id) if item: item.final_level = eval_item.level item.final_note = eval_item.note # 計算總分 total_score = Decimal("0") for item in sheet.items: if item.final_level is not None: weight_ratio = Decimal(str(item.weight)) / Decimal("100") level_month = LEVEL_MONTHS.get(item.final_level, Decimal("0")) total_score += weight_ratio * level_month sheet.status = KPISheetStatus.COMPLETED sheet.manager_eval_by = actor.id sheet.manager_eval_at = datetime.utcnow() sheet.manager_eval_comment = data.comment sheet.total_score = total_score self._log_action( sheet, "manager_eval", actor, from_status, sheet.status, data.comment ) self.db.commit() self.db.refresh(sheet) return sheet # ==================== 刪除 ==================== def delete(self, sheet: KPISheet) -> None: """刪除表單(只能刪除草稿)""" if sheet.status != KPISheetStatus.DRAFT: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={"code": "KPI007", "message": "只能刪除草稿狀態的表單"}, ) self.db.delete(sheet) self.db.commit()