Files
SalesPipeline/backend/app/services/report_generator.py
2026-01-16 18:16:33 +08:00

191 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import io
from typing import List, Dict, Any
from datetime import datetime
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4, landscape
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from sqlalchemy.orm import Session
from app.models.dit import DitRecord
from app.models.sample import SampleRecord
from app.models.order import OrderRecord
from app.models.match import MatchResult, MatchStatus
class ReportGenerator:
def __init__(self, db: Session):
self.db = db
def get_attribution_data(self) -> List[Dict[str, Any]]:
"""取得歸因明細資料"""
dit_records = self.db.query(DitRecord).all()
result = []
for dit in dit_records:
row = {
'op_id': dit.op_id,
'customer': dit.customer,
'pn': dit.pn,
'eau': dit.eau,
'stage': dit.stage,
'sample_order': None,
'order_no': None,
'order_status': None,
'order_amount': None
}
# 找到已接受的樣品匹配
sample_match = self.db.query(MatchResult).filter(
MatchResult.dit_id == dit.id,
MatchResult.target_type == 'SAMPLE',
MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched])
).first()
if sample_match:
sample = self.db.query(SampleRecord).filter(
SampleRecord.id == sample_match.target_id
).first()
if sample:
row['sample_order'] = sample.order_no
# 找到已接受的訂單匹配
order_match = self.db.query(MatchResult).filter(
MatchResult.dit_id == dit.id,
MatchResult.target_type == 'ORDER',
MatchResult.status.in_([MatchStatus.accepted, MatchStatus.auto_matched])
).first()
if order_match:
order = self.db.query(OrderRecord).filter(
OrderRecord.id == order_match.target_id
).first()
if order:
row['order_no'] = order.order_no
row['order_status'] = order.status.value if order.status else None
row['order_amount'] = order.amount
result.append(row)
return result
def generate_excel(self) -> io.BytesIO:
"""產生 Excel 報表 (包含三個分頁DIT歸因明細, 成功送樣, 取得訂單)"""
wb = Workbook()
# 取得所有資料
all_data = self.get_attribution_data()
# 定義樣式
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4F46E5", end_color="4F46E5", fill_type="solid")
header_alignment = Alignment(horizontal="center", vertical="center")
headers = ['OP編號', '客戶名稱', '料號', 'EAU', '階段', '樣品單號', '訂單單號', '訂單狀態', '訂單金額']
column_widths = [15, 30, 20, 12, 15, 15, 15, 12, 12]
def create_sheet(sheet_name, data_rows):
if sheet_name == "DIT歸因明細":
ws = wb.active
ws.title = sheet_name
else:
ws = wb.create_sheet(title=sheet_name)
# 表頭
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
# 資料
for row_idx, row_data in enumerate(data_rows, 2):
ws.cell(row=row_idx, column=1, value=row_data['op_id'])
ws.cell(row=row_idx, column=2, value=row_data['customer'])
ws.cell(row=row_idx, column=3, value=row_data['pn'])
ws.cell(row=row_idx, column=4, value=row_data['eau'])
ws.cell(row=row_idx, column=5, value=row_data['stage'])
ws.cell(row=row_idx, column=6, value=row_data['sample_order'] or '-')
ws.cell(row=row_idx, column=7, value=row_data['order_no'] or '-')
ws.cell(row=row_idx, column=8, value=row_data['order_status'] or '-')
ws.cell(row=row_idx, column=9, value=row_data['order_amount'] or 0)
# 調整欄寬
for col, width in enumerate(column_widths, 1):
ws.column_dimensions[chr(64 + col)].width = width
# 1. DIT歸因明細 (全部)
create_sheet("DIT歸因明細", all_data)
# 2. 成功送樣 (有樣品單號)
success_samples = [row for row in all_data if row['sample_order']]
create_sheet("成功送樣", success_samples)
# 3. 取得訂單 (有訂單單號)
orders_received = [row for row in all_data if row['order_no']]
create_sheet("取得訂單", orders_received)
# 儲存到 BytesIO
output = io.BytesIO()
wb.save(output)
output.seek(0)
return output
def generate_pdf(self) -> io.BytesIO:
"""產生 PDF 報表"""
output = io.BytesIO()
doc = SimpleDocTemplate(output, pagesize=landscape(A4))
elements = []
styles = getSampleStyleSheet()
# 標題
title = Paragraph("DIT Attribution Report", styles['Title'])
elements.append(title)
elements.append(Spacer(1, 20))
# 日期
date_text = Paragraph(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}", styles['Normal'])
elements.append(date_text)
elements.append(Spacer(1, 20))
# 表格資料
data = self.get_attribution_data()
table_data = [['OP No.', 'Customer', 'P/N', 'EAU', 'Stage', 'Sample', 'Order', 'Status', 'Amount']]
for row in data:
table_data.append([
row['op_id'],
row['customer'][:20] + '...' if len(row['customer']) > 20 else row['customer'],
row['pn'],
str(row['eau']),
row['stage'] or '-',
row['sample_order'] or '-',
row['order_no'] or '-',
row['order_status'] or '-',
f"${row['order_amount']:,.0f}" if row['order_amount'] else '-'
])
# 建立表格
table = Table(table_data)
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#4F46E5')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('FONTSIZE', (0, 1), (-1, -1), 8),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.white, colors.HexColor('#F8FAFC')]),
]))
elements.append(table)
doc.build(elements)
output.seek(0)
return output