This commit is contained in:
beabigegg
2025-08-29 19:02:19 +08:00
parent b0c86302ff
commit f3f2b7d596
17 changed files with 1632 additions and 157 deletions

81
backend/.env Normal file
View File

@@ -0,0 +1,81 @@
# ===========================================
# Flask 應用程式設定
# ===========================================
FLASK_ENV=development
SECRET_KEY=dev-secret-key-change-in-production
# ===========================================
# MySQL 資料庫連線
# ===========================================
MYSQL_HOST=mysql.theaken.com
MYSQL_PORT=33306
MYSQL_USER=A060
MYSQL_PASSWORD=WLeSCi0yhtc7
MYSQL_DATABASE=db_A060
# ===========================================
# JWT 設定
# ===========================================
JWT_SECRET_KEY=jwt-secret-key-change-in-production
JWT_ACCESS_TOKEN_EXPIRES_HOURS=8
JWT_REFRESH_TOKEN_EXPIRES_DAYS=30
# ===========================================
# AD/LDAP 設定
# ===========================================
USE_MOCK_LDAP=false
LDAP_SERVER=ldap://panjit.com.tw
LDAP_PORT=389
LDAP_USE_SSL=false
LDAP_USE_TLS=false
LDAP_SEARCH_BASE=OU=PANJIT,DC=panjit,DC=com,DC=tw
LDAP_BIND_USER_DN=CN=LdapBind,CN=Users,DC=PANJIT,DC=COM,DC=TW
LDAP_BIND_USER_PASSWORD=panjit2481
LDAP_USER_LOGIN_ATTR=userPrincipalName
# ===========================================
# SMTP 郵件設定
# ===========================================
SMTP_SERVER=mail.panjit.com.tw
SMTP_PORT=25
SMTP_USE_TLS=false
SMTP_USE_SSL=false
SMTP_AUTH_REQUIRED=false
SMTP_SENDER_EMAIL=todo-system@panjit.com.tw
SMTP_SENDER_PASSWORD=
# ===========================================
# Fire Email 限制設定
# ===========================================
FIRE_EMAIL_COOLDOWN_MINUTES=2
FIRE_EMAIL_DAILY_LIMIT=20
# ===========================================
# 排程提醒設定
# ===========================================
REMINDER_DAYS_BEFORE=3
REMINDER_DAYS_AFTER=1
WEEKLY_SUMMARY_DAY=0
WEEKLY_SUMMARY_HOUR=9
# ===========================================
# 檔案上傳設定
# ===========================================
MAX_CONTENT_LENGTH=16
UPLOAD_FOLDER=uploads
# ===========================================
# Redis 設定 (用於 Celery)
# ===========================================
REDIS_URL=redis://localhost:6379/0
# ===========================================
# CORS 設定
# ===========================================
CORS_ORIGINS=http://localhost:3000,http://localhost:3001,http://localhost:3002
# ===========================================
# 日誌設定
# ===========================================
LOG_LEVEL=INFO
LOG_FILE=logs/app.log

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Create test todos directly in database for testing public/private functionality"""
import pymysql
import uuid
import json
from datetime import datetime
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def create_test_todos():
"""Create test todos directly in database"""
try:
# Connect to database
conn = pymysql.connect(
host=os.getenv('MYSQL_HOST', 'mysql.theaken.com'),
port=int(os.getenv('MYSQL_PORT', 33306)),
user=os.getenv('MYSQL_USER', 'A060'),
password=os.getenv('MYSQL_PASSWORD', 'WLeSCi0yhtc7'),
database=os.getenv('MYSQL_DATABASE', 'db_A060')
)
cursor = conn.cursor()
# Test data
todos = [
{
'id': str(uuid.uuid4()),
'title': '公開測試任務 - ymirliu 建立',
'description': '這是一個公開任務,其他人可以看到並追蹤',
'status': 'NEW',
'priority': 'MEDIUM',
'created_at': datetime.utcnow(),
'creator_ad': '92367',
'creator_display_name': 'ymirliu 劉念蒨',
'creator_email': 'ymirliu@panjit.com.tw',
'starred': False,
'is_public': True,
'tags': json.dumps(['測試', '公開功能'])
},
{
'id': str(uuid.uuid4()),
'title': '私人測試任務 - ymirliu 建立',
'description': '這是一個私人任務,只有建立者可見',
'status': 'NEW',
'priority': 'HIGH',
'created_at': datetime.utcnow(),
'creator_ad': '92367',
'creator_display_name': 'ymirliu 劉念蒨',
'creator_email': 'ymirliu@panjit.com.tw',
'starred': True,
'is_public': False,
'tags': json.dumps(['測試', '私人功能'])
}
]
# Insert todos
for todo in todos:
sql = """INSERT INTO todo_item
(id, title, description, status, priority, created_at, creator_ad,
creator_display_name, creator_email, starred, is_public, tags)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
cursor.execute(sql, (
todo['id'], todo['title'], todo['description'], todo['status'],
todo['priority'], todo['created_at'], todo['creator_ad'],
todo['creator_display_name'], todo['creator_email'],
todo['starred'], todo['is_public'], todo['tags']
))
print(f"✓ 建立 {'公開' if todo['is_public'] else '私人'} Todo: {todo['title']}")
# Commit changes
conn.commit()
print(f"\n✅ 成功建立 {len(todos)} 個測試 Todo")
# Verify the insertion
cursor.execute("SELECT id, title, is_public FROM todo_item WHERE creator_ad = '92367'")
results = cursor.fetchall()
print(f"\n📊 資料庫中的測試數據:")
for result in results:
print(f" - {result[1]} ({'公開' if result[2] else '私人'})")
cursor.close()
conn.close()
except Exception as e:
print(f"❌ 建立測試數據失敗: {str(e)}")
return False
return True
if __name__ == "__main__":
create_test_todos()

