""" Sidecar Process Manager Manages the Python sidecar process for speech-to-text transcription. Provides an interface for the backend to communicate with the sidecar via subprocess stdin/stdout. """ import asyncio import json import os import subprocess import sys import tempfile import base64 from pathlib import Path from typing import Optional, Dict, Any, Callable from threading import Thread, Lock import queue class SidecarManager: """ Manages the Whisper transcription sidecar process. The sidecar is a Python process running transcriber.py that handles speech-to-text conversion using faster-whisper. """ def __init__(self): self.process: Optional[subprocess.Popen] = None self.ready = False self.whisper_info: Optional[Dict] = None self._lock = Lock() self._response_queue = queue.Queue() self._reader_thread: Optional[Thread] = None self._progress_callbacks: list[Callable] = [] self._last_status: Dict[str, Any] = {} # Paths self.project_dir = Path(__file__).parent.parent.parent self.sidecar_dir = self.project_dir / "sidecar" self.transcriber_path = self.sidecar_dir / "transcriber.py" self.venv_python = self.sidecar_dir / "venv" / "bin" / "python" def is_available(self) -> bool: """Check if sidecar is available (files exist).""" return self.transcriber_path.exists() and self.venv_python.exists() def get_status(self) -> Dict[str, Any]: """Get current sidecar status.""" return { "ready": self.ready, "streaming": self._is_streaming(), "whisper": self.whisper_info, "available": self.is_available(), "browserMode": False, **self._last_status } def _is_streaming(self) -> bool: """Check if currently in streaming mode.""" return self._last_status.get("streaming", False) async def start(self) -> bool: """Start the sidecar process.""" if self.process and self.process.poll() is None: return True # Already running if not self.is_available(): print(f"[Sidecar] Not available: transcriber={self.transcriber_path.exists()}, venv={self.venv_python.exists()}") return False try: # Get Whisper configuration from environment env = os.environ.copy() env["WHISPER_MODEL"] = os.getenv("WHISPER_MODEL", "medium") env["WHISPER_DEVICE"] = os.getenv("WHISPER_DEVICE", "cpu") env["WHISPER_COMPUTE"] = os.getenv("WHISPER_COMPUTE", "int8") print(f"[Sidecar] Starting with model={env['WHISPER_MODEL']}, device={env['WHISPER_DEVICE']}, compute={env['WHISPER_COMPUTE']}") self.process = subprocess.Popen( [str(self.venv_python), str(self.transcriber_path), "--server"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, cwd=str(self.sidecar_dir), bufsize=1, # Line buffered text=True ) # Start reader threads self._reader_thread = Thread(target=self._read_stdout, daemon=True) self._reader_thread.start() stderr_thread = Thread(target=self._read_stderr, daemon=True) stderr_thread.start() # Wait for ready signal try: response = await asyncio.wait_for( asyncio.get_event_loop().run_in_executor( None, self._wait_for_ready ), timeout=120.0 # 2 minutes for model download ) if response and response.get("status") == "ready": self.ready = True print("[Sidecar] Ready") return True except asyncio.TimeoutError: print("[Sidecar] Timeout waiting for ready") self.stop() return False except Exception as e: print(f"[Sidecar] Start error: {e}") return False return False def _wait_for_ready(self) -> Optional[Dict]: """Wait for the ready signal from sidecar.""" while True: try: response = self._response_queue.get(timeout=1.0) status = response.get("status", "") # Track progress events if status in ["downloading_model", "model_downloaded", "model_cached", "loading_model", "model_loaded", "model_error"]: self._last_status = response self._notify_progress(response) if status == "model_loaded": # Extract whisper info self.whisper_info = { "model": os.getenv("WHISPER_MODEL", "medium"), "device": os.getenv("WHISPER_DEVICE", "cpu"), "compute": os.getenv("WHISPER_COMPUTE", "int8"), "configSource": "environment" } elif status == "model_error": self.whisper_info = {"error": response.get("error", "Unknown error")} if status == "ready": return response except queue.Empty: if self.process and self.process.poll() is not None: return None # Process died continue def _read_stdout(self): """Read stdout from sidecar process.""" if not self.process or not self.process.stdout: return for line in self.process.stdout: line = line.strip() if not line: continue try: data = json.loads(line) self._response_queue.put(data) except json.JSONDecodeError as e: print(f"[Sidecar] Invalid JSON: {line[:100]}") def _read_stderr(self): """Read stderr from sidecar process.""" if not self.process or not self.process.stderr: return for line in self.process.stderr: line = line.strip() if line: # Try to parse as JSON (some status messages go to stderr) try: data = json.loads(line) if "status" in data or "warning" in data: self._notify_progress(data) except json.JSONDecodeError: print(f"[Sidecar stderr] {line}") def _notify_progress(self, data: Dict): """Notify all progress callbacks.""" for callback in self._progress_callbacks: try: callback(data) except Exception as e: print(f"[Sidecar] Progress callback error: {e}") def add_progress_callback(self, callback: Callable): """Add a callback for progress updates.""" self._progress_callbacks.append(callback) def remove_progress_callback(self, callback: Callable): """Remove a progress callback.""" if callback in self._progress_callbacks: self._progress_callbacks.remove(callback) async def send_command(self, command: Dict) -> Optional[Dict]: """Send a command to the sidecar and wait for response.""" if not self.process or self.process.poll() is not None: return {"error": "Sidecar not running"} with self._lock: try: # Clear queue before sending while not self._response_queue.empty(): try: self._response_queue.get_nowait() except queue.Empty: break # Send command cmd_json = json.dumps(command) + "\n" self.process.stdin.write(cmd_json) self.process.stdin.flush() # Wait for response try: response = await asyncio.wait_for( asyncio.get_event_loop().run_in_executor( None, lambda: self._response_queue.get(timeout=60.0) ), timeout=65.0 ) return response except (asyncio.TimeoutError, queue.Empty): return {"error": "Command timeout"} except Exception as e: return {"error": f"Command error: {e}"} async def transcribe_file(self, audio_path: str) -> Dict: """Transcribe an audio file.""" return await self.send_command({ "action": "transcribe", "file": audio_path }) or {"error": "No response"} async def start_stream(self) -> Dict: """Start a streaming transcription session.""" result = await self.send_command({"action": "start_stream"}) if result and result.get("status") == "streaming": self._last_status["streaming"] = True return result or {"error": "No response"} async def send_audio_chunk(self, base64_audio: str) -> Optional[Dict]: """Send an audio chunk for streaming transcription.""" return await self.send_command({ "action": "audio_chunk", "data": base64_audio }) async def stop_stream(self) -> Dict: """Stop the streaming session.""" result = await self.send_command({"action": "stop_stream"}) self._last_status["streaming"] = False return result or {"error": "No response"} async def segment_audio(self, file_path: str, max_chunk_seconds: int = 300) -> Dict: """Segment an audio file using VAD.""" return await self.send_command({ "action": "segment_audio", "file_path": file_path, "max_chunk_seconds": max_chunk_seconds }) or {"error": "No response"} def stop(self): """Stop the sidecar process.""" self.ready = False self._last_status = {} if self.process: try: # Try graceful shutdown self.process.stdin.write('{"action": "quit"}\n') self.process.stdin.flush() self.process.wait(timeout=5.0) except: pass finally: if self.process.poll() is None: self.process.terminate() try: self.process.wait(timeout=2.0) except: self.process.kill() self.process = None print("[Sidecar] Stopped") # Global instance _sidecar_manager: Optional[SidecarManager] = None def get_sidecar_manager() -> SidecarManager: """Get or create the global sidecar manager instance.""" global _sidecar_manager if _sidecar_manager is None: _sidecar_manager = SidecarManager() return _sidecar_manager