""" Sidecar Process Manager Manages the Python sidecar process for speech-to-text transcription. Provides an interface for the backend to communicate with the sidecar via subprocess stdin/stdout. """ import asyncio import json import os import subprocess import sys import tempfile import base64 from pathlib import Path from typing import Optional, Dict, Any, Callable from threading import Thread, Lock import queue class SidecarManager: """ Manages the Whisper transcription sidecar process. The sidecar is a Python process running transcriber.py that handles speech-to-text conversion using faster-whisper. """ def __init__(self): self.process: Optional[subprocess.Popen] = None self.ready = False self.whisper_info: Optional[Dict] = None self._lock = Lock() self._response_queue = queue.Queue() self._reader_thread: Optional[Thread] = None self._progress_callbacks: list[Callable] = [] self._last_status: Dict[str, Any] = {} self._is_packaged = getattr(sys, 'frozen', False) # Paths - detect packaged vs development mode if self._is_packaged: # Packaged mode: executable at resources/backend/backend/backend.exe # Sidecar at resources/sidecar/transcriber/transcriber.exe exec_dir = Path(sys.executable).parent.parent # up from backend/backend.exe resources_dir = exec_dir.parent # up from backend/ to resources/ self.sidecar_dir = resources_dir / "sidecar" / "transcriber" self.transcriber_path = self.sidecar_dir / ("transcriber.exe" if sys.platform == "win32" else "transcriber") self.venv_python = None # Not used in packaged mode print(f"[Sidecar] Packaged mode: transcriber={self.transcriber_path}") else: # Development mode self.project_dir = Path(__file__).parent.parent.parent self.sidecar_dir = self.project_dir / "sidecar" self.transcriber_path = self.sidecar_dir / "transcriber.py" if sys.platform == "win32": self.venv_python = self.sidecar_dir / "venv" / "Scripts" / "python.exe" else: self.venv_python = self.sidecar_dir / "venv" / "bin" / "python" print(f"[Sidecar] Development mode: transcriber={self.transcriber_path}") def is_available(self) -> bool: """Check if sidecar is available (files exist).""" if self._is_packaged: # In packaged mode, just check the executable available = self.transcriber_path.exists() print(f"[Sidecar] is_available (packaged): {available}, path={self.transcriber_path}") return available else: # Development mode - need both script and venv available = self.transcriber_path.exists() and self.venv_python.exists() print(f"[Sidecar] is_available (dev): {available}, script={self.transcriber_path.exists()}, venv={self.venv_python.exists()}") return available def get_status(self) -> Dict[str, Any]: """Get current sidecar status.""" return { "ready": self.ready, "streaming": self._is_streaming(), "whisper": self.whisper_info, "available": self.is_available(), "browserMode": False, **self._last_status } def _is_streaming(self) -> bool: """Check if currently in streaming mode.""" return self._last_status.get("streaming", False) async def start(self) -> bool: """Start the sidecar process.""" if self.process and self.process.poll() is None: return True # Already running if not self.is_available(): return False try: # Get Whisper configuration from environment env = os.environ.copy() env["WHISPER_MODEL"] = os.getenv("WHISPER_MODEL", "medium") env["WHISPER_DEVICE"] = os.getenv("WHISPER_DEVICE", "cpu") env["WHISPER_COMPUTE"] = os.getenv("WHISPER_COMPUTE", "int8") print(f"[Sidecar] Starting with model={env['WHISPER_MODEL']}, device={env['WHISPER_DEVICE']}, compute={env['WHISPER_COMPUTE']}") # Build command based on mode if self._is_packaged: # Packaged mode: run the executable directly cmd = [str(self.transcriber_path)] cwd = str(self.sidecar_dir) else: # Development mode: use venv python cmd = [str(self.venv_python), str(self.transcriber_path), "--server"] cwd = str(self.sidecar_dir.parent) if self._is_packaged else str(self.sidecar_dir) print(f"[Sidecar] Command: {cmd}, cwd={cwd}") self.process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, cwd=cwd, bufsize=1, # Line buffered text=True ) # Start reader threads self._reader_thread = Thread(target=self._read_stdout, daemon=True) self._reader_thread.start() stderr_thread = Thread(target=self._read_stderr, daemon=True) stderr_thread.start() # Wait for ready signal try: response = await asyncio.wait_for( asyncio.get_event_loop().run_in_executor( None, self._wait_for_ready ), timeout=120.0 # 2 minutes for model download ) if response and response.get("status") == "ready": self.ready = True print("[Sidecar] Ready") return True except asyncio.TimeoutError: print("[Sidecar] Timeout waiting for ready") self.stop() return False except Exception as e: print(f"[Sidecar] Start error: {e}") return False return False def _wait_for_ready(self) -> Optional[Dict]: """Wait for the ready signal from sidecar.""" while True: try: response = self._response_queue.get(timeout=1.0) status = response.get("status", "") # Track progress events if status in ["downloading_model", "model_downloaded", "model_cached", "loading_model", "model_loaded", "model_error"]: self._last_status = response self._notify_progress(response) if status == "model_loaded": # Extract whisper info self.whisper_info = { "model": os.getenv("WHISPER_MODEL", "medium"), "device": os.getenv("WHISPER_DEVICE", "cpu"), "compute": os.getenv("WHISPER_COMPUTE", "int8"), "configSource": "environment" } elif status == "model_error": self.whisper_info = {"error": response.get("error", "Unknown error")} if status == "ready": return response except queue.Empty: if self.process and self.process.poll() is not None: return None # Process died continue def _read_stdout(self): """Read stdout from sidecar process.""" if not self.process or not self.process.stdout: return for line in self.process.stdout: line = line.strip() if not line: continue try: data = json.loads(line) self._response_queue.put(data) except json.JSONDecodeError as e: print(f"[Sidecar] Invalid JSON: {line[:100]}") def _read_stderr(self): """Read stderr from sidecar process.""" if not self.process or not self.process.stderr: return for line in self.process.stderr: line = line.strip() if line: # Try to parse as JSON (some status messages go to stderr) try: data = json.loads(line) if "status" in data or "warning" in data: self._notify_progress(data) except json.JSONDecodeError: print(f"[Sidecar stderr] {line}") def _notify_progress(self, data: Dict): """Notify all progress callbacks.""" for callback in self._progress_callbacks: try: callback(data) except Exception as e: print(f"[Sidecar] Progress callback error: {e}") def add_progress_callback(self, callback: Callable): """Add a callback for progress updates.""" self._progress_callbacks.append(callback) def remove_progress_callback(self, callback: Callable): """Remove a progress callback.""" if callback in self._progress_callbacks: self._progress_callbacks.remove(callback) async def send_command(self, command: Dict) -> Optional[Dict]: """Send a command to the sidecar and wait for response.""" if not self.process or self.process.poll() is not None: return {"error": "Sidecar not running"} with self._lock: try: # Clear queue before sending while not self._response_queue.empty(): try: self._response_queue.get_nowait() except queue.Empty: break # Send command cmd_json = json.dumps(command) + "\n" self.process.stdin.write(cmd_json) self.process.stdin.flush() # Wait for response try: response = await asyncio.wait_for( asyncio.get_event_loop().run_in_executor( None, lambda: self._response_queue.get(timeout=60.0) ), timeout=65.0 ) return response except (asyncio.TimeoutError, queue.Empty): return {"error": "Command timeout"} except Exception as e: return {"error": f"Command error: {e}"} async def transcribe_file(self, audio_path: str) -> Dict: """Transcribe an audio file.""" return await self.send_command({ "action": "transcribe", "file": audio_path }) or {"error": "No response"} async def start_stream(self) -> Dict: """Start a streaming transcription session.""" result = await self.send_command({"action": "start_stream"}) if result and result.get("status") == "streaming": self._last_status["streaming"] = True return result or {"error": "No response"} async def send_audio_chunk(self, base64_audio: str) -> Optional[Dict]: """Send an audio chunk for streaming transcription.""" return await self.send_command({ "action": "audio_chunk", "data": base64_audio }) async def stop_stream(self) -> Dict: """Stop the streaming session.""" result = await self.send_command({"action": "stop_stream"}) self._last_status["streaming"] = False return result or {"error": "No response"} async def segment_audio(self, file_path: str, max_chunk_seconds: int = 300) -> Dict: """Segment an audio file using VAD.""" return await self.send_command({ "action": "segment_audio", "file_path": file_path, "max_chunk_seconds": max_chunk_seconds }) or {"error": "No response"} def stop(self): """Stop the sidecar process.""" self.ready = False self._last_status = {} if self.process: try: # Try graceful shutdown self.process.stdin.write('{"action": "quit"}\n') self.process.stdin.flush() self.process.wait(timeout=5.0) except: pass finally: if self.process.poll() is None: self.process.terminate() try: self.process.wait(timeout=2.0) except: self.process.kill() self.process = None print("[Sidecar] Stopped") # Global instance _sidecar_manager: Optional[SidecarManager] = None def get_sidecar_manager() -> SidecarManager: """Get or create the global sidecar manager instance.""" global _sidecar_manager if _sidecar_manager is None: _sidecar_manager = SidecarManager() return _sidecar_manager