feat: implement automation module

- Event-based triggers (Phase 1):
  - Trigger/TriggerLog models with field_change type
  - TriggerService for condition evaluation and action execution
  - Trigger CRUD API endpoints
  - Task integration (status, assignee, priority changes)
  - Frontend: TriggerList, TriggerForm components

- Weekly reports (Phase 2):
  - ScheduledReport/ReportHistory models
  - ReportService for stats generation
  - APScheduler for Friday 16:00 job
  - Report preview/generate/history API
  - Frontend: WeeklyReportPreview, ReportHistory components

- Tests: 23 new tests (14 triggers + 9 reports)
- OpenSpec: add-automation change archived

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beabigegg
2025-12-29 22:59:00 +08:00
parent 3108fe1dff
commit 95c281d8e1
32 changed files with 3163 additions and 3 deletions

View File

@@ -0,0 +1,228 @@
import uuid
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.models import (
User, Task, Project, ScheduledReport, ReportHistory
)
from app.services.notification_service import NotificationService
class ReportService:
"""Service for generating and managing scheduled reports."""
@staticmethod
def get_week_start(date: Optional[datetime] = None) -> datetime:
"""Get the start of the week (Monday) for a given date."""
if date is None:
date = datetime.utcnow()
# Get Monday of the current week
days_since_monday = date.weekday()
week_start = date - timedelta(days=days_since_monday)
return week_start.replace(hour=0, minute=0, second=0, microsecond=0)
@staticmethod
def get_weekly_stats(db: Session, user_id: str, week_start: Optional[datetime] = None) -> Dict[str, Any]:
"""
Get weekly task statistics for a user's projects.
Returns stats for all projects where the user is the owner.
"""
if week_start is None:
week_start = ReportService.get_week_start()
week_end = week_start + timedelta(days=7)
# Get projects owned by the user
projects = db.query(Project).filter(Project.owner_id == user_id).all()
if not projects:
return {
"week_start": week_start.isoformat(),
"week_end": week_end.isoformat(),
"projects": [],
"summary": {
"completed_count": 0,
"in_progress_count": 0,
"overdue_count": 0,
"total_tasks": 0,
}
}
project_ids = [p.id for p in projects]
# Get all tasks for these projects
all_tasks = db.query(Task).filter(Task.project_id.in_(project_ids)).all()
# Categorize tasks
completed_tasks = []
in_progress_tasks = []
overdue_tasks = []
now = datetime.utcnow()
for task in all_tasks:
status_name = task.status.name.lower() if task.status else ""
# Check if completed (updated this week)
if status_name in ["done", "completed", "完成"]:
if task.updated_at and task.updated_at >= week_start:
completed_tasks.append(task)
# Check if in progress
elif status_name in ["in progress", "進行中", "doing"]:
in_progress_tasks.append(task)
# Check if overdue
if task.due_date and task.due_date < now and status_name not in ["done", "completed", "完成"]:
overdue_tasks.append(task)
# Build project details
project_details = []
for project in projects:
project_tasks = [t for t in all_tasks if t.project_id == project.id]
project_completed = [t for t in completed_tasks if t.project_id == project.id]
project_in_progress = [t for t in in_progress_tasks if t.project_id == project.id]
project_overdue = [t for t in overdue_tasks if t.project_id == project.id]
project_details.append({
"project_id": project.id,
"project_title": project.title,
"completed_count": len(project_completed),
"in_progress_count": len(project_in_progress),
"overdue_count": len(project_overdue),
"total_tasks": len(project_tasks),
"completed_tasks": [{"id": t.id, "title": t.title} for t in project_completed[:5]],
"overdue_tasks": [{"id": t.id, "title": t.title, "due_date": t.due_date.isoformat() if t.due_date else None} for t in project_overdue[:5]],
})
return {
"week_start": week_start.isoformat(),
"week_end": week_end.isoformat(),
"generated_at": datetime.utcnow().isoformat(),
"projects": project_details,
"summary": {
"completed_count": len(completed_tasks),
"in_progress_count": len(in_progress_tasks),
"overdue_count": len(overdue_tasks),
"total_tasks": len(all_tasks),
}
}
@staticmethod
def generate_weekly_report(db: Session, user_id: str) -> Optional[ReportHistory]:
"""
Generate a weekly report for a user and save to history.
"""
# Get or create scheduled report for this user
scheduled_report = db.query(ScheduledReport).filter(
ScheduledReport.recipient_id == user_id,
ScheduledReport.report_type == "weekly",
).first()
if not scheduled_report:
scheduled_report = ScheduledReport(
id=str(uuid.uuid4()),
report_type="weekly",
recipient_id=user_id,
is_active=True,
)
db.add(scheduled_report)
db.flush()
# Generate report content
content = ReportService.get_weekly_stats(db, user_id)
# Save to history
report_history = ReportHistory(
id=str(uuid.uuid4()),
report_id=scheduled_report.id,
content=content,
status="sent",
)
db.add(report_history)
# Update last_sent_at
scheduled_report.last_sent_at = datetime.utcnow()
db.commit()
return report_history
@staticmethod
def send_report_notification(
db: Session,
user_id: str,
report_content: Dict[str, Any],
) -> None:
"""Send a notification with the weekly report summary."""
summary = report_content.get("summary", {})
completed = summary.get("completed_count", 0)
in_progress = summary.get("in_progress_count", 0)
overdue = summary.get("overdue_count", 0)
message = f"本週完成 {completed} 項任務,進行中 {in_progress}"
if overdue > 0:
message += f",逾期 {overdue} 項需關注"
NotificationService.create_notification(
db=db,
user_id=user_id,
notification_type="status_change",
reference_type="report",
reference_id="weekly",
title="週報:專案進度彙整",
message=message,
)
@staticmethod
async def generate_all_weekly_reports(db: Session) -> List[str]:
"""
Generate weekly reports for all active subscriptions.
Called by the scheduler on Friday 16:00.
"""
generated_for = []
# Get all active scheduled reports
active_reports = db.query(ScheduledReport).filter(
ScheduledReport.is_active == True,
ScheduledReport.report_type == "weekly",
).all()
for scheduled_report in active_reports:
try:
# Generate report
content = ReportService.get_weekly_stats(db, scheduled_report.recipient_id)
# Save history
history = ReportHistory(
id=str(uuid.uuid4()),
report_id=scheduled_report.id,
content=content,
status="sent",
)
db.add(history)
# Update last_sent_at
scheduled_report.last_sent_at = datetime.utcnow()
# Send notification
ReportService.send_report_notification(db, scheduled_report.recipient_id, content)
generated_for.append(scheduled_report.recipient_id)
except Exception as e:
# Log failure
history = ReportHistory(
id=str(uuid.uuid4()),
report_id=scheduled_report.id,
content={},
status="failed",
error_message=str(e),
)
db.add(history)
db.commit()
return generated_for

