Files
hr-position-system/scripts/import_hierarchy_data.py
DonaldFang 方士碩 a6af297623 backup: 完成 HR_position_ 表格前綴重命名與欄位對照表整理
變更內容:
- 所有資料表加上 HR_position_ 前綴
- 整理完整欄位顯示名稱與 ID 對照表
- 模組化 JS 檔案 (admin.js, ai.js, csv.js 等)
- 專案結構優化 (docs/, scripts/, tests/ 等)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-09 12:05:20 +08:00

388 lines
14 KiB
Python

"""
匯入組織階層資料到資料庫
從 hierarchical_data.js 讀取資料並匯入 MySQL
"""
import os
import re
import json
import pymysql
from dotenv import load_dotenv
from datetime import datetime
# Load environment variables
load_dotenv()
# 從 hierarchical_data.js 解析資料
def parse_hierarchical_data():
"""解析 hierarchical_data.js 檔案"""
js_file = os.path.join(os.path.dirname(__file__), 'hierarchical_data.js')
with open(js_file, 'r', encoding='utf-8') as f:
content = f.read()
# 提取 businessToDivision
business_match = re.search(r'const businessToDivision = ({[\s\S]*?});', content)
business_to_division = json.loads(business_match.group(1).replace("'", '"')) if business_match else {}
# 提取 divisionToDepartment
division_match = re.search(r'const divisionToDepartment = ({[\s\S]*?});', content)
division_to_department = json.loads(division_match.group(1).replace("'", '"')) if division_match else {}
# 提取 departmentToPosition
dept_match = re.search(r'const departmentToPosition = ({[\s\S]*?});', content)
department_to_position = json.loads(dept_match.group(1).replace("'", '"')) if dept_match else {}
# 提取 fullHierarchyData
hierarchy_match = re.search(r'const fullHierarchyData = (\[[\s\S]*?\]);', content)
full_hierarchy_data = json.loads(hierarchy_match.group(1)) if hierarchy_match else []
return {
'businessToDivision': business_to_division,
'divisionToDepartment': division_to_department,
'departmentToPosition': department_to_position,
'fullHierarchyData': full_hierarchy_data
}
def generate_code(prefix, index):
"""生成代碼"""
return f"{prefix}{index:03d}"
def import_to_database():
"""匯入資料到資料庫"""
# Database connection parameters
db_config = {
'host': os.getenv('DB_HOST', 'localhost'),
'port': int(os.getenv('DB_PORT', 3306)),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD', ''),
'database': os.getenv('DB_NAME', 'hr_position_system'),
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
print("=" * 60)
print("組織階層資料匯入工具")
print("=" * 60)
print()
# 解析 JS 資料
print("步驟 1: 解析 hierarchical_data.js...")
data = parse_hierarchical_data()
print(f" - 事業體數量: {len(data['businessToDivision'])}")
print(f" - 處級單位對應數: {len(data['divisionToDepartment'])}")
print(f" - 部級單位對應數: {len(data['departmentToPosition'])}")
print(f" - 完整階層記錄數: {len(data['fullHierarchyData'])}")
print()
try:
# 連接資料庫
print("步驟 2: 連接資料庫...")
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print(" 連接成功")
print()
# 先建立資料表(如果不存在)
print("步驟 3: 確認資料表存在...")
create_tables_sql = """
-- 事業體表
CREATE TABLE IF NOT EXISTS business_units (
id INT AUTO_INCREMENT PRIMARY KEY,
business_code VARCHAR(20) NOT NULL UNIQUE,
business_name VARCHAR(100) NOT NULL,
sort_order INT DEFAULT 0,
is_active BOOLEAN DEFAULT TRUE,
remark VARCHAR(500),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_business_name (business_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- 處級單位表
CREATE TABLE IF NOT EXISTS divisions (
id INT AUTO_INCREMENT PRIMARY KEY,
division_code VARCHAR(20) NOT NULL UNIQUE,
division_name VARCHAR(100) NOT NULL,
business_id INT,
sort_order INT DEFAULT 0,
is_active BOOLEAN DEFAULT TRUE,
remark VARCHAR(500),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_division_name (division_name),
INDEX idx_business_id (business_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- 部級單位表
CREATE TABLE IF NOT EXISTS departments (
id INT AUTO_INCREMENT PRIMARY KEY,
department_code VARCHAR(20) NOT NULL UNIQUE,
department_name VARCHAR(100) NOT NULL,
division_id INT,
sort_order INT DEFAULT 0,
is_active BOOLEAN DEFAULT TRUE,
remark VARCHAR(500),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_department_name (department_name),
INDEX idx_division_id (division_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- 組織崗位關聯表
CREATE TABLE IF NOT EXISTS organization_positions (
id INT AUTO_INCREMENT PRIMARY KEY,
business_id INT NOT NULL,
division_id INT NOT NULL,
department_id INT NOT NULL,
position_title VARCHAR(100) NOT NULL,
sort_order INT DEFAULT 0,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_business_id (business_id),
INDEX idx_division_id (division_id),
INDEX idx_department_id (department_id),
INDEX idx_position_title (position_title),
UNIQUE KEY uk_org_position (business_id, division_id, department_id, position_title)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
"""
for statement in create_tables_sql.split(';'):
if statement.strip():
try:
cursor.execute(statement)
except Exception as e:
pass # 表已存在時忽略錯誤
connection.commit()
print(" 資料表已確認")
print()
# 清空現有資料
print("步驟 4: 清空現有資料...")
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
cursor.execute("TRUNCATE TABLE organization_positions")
cursor.execute("TRUNCATE TABLE departments")
cursor.execute("TRUNCATE TABLE divisions")
cursor.execute("TRUNCATE TABLE business_units")
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
connection.commit()
print(" 資料已清空")
print()
# 匯入事業體
print("步驟 5: 匯入事業體...")
business_id_map = {}
business_list = list(data['businessToDivision'].keys())
for idx, business_name in enumerate(business_list, 1):
code = generate_code('BU', idx)
cursor.execute(
"INSERT INTO business_units (business_code, business_name, sort_order) VALUES (%s, %s, %s)",
(code, business_name, idx)
)
business_id_map[business_name] = cursor.lastrowid
connection.commit()
print(f" 匯入 {len(business_list)} 筆事業體")
# 匯入處級單位
print("步驟 6: 匯入處級單位...")
division_id_map = {}
division_idx = 0
for business_name, divisions in data['businessToDivision'].items():
business_id = business_id_map.get(business_name)
for division_name in divisions:
if division_name not in division_id_map:
division_idx += 1
code = generate_code('DIV', division_idx)
cursor.execute(
"INSERT INTO divisions (division_code, division_name, business_id, sort_order) VALUES (%s, %s, %s, %s)",
(code, division_name, business_id, division_idx)
)
division_id_map[division_name] = cursor.lastrowid
connection.commit()
print(f" 匯入 {division_idx} 筆處級單位")
# 匯入部級單位
print("步驟 7: 匯入部級單位...")
department_id_map = {}
dept_idx = 0
for division_name, departments in data['divisionToDepartment'].items():
division_id = division_id_map.get(division_name)
for dept_name in departments:
if dept_name not in department_id_map:
dept_idx += 1
code = generate_code('DEPT', dept_idx)
cursor.execute(
"INSERT INTO departments (department_code, department_name, division_id, sort_order) VALUES (%s, %s, %s, %s)",
(code, dept_name, division_id, dept_idx)
)
department_id_map[dept_name] = cursor.lastrowid
connection.commit()
print(f" 匯入 {dept_idx} 筆部級單位")
# 匯入組織崗位關聯
print("步驟 8: 匯入組織崗位關聯...")
position_count = 0
inserted_combinations = set()
for record in data['fullHierarchyData']:
business_name = record.get('business')
division_name = record.get('division')
department_name = record.get('department')
position_title = record.get('position')
business_id = business_id_map.get(business_name)
division_id = division_id_map.get(division_name)
department_id = department_id_map.get(department_name)
if not all([business_id, division_id, department_id, position_title]):
continue
# 避免重複插入
combination_key = (business_id, division_id, department_id, position_title)
if combination_key in inserted_combinations:
continue
inserted_combinations.add(combination_key)
try:
cursor.execute(
"""INSERT INTO organization_positions
(business_id, division_id, department_id, position_title, sort_order)
VALUES (%s, %s, %s, %s, %s)""",
(business_id, division_id, department_id, position_title, position_count + 1)
)
position_count += 1
except pymysql.err.IntegrityError:
pass # 重複記錄跳過
connection.commit()
print(f" 匯入 {position_count} 筆組織崗位關聯")
print()
# 顯示統計
print("=" * 60)
print("匯入完成!")
print("=" * 60)
print()
# 查詢統計
cursor.execute("SELECT COUNT(*) as cnt FROM business_units")
business_count = cursor.fetchone()['cnt']
cursor.execute("SELECT COUNT(*) as cnt FROM divisions")
division_count = cursor.fetchone()['cnt']
cursor.execute("SELECT COUNT(*) as cnt FROM departments")
dept_count = cursor.fetchone()['cnt']
cursor.execute("SELECT COUNT(*) as cnt FROM organization_positions")
org_pos_count = cursor.fetchone()['cnt']
print(f"資料庫統計:")
print(f" - 事業體: {business_count}")
print(f" - 處級單位: {division_count}")
print(f" - 部級單位: {dept_count}")
print(f" - 組織崗位關聯: {org_pos_count}")
cursor.close()
connection.close()
return True
except Exception as e:
print(f"\n錯誤: {str(e)}")
return False
def import_to_memory():
"""匯入資料到記憶體(用於 Flask 應用)"""
# 解析 JS 資料
data = parse_hierarchical_data()
# 建立記憶體資料結構
business_units = {}
divisions = {}
departments = {}
organization_positions = []
business_idx = 0
division_idx = 0
dept_idx = 0
# 處理事業體
for business_name in data['businessToDivision'].keys():
business_idx += 1
business_units[business_name] = {
'id': business_idx,
'code': generate_code('BU', business_idx),
'name': business_name
}
# 處理處級單位
division_id_map = {}
for business_name, division_list in data['businessToDivision'].items():
for div_name in division_list:
if div_name not in division_id_map:
division_idx += 1
division_id_map[div_name] = division_idx
divisions[div_name] = {
'id': division_idx,
'code': generate_code('DIV', division_idx),
'name': div_name,
'business': business_name
}
# 處理部級單位
dept_id_map = {}
for div_name, dept_list in data['divisionToDepartment'].items():
for dept_name in dept_list:
if dept_name not in dept_id_map:
dept_idx += 1
dept_id_map[dept_name] = dept_idx
departments[dept_name] = {
'id': dept_idx,
'code': generate_code('DEPT', dept_idx),
'name': dept_name,
'division': div_name
}
# 處理組織崗位關聯
seen = set()
for record in data['fullHierarchyData']:
key = (record['business'], record['division'], record['department'], record['position'])
if key not in seen:
seen.add(key)
organization_positions.append({
'business': record['business'],
'division': record['division'],
'department': record['department'],
'position': record['position']
})
return {
'business_units': business_units,
'divisions': divisions,
'departments': departments,
'organization_positions': organization_positions,
'businessToDivision': data['businessToDivision'],
'divisionToDepartment': data['divisionToDepartment'],
'departmentToPosition': data['departmentToPosition']
}
if __name__ == '__main__':
import_to_database()