feat: Initial commit - Task Reporter incident response system

Complete implementation of the production line incident response system (生產線異常即時反應系統) including:

Backend (FastAPI):
- User authentication with AD integration and session management
- Chat room management (create, list, update, members, roles)
- Real-time messaging via WebSocket (typing indicators, reactions)
- File storage with MinIO (upload, download, image preview)

Frontend (React + Vite):
- Authentication flow with token management
- Room list with filtering, search, and pagination
- Real-time chat interface with WebSocket
- File upload with drag-and-drop and image preview
- Member management and room settings
- Breadcrumb navigation
- 53 unit tests (Vitest)

Specifications:
- authentication: AD auth, sessions, JWT tokens
- chat-room: rooms, members, templates
- realtime-messaging: WebSocket, messages, reactions
- file-storage: MinIO integration, file management
- frontend-core: React SPA structure

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
egg
2025-12-01 17:42:52 +08:00
commit c8966477b9
135 changed files with 23269 additions and 0 deletions

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Tests for Task Reporter application"""

50
tests/conftest.py Normal file
View File

@@ -0,0 +1,50 @@
"""Pytest configuration and fixtures"""
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.core.database import Base, get_db
# Test database (in-memory SQLite)
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@pytest.fixture(scope="function")
def test_db():
"""Create test database tables"""
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function")
def db_session(test_db):
"""Get test database session"""
session = TestingSessionLocal()
try:
yield session
finally:
session.close()
@pytest.fixture(scope="function")
def client(test_db):
"""Get test client with overridden database"""
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
yield TestClient(app)
app.dependency_overrides.clear()

75
tests/test_auth.py Executable file
View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""Test authentication flow
測試:
1. 登入成功
2. 使用 token 訪問受保護的端點
3. 登出
"""
import httpx
import asyncio
async def test_auth_flow():
client = httpx.AsyncClient(base_url="http://localhost:8000")
print("=" * 60)
print("認證流程測試")
print("=" * 60)
# Step 1: Login
print("\n1. 測試登入...")
login_response = await client.post(
"/api/auth/login",
json={"username": "ymirliu@panjit.com.tw", "password": "4RFV5tgb6yhn"},
)
print(f" 狀態碼: {login_response.status_code}")
print(f" 回應: {login_response.json()}")
if login_response.status_code != 200:
print(" ✗ 登入失敗!")
return
login_data = login_response.json()
token = login_data["token"]
display_name = login_data["display_name"]
print(f" ✓ 登入成功!")
print(f" 使用者: {display_name}")
print(f" Token: {token}")
# Step 2: Test health endpoint (no auth needed)
print("\n2. 測試健康檢查端點 (無需認證)...")
health_response = await client.get("/health")
print(f" 狀態碼: {health_response.status_code}")
print(f" 回應: {health_response.json()}")
# Step 3: Logout
print("\n3. 測試登出...")
logout_response = await client.post(
"/api/auth/logout", headers={"Authorization": f"Bearer {token}"}
)
print(f" 狀態碼: {logout_response.status_code}")
print(f" 回應: {logout_response.json()}")
if logout_response.status_code == 200:
print(" ✓ 登出成功!")
# Step 4: Try to use token after logout (should fail)
print("\n4. 測試登出後使用 token...")
reuse_response = await client.post(
"/api/auth/logout", headers={"Authorization": f"Bearer {token}"}
)
print(f" 狀態碼: {reuse_response.status_code}")
print(f" 回應: {reuse_response.json()}")
print(" ✓ Token 已失效(符合預期)")
print("\n" + "=" * 60)
print("測試完成!認證系統運作正常")
print("=" * 60)
await client.aclose()
if __name__ == "__main__":
asyncio.run(test_auth_flow())

View File

