""" Database abstraction layer supporting both MySQL and SQLite. Usage: from app.database import init_db, get_db_cursor, init_tables # At application startup init_db() init_tables() # In request handlers with get_db_cursor() as cursor: cursor.execute("SELECT * FROM meeting_records") results = cursor.fetchall() with get_db_cursor(commit=True) as cursor: cursor.execute("INSERT INTO ...") """ import os import sqlite3 import threading from contextlib import contextmanager import mysql.connector from mysql.connector import pooling from .config import settings # Global state _db_type: str = "mysql" _mysql_pool = None _sqlite_conn = None _sqlite_lock = threading.Lock() # ============================================================================ # Initialization Functions # ============================================================================ def init_db(): """Initialize database based on DB_TYPE setting.""" global _db_type _db_type = settings.DB_TYPE.lower() if _db_type == "sqlite": init_sqlite() else: init_mysql() def init_mysql(): """Initialize MySQL connection pool.""" global _mysql_pool _mysql_pool = pooling.MySQLConnectionPool( pool_name="meeting_pool", pool_size=settings.DB_POOL_SIZE, host=settings.DB_HOST, port=settings.DB_PORT, user=settings.DB_USER, password=settings.DB_PASS, database=settings.DB_NAME, ) return _mysql_pool def init_sqlite(): """Initialize SQLite connection with row_factory for dict-like access.""" global _sqlite_conn db_path = settings.get_sqlite_path() db_dir = os.path.dirname(db_path) # Create directory if needed if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) _sqlite_conn = sqlite3.connect(db_path, check_same_thread=False) _sqlite_conn.row_factory = sqlite3.Row _sqlite_conn.execute("PRAGMA foreign_keys=ON") print(f"SQLite database initialized at: {db_path}", flush=True) return _sqlite_conn # ============================================================================ # Legacy Compatibility # ============================================================================ def init_db_pool(): """Legacy function for backward compatibility. Use init_db() instead.""" return init_db() # ============================================================================ # Connection Context Managers # ============================================================================ @contextmanager def get_db_connection(): """Get a database connection (MySQL or SQLite).""" if _db_type == "sqlite": # SQLite uses a single connection with thread lock yield _sqlite_conn else: # MySQL uses connection pool conn = _mysql_pool.get_connection() try: yield conn finally: conn.close() class SQLiteCursorWrapper: """Wrapper to make SQLite cursor behave more like MySQL cursor with dictionary=True.""" def __init__(self, cursor): self._cursor = cursor self.lastrowid = None self.rowcount = 0 def execute(self, query, params=None): # Convert MySQL-style %s placeholders to SQLite ? placeholders query = query.replace("%s", "?") if params: self._cursor.execute(query, params) else: self._cursor.execute(query) self.lastrowid = self._cursor.lastrowid self.rowcount = self._cursor.rowcount def executemany(self, query, params_list): query = query.replace("%s", "?") self._cursor.executemany(query, params_list) self.lastrowid = self._cursor.lastrowid self.rowcount = self._cursor.rowcount def fetchone(self): row = self._cursor.fetchone() if row is None: return None return dict(row) def fetchall(self): rows = self._cursor.fetchall() return [dict(row) for row in rows] def fetchmany(self, size=None): if size: rows = self._cursor.fetchmany(size) else: rows = self._cursor.fetchmany() return [dict(row) for row in rows] def close(self): self._cursor.close() @contextmanager def get_db_cursor(commit=False): """Get a database cursor that returns dict-like rows. Args: commit: If True, commit the transaction after yield. Yields: cursor: A cursor that returns dict-like rows. """ if _db_type == "sqlite": with _sqlite_lock: cursor = SQLiteCursorWrapper(_sqlite_conn.cursor()) try: yield cursor if commit: _sqlite_conn.commit() except Exception: _sqlite_conn.rollback() raise finally: cursor.close() else: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) try: yield cursor if commit: conn.commit() finally: cursor.close() # ============================================================================ # Table Initialization # ============================================================================ MYSQL_TABLES = [ """ CREATE TABLE IF NOT EXISTS meeting_users ( user_id INT PRIMARY KEY AUTO_INCREMENT, email VARCHAR(100) UNIQUE NOT NULL, display_name VARCHAR(50), role ENUM('admin', 'user') DEFAULT 'user', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS meeting_records ( meeting_id INT PRIMARY KEY AUTO_INCREMENT, uuid VARCHAR(64) UNIQUE, meeting_number VARCHAR(20), subject VARCHAR(200) NOT NULL, meeting_time DATETIME NOT NULL, location VARCHAR(100), chairperson VARCHAR(50), recorder VARCHAR(50), attendees TEXT, transcript_blob LONGTEXT, created_by VARCHAR(100), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS meeting_conclusions ( conclusion_id INT PRIMARY KEY AUTO_INCREMENT, meeting_id INT, content TEXT, system_code VARCHAR(20), FOREIGN KEY (meeting_id) REFERENCES meeting_records(meeting_id) ON DELETE CASCADE ) """, """ CREATE TABLE IF NOT EXISTS meeting_action_items ( action_id INT PRIMARY KEY AUTO_INCREMENT, meeting_id INT, content TEXT, owner VARCHAR(50), due_date DATE, status ENUM('Open', 'In Progress', 'Done', 'Delayed') DEFAULT 'Open', system_code VARCHAR(20), FOREIGN KEY (meeting_id) REFERENCES meeting_records(meeting_id) ON DELETE CASCADE ) """, ] SQLITE_TABLES = [ """ CREATE TABLE IF NOT EXISTS meeting_users ( user_id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT UNIQUE NOT NULL, display_name TEXT, role TEXT CHECK(role IN ('admin', 'user')) DEFAULT 'user', created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS meeting_records ( meeting_id INTEGER PRIMARY KEY AUTOINCREMENT, uuid TEXT UNIQUE, meeting_number TEXT, subject TEXT NOT NULL, meeting_time DATETIME NOT NULL, location TEXT, chairperson TEXT, recorder TEXT, attendees TEXT, transcript_blob TEXT, created_by TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS meeting_conclusions ( conclusion_id INTEGER PRIMARY KEY AUTOINCREMENT, meeting_id INTEGER, content TEXT, system_code TEXT, FOREIGN KEY (meeting_id) REFERENCES meeting_records(meeting_id) ON DELETE CASCADE ) """, """ CREATE TABLE IF NOT EXISTS meeting_action_items ( action_id INTEGER PRIMARY KEY AUTOINCREMENT, meeting_id INTEGER, content TEXT, owner TEXT, due_date DATE, status TEXT CHECK(status IN ('Open', 'In Progress', 'Done', 'Delayed')) DEFAULT 'Open', system_code TEXT, FOREIGN KEY (meeting_id) REFERENCES meeting_records(meeting_id) ON DELETE CASCADE ) """, ] def init_tables(): """Create all required tables if they don't exist.""" tables = SQLITE_TABLES if _db_type == "sqlite" else MYSQL_TABLES if _db_type == "sqlite": with _sqlite_lock: cursor = _sqlite_conn.cursor() try: for statement in tables: cursor.execute(statement) _sqlite_conn.commit() finally: cursor.close() else: with get_db_cursor(commit=True) as cursor: for statement in tables: cursor.execute(statement)