- 移除 database.py、settings.py、auth_service.py 中的硬編碼預設值 - 移除 tools/*.py 中的硬編碼 IP 位址和服務名稱 - 更新 generate_documentation.py 移除硬編碼的帳號密碼 - 更新 MES_Database_Reference.md 和 Oracle_Authorized_Objects.md 移除敏感資訊 - 更新 .env.example 和 README.md 使用 placeholder 值 - 所有敏感設定現在必須透過 .env 檔案配置 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
195 lines
5.8 KiB
Python
195 lines
5.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Generate a list of accessible TABLE/VIEW objects under a specific owner (default: DWH)
|
||
and update docs/Oracle_Authorized_Objects.md.
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
from collections import Counter
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import oracledb
|
||
|
||
|
||
def load_env() -> None:
|
||
"""Load .env if available (best-effort)."""
|
||
try:
|
||
from dotenv import load_dotenv # type: ignore
|
||
|
||
env_path = Path(__file__).resolve().parent.parent / ".env"
|
||
load_dotenv(env_path)
|
||
return
|
||
except Exception:
|
||
pass
|
||
|
||
env_path = Path(__file__).resolve().parent.parent / ".env"
|
||
if not env_path.exists():
|
||
return
|
||
for line in env_path.read_text().splitlines():
|
||
if not line or line.strip().startswith("#") or "=" not in line:
|
||
continue
|
||
key, value = line.split("=", 1)
|
||
os.environ.setdefault(key.strip(), value.strip())
|
||
|
||
|
||
def get_connection():
|
||
host = os.getenv("DB_HOST", "")
|
||
port = os.getenv("DB_PORT", "1521")
|
||
service = os.getenv("DB_SERVICE", "")
|
||
user = os.getenv("DB_USER", "")
|
||
password = os.getenv("DB_PASSWORD", "")
|
||
dsn = (
|
||
"(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)"
|
||
f"(HOST={host})(PORT={port})))(CONNECT_DATA=(SERVICE_NAME={service})))"
|
||
)
|
||
return oracledb.connect(user=user, password=password, dsn=dsn)
|
||
|
||
|
||
def main() -> int:
|
||
owner = "DWH"
|
||
output_path = Path("docs/Oracle_Authorized_Objects.md")
|
||
if len(sys.argv) > 1:
|
||
owner = sys.argv[1].strip().upper()
|
||
|
||
load_env()
|
||
conn = get_connection()
|
||
cur = conn.cursor()
|
||
|
||
cur.execute("SELECT USER FROM DUAL")
|
||
user = cur.fetchone()[0]
|
||
|
||
# Roles
|
||
cur.execute("SELECT GRANTED_ROLE FROM USER_ROLE_PRIVS")
|
||
roles = [r[0] for r in cur.fetchall()]
|
||
|
||
# Accessible objects under owner
|
||
cur.execute(
|
||
"""
|
||
SELECT OBJECT_NAME, OBJECT_TYPE
|
||
FROM ALL_OBJECTS
|
||
WHERE OWNER = :p_owner
|
||
AND OBJECT_TYPE IN ('TABLE', 'VIEW')
|
||
ORDER BY OBJECT_NAME
|
||
""",
|
||
p_owner=owner,
|
||
)
|
||
objects = cur.fetchall()
|
||
|
||
# Direct + PUBLIC grants
|
||
cur.execute(
|
||
"""
|
||
SELECT o.object_name, o.object_type, p.privilege,
|
||
CASE WHEN p.grantee = 'PUBLIC' THEN 'PUBLIC' ELSE 'DIRECT' END AS source
|
||
FROM all_tab_privs p
|
||
JOIN all_objects o
|
||
ON o.owner = p.table_schema
|
||
AND o.object_name = p.table_name
|
||
WHERE p.grantee IN (:p_user, 'PUBLIC')
|
||
AND o.owner = :p_owner
|
||
AND o.object_type IN ('TABLE', 'VIEW')
|
||
""",
|
||
p_user=user,
|
||
p_owner=owner,
|
||
)
|
||
direct_rows = cur.fetchall()
|
||
|
||
# Role grants
|
||
role_rows = []
|
||
for role in roles:
|
||
cur.execute(
|
||
"""
|
||
SELECT o.object_name, o.object_type, p.privilege, p.role AS source
|
||
FROM role_tab_privs p
|
||
JOIN all_objects o
|
||
ON o.owner = p.owner
|
||
AND o.object_name = p.table_name
|
||
WHERE p.role = :p_role
|
||
AND o.owner = :p_owner
|
||
AND o.object_type IN ('TABLE', 'VIEW')
|
||
""",
|
||
p_role=role,
|
||
p_owner=owner,
|
||
)
|
||
role_rows.extend(cur.fetchall())
|
||
|
||
# Aggregate privileges by object
|
||
info = {}
|
||
for name, otype in objects:
|
||
info[(name, otype)] = {"privs": set(), "sources": set()}
|
||
|
||
for name, otype, priv, source in direct_rows + role_rows:
|
||
key = (name, otype)
|
||
if key not in info:
|
||
info[key] = {"privs": set(), "sources": set()}
|
||
info[key]["privs"].add(priv)
|
||
info[key]["sources"].add(source)
|
||
|
||
# Fill in missing privilege/source if object is visible but not in grants
|
||
for key, data in info.items():
|
||
if not data["privs"]:
|
||
data["privs"].add("UNKNOWN")
|
||
if not data["sources"]:
|
||
data["sources"].add("SYSTEM")
|
||
|
||
type_counts = Counter(k[1] for k in info.keys())
|
||
source_counts = Counter()
|
||
for data in info.values():
|
||
for s in data["sources"]:
|
||
if s in ("DIRECT", "PUBLIC"):
|
||
source_counts[s] += 1
|
||
elif s == "SYSTEM":
|
||
source_counts["SYSTEM"] += 1
|
||
else:
|
||
source_counts["ROLE"] += 1
|
||
|
||
# Render markdown
|
||
lines = []
|
||
lines.append("# Oracle 可使用 TABLE/VIEW 清單(DWH)")
|
||
lines.append("")
|
||
lines.append(f"**產生時間**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
lines.append(f"**使用者**: {user}")
|
||
lines.append(f"**Schema**: {owner}")
|
||
lines.append("")
|
||
lines.append("## 摘要")
|
||
lines.append("")
|
||
lines.append(f"- 可使用物件總數: {len(info):,}")
|
||
lines.append(f"- TABLE: {type_counts.get('TABLE', 0):,}")
|
||
lines.append(f"- VIEW: {type_counts.get('VIEW', 0):,}")
|
||
lines.append(
|
||
"- 來源 (去重後物件數): "
|
||
f"DIRECT {source_counts.get('DIRECT', 0):,}, "
|
||
f"PUBLIC {source_counts.get('PUBLIC', 0):,}, "
|
||
f"ROLE {source_counts.get('ROLE', 0):,}, "
|
||
f"SYSTEM {source_counts.get('SYSTEM', 0):,}"
|
||
)
|
||
lines.append("")
|
||
lines.append("## 物件清單")
|
||
lines.append("")
|
||
lines.append("| 物件 | 類型 | 權限 | 授權來源 |")
|
||
lines.append("|------|------|------|----------|")
|
||
|
||
for name, otype in sorted(info.keys()):
|
||
data = info[(name, otype)]
|
||
obj = f"{owner}.{name}"
|
||
privs = ", ".join(sorted(data["privs"]))
|
||
sources = ", ".join(
|
||
sorted(
|
||
"ROLE" if s not in ("DIRECT", "PUBLIC", "SYSTEM") else s
|
||
for s in data["sources"]
|
||
)
|
||
)
|
||
lines.append(f"| `{obj}` | {otype} | {privs} | {sources} |")
|
||
|
||
output_path.write_text("\n".join(lines), encoding="utf-8")
|
||
|
||
cur.close()
|
||
conn.close()
|
||
print(f"Wrote {output_path} ({len(info)} objects)")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|