@@ -0,0 +1,210 @@
"""Unit and integration tests for authentication module
測試範圍:
- EncryptionService (加密/解密)
- ADAuthService (AD API 整合)
- SessionService (資料庫操作)
- Login/Logout endpoints (整合測試)
"""
import pytest
from app.modules.auth.services.encryption import encryption_service
from app.modules.auth.services.session_service import session_service
from datetime import datetime, timedelta
class TestEncryptionService:
"""測試 EncryptionService"""
def test_encrypt_decrypt_roundtrip(self):
"""測試加密/解密往返"""
plaintext = "4RFV5tgb6yhn"
encrypted = encryption_service.encrypt_password(plaintext)
decrypted = encryption_service.decrypt_password(encrypted)
assert decrypted == plaintext
assert encrypted != plaintext # 密文不應等於明文
def test_encrypted_output_differs(self):
"""測試加密輸出與明文不同"""
plaintext = "test_password_123"
encrypted = encryption_service.encrypt_password(plaintext)
assert encrypted != plaintext
assert len(encrypted) > len(plaintext) # 密文通常更長
class TestSessionService:
"""測試 SessionService"""
def test_create_session(self, db_session):
"""測試建立 session"""
expires_at = datetime.utcnow() + timedelta(hours=1)
session = session_service.create_session(
db=db_session,
username="test@example.com",
display_name="Test User",
ad_token="test_ad_token",
encrypted_password="encrypted_pwd",
ad_token_expires_at=expires_at,
)
assert session.id is not None
assert session.username == "test@example.com"
assert session.display_name == "Test User"
assert session.internal_token is not None # UUID4 generated
assert session.refresh_attempt_count == 0
def test_get_session_by_token(self, db_session):
"""測試根據 token 查詢 session"""
expires_at = datetime.utcnow() + timedelta(hours=1)
# Create session
created_session = session_service.create_session(
db=db_session,
username="test@example.com",
display_name="Test User",
ad_token="test_ad_token",
encrypted_password="encrypted_pwd",
ad_token_expires_at=expires_at,
)
# Retrieve by token
retrieved_session = session_service.get_session_by_token(
db=db_session, internal_token=created_session.internal_token
)
assert retrieved_session is not None
assert retrieved_session.id == created_session.id
assert retrieved_session.username == created_session.username
def test_update_activity(self, db_session):
"""測試更新活動時間"""
expires_at = datetime.utcnow() + timedelta(hours=1)
session = session_service.create_session(
db=db_session,
username="test@example.com",
display_name="Test User",
ad_token="test_ad_token",
encrypted_password="encrypted_pwd",
ad_token_expires_at=expires_at,
)
original_activity = session.last_activity
# Wait a moment and update
import time
time.sleep(0.1)
session_service.update_activity(db=db_session, session_id=session.id)
# Retrieve updated session
updated_session = session_service.get_session_by_token(
db=db_session, internal_token=session.internal_token
)
assert updated_session.last_activity > original_activity
def test_increment_refresh_attempts(self, db_session):
"""測試增加重試計數器"""
expires_at = datetime.utcnow() + timedelta(hours=1)
session = session_service.create_session(
db=db_session,
username="test@example.com",
display_name="Test User",
ad_token="test_ad_token",
encrypted_password="encrypted_pwd",
ad_token_expires_at=expires_at,
)
assert session.refresh_attempt_count == 0
# Increment
new_count = session_service.increment_refresh_attempts(db=db_session, session_id=session.id)
assert new_count == 1
new_count = session_service.increment_refresh_attempts(db=db_session, session_id=session.id)
assert new_count == 2
def test_delete_session(self, db_session):
"""測試刪除 session"""
expires_at = datetime.utcnow() + timedelta(hours=1)
session = session_service.create_session(
db=db_session,
username="test@example.com",
display_name="Test User",
ad_token="test_ad_token",
encrypted_password="encrypted_pwd",
ad_token_expires_at=expires_at,
)
session_id = session.id
internal_token = session.internal_token
# Delete
session_service.delete_session(db=db_session, session_id=session_id)
# Verify deleted
retrieved = session_service.get_session_by_token(db=db_session, internal_token=internal_token)
assert retrieved is None
class TestAuthenticationEndpoints:
"""測試認證 API 端點 (整合測試)
注意:這些測試會實際呼叫 AD API需要網路連線
如果要在 CI/CD 中執行,應該 mock AD API
"""
@pytest.mark.skip(reason="Requires actual AD API credentials")
def test_login_success(self, client):
"""測試成功登入"""
response = client.post(
"/api/auth/login",
json={"username": "ymirliu@panjit.com.tw", "password": "4RFV5tgb6yhn"},
)
assert response.status_code == 200
data = response.json()
assert "token" in data
assert "display_name" in data
assert data["display_name"] == "ymirliu 劉念萱"
def test_login_invalid_credentials(self, client):
"""測試錯誤憑證登入"""
response = client.post(
"/api/auth/login",
json={"username": "wrong@example.com", "password": "wrongpassword"},
)
assert response.status_code == 401
data = response.json()
assert "detail" in data
def test_logout_without_token(self, client):
"""測試無 token 登出"""
response = client.post("/api/auth/logout")
assert response.status_code == 401
@pytest.mark.skip(reason="Requires actual login first")
def test_logout_with_valid_token(self, client):
"""測試有效 token 登出"""
# First login
login_response = client.post(
"/api/auth/login",
json={"username": "ymirliu@panjit.com.tw", "password": "4RFV5tgb6yhn"},
)
token = login_response.json()["token"]
# Then logout
logout_response = client.post(
"/api/auth/logout", headers={"Authorization": f"Bearer {token}"}
)
assert logout_response.status_code == 200
data = logout_response.json()
assert data["message"] == "Logout successful"