View File

@@ -0,0 +1,19 @@
-- Add public/private feature to TodoItem table
-- Date: 2025-08-29
-- Add is_public column to todo_item table
ALTER TABLE todo_item
ADD COLUMN is_public BOOLEAN DEFAULT FALSE COMMENT '是否公開';
-- Add tags column to todo_item table (JSON type for flexible tagging)
ALTER TABLE todo_item
ADD COLUMN tags JSON DEFAULT NULL COMMENT '標籤';
-- Create index for public todos query performance
CREATE INDEX idx_is_public ON todo_item(is_public);
-- Create index for tags search (if MySQL version supports JSON index)
-- CREATE INDEX idx_tags ON todo_item((CAST(tags AS CHAR(255))));
-- Update existing todos to be private by default
UPDATE todo_item SET is_public = FALSE WHERE is_public IS NULL;

View File

@@ -24,6 +24,8 @@ class TodoItem(db.Model):
creator_display_name = db.Column(db.String(128))
creator_email = db.Column(db.String(256))
starred = db.Column(db.Boolean, default=False)
is_public = db.Column(db.Boolean, default=False)
tags = db.Column(JSON, default=list)
# Relationships
responsible_users = db.relationship('TodoItemResponsible', back_populates='todo', cascade='all, delete-orphan')
@@ -46,6 +48,8 @@ class TodoItem(db.Model):
'creator_display_name': self.creator_display_name,
'creator_email': self.creator_email,
'starred': self.starred,
'is_public': self.is_public,
'tags': self.tags if self.tags else [],
'responsible_users': [r.ad_account for r in self.responsible_users],
'followers': [f.ad_account for f in self.followers]
}
@@ -87,9 +91,21 @@ class TodoItem(db.Model):
def can_view(self, user_ad):
"""Check if user can view this todo"""
# Public todos can be viewed by anyone
if self.is_public:
return True
# Private todos can be viewed by creator, responsible users, and followers
if self.can_edit(user_ad):
return True
return any(f.ad_account == user_ad for f in self.followers)
def can_follow(self, user_ad):
"""Check if user can follow this todo"""
# Anyone can follow public todos
if self.is_public:
return True
# For private todos, only creator/responsible can add followers
return False
class TodoItemResponsible(db.Model):
__tablename__ = 'todo_item_responsible'