View File

@@ -0,0 +1,200 @@
import uuid
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from app.models import Trigger, TriggerLog, Task, User, Project
from app.services.notification_service import NotificationService
class TriggerService:
"""Service for evaluating and executing triggers."""
SUPPORTED_FIELDS = ["status_id", "assignee_id", "priority"]
SUPPORTED_OPERATORS = ["equals", "not_equals", "changed_to", "changed_from"]
@staticmethod
def evaluate_triggers(
db: Session,
task: Task,
old_values: Dict[str, Any],
new_values: Dict[str, Any],
current_user: User,
) -> List[TriggerLog]:
"""Evaluate all active triggers for a project when task values change."""
logs = []
# Get active field_change triggers for the project
triggers = db.query(Trigger).filter(
Trigger.project_id == task.project_id,
Trigger.is_active == True,
Trigger.trigger_type == "field_change",
).all()
for trigger in triggers:
if TriggerService._check_conditions(trigger.conditions, old_values, new_values):
log = TriggerService._execute_actions(db, trigger, task, current_user, old_values, new_values)
logs.append(log)
return logs
@staticmethod
def _check_conditions(
conditions: Dict[str, Any],
old_values: Dict[str, Any],
new_values: Dict[str, Any],
) -> bool:
"""Check if trigger conditions are met."""
field = conditions.get("field")
operator = conditions.get("operator")
value = conditions.get("value")
if field not in TriggerService.SUPPORTED_FIELDS:
return False
old_value = old_values.get(field)
new_value = new_values.get(field)
if operator == "equals":
return new_value == value
elif operator == "not_equals":
return new_value != value
elif operator == "changed_to":
return old_value != value and new_value == value
elif operator == "changed_from":
return old_value == value and new_value != value
return False
@staticmethod
def _execute_actions(
db: Session,
trigger: Trigger,
task: Task,
current_user: User,
old_values: Dict[str, Any],
new_values: Dict[str, Any],
) -> TriggerLog:
"""Execute trigger actions and log the result."""
actions = trigger.actions if isinstance(trigger.actions, list) else [trigger.actions]
executed_actions = []
error_message = None
try:
for action in actions:
action_type = action.get("type")
if action_type == "notify":
TriggerService._execute_notify_action(db, action, task, current_user, old_values, new_values)
executed_actions.append({"type": action_type, "status": "success"})
status = "success"
except Exception as e:
status = "failed"
error_message = str(e)
executed_actions.append({"type": "error", "message": str(e)})
log = TriggerLog(
id=str(uuid.uuid4()),
trigger_id=trigger.id,
task_id=task.id,
status=status,
details={
"trigger_name": trigger.name,
"old_values": old_values,
"new_values": new_values,
"actions_executed": executed_actions,
},
error_message=error_message,
)
db.add(log)
return log
@staticmethod
def _execute_notify_action(
db: Session,
action: Dict[str, Any],
task: Task,
current_user: User,
old_values: Dict[str, Any],
new_values: Dict[str, Any],
) -> None:
"""Execute a notify action."""
target = action.get("target", "assignee")
template = action.get("template", "任務 {task_title} 已觸發自動化規則")
# Resolve target user
target_user_id = TriggerService._resolve_target(task, target)
if not target_user_id:
return
# Don't notify the user who triggered the action
if target_user_id == current_user.id:
return
# Format message with variables
message = TriggerService._format_template(template, task, old_values, new_values)
NotificationService.create_notification(
db=db,
user_id=target_user_id,
notification_type="status_change",
reference_type="task",
reference_id=task.id,
title=f"自動化通知: {task.title}",
message=message,
)
@staticmethod
def _resolve_target(task: Task, target: str) -> Optional[str]:
"""Resolve notification target to user ID."""
if target == "assignee":
return task.assignee_id
elif target == "creator":
return task.created_by
elif target == "project_owner":
return task.project.owner_id if task.project else None
elif target.startswith("user:"):
return target.split(":", 1)[1]
return None
@staticmethod
def _format_template(
template: str,
task: Task,
old_values: Dict[str, Any],
new_values: Dict[str, Any],
) -> str:
"""Format message template with task variables."""
replacements = {
"{task_title}": task.title,
"{task_id}": task.id,
"{old_value}": str(old_values.get("status_id", old_values.get("assignee_id", old_values.get("priority", "")))),
"{new_value}": str(new_values.get("status_id", new_values.get("assignee_id", new_values.get("priority", "")))),
}
result = template
for key, value in replacements.items():
result = result.replace(key, value)
return result
@staticmethod
def log_execution(
db: Session,
trigger: Trigger,
task: Optional[Task],
status: str,
details: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None,
) -> TriggerLog:
"""Log a trigger execution."""
log = TriggerLog(
id=str(uuid.uuid4()),
trigger_id=trigger.id,
task_id=task.id if task else None,
status=status,
details=details,
error_message=error_message,
)
db.add(log)
return log