570
tests/test_file_storage.py Normal file
View File

@@ -0,0 +1,570 @@
"""Test suite for file storage module
Tests cover:
- Section 7.1: Unit tests for file validators
- Section 7.2: Integration tests for file upload flow
- Section 7.3: Integration tests for file download flow
- Section 7.4: Comprehensive test suite
"""
import pytest
import io
from unittest.mock import Mock, patch, MagicMock
from fastapi import UploadFile, HTTPException
from datetime import datetime
import uuid
from app.modules.file_storage.validators import (
detect_mime_type,
validate_file_type,
validate_file_size,
get_file_type_and_limits,
validate_upload_file,
IMAGE_TYPES,
DOCUMENT_TYPES,
LOG_TYPES,
IMAGE_MAX_SIZE,
DOCUMENT_MAX_SIZE,
LOG_MAX_SIZE
)
from app.modules.file_storage.models import RoomFile
from app.modules.file_storage.schemas import (
FileUploadResponse,
FileMetadata,
FileListResponse,
FileType
)
# ============================================================================
# Section 7.1: Unit Tests for File Validators
# ============================================================================
class TestMimeTypeDetection:
"""Tests for MIME type detection"""
def test_detect_jpeg_image(self):
"""Test detecting JPEG image from file header"""
# JPEG magic bytes
jpeg_header = b'\xff\xd8\xff\xe0\x00\x10JFIF'
with patch('magic.Magic') as MockMagic:
mock_magic = MockMagic.return_value
mock_magic.from_buffer.return_value = 'image/jpeg'
result = detect_mime_type(jpeg_header)
assert result == 'image/jpeg'
def test_detect_png_image(self):
"""Test detecting PNG image from file header"""
# PNG magic bytes
png_header = b'\x89PNG\r\n\x1a\n'
with patch('magic.Magic') as MockMagic:
mock_magic = MockMagic.return_value
mock_magic.from_buffer.return_value = 'image/png'
result = detect_mime_type(png_header)
assert result == 'image/png'
def test_detect_pdf_document(self):
"""Test detecting PDF document from file header"""
# PDF magic bytes
pdf_header = b'%PDF-1.4'
with patch('magic.Magic') as MockMagic:
mock_magic = MockMagic.return_value
mock_magic.from_buffer.return_value = 'application/pdf'
result = detect_mime_type(pdf_header)
assert result == 'application/pdf'
def test_detect_text_file(self):
"""Test detecting plain text file"""
text_content = b'Hello, this is a plain text file.'
with patch('magic.Magic') as MockMagic:
mock_magic = MockMagic.return_value
mock_magic.from_buffer.return_value = 'text/plain'
result = detect_mime_type(text_content)
assert result == 'text/plain'
def test_detect_unknown_fallback(self):
"""Test fallback to application/octet-stream on error"""
with patch('magic.Magic') as MockMagic:
MockMagic.side_effect = Exception("Magic library error")
result = detect_mime_type(b'some data')
assert result == 'application/octet-stream'
class TestFileTypeValidation:
"""Tests for file type validation"""
def create_mock_upload_file(self, content: bytes, filename: str = "test.txt"):
"""Helper to create a mock UploadFile"""
file = Mock(spec=UploadFile)
file.filename = filename
file.file = io.BytesIO(content)
return file
def test_validate_allowed_image_type(self):
"""Test validation passes for allowed image types"""
file = self.create_mock_upload_file(b'\xff\xd8\xff', "test.jpg")
with patch('app.modules.file_storage.validators.detect_mime_type') as mock_detect:
mock_detect.return_value = 'image/jpeg'
result = validate_file_type(file, IMAGE_TYPES)
assert result == 'image/jpeg'
def test_validate_disallowed_type_raises_exception(self):
"""Test validation fails for disallowed file types"""
file = self.create_mock_upload_file(b'MZ', "virus.exe")
with patch('app.modules.file_storage.validators.detect_mime_type') as mock_detect:
mock_detect.return_value = 'application/x-executable'
with pytest.raises(HTTPException) as exc_info:
validate_file_type(file, IMAGE_TYPES)
assert exc_info.value.status_code == 400
assert "File type not allowed" in exc_info.value.detail
def test_validate_pdf_in_document_types(self):
"""Test validation passes for PDF in document types"""
file = self.create_mock_upload_file(b'%PDF-1.4', "document.pdf")
with patch('app.modules.file_storage.validators.detect_mime_type') as mock_detect:
mock_detect.return_value = 'application/pdf'
result = validate_file_type(file, DOCUMENT_TYPES)
assert result == 'application/pdf'
class TestFileSizeValidation:
"""Tests for file size validation"""
def create_mock_upload_file(self, size: int, filename: str = "test.txt"):
"""Helper to create a mock UploadFile with specific size"""
content = b'x' * size
file = Mock(spec=UploadFile)
file.filename = filename
file.file = io.BytesIO(content)
return file
def test_validate_file_within_limit(self):
"""Test validation passes for file within size limit"""
file = self.create_mock_upload_file(1024) # 1KB
result = validate_file_size(file, IMAGE_MAX_SIZE)
assert result == 1024
def test_validate_file_exceeds_limit_raises_exception(self):
"""Test validation fails for file exceeding size limit"""
file = self.create_mock_upload_file(IMAGE_MAX_SIZE + 1) # Just over limit
with pytest.raises(HTTPException) as exc_info:
validate_file_size(file, IMAGE_MAX_SIZE)
assert exc_info.value.status_code == 413
assert "File size exceeds limit" in exc_info.value.detail
def test_validate_exact_limit_passes(self):
"""Test validation passes for file at exact size limit"""
file = self.create_mock_upload_file(IMAGE_MAX_SIZE)
result = validate_file_size(file, IMAGE_MAX_SIZE)
assert result == IMAGE_MAX_SIZE
class TestFileTypeCategorization:
"""Tests for file type categorization"""
def test_image_type_returns_image_category(self):
"""Test image MIME types return 'image' category"""
for mime_type in IMAGE_TYPES:
file_type, max_size = get_file_type_and_limits(mime_type)
assert file_type == "image"
assert max_size == IMAGE_MAX_SIZE
def test_document_type_returns_document_category(self):
"""Test document MIME types return 'document' category"""
for mime_type in DOCUMENT_TYPES:
file_type, max_size = get_file_type_and_limits(mime_type)
assert file_type == "document"
assert max_size == DOCUMENT_MAX_SIZE
def test_log_type_returns_log_category(self):
"""Test log MIME types return 'log' category"""
for mime_type in LOG_TYPES:
file_type, max_size = get_file_type_and_limits(mime_type)
assert file_type == "log"
assert max_size == LOG_MAX_SIZE
def test_unknown_type_raises_exception(self):
"""Test unknown MIME type raises exception"""
with pytest.raises(HTTPException) as exc_info:
get_file_type_and_limits("application/x-executable")
assert exc_info.value.status_code == 400
assert "Unsupported file type" in exc_info.value.detail
class TestCompleteFileValidation:
"""Tests for complete file validation flow"""
def create_mock_upload_file(self, content: bytes, filename: str):
"""Helper to create a mock UploadFile"""
file = Mock(spec=UploadFile)
file.filename = filename
file.file = io.BytesIO(content)
return file
def test_validate_upload_image_file(self):
"""Test complete validation for image file"""
content = b'\xff\xd8\xff' + b'x' * 1000
file = self.create_mock_upload_file(content, "photo.jpg")
with patch('app.modules.file_storage.validators.detect_mime_type') as mock_detect:
mock_detect.return_value = 'image/jpeg'
file_type, mime_type, file_size = validate_upload_file(file)
assert file_type == "image"
assert mime_type == "image/jpeg"
assert file_size == len(content)
def test_validate_upload_pdf_file(self):
"""Test complete validation for PDF file"""
content = b'%PDF-1.4' + b'x' * 2000
file = self.create_mock_upload_file(content, "document.pdf")
with patch('app.modules.file_storage.validators.detect_mime_type') as mock_detect:
mock_detect.return_value = 'application/pdf'
file_type, mime_type, file_size = validate_upload_file(file)
assert file_type == "document"
assert mime_type == "application/pdf"
assert file_size == len(content)
def test_validate_upload_log_file(self):
"""Test complete validation for log file"""
content = b'2024-01-01 INFO: Log message\n' * 100
file = self.create_mock_upload_file(content, "app.log")
with patch('app.modules.file_storage.validators.detect_mime_type') as mock_detect:
mock_detect.return_value = 'text/plain'
file_type, mime_type, file_size = validate_upload_file(file)
assert file_type == "log"
assert mime_type == "text/plain"
assert file_size == len(content)
# ============================================================================
# Section 7.2: Integration Tests for File Upload Flow (Mocked MinIO)
# ============================================================================
class TestFileUploadFlow:
"""Integration tests for file upload flow"""
@pytest.fixture
def mock_minio_service(self):
"""Mock MinIO service"""
with patch('app.modules.file_storage.services.file_service.minio_service') as mock:
mock.upload_file.return_value = True
mock.generate_presigned_url.return_value = "https://minio.example.com/bucket/file.jpg?signed=123"
mock.delete_file.return_value = True
yield mock
def test_file_upload_response_schema(self):
"""Test FileUploadResponse schema validation"""
response = FileUploadResponse(
file_id=str(uuid.uuid4()),
filename="test.jpg",
file_type=FileType.IMAGE,
file_size=1024,
mime_type="image/jpeg",
download_url="https://example.com/file.jpg",
uploaded_at=datetime.utcnow(),
uploader_id="user@example.com"
)
assert response.file_id is not None
assert response.file_type == FileType.IMAGE
assert response.file_size == 1024
# ============================================================================
# Section 7.3: Integration Tests for File Download Flow
# ============================================================================
class TestFileDownloadFlow:
"""Integration tests for file download and listing"""
def test_file_metadata_schema(self):
"""Test FileMetadata schema validation"""
metadata = FileMetadata(
file_id=str(uuid.uuid4()),
room_id=str(uuid.uuid4()),
filename="document.pdf",
file_type=FileType.DOCUMENT,
mime_type="application/pdf",
file_size=2048,
minio_bucket="task-reporter-files",
minio_object_path="room-123/documents/abc.pdf",
uploaded_at=datetime.utcnow(),
uploader_id="user@example.com",
download_url="https://example.com/file.pdf"
)
assert metadata.file_type == FileType.DOCUMENT
assert metadata.download_url is not None
def test_file_list_response_schema(self):
"""Test FileListResponse schema validation"""
files = [
FileMetadata(
file_id=str(uuid.uuid4()),
room_id=str(uuid.uuid4()),
filename=f"file{i}.jpg",
file_type=FileType.IMAGE,
mime_type="image/jpeg",
file_size=1024 * (i + 1),
minio_bucket="task-reporter-files",
minio_object_path=f"room-123/images/file{i}.jpg",
uploaded_at=datetime.utcnow(),
uploader_id="user@example.com"
)
for i in range(3)
]
response = FileListResponse(
files=files,
total=10,
limit=3,
offset=0,
has_more=True
)
assert len(response.files) == 3
assert response.has_more is True
assert response.total == 10
# ============================================================================
# Section 7.4: Model Tests
# ============================================================================
class TestRoomFileModel:
"""Tests for RoomFile SQLAlchemy model"""
def test_room_file_creation(self, db_session):
"""Test creating a RoomFile record"""
from app.modules.chat_room.models import IncidentRoom, RoomMember, MemberRole
from app.modules.chat_room.schemas import IncidentType, SeverityLevel
# Create a room first (required for foreign key)
room = IncidentRoom(
room_id=str(uuid.uuid4()),
title="Test Room",
location="Line 1",
incident_type=IncidentType.EQUIPMENT_FAILURE,
severity=SeverityLevel.HIGH,
created_by="test@example.com"
)
db_session.add(room)
db_session.commit()
# Create member
member = RoomMember(
room_id=room.room_id,
user_id="test@example.com",
role=MemberRole.OWNER,
added_by="system"
)
db_session.add(member)
db_session.commit()
# Create file record
file = RoomFile(
file_id=str(uuid.uuid4()),
room_id=room.room_id,
uploader_id="test@example.com",
filename="test.jpg",
file_type="image",
mime_type="image/jpeg",
file_size=1024,
minio_bucket="task-reporter-files",
minio_object_path=f"room-{room.room_id}/images/test.jpg",
uploaded_at=datetime.utcnow()
)
db_session.add(file)
db_session.commit()
# Verify
retrieved = db_session.query(RoomFile).filter(RoomFile.file_id == file.file_id).first()
assert retrieved is not None
assert retrieved.filename == "test.jpg"
assert retrieved.file_type == "image"
assert retrieved.deleted_at is None
def test_room_file_soft_delete(self, db_session):
"""Test soft deleting a RoomFile record"""
from app.modules.chat_room.models import IncidentRoom, RoomMember, MemberRole
from app.modules.chat_room.schemas import IncidentType, SeverityLevel
# Create room and file
room = IncidentRoom(
room_id=str(uuid.uuid4()),
title="Test Room",
location="Line 1",
incident_type=IncidentType.EQUIPMENT_FAILURE,
severity=SeverityLevel.HIGH,
created_by="test@example.com"
)
db_session.add(room)
db_session.commit()
file = RoomFile(
file_id=str(uuid.uuid4()),
room_id=room.room_id,
uploader_id="test@example.com",
filename="test.pdf",
file_type="document",
mime_type="application/pdf",
file_size=2048,
minio_bucket="task-reporter-files",
minio_object_path=f"room-{room.room_id}/documents/test.pdf",
uploaded_at=datetime.utcnow()
)
db_session.add(file)
db_session.commit()
# Soft delete
file.deleted_at = datetime.utcnow()
db_session.commit()
# Verify soft delete
retrieved = db_session.query(RoomFile).filter(RoomFile.file_id == file.file_id).first()
assert retrieved.deleted_at is not None
# ============================================================================
# WebSocket Schemas Tests
# ============================================================================
class TestWebSocketSchemas:
"""Tests for WebSocket broadcast schemas"""
def test_file_uploaded_broadcast_schema(self):
"""Test FileUploadedBroadcast schema"""
from app.modules.realtime.schemas import FileUploadedBroadcast
broadcast = FileUploadedBroadcast(
file_id=str(uuid.uuid4()),
room_id=str(uuid.uuid4()),
uploader_id="user@example.com",
filename="photo.jpg",
file_type="image",
file_size=1024,
mime_type="image/jpeg",
download_url="https://example.com/file.jpg",
uploaded_at=datetime.utcnow()
)
data = broadcast.to_dict()
assert data["type"] == "file_uploaded"
assert data["filename"] == "photo.jpg"
assert "uploaded_at" in data
def test_file_deleted_broadcast_schema(self):
"""Test FileDeletedBroadcast schema"""
from app.modules.realtime.schemas import FileDeletedBroadcast
broadcast = FileDeletedBroadcast(
file_id=str(uuid.uuid4()),
room_id=str(uuid.uuid4()),
deleted_by="admin@example.com",
deleted_at=datetime.utcnow()
)
data = broadcast.to_dict()
assert data["type"] == "file_deleted"
assert data["deleted_by"] == "admin@example.com"
def test_file_upload_ack_schema(self):
"""Test FileUploadAck schema"""
from app.modules.realtime.schemas import FileUploadAck
ack = FileUploadAck(
file_id=str(uuid.uuid4()),
status="success",
download_url="https://example.com/file.jpg"
)
data = ack.to_dict()
assert data["type"] == "file_upload_ack"
assert data["status"] == "success"
assert data["download_url"] is not None
# ============================================================================
# File Reference Message Tests
# ============================================================================
class TestFileReferenceMessage:
"""Tests for file reference message helper"""
def test_create_image_reference_message(self, db_session):
"""Test creating an image reference message"""
from app.modules.chat_room.models import IncidentRoom, RoomMember, MemberRole
from app.modules.chat_room.schemas import IncidentType, SeverityLevel
from app.modules.file_storage.services.file_service import FileService
from app.modules.realtime.models import MessageType
# Create room
room = IncidentRoom(
room_id=str(uuid.uuid4()),
title="Test Room",
location="Line 1",
incident_type=IncidentType.EQUIPMENT_FAILURE,
severity=SeverityLevel.HIGH,
created_by="test@example.com"
)
db_session.add(room)
db_session.commit()
# Create file reference message
file_id = str(uuid.uuid4())
message = FileService.create_file_reference_message(
db=db_session,
room_id=room.room_id,
sender_id="test@example.com",
file_id=file_id,
filename="photo.jpg",
file_type="image",
file_url="https://example.com/photo.jpg",
description="Equipment damage photo"
)
assert message is not None
assert message.message_type == MessageType.IMAGE_REF
assert message.content == "Equipment damage photo"
assert message.message_metadata["file_id"] == file_id
def test_create_file_reference_message(self, db_session):
"""Test creating a document reference message"""
from app.modules.chat_room.models import IncidentRoom
from app.modules.chat_room.schemas import IncidentType, SeverityLevel
from app.modules.file_storage.services.file_service import FileService
from app.modules.realtime.models import MessageType
# Create room
room = IncidentRoom(
room_id=str(uuid.uuid4()),
title="Test Room",
location="Line 1",
incident_type=IncidentType.EQUIPMENT_FAILURE,
severity=SeverityLevel.HIGH,
created_by="test@example.com"
)
db_session.add(room)
db_session.commit()
# Create file reference message without description
file_id = str(uuid.uuid4())
message = FileService.create_file_reference_message(
db=db_session,
room_id=room.room_id,
sender_id="test@example.com",
file_id=file_id,
filename="report.pdf",
file_type="document",
file_url="https://example.com/report.pdf"
)
assert message is not None
assert message.message_type == MessageType.FILE_REF
assert message.content == "[File] report.pdf"
assert message.message_metadata["filename"] == "report.pdf"