Files
ai-showcase-platform/lib/database-sync.ts

230 lines
7.1 KiB
TypeScript

// =====================================================
// 資料庫雙寫同步服務
// =====================================================
import mysql from 'mysql2/promise';
import { dbFailover } from './database-failover';
// 雙寫配置
interface DualWriteConfig {
enabled: boolean;
masterPriority: boolean; // 主機優先,如果主機失敗則只寫備機
conflictResolution: 'master' | 'slave' | 'timestamp'; // 衝突解決策略
retryAttempts: number;
retryDelay: number;
}
// 寫入結果
interface WriteResult {
success: boolean;
masterSuccess: boolean;
slaveSuccess: boolean;
masterError?: string;
slaveError?: string;
conflictDetected?: boolean;
}
export class DatabaseSync {
private static instance: DatabaseSync;
private config: DualWriteConfig;
private constructor() {
this.config = {
enabled: process.env.DB_DUAL_WRITE_ENABLED === 'true',
masterPriority: process.env.DB_MASTER_PRIORITY !== 'false',
conflictResolution: (process.env.DB_CONFLICT_RESOLUTION as any) || 'master',
retryAttempts: parseInt(process.env.DB_RETRY_ATTEMPTS || '3'),
retryDelay: parseInt(process.env.DB_RETRY_DELAY || '1000')
};
}
public static getInstance(): DatabaseSync {
if (!DatabaseSync.instance) {
DatabaseSync.instance = new DatabaseSync();
}
return DatabaseSync.instance;
}
// 雙寫插入
async dualInsert(sql: string, params?: any[]): Promise<WriteResult> {
if (!this.config.enabled) {
// 如果雙寫未啟用,使用備援系統選擇的資料庫
try {
await dbFailover.insert(sql, params);
return {
success: true,
masterSuccess: true,
slaveSuccess: false
};
} catch (error) {
return {
success: false,
masterSuccess: false,
slaveSuccess: false,
masterError: error instanceof Error ? error.message : '未知錯誤'
};
}
}
const result: WriteResult = {
success: false,
masterSuccess: false,
slaveSuccess: false
};
// 獲取主機和備機連接
const masterPool = dbFailover.getMasterPool();
const slavePool = dbFailover.getSlavePool();
if (!masterPool || !slavePool) {
result.masterError = '無法獲取資料庫連接池';
return result;
}
// 根據優先級決定寫入順序
const writeOrder = this.config.masterPriority
? [{ pool: masterPool, name: 'master' }, { pool: slavePool, name: 'slave' }]
: [{ pool: slavePool, name: 'slave' }, { pool: masterPool, name: 'master' }];
// 執行雙寫
for (const { pool, name } of writeOrder) {
try {
await this.executeWithRetry(pool, sql, params);
result[`${name}Success`] = true;
console.log(`${name} 資料庫寫入成功`);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : '未知錯誤';
result[`${name}Error`] = errorMsg;
console.error(`${name} 資料庫寫入失敗:`, errorMsg);
// 如果主機優先且主機失敗,嘗試備機
if (this.config.masterPriority && name === 'master') {
continue;
}
}
}
// 判斷整體成功狀態
result.success = result.masterSuccess || result.slaveSuccess;
// 檢查衝突
if (result.masterSuccess && result.slaveSuccess) {
result.conflictDetected = await this.checkForConflicts(sql, params);
}
return result;
}
// 雙寫更新
async dualUpdate(sql: string, params?: any[]): Promise<WriteResult> {
return await this.dualInsert(sql, params);
}
// 雙寫刪除
async dualDelete(sql: string, params?: any[]): Promise<WriteResult> {
return await this.dualInsert(sql, params);
}
// 帶重試的執行
private async executeWithRetry(pool: mysql.Pool, sql: string, params?: any[]): Promise<any> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= this.config.retryAttempts; attempt++) {
try {
const connection = await pool.getConnection();
try {
const [result] = await connection.execute(sql, params);
return result;
} finally {
connection.release();
}
} catch (error) {
lastError = error instanceof Error ? error : new Error('未知錯誤');
if (attempt < this.config.retryAttempts) {
console.log(`重試 ${attempt}/${this.config.retryAttempts}: ${lastError.message}`);
await new Promise(resolve => setTimeout(resolve, this.config.retryDelay * attempt));
}
}
}
throw lastError;
}
// 檢查衝突(簡化版本,實際應用中可能需要更複雜的邏輯)
private async checkForConflicts(sql: string, params?: any[]): Promise<boolean> {
// 這裡可以實現更複雜的衝突檢測邏輯
// 例如:比較主機和備機的資料是否一致
return false;
}
// 同步資料(從主機到備機)
async syncFromMasterToSlave(tableName: string, condition?: string): Promise<boolean> {
try {
const masterPool = dbFailover.getMasterPool();
const slavePool = dbFailover.getSlavePool();
if (!masterPool || !slavePool) {
throw new Error('無法獲取資料庫連接池');
}
// 從主機讀取資料
const masterConnection = await masterPool.getConnection();
const slaveConnection = await slavePool.getConnection();
try {
const selectSql = condition
? `SELECT * FROM ${tableName} WHERE ${condition}`
: `SELECT * FROM ${tableName}`;
const [rows] = await masterConnection.execute(selectSql);
if (Array.isArray(rows) && rows.length > 0) {
// 清空備機表(可選)
await slaveConnection.execute(`DELETE FROM ${tableName}${condition ? ` WHERE ${condition}` : ''}`);
// 插入資料到備機
for (const row of rows as any[]) {
const columns = Object.keys(row);
const values = columns.map(() => '?').join(', ');
const insertSql = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${values})`;
const insertParams = columns.map(col => row[col]);
await slaveConnection.execute(insertSql, insertParams);
}
}
console.log(`✅ 成功同步 ${tableName} 表資料到備機`);
return true;
} finally {
masterConnection.release();
slaveConnection.release();
}
} catch (error) {
console.error(`❌ 同步 ${tableName} 表資料失敗:`, error);
return false;
}
}
// 獲取同步狀態
async getSyncStatus(): Promise<{
enabled: boolean;
masterHealthy: boolean;
slaveHealthy: boolean;
lastSyncTime?: string;
}> {
const masterPool = dbFailover.getMasterPool();
const slavePool = dbFailover.getSlavePool();
return {
enabled: this.config.enabled,
masterHealthy: masterPool ? true : false,
slaveHealthy: slavePool ? true : false,
lastSyncTime: new Date().toISOString()
};
}
}
// 導出單例實例
export const dbSync = DatabaseSync.getInstance();