View File

@@ -49,12 +49,13 @@ def get_todos():
query = query.join(TodoItemFollower).filter(
TodoItemFollower.ad_account == identity
)
else: # all
else: # all - show todos user can view (public + private with access)
query = query.filter(
or_(
TodoItem.creator_ad == identity,
TodoItem.responsible_users.any(TodoItemResponsible.ad_account == identity),
TodoItem.followers.any(TodoItemFollower.ad_account == identity)
TodoItem.is_public == True, # All public todos
TodoItem.creator_ad == identity, # Created by user
TodoItem.responsible_users.any(TodoItemResponsible.ad_account == identity), # User is responsible
TodoItem.followers.any(TodoItemFollower.ad_account == identity) # User is follower
)
)
@@ -157,7 +158,9 @@ def create_todo():
creator_ad=identity,
creator_display_name=claims.get('display_name', identity),
creator_email=claims.get('email', ''),
starred=data.get('starred', False)
starred=data.get('starred', False),
is_public=data.get('is_public', False),
tags=data.get('tags', [])
)
db.session.add(todo)
@@ -262,6 +265,14 @@ def update_todo(todo_id):
changes['starred'] = {'old': todo.starred, 'new': data['starred']}
todo.starred = data['starred']
if 'is_public' in data:
changes['is_public'] = {'old': todo.is_public, 'new': data['is_public']}
todo.is_public = data['is_public']
if 'tags' in data:
changes['tags'] = {'old': todo.tags, 'new': data['tags']}
todo.tags = data['tags']
# Update responsible users
if 'responsible_users' in data:
# Remove existing
@@ -706,4 +717,228 @@ def star_todo(todo_id):
except Exception as e:
db.session.rollback()
logger.error(f"Star todo error: {str(e)}")
return jsonify({'error': 'Failed to star todo'}), 500
return jsonify({'error': 'Failed to star todo'}), 500
@todos_bp.route('/public', methods=['GET'])
@jwt_required()
def get_public_todos():
"""Get all public todos"""
try:
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
# Filters for public todos
status = request.args.get('status')
priority = request.args.get('priority')
search = request.args.get('search')
tags = request.args.getlist('tags')
# Query only public todos
query = TodoItem.query.filter(TodoItem.is_public == True).options(
joinedload(TodoItem.responsible_users),
joinedload(TodoItem.followers)
)
# Apply filters
if status:
query = query.filter(TodoItem.status == status)
if priority:
query = query.filter(TodoItem.priority == priority)
if search:
query = query.filter(
or_(
TodoItem.title.contains(search),
TodoItem.description.contains(search)
)
)
if tags:
for tag in tags:
query = query.filter(TodoItem.tags.contains(tag))
# Order by created_at desc
query = query.order_by(TodoItem.created_at.desc())
# Paginate
paginated = query.paginate(page=page, per_page=per_page, error_out=False)
return jsonify({
'todos': [todo.to_dict() for todo in paginated.items],
'total': paginated.total,
'pages': paginated.pages,
'current_page': page
}), 200
except Exception as e:
logger.error(f"Get public todos error: {str(e)}")
return jsonify({'error': 'Failed to fetch public todos'}), 500
@todos_bp.route('/following', methods=['GET'])
@jwt_required()
def get_following_todos():
"""Get todos that user is following"""
try:
identity = get_jwt_identity()
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
# Query todos where user is a follower
query = TodoItem.query.join(TodoItemFollower).filter(
TodoItemFollower.ad_account == identity
).options(
joinedload(TodoItem.responsible_users),
joinedload(TodoItem.followers)
)
# Order by created_at desc
query = query.order_by(TodoItem.created_at.desc())
# Paginate
paginated = query.paginate(page=page, per_page=per_page, error_out=False)
return jsonify({
'todos': [todo.to_dict() for todo in paginated.items],
'total': paginated.total,
'pages': paginated.pages,
'current_page': page
}), 200
except Exception as e:
logger.error(f"Get following todos error: {str(e)}")
return jsonify({'error': 'Failed to fetch following todos'}), 500
@todos_bp.route('/<todo_id>/visibility', methods=['PATCH'])
@jwt_required()
def update_todo_visibility(todo_id):
"""Toggle todo visibility (public/private)"""
try:
identity = get_jwt_identity()
# Get todo
todo = TodoItem.query.get(todo_id)
if not todo:
return jsonify({'error': 'Todo not found'}), 404
# Only creator can change visibility
if todo.creator_ad != identity:
return jsonify({'error': 'Only creator can change visibility'}), 403
# Toggle visibility
data = request.get_json()
is_public = data.get('is_public', not todo.is_public)
todo.is_public = is_public
# Log audit
audit = TodoAuditLog(
actor_ad=identity,
todo_id=todo_id,
action='UPDATE',
detail={
'field': 'is_public',
'old_value': not is_public,
'new_value': is_public
}
)
db.session.add(audit)
db.session.commit()
logger.info(f"Todo {todo_id} visibility changed to {'public' if is_public else 'private'} by {identity}")
return jsonify({
'message': f'Todo is now {"public" if is_public else "private"}',
'is_public': todo.is_public
}), 200
except Exception as e:
db.session.rollback()
logger.error(f"Update visibility error: {str(e)}")
return jsonify({'error': 'Failed to update visibility'}), 500
@todos_bp.route('/<todo_id>/follow', methods=['POST'])
@jwt_required()
def follow_todo(todo_id):
"""Follow a public todo"""
try:
identity = get_jwt_identity()
# Get todo
todo = TodoItem.query.get(todo_id)
if not todo:
return jsonify({'error': 'Todo not found'}), 404
# Check if todo is public or user has permission
if not todo.is_public and not todo.can_edit(identity):
return jsonify({'error': 'Cannot follow private todo'}), 403
# Check if already following
existing = TodoItemFollower.query.filter_by(
todo_id=todo_id,
ad_account=identity
).first()
if existing:
return jsonify({'message': 'Already following this todo'}), 200
# Add follower
follower = TodoItemFollower(
todo_id=todo_id,
ad_account=identity,
added_by=identity
)
db.session.add(follower)
# Log audit
audit = TodoAuditLog(
actor_ad=identity,
todo_id=todo_id,
action='FOLLOW',
detail={'follower': identity}
)
db.session.add(audit)
db.session.commit()
logger.info(f"User {identity} followed todo {todo_id}")
return jsonify({'message': 'Successfully followed todo'}), 200
except Exception as e:
db.session.rollback()
logger.error(f"Follow todo error: {str(e)}")
return jsonify({'error': 'Failed to follow todo'}), 500
@todos_bp.route('/<todo_id>/follow', methods=['DELETE'])
@jwt_required()
def unfollow_todo(todo_id):
"""Unfollow a todo"""
try:
identity = get_jwt_identity()
# Get follower record
follower = TodoItemFollower.query.filter_by(
todo_id=todo_id,
ad_account=identity
).first()
if not follower:
return jsonify({'message': 'Not following this todo'}), 200
# Remove follower
db.session.delete(follower)
# Log audit
audit = TodoAuditLog(
actor_ad=identity,
todo_id=todo_id,
action='UNFOLLOW',
detail={'follower': identity}
)
db.session.add(audit)
db.session.commit()
logger.info(f"User {identity} unfollowed todo {todo_id}")
return jsonify({'message': 'Successfully unfollowed todo'}), 200
except Exception as e:
db.session.rollback()
logger.error(f"Unfollow todo error: {str(e)}")
return jsonify({'error': 'Failed to unfollow todo'}), 500

