feat: implement kanban real-time sync and fix workload cache
## Kanban Real-time Sync (NEW-002)
- Backend:
- WebSocket endpoint: /ws/projects/{project_id}
- Project room management in ConnectionManager
- Redis Pub/Sub: project:{project_id}:tasks channel
- Task CRUD event publishing (5 event types)
- Redis connection retry with exponential backoff
- Race condition fix in broadcast_to_project
- Frontend:
- ProjectSyncContext for WebSocket management
- Reconnection with exponential backoff (max 5 attempts)
- Multi-tab event deduplication via event_id
- Live/Offline connection indicator
- Optimistic updates with rollback
- Spec:
- collaboration spec: +1 requirement (Project Real-time Sync)
- 7 new scenarios for real-time sync
## Workload Cache Fix (NEW-001)
- Added cache invalidation to all task endpoints:
- create_task, update_task, update_task_status
- delete_task, restore_task, assign_task
- Extended to clear heatmap cache as well
## OpenSpec Archive
- 2026-01-05-add-kanban-realtime-sync
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,7 +1,10 @@
|
||||
"""Redis Pub/Sub service for cross-process notification broadcasting."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import Optional, Callable, Any
|
||||
import redis.asyncio as aioredis
|
||||
|
||||
@@ -9,6 +12,10 @@ from app.core.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Redis retry configuration
|
||||
MAX_REDIS_RETRIES = 3
|
||||
REDIS_RETRY_DELAY = 0.5 # seconds (base delay for exponential backoff)
|
||||
|
||||
# Global async Redis client for pub/sub
|
||||
_pubsub_redis: Optional[aioredis.Redis] = None
|
||||
|
||||
@@ -18,6 +25,11 @@ def get_channel_name(user_id: str) -> str:
|
||||
return f"notifications:{user_id}"
|
||||
|
||||
|
||||
def get_project_channel_name(project_id: str) -> str:
|
||||
"""Get the Redis channel name for project task events."""
|
||||
return f"project:{project_id}:tasks"
|
||||
|
||||
|
||||
async def get_pubsub_redis() -> aioredis.Redis:
|
||||
"""Get or create the async Redis client for pub/sub."""
|
||||
global _pubsub_redis
|
||||
@@ -120,3 +132,187 @@ class NotificationSubscriber:
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
|
||||
|
||||
async def _reset_pubsub_redis() -> None:
|
||||
"""Reset the Redis connection on failure."""
|
||||
global _pubsub_redis
|
||||
if _pubsub_redis is not None:
|
||||
try:
|
||||
await _pubsub_redis.close()
|
||||
except Exception:
|
||||
pass
|
||||
_pubsub_redis = None
|
||||
|
||||
|
||||
async def publish_task_event(
|
||||
project_id: str,
|
||||
event_type: str,
|
||||
task_data: dict,
|
||||
triggered_by: str
|
||||
) -> bool:
|
||||
"""
|
||||
Publish a task event to a project's channel with retry logic.
|
||||
|
||||
Args:
|
||||
project_id: The project ID
|
||||
event_type: Event type (task_created, task_updated, task_status_changed, task_deleted, task_assigned)
|
||||
task_data: The task data to include in the event
|
||||
triggered_by: User ID who triggered this event
|
||||
|
||||
Returns:
|
||||
True if published successfully, False otherwise
|
||||
"""
|
||||
channel = get_project_channel_name(project_id)
|
||||
message = json.dumps({
|
||||
"type": event_type,
|
||||
"event_id": str(uuid.uuid4()), # Unique event ID for multi-tab deduplication
|
||||
"data": task_data,
|
||||
"triggered_by": triggered_by,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
}, default=str)
|
||||
|
||||
for attempt in range(MAX_REDIS_RETRIES):
|
||||
try:
|
||||
redis_client = await get_pubsub_redis()
|
||||
# Test connection with ping before publishing
|
||||
await redis_client.ping()
|
||||
await redis_client.publish(channel, message)
|
||||
logger.debug(f"Published task event '{event_type}' to channel {channel}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"Redis publish attempt {attempt + 1}/{MAX_REDIS_RETRIES} failed: {e}")
|
||||
if attempt < MAX_REDIS_RETRIES - 1:
|
||||
# Exponential backoff
|
||||
await asyncio.sleep(REDIS_RETRY_DELAY * (attempt + 1))
|
||||
# Reset connection on failure
|
||||
await _reset_pubsub_redis()
|
||||
else:
|
||||
logger.error(f"Failed to publish task event '{event_type}' after {MAX_REDIS_RETRIES} attempts")
|
||||
return False
|
||||
return False
|
||||
|
||||
|
||||
class ProjectTaskSubscriber:
|
||||
"""
|
||||
Subscriber for project task events via Redis Pub/Sub.
|
||||
Used by WebSocket connections to receive real-time task updates.
|
||||
Includes automatic reconnection handling.
|
||||
"""
|
||||
|
||||
def __init__(self, project_id: str):
|
||||
self.project_id = project_id
|
||||
self.channel = get_project_channel_name(project_id)
|
||||
self.pubsub: Optional[aioredis.client.PubSub] = None
|
||||
self._running = False
|
||||
self._reconnect_attempts = 0
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start subscribing to the project's task channel with retry logic."""
|
||||
for attempt in range(MAX_REDIS_RETRIES):
|
||||
try:
|
||||
redis_client = await get_pubsub_redis()
|
||||
# Test connection health
|
||||
await redis_client.ping()
|
||||
self.pubsub = redis_client.pubsub()
|
||||
await self.pubsub.subscribe(self.channel)
|
||||
self._running = True
|
||||
self._reconnect_attempts = 0
|
||||
logger.debug(f"Subscribed to project task channel {self.channel}")
|
||||
return
|
||||
except Exception as e:
|
||||
logger.warning(f"Redis subscribe attempt {attempt + 1}/{MAX_REDIS_RETRIES} failed: {e}")
|
||||
if attempt < MAX_REDIS_RETRIES - 1:
|
||||
await asyncio.sleep(REDIS_RETRY_DELAY * (attempt + 1))
|
||||
await _reset_pubsub_redis()
|
||||
else:
|
||||
logger.error(f"Failed to subscribe to channel {self.channel} after {MAX_REDIS_RETRIES} attempts")
|
||||
raise
|
||||
|
||||
async def _reconnect(self) -> bool:
|
||||
"""Attempt to reconnect to Redis and resubscribe."""
|
||||
self._reconnect_attempts += 1
|
||||
if self._reconnect_attempts > MAX_REDIS_RETRIES:
|
||||
logger.error(f"Max reconnection attempts reached for channel {self.channel}")
|
||||
return False
|
||||
|
||||
logger.info(f"Attempting to reconnect to Redis (attempt {self._reconnect_attempts}/{MAX_REDIS_RETRIES})")
|
||||
|
||||
# Clean up old pubsub
|
||||
if self.pubsub:
|
||||
try:
|
||||
await self.pubsub.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.pubsub = None
|
||||
|
||||
# Reset global connection
|
||||
await _reset_pubsub_redis()
|
||||
|
||||
# Wait with exponential backoff
|
||||
await asyncio.sleep(REDIS_RETRY_DELAY * self._reconnect_attempts)
|
||||
|
||||
try:
|
||||
redis_client = await get_pubsub_redis()
|
||||
await redis_client.ping()
|
||||
self.pubsub = redis_client.pubsub()
|
||||
await self.pubsub.subscribe(self.channel)
|
||||
self._reconnect_attempts = 0
|
||||
logger.info(f"Successfully reconnected to channel {self.channel}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"Reconnection attempt failed: {e}")
|
||||
return False
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop subscribing and clean up."""
|
||||
self._running = False
|
||||
if self.pubsub:
|
||||
try:
|
||||
await self.pubsub.unsubscribe(self.channel)
|
||||
await self.pubsub.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error during pubsub cleanup: {e}")
|
||||
self.pubsub = None
|
||||
logger.debug(f"Unsubscribed from project task channel {self.channel}")
|
||||
|
||||
async def listen(self, callback: Callable[[dict], Any]) -> None:
|
||||
"""
|
||||
Listen for task events and call the callback for each event.
|
||||
Includes automatic reconnection on connection failures.
|
||||
|
||||
Args:
|
||||
callback: Async function to call with each task event dict.
|
||||
The dict contains: type, data, triggered_by
|
||||
"""
|
||||
if not self.pubsub:
|
||||
raise RuntimeError("Subscriber not started. Call start() first.")
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
async for message in self.pubsub.listen():
|
||||
if not self._running:
|
||||
break
|
||||
|
||||
if message["type"] == "message":
|
||||
try:
|
||||
data = json.loads(message["data"])
|
||||
await callback(data)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Invalid JSON in task event: {message['data']}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing task event: {e}")
|
||||
except Exception as e:
|
||||
if not self._running:
|
||||
break
|
||||
logger.warning(f"Redis connection error in task listener: {e}")
|
||||
# Attempt to reconnect
|
||||
if await self._reconnect():
|
||||
continue # Resume listening after successful reconnection
|
||||
else:
|
||||
logger.error(f"Failed to recover connection for channel {self.channel}")
|
||||
break
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
|
||||
Reference in New Issue
Block a user