""" 匯入組織階層資料到資料庫 從 hierarchical_data.js 讀取資料並匯入 MySQL """ import os import re import json import pymysql from dotenv import load_dotenv from datetime import datetime # Load environment variables load_dotenv() # 從 hierarchical_data.js 解析資料 def parse_hierarchical_data(): """解析 hierarchical_data.js 檔案""" js_file = os.path.join(os.path.dirname(__file__), 'hierarchical_data.js') with open(js_file, 'r', encoding='utf-8') as f: content = f.read() # 提取 businessToDivision business_match = re.search(r'const businessToDivision = ({[\s\S]*?});', content) business_to_division = json.loads(business_match.group(1).replace("'", '"')) if business_match else {} # 提取 divisionToDepartment division_match = re.search(r'const divisionToDepartment = ({[\s\S]*?});', content) division_to_department = json.loads(division_match.group(1).replace("'", '"')) if division_match else {} # 提取 departmentToPosition dept_match = re.search(r'const departmentToPosition = ({[\s\S]*?});', content) department_to_position = json.loads(dept_match.group(1).replace("'", '"')) if dept_match else {} # 提取 fullHierarchyData hierarchy_match = re.search(r'const fullHierarchyData = (\[[\s\S]*?\]);', content) full_hierarchy_data = json.loads(hierarchy_match.group(1)) if hierarchy_match else [] return { 'businessToDivision': business_to_division, 'divisionToDepartment': division_to_department, 'departmentToPosition': department_to_position, 'fullHierarchyData': full_hierarchy_data } def generate_code(prefix, index): """生成代碼""" return f"{prefix}{index:03d}" def import_to_database(): """匯入資料到資料庫""" # Database connection parameters db_config = { 'host': os.getenv('DB_HOST', 'localhost'), 'port': int(os.getenv('DB_PORT', 3306)), 'user': os.getenv('DB_USER', 'root'), 'password': os.getenv('DB_PASSWORD', ''), 'database': os.getenv('DB_NAME', 'hr_position_system'), 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } print("=" * 60) print("組織階層資料匯入工具") print("=" * 60) print() # 解析 JS 資料 print("步驟 1: 解析 hierarchical_data.js...") data = parse_hierarchical_data() print(f" - 事業體數量: {len(data['businessToDivision'])}") print(f" - 處級單位對應數: {len(data['divisionToDepartment'])}") print(f" - 部級單位對應數: {len(data['departmentToPosition'])}") print(f" - 完整階層記錄數: {len(data['fullHierarchyData'])}") print() try: # 連接資料庫 print("步驟 2: 連接資料庫...") connection = pymysql.connect(**db_config) cursor = connection.cursor() print(" 連接成功") print() # 先建立資料表(如果不存在) print("步驟 3: 確認資料表存在...") create_tables_sql = """ -- 事業體表 CREATE TABLE IF NOT EXISTS business_units ( id INT AUTO_INCREMENT PRIMARY KEY, business_code VARCHAR(20) NOT NULL UNIQUE, business_name VARCHAR(100) NOT NULL, sort_order INT DEFAULT 0, is_active BOOLEAN DEFAULT TRUE, remark VARCHAR(500), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_business_name (business_name) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; -- 處級單位表 CREATE TABLE IF NOT EXISTS divisions ( id INT AUTO_INCREMENT PRIMARY KEY, division_code VARCHAR(20) NOT NULL UNIQUE, division_name VARCHAR(100) NOT NULL, business_id INT, sort_order INT DEFAULT 0, is_active BOOLEAN DEFAULT TRUE, remark VARCHAR(500), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_division_name (division_name), INDEX idx_business_id (business_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; -- 部級單位表 CREATE TABLE IF NOT EXISTS departments ( id INT AUTO_INCREMENT PRIMARY KEY, department_code VARCHAR(20) NOT NULL UNIQUE, department_name VARCHAR(100) NOT NULL, division_id INT, sort_order INT DEFAULT 0, is_active BOOLEAN DEFAULT TRUE, remark VARCHAR(500), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_department_name (department_name), INDEX idx_division_id (division_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; -- 組織崗位關聯表 CREATE TABLE IF NOT EXISTS organization_positions ( id INT AUTO_INCREMENT PRIMARY KEY, business_id INT NOT NULL, division_id INT NOT NULL, department_id INT NOT NULL, position_title VARCHAR(100) NOT NULL, sort_order INT DEFAULT 0, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_business_id (business_id), INDEX idx_division_id (division_id), INDEX idx_department_id (department_id), INDEX idx_position_title (position_title), UNIQUE KEY uk_org_position (business_id, division_id, department_id, position_title) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; """ for statement in create_tables_sql.split(';'): if statement.strip(): try: cursor.execute(statement) except Exception as e: pass # 表已存在時忽略錯誤 connection.commit() print(" 資料表已確認") print() # 清空現有資料 print("步驟 4: 清空現有資料...") cursor.execute("SET FOREIGN_KEY_CHECKS = 0") cursor.execute("TRUNCATE TABLE organization_positions") cursor.execute("TRUNCATE TABLE departments") cursor.execute("TRUNCATE TABLE divisions") cursor.execute("TRUNCATE TABLE business_units") cursor.execute("SET FOREIGN_KEY_CHECKS = 1") connection.commit() print(" 資料已清空") print() # 匯入事業體 print("步驟 5: 匯入事業體...") business_id_map = {} business_list = list(data['businessToDivision'].keys()) for idx, business_name in enumerate(business_list, 1): code = generate_code('BU', idx) cursor.execute( "INSERT INTO business_units (business_code, business_name, sort_order) VALUES (%s, %s, %s)", (code, business_name, idx) ) business_id_map[business_name] = cursor.lastrowid connection.commit() print(f" 匯入 {len(business_list)} 筆事業體") # 匯入處級單位 print("步驟 6: 匯入處級單位...") division_id_map = {} division_idx = 0 for business_name, divisions in data['businessToDivision'].items(): business_id = business_id_map.get(business_name) for division_name in divisions: if division_name not in division_id_map: division_idx += 1 code = generate_code('DIV', division_idx) cursor.execute( "INSERT INTO divisions (division_code, division_name, business_id, sort_order) VALUES (%s, %s, %s, %s)", (code, division_name, business_id, division_idx) ) division_id_map[division_name] = cursor.lastrowid connection.commit() print(f" 匯入 {division_idx} 筆處級單位") # 匯入部級單位 print("步驟 7: 匯入部級單位...") department_id_map = {} dept_idx = 0 for division_name, departments in data['divisionToDepartment'].items(): division_id = division_id_map.get(division_name) for dept_name in departments: if dept_name not in department_id_map: dept_idx += 1 code = generate_code('DEPT', dept_idx) cursor.execute( "INSERT INTO departments (department_code, department_name, division_id, sort_order) VALUES (%s, %s, %s, %s)", (code, dept_name, division_id, dept_idx) ) department_id_map[dept_name] = cursor.lastrowid connection.commit() print(f" 匯入 {dept_idx} 筆部級單位") # 匯入組織崗位關聯 print("步驟 8: 匯入組織崗位關聯...") position_count = 0 inserted_combinations = set() for record in data['fullHierarchyData']: business_name = record.get('business') division_name = record.get('division') department_name = record.get('department') position_title = record.get('position') business_id = business_id_map.get(business_name) division_id = division_id_map.get(division_name) department_id = department_id_map.get(department_name) if not all([business_id, division_id, department_id, position_title]): continue # 避免重複插入 combination_key = (business_id, division_id, department_id, position_title) if combination_key in inserted_combinations: continue inserted_combinations.add(combination_key) try: cursor.execute( """INSERT INTO organization_positions (business_id, division_id, department_id, position_title, sort_order) VALUES (%s, %s, %s, %s, %s)""", (business_id, division_id, department_id, position_title, position_count + 1) ) position_count += 1 except pymysql.err.IntegrityError: pass # 重複記錄跳過 connection.commit() print(f" 匯入 {position_count} 筆組織崗位關聯") print() # 顯示統計 print("=" * 60) print("匯入完成!") print("=" * 60) print() # 查詢統計 cursor.execute("SELECT COUNT(*) as cnt FROM business_units") business_count = cursor.fetchone()['cnt'] cursor.execute("SELECT COUNT(*) as cnt FROM divisions") division_count = cursor.fetchone()['cnt'] cursor.execute("SELECT COUNT(*) as cnt FROM departments") dept_count = cursor.fetchone()['cnt'] cursor.execute("SELECT COUNT(*) as cnt FROM organization_positions") org_pos_count = cursor.fetchone()['cnt'] print(f"資料庫統計:") print(f" - 事業體: {business_count} 筆") print(f" - 處級單位: {division_count} 筆") print(f" - 部級單位: {dept_count} 筆") print(f" - 組織崗位關聯: {org_pos_count} 筆") cursor.close() connection.close() return True except Exception as e: print(f"\n錯誤: {str(e)}") return False def import_to_memory(): """匯入資料到記憶體(用於 Flask 應用)""" # 解析 JS 資料 data = parse_hierarchical_data() # 建立記憶體資料結構 business_units = {} divisions = {} departments = {} organization_positions = [] business_idx = 0 division_idx = 0 dept_idx = 0 # 處理事業體 for business_name in data['businessToDivision'].keys(): business_idx += 1 business_units[business_name] = { 'id': business_idx, 'code': generate_code('BU', business_idx), 'name': business_name } # 處理處級單位 division_id_map = {} for business_name, division_list in data['businessToDivision'].items(): for div_name in division_list: if div_name not in division_id_map: division_idx += 1 division_id_map[div_name] = division_idx divisions[div_name] = { 'id': division_idx, 'code': generate_code('DIV', division_idx), 'name': div_name, 'business': business_name } # 處理部級單位 dept_id_map = {} for div_name, dept_list in data['divisionToDepartment'].items(): for dept_name in dept_list: if dept_name not in dept_id_map: dept_idx += 1 dept_id_map[dept_name] = dept_idx departments[dept_name] = { 'id': dept_idx, 'code': generate_code('DEPT', dept_idx), 'name': dept_name, 'division': div_name } # 處理組織崗位關聯 seen = set() for record in data['fullHierarchyData']: key = (record['business'], record['division'], record['department'], record['position']) if key not in seen: seen.add(key) organization_positions.append({ 'business': record['business'], 'division': record['division'], 'department': record['department'], 'position': record['position'] }) return { 'business_units': business_units, 'divisions': divisions, 'departments': departments, 'organization_positions': organization_positions, 'businessToDivision': data['businessToDivision'], 'divisionToDepartment': data['divisionToDepartment'], 'departmentToPosition': data['departmentToPosition'] } if __name__ == '__main__': import_to_database()