60
backend/run_migration.py Normal file
View File

@@ -0,0 +1,60 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Run migration to add public/private feature"""
import pymysql
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def run_migration():
"""Execute the migration SQL"""
try:
# Connect to database
conn = pymysql.connect(
host=os.getenv('MYSQL_HOST', 'mysql.theaken.com'),
port=int(os.getenv('MYSQL_PORT', 33306)),
user=os.getenv('MYSQL_USER', 'A060'),
password=os.getenv('MYSQL_PASSWORD', 'WLeSCi0yhtc7'),
database=os.getenv('MYSQL_DATABASE', 'db_A060')
)
cursor = conn.cursor()
# Read migration file
with open('migrations/add_public_feature.sql', 'r', encoding='utf-8') as f:
sql_content = f.read()
# Execute each statement
statements = sql_content.split(';')
for statement in statements:
statement = statement.strip()
if statement and not statement.startswith('--'):
print(f"Executing: {statement[:50]}...")
cursor.execute(statement)
# Commit changes
conn.commit()
print("Migration completed successfully!")
# Verify the changes
cursor.execute("DESCRIBE todo_item")
columns = cursor.fetchall()
print("\nCurrent todo_item columns:")
for col in columns:
if col[0] in ['is_public', 'tags']:
print(f"{col[0]}: {col[1]}")
cursor.close()
conn.close()
except Exception as e:
print(f"Migration failed: {str(e)}")
return False
return True
if __name__ == "__main__":
run_migration()

181
backend/test_db.py Normal file
View File

@@ -0,0 +1,181 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Test database connection and check data"""
import os
import sys
from dotenv import load_dotenv
import pymysql
from datetime import datetime
# Load environment variables
load_dotenv()
def test_db_connection():
"""Test database connection and list tables"""
print("=" * 60)
print("Testing Database Connection")
print("=" * 60)
# Get database configuration
db_config = {
'host': os.getenv('MYSQL_HOST', 'mysql.theaken.com'),
'port': int(os.getenv('MYSQL_PORT', 33306)),
'user': os.getenv('MYSQL_USER', 'A060'),
'password': os.getenv('MYSQL_PASSWORD', 'WLeSCi0yhtc7'),
'database': os.getenv('MYSQL_DATABASE', 'db_A060'),
'charset': 'utf8mb4'
}
print(f"Host: {db_config['host']}")
print(f"Port: {db_config['port']}")
print(f"Database: {db_config['database']}")
print(f"User: {db_config['user']}")
print("-" * 60)
try:
# Connect to database
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("[OK] Successfully connected to database")
# List all tables
print("\n[1] Listing all todo tables:")
cursor.execute("SHOW TABLES LIKE 'todo%'")
tables = cursor.fetchall()
if tables:
for table in tables:
print(f" - {table[0]}")
else:
print(" No todo tables found")
# Check todo_item table
print("\n[2] Checking todo_item table:")
cursor.execute("SELECT COUNT(*) FROM todo_item")
count = cursor.fetchone()[0]
print(f" Total records: {count}")
if count > 0:
print("\n Sample data from todo_item:")
cursor.execute("""
SELECT id, title, status, priority, due_date, creator_ad
FROM todo_item
ORDER BY created_at DESC
LIMIT 5
""")
items = cursor.fetchall()
for item in items:
print(f" - {item[0][:8]}... | {item[1][:30]}... | {item[2]} | {item[5]}")
# Check todo_user_pref table
print("\n[3] Checking todo_user_pref table:")
cursor.execute("SELECT COUNT(*) FROM todo_user_pref")
count = cursor.fetchone()[0]
print(f" Total users: {count}")
if count > 0:
print("\n Sample users:")
cursor.execute("""
SELECT ad_account, display_name, email
FROM todo_user_pref
LIMIT 5
""")
users = cursor.fetchall()
for user in users:
print(f" - {user[0]} | {user[1]} | {user[2]}")
# Check todo_item_responsible table
print("\n[4] Checking todo_item_responsible table:")
cursor.execute("SELECT COUNT(*) FROM todo_item_responsible")
count = cursor.fetchone()[0]
print(f" Total assignments: {count}")
# Check todo_item_follower table
print("\n[5] Checking todo_item_follower table:")
cursor.execute("SELECT COUNT(*) FROM todo_item_follower")
count = cursor.fetchone()[0]
print(f" Total followers: {count}")
cursor.close()
connection.close()
print("\n" + "=" * 60)
print("[OK] Database connection test successful!")
print("=" * 60)
return True
except Exception as e:
print(f"\n[ERROR] Database connection failed: {str(e)}")
print(f"Error type: {type(e).__name__}")
return False
def create_sample_todo():
"""Create a sample todo item for testing"""
print("\n" + "=" * 60)
print("Creating Sample Todo Item")
print("=" * 60)
db_config = {
'host': os.getenv('MYSQL_HOST', 'mysql.theaken.com'),
'port': int(os.getenv('MYSQL_PORT', 33306)),
'user': os.getenv('MYSQL_USER', 'A060'),
'password': os.getenv('MYSQL_PASSWORD', 'WLeSCi0yhtc7'),
'database': os.getenv('MYSQL_DATABASE', 'db_A060'),
'charset': 'utf8mb4'
}
try:
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
# Generate a UUID
import uuid
todo_id = str(uuid.uuid4())
# Insert sample todo
sql = """
INSERT INTO todo_item
(id, title, description, status, priority, due_date, created_at, creator_ad, creator_display_name, creator_email, starred)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
todo_id,
'Test Todo Item - ' + datetime.now().strftime('%Y-%m-%d %H:%M'),
'This is a test todo item created from Python script',
'NEW',
'MEDIUM',
'2025-09-15',
datetime.now(),
'test_user',
'Test User',
'test@panjit.com.tw',
False
)
cursor.execute(sql, values)
connection.commit()
print(f"[OK] Created todo item with ID: {todo_id}")
cursor.close()
connection.close()
return True
except Exception as e:
print(f"[ERROR] Failed to create todo: {str(e)}")
return False
if __name__ == "__main__":
# Test database connection
if test_db_connection():
# Ask if user wants to create sample data
response = input("\nDo you want to create a sample todo item? (y/n): ")
if response.lower() == 'y':
create_sample_todo()
else:
print("\n[WARNING] Please check your database configuration in .env file")
sys.exit(1)

165
backend/test_ldap.py Normal file
View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Test LDAP connection and authentication"""
import os
import sys
from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def test_ldap_connection():
"""Test LDAP connection"""
print("=" * 50)
print("Testing LDAP Connection")
print("=" * 50)
# Get LDAP configuration
ldap_server = os.getenv('LDAP_SERVER', 'ldap://panjit.com.tw')
ldap_port = int(os.getenv('LDAP_PORT', 389))
ldap_bind_user = os.getenv('LDAP_BIND_USER_DN', '')
ldap_bind_password = os.getenv('LDAP_BIND_USER_PASSWORD', '')
ldap_search_base = os.getenv('LDAP_SEARCH_BASE', 'DC=panjit,DC=com,DC=tw')
print(f"LDAP Server: {ldap_server}")
print(f"LDAP Port: {ldap_port}")
print(f"Bind User: {ldap_bind_user}")
print(f"Search Base: {ldap_search_base}")
print("-" * 50)
try:
# Create server object
server = Server(
ldap_server,
port=ldap_port,
use_ssl=False,
get_info=ALL_ATTRIBUTES
)
print("Creating LDAP connection...")
# Create connection with bind user
conn = Connection(
server,
user=ldap_bind_user,
password=ldap_bind_password,
auto_bind=True,
raise_exceptions=True
)
print("[OK] Successfully connected to LDAP server")
print(f"[OK] Server info: {conn.server}")
# Test search
print("\nTesting LDAP search...")
search_filter = "(objectClass=person)"
conn.search(
ldap_search_base,
search_filter,
SUBTREE,
attributes=['sAMAccountName', 'displayName', 'mail'],
size_limit=5
)
print(f"[OK] Found {len(conn.entries)} entries")
if conn.entries:
print("\nSample users:")
for i, entry in enumerate(conn.entries[:3], 1):
print(f" {i}. {entry.sAMAccountName} - {entry.displayName}")
conn.unbind()
print("\n[OK] LDAP connection test successful!")
return True
except Exception as e:
print(f"\n[ERROR] LDAP connection failed: {str(e)}")
print(f"Error type: {type(e).__name__}")
return False
def test_user_authentication(username, password):
"""Test user authentication"""
print("\n" + "=" * 50)
print(f"Testing authentication for user: {username}")
print("=" * 50)
# Get LDAP configuration
ldap_server = os.getenv('LDAP_SERVER', 'ldap://panjit.com.tw')
ldap_port = int(os.getenv('LDAP_PORT', 389))
ldap_bind_user = os.getenv('LDAP_BIND_USER_DN', '')
ldap_bind_password = os.getenv('LDAP_BIND_USER_PASSWORD', '')
ldap_search_base = os.getenv('LDAP_SEARCH_BASE', 'DC=panjit,DC=com,DC=tw')
ldap_user_attr = os.getenv('LDAP_USER_LOGIN_ATTR', 'userPrincipalName')
try:
# Create server object
server = Server(
ldap_server,
port=ldap_port,
use_ssl=False,
get_info=ALL_ATTRIBUTES
)
# First, bind with service account to search for user
conn = Connection(
server,
user=ldap_bind_user,
password=ldap_bind_password,
auto_bind=True,
raise_exceptions=True
)
# Search for user
search_filter = f"(&(objectClass=person)({ldap_user_attr}={username}))"
print(f"Searching with filter: {search_filter}")
conn.search(
ldap_search_base,
search_filter,
SUBTREE,
attributes=['sAMAccountName', 'displayName', 'mail', 'userPrincipalName', 'distinguishedName']
)
if not conn.entries:
print(f"[ERROR] User not found: {username}")
return False
user_entry = conn.entries[0]
user_dn = user_entry.distinguishedName.value
print(f"[OK] User found:")
print(f" DN: {user_dn}")
print(f" sAMAccountName: {user_entry.sAMAccountName}")
print(f" displayName: {user_entry.displayName}")
print(f" mail: {user_entry.mail}")
# Try to bind with user credentials
print(f"\nAttempting to authenticate user...")
user_conn = Connection(
server,
user=user_dn,
password=password,
auto_bind=True,
raise_exceptions=True
)
print("[OK] Authentication successful!")
user_conn.unbind()
conn.unbind()
return True
except Exception as e:
print(f"[ERROR] Authentication failed: {str(e)}")
return False
if __name__ == "__main__":
# Test basic connection
if test_ldap_connection():
# If you want to test user authentication, uncomment and modify:
# test_user_authentication("your_username@panjit.com.tw", "your_password")
pass
else:
print("\n[WARNING] Please check your LDAP configuration in .env file")
sys.exit(1)

51
backend/test_ldap_auth.py Normal file
View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Test LDAP authentication with provided credentials"""
import os
import sys
from dotenv import load_dotenv
from app import create_app
from utils.ldap_utils import authenticate_user
# Load environment variables
load_dotenv()
def test_ldap_auth():
"""Test LDAP authentication"""
print("=" * 60)
print("Testing LDAP Authentication")
print("=" * 60)
print(f"LDAP Server: {os.getenv('LDAP_SERVER')}")
print(f"Search Base: {os.getenv('LDAP_SEARCH_BASE')}")
print(f"Login Attr: {os.getenv('LDAP_USER_LOGIN_ATTR')}")
print("-" * 60)
username = 'ymirliu@panjit.com.tw'
password = '3EDC4rfv5tgb'
print(f"Testing authentication for: {username}")
# Create Flask app and context
app = create_app()
with app.app_context():
try:
result = authenticate_user(username, password)
if result:
print("[SUCCESS] Authentication successful!")
print(f"User info: {result}")
else:
print("[FAILED] Authentication failed")
return result is not None
except Exception as e:
print(f"[ERROR] Exception during authentication: {str(e)}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
test_ldap_auth()