Files
Meeting_Assistant/sidecar/transcriber.py
egg d75789f23e fix: Improve Whisper model status verification and PyInstaller builds
- Add robust model cache verification (check model.bin + config.json)
- Add new status messages: model_cached, incomplete_cache, model_error
- Forward model status events to frontend for better UI feedback
- Add clean_build_cache() to remove stale spec files before build
- Add --clean flag to PyInstaller commands
- Change sidecar from --onefile to --onedir for faster startup
- Add missing hidden imports: onnxruntime, wave, huggingface_hub.utils

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-17 20:33:59 +08:00

891 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Meeting Assistant Transcription Sidecar
Provides speech-to-text transcription using faster-whisper
with automatic Traditional Chinese conversion via OpenCC.
Modes:
1. File mode: transcriber.py <audio_file>
2. Server mode: transcriber.py (default, listens on stdin for JSON commands)
3. Streaming mode: Continuous audio processing with VAD segmentation
Uses ONNX Runtime for VAD (lightweight, ~20MB vs PyTorch ~2GB)
"""
import sys
import os
import json
import tempfile
import base64
import uuid
import re
import wave
import urllib.request
from pathlib import Path
from typing import Optional, List, Tuple
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
try:
from faster_whisper import WhisperModel
import opencc
import numpy as np
from huggingface_hub import snapshot_download, hf_hub_download
from huggingface_hub.utils import tqdm as hf_tqdm
except ImportError as e:
print(json.dumps({"error": f"Missing dependency: {e}"}), file=sys.stderr)
sys.exit(1)
# Try to import ONNX Runtime for VAD
try:
import onnxruntime as ort
ONNX_AVAILABLE = True
except ImportError:
ONNX_AVAILABLE = False
def check_and_download_whisper_model(model_size: str) -> bool:
"""
Check if Whisper model is cached, download with progress if not.
Returns:
True if model is ready (cached or downloaded), False on error
"""
# faster-whisper model repository mapping
repo_id = f"Systran/faster-whisper-{model_size}"
# Check if model is already cached
cache_dir = Path.home() / ".cache" / "huggingface" / "hub"
repo_cache_name = f"models--Systran--faster-whisper-{model_size}"
model_cache_path = cache_dir / repo_cache_name
# Check if model files exist - verify essential files are present
if model_cache_path.exists():
snapshots_dir = model_cache_path / "snapshots"
if snapshots_dir.exists():
# Check for actual model files, not just any file
for snapshot in snapshots_dir.iterdir():
if snapshot.is_dir():
# Essential faster-whisper model files
required_files = ["model.bin", "config.json"]
has_all_files = all(
(snapshot / f).exists() for f in required_files
)
if has_all_files:
print(json.dumps({
"status": "model_cached",
"model": model_size,
"path": str(snapshot)
}), flush=True)
return True
# Snapshots exist but no valid model found
print(json.dumps({
"status": "incomplete_cache",
"model": model_size,
"message": "Model cache incomplete, will re-download"
}), flush=True)
# Model not cached, need to download
print(json.dumps({
"status": "downloading_model",
"model": model_size,
"repo": repo_id,
"progress": 0
}), flush=True)
try:
# Custom progress callback class
class DownloadProgressCallback:
def __init__(self):
self.total_files = 0
self.downloaded_files = 0
self.current_file_progress = 0
self.last_reported_percent = -5 # Report every 5%
def __call__(self, progress: float, total: float, filename: str = ""):
if total > 0:
percent = int((progress / total) * 100)
# Report every 5% or at completion
if percent >= self.last_reported_percent + 5 or percent == 100:
self.last_reported_percent = percent
downloaded_mb = progress / (1024 * 1024)
total_mb = total / (1024 * 1024)
print(json.dumps({
"status": "downloading_model",
"model": model_size,
"progress": percent,
"downloaded_mb": round(downloaded_mb, 1),
"total_mb": round(total_mb, 1),
"file": filename
}), flush=True)
# Use huggingface_hub to download with a simple approach
# We'll monitor the download by checking file sizes
import threading
import time
download_complete = False
download_error = None
def download_thread():
nonlocal download_complete, download_error
try:
snapshot_download(
repo_id,
local_dir=None, # Use default cache
local_dir_use_symlinks=False,
)
download_complete = True
except Exception as e:
download_error = str(e)
# Start download in background thread
thread = threading.Thread(target=download_thread)
thread.start()
# Monitor progress by checking cache directory
last_size = 0
last_report_time = time.time()
estimated_size_mb = {
"tiny": 77,
"base": 145,
"small": 488,
"medium": 1530,
"large": 3100,
"large-v2": 3100,
"large-v3": 3100,
}.get(model_size, 1530) # Default to medium size
while thread.is_alive():
time.sleep(1)
try:
# Check current download size
current_size = 0
if model_cache_path.exists():
for file in model_cache_path.rglob("*"):
if file.is_file():
current_size += file.stat().st_size
current_mb = current_size / (1024 * 1024)
progress = min(99, int((current_mb / estimated_size_mb) * 100))
# Report progress every 5 seconds or if significant change
now = time.time()
if now - last_report_time >= 5 or (current_mb - last_size / (1024 * 1024)) > 50:
if current_size > last_size:
print(json.dumps({
"status": "downloading_model",
"model": model_size,
"progress": progress,
"downloaded_mb": round(current_mb, 1),
"total_mb": estimated_size_mb
}), flush=True)
last_size = current_size
last_report_time = now
except Exception:
pass
thread.join()
if download_error:
print(json.dumps({
"status": "download_error",
"error": download_error
}), flush=True)
return False
print(json.dumps({
"status": "model_downloaded",
"model": model_size
}), flush=True)
return True
except Exception as e:
print(json.dumps({
"status": "download_error",
"error": str(e)
}), flush=True)
return False
class ChinesePunctuator:
"""Rule-based Chinese punctuation processor."""
QUESTION_PATTERNS = [
r'嗎$', r'呢$', r'什麼$', r'怎麼$', r'為什麼$', r'哪裡$', r'哪個$',
r'誰$', r'幾$', r'多少$', r'是否$', r'能否$', r'可否$', r'有沒有$',
r'是不是$', r'會不會$', r'能不能$', r'可不可以$', r'好不好$', r'對不對$'
]
def __init__(self):
self.question_regex = re.compile('|'.join(self.QUESTION_PATTERNS))
def add_punctuation(self, text: str, word_timestamps: Optional[List] = None) -> str:
"""Add punctuation to transcribed text."""
if not text:
return text
text = text.strip()
# Already has ending punctuation
if text and text[-1] in '。?!,;:':
return text
# Check for question patterns
if self.question_regex.search(text):
return text + ''
# Default to period for statements
return text + ''
def process_segments(self, segments: List[dict]) -> str:
"""Process multiple segments with timestamps to add punctuation."""
result_parts = []
for i, seg in enumerate(segments):
text = seg.get('text', '').strip()
if not text:
continue
# Check for long pause before next segment (comma opportunity)
if i < len(segments) - 1:
next_seg = segments[i + 1]
gap = next_seg.get('start', 0) - seg.get('end', 0)
if gap > 0.5 and not text[-1] in '。?!,;:':
# Long pause, add comma if not end of sentence
if not self.question_regex.search(text):
text = text + ''
result_parts.append(text)
# Join and add final punctuation
result = ''.join(result_parts)
return self.add_punctuation(result)
class SileroVAD:
"""Silero VAD using ONNX Runtime (lightweight alternative to PyTorch)."""
MODEL_URL = "https://github.com/snakers4/silero-vad/raw/master/src/silero_vad/data/silero_vad.onnx"
def __init__(self, model_path: Optional[str] = None, threshold: float = 0.5):
self.threshold = threshold
self.session = None
self._state = np.zeros((2, 1, 128), dtype=np.float32)
self.sample_rate = 16000
if not ONNX_AVAILABLE:
print(json.dumps({"warning": "onnxruntime not available, VAD disabled"}), file=sys.stderr)
return
# Determine model path
if model_path is None:
cache_dir = Path.home() / ".cache" / "silero-vad"
cache_dir.mkdir(parents=True, exist_ok=True)
model_path = cache_dir / "silero_vad.onnx"
# Download if not exists
if not Path(model_path).exists():
print(json.dumps({"status": "downloading_vad_model"}), file=sys.stderr)
try:
urllib.request.urlretrieve(self.MODEL_URL, model_path)
print(json.dumps({"status": "vad_model_downloaded"}), file=sys.stderr)
except Exception as e:
print(json.dumps({"warning": f"VAD model download failed: {e}"}), file=sys.stderr)
return
# Load ONNX model
try:
self.session = ort.InferenceSession(
str(model_path),
providers=['CPUExecutionProvider']
)
print(json.dumps({"status": "vad_loaded"}), file=sys.stderr)
except Exception as e:
print(json.dumps({"warning": f"VAD load failed: {e}"}), file=sys.stderr)
def reset_states(self):
"""Reset hidden states."""
self._state = np.zeros((2, 1, 128), dtype=np.float32)
def __call__(self, audio: np.ndarray) -> float:
"""Run VAD on audio chunk, return speech probability."""
if self.session is None:
return 0.5 # Neutral if VAD not available
# Ensure correct shape (batch, samples)
if audio.ndim == 1:
audio = audio[np.newaxis, :]
# Run inference with updated model format
ort_inputs = {
'input': audio.astype(np.float32),
'state': self._state,
'sr': np.array(self.sample_rate, dtype=np.int64)
}
output, self._state = self.session.run(None, ort_inputs)
return float(output[0][0])
class VADProcessor:
"""Voice Activity Detection processor."""
def __init__(self, sample_rate: int = 16000, threshold: float = 0.5, vad_model: Optional[SileroVAD] = None):
self.sample_rate = sample_rate
self.threshold = threshold
# Reuse pre-loaded VAD model if provided
self.vad = vad_model if vad_model else (SileroVAD(threshold=threshold) if ONNX_AVAILABLE else None)
self.reset()
def reset(self):
"""Reset VAD state."""
self.audio_buffer = np.array([], dtype=np.float32)
self.speech_buffer = np.array([], dtype=np.float32)
self.speech_started = False
self.silence_samples = 0
self.speech_samples = 0
if self.vad:
self.vad.reset_states()
def process_chunk(self, audio_chunk: np.ndarray) -> Optional[np.ndarray]:
"""
Process audio chunk and return speech segment if speech end detected.
Returns:
Speech audio if end detected, None otherwise
"""
self.audio_buffer = np.concatenate([self.audio_buffer, audio_chunk])
# Fallback: time-based segmentation if no VAD
if self.vad is None or self.vad.session is None:
# Every 5 seconds, return the buffer
if len(self.audio_buffer) >= self.sample_rate * 5:
result = self.audio_buffer.copy()
self.audio_buffer = np.array([], dtype=np.float32)
return result
return None
# Process in 512-sample windows (32ms at 16kHz)
window_size = 512
silence_threshold_samples = int(0.5 * self.sample_rate) # 500ms
max_speech_samples = int(15 * self.sample_rate) # 15s max
while len(self.audio_buffer) >= window_size:
window = self.audio_buffer[:window_size]
self.audio_buffer = self.audio_buffer[window_size:]
# Run VAD
speech_prob = self.vad(window)
if speech_prob >= self.threshold:
if not self.speech_started:
self.speech_started = True
self.speech_buffer = np.array([], dtype=np.float32)
self.speech_buffer = np.concatenate([self.speech_buffer, window])
self.silence_samples = 0
self.speech_samples += window_size
else:
if self.speech_started:
self.speech_buffer = np.concatenate([self.speech_buffer, window])
self.silence_samples += window_size
# Force segment if speech too long
if self.speech_samples >= max_speech_samples:
result = self.speech_buffer.copy()
self.speech_started = False
self.speech_buffer = np.array([], dtype=np.float32)
self.silence_samples = 0
self.speech_samples = 0
return result
# Detect end of speech (500ms silence)
if self.speech_started and self.silence_samples >= silence_threshold_samples:
if len(self.speech_buffer) > self.sample_rate * 0.3: # At least 300ms
result = self.speech_buffer.copy()
self.speech_started = False
self.speech_buffer = np.array([], dtype=np.float32)
self.silence_samples = 0
self.speech_samples = 0
return result
return None
def flush(self) -> Optional[np.ndarray]:
"""Flush remaining audio."""
# Combine any remaining audio
remaining = np.concatenate([self.speech_buffer, self.audio_buffer])
if len(remaining) > self.sample_rate * 0.5: # At least 500ms
self.reset()
return remaining
self.reset()
return None
class StreamingSession:
"""Manages a streaming transcription session."""
def __init__(self, transcriber: 'Transcriber', vad_model: Optional[SileroVAD] = None):
self.session_id = str(uuid.uuid4())
self.transcriber = transcriber
self.vad = VADProcessor(vad_model=vad_model)
self.segment_id = 0
self.active = True
def process_chunk(self, audio_data: str) -> Optional[dict]:
"""Process base64-encoded audio chunk."""
try:
# Decode base64 to raw PCM (16-bit, 16kHz, mono)
pcm_data = base64.b64decode(audio_data)
audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
# Run VAD
speech_segment = self.vad.process_chunk(audio)
if speech_segment is not None and len(speech_segment) > 0:
return self._transcribe_segment(speech_segment)
return None
except Exception as e:
return {"error": f"Chunk processing error: {e}"}
def _transcribe_segment(self, audio: np.ndarray) -> dict:
"""Transcribe a speech segment."""
self.segment_id += 1
# Save to temp file for Whisper
temp_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
try:
import wave
with wave.open(temp_file.name, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes((audio * 32768).astype(np.int16).tobytes())
# Transcribe
text = self.transcriber.transcribe_file(temp_file.name, add_punctuation=True)
return {
"segment_id": self.segment_id,
"text": text,
"is_final": True,
"duration": len(audio) / 16000
}
finally:
os.unlink(temp_file.name)
def stop(self) -> dict:
"""Stop the session and flush remaining audio."""
self.active = False
results = []
# Flush VAD buffer
remaining = self.vad.flush()
if remaining is not None and len(remaining) > 0:
result = self._transcribe_segment(remaining)
if result and not result.get('error'):
results.append(result)
return {
"status": "stream_stopped",
"session_id": self.session_id,
"total_segments": self.segment_id,
"final_segments": results
}
class Transcriber:
"""Main transcription engine."""
def __init__(self, model_size: str = "medium", device: str = "cpu", compute_type: str = "int8"):
self.model = None
self.converter = None
self.punctuator = ChinesePunctuator()
self.streaming_session: Optional[StreamingSession] = None
self.vad_model: Optional[SileroVAD] = None
try:
# Check if model needs to be downloaded (with progress reporting)
download_ok = check_and_download_whisper_model(model_size)
if not download_ok:
print(json.dumps({
"status": "model_error",
"error": "Failed to download model"
}), flush=True)
raise RuntimeError("Failed to download Whisper model")
# Now load the model
print(json.dumps({"status": "loading_model", "model": model_size}), flush=True)
self.model = WhisperModel(model_size, device=device, compute_type=compute_type)
self.converter = opencc.OpenCC("s2twp")
print(json.dumps({"status": "model_loaded", "model": model_size}), flush=True)
# Pre-load VAD model at startup (not when streaming starts)
if ONNX_AVAILABLE:
self.vad_model = SileroVAD()
except Exception as e:
print(json.dumps({
"status": "model_error",
"error": f"Failed to load model: {e}"
}), flush=True)
raise
def transcribe_file(self, audio_path: str, add_punctuation: bool = False) -> str:
"""Transcribe an audio file to text."""
if not self.model:
return ""
if not os.path.exists(audio_path):
print(json.dumps({"error": f"File not found: {audio_path}"}), file=sys.stderr)
return ""
try:
segments, info = self.model.transcribe(
audio_path,
language="zh", # Use "nan" for Taiwanese/Hokkien, "zh" for Mandarin
beam_size=8,
vad_filter=True,
word_timestamps=add_punctuation,
# Anti-hallucination settings
condition_on_previous_text=False, # Prevents hallucination propagation
no_speech_threshold=0.6, # Higher = stricter silence detection
compression_ratio_threshold=2.4, # Filter repetitive/hallucinated text
log_prob_threshold=-1.0, # Filter low-confidence output
temperature=0.0, # Deterministic output (no sampling)
)
if add_punctuation:
# Collect segments with timestamps for punctuation
seg_list = []
for segment in segments:
seg_list.append({
'text': segment.text,
'start': segment.start,
'end': segment.end
})
text = self.punctuator.process_segments(seg_list)
else:
text = ""
for segment in segments:
text += segment.text
# Convert to Traditional Chinese
if text and self.converter:
text = self.converter.convert(text)
return text.strip()
except Exception as e:
print(json.dumps({"error": f"Transcription error: {e}"}), file=sys.stderr)
return ""
def segment_audio_file(
self,
audio_path: str,
max_chunk_seconds: int = 300,
min_silence_ms: int = 500,
output_dir: Optional[str] = None
) -> dict:
"""
Segment an audio file using VAD for natural speech boundaries.
Args:
audio_path: Path to the audio file
max_chunk_seconds: Maximum duration per chunk (default 5 minutes)
min_silence_ms: Minimum silence duration to consider as boundary (default 500ms)
output_dir: Directory to save chunks (default: temp directory)
Returns:
dict with segments list and metadata
"""
try:
# Import audio processing libraries
try:
from pydub import AudioSegment
except ImportError:
return {"error": "pydub not installed. Run: pip install pydub"}
if not os.path.exists(audio_path):
return {"error": f"File not found: {audio_path}"}
# Create output directory
if output_dir is None:
output_dir = tempfile.mkdtemp(prefix="audio_segments_")
else:
os.makedirs(output_dir, exist_ok=True)
# Load audio file and convert to mono 16kHz
print(json.dumps({"status": "loading_audio", "file": audio_path}), file=sys.stderr)
audio = AudioSegment.from_file(audio_path)
audio = audio.set_channels(1).set_frame_rate(16000)
total_duration_ms = len(audio)
total_duration_sec = total_duration_ms / 1000
print(json.dumps({
"status": "audio_loaded",
"duration_seconds": total_duration_sec
}), file=sys.stderr)
# Convert to numpy for VAD processing
samples = np.array(audio.get_array_of_samples(), dtype=np.float32) / 32768.0
# Run VAD to detect speech regions
segments = []
current_start = 0
max_chunk_samples = max_chunk_seconds * 16000
min_silence_samples = int(min_silence_ms * 16 ) # 16 samples per ms at 16kHz
if self.vad_model is None or self.vad_model.session is None:
# No VAD available, use fixed-time splitting
print(json.dumps({"warning": "VAD not available, using fixed-time splitting"}), file=sys.stderr)
chunk_idx = 0
for start_sample in range(0, len(samples), max_chunk_samples):
end_sample = min(start_sample + max_chunk_samples, len(samples))
chunk_samples = samples[start_sample:end_sample]
# Export chunk
chunk_path = os.path.join(output_dir, f"chunk_{chunk_idx:03d}.wav")
self._export_wav(chunk_samples, chunk_path)
segments.append({
"index": chunk_idx,
"path": chunk_path,
"start": start_sample / 16000,
"end": end_sample / 16000,
"duration": (end_sample - start_sample) / 16000
})
chunk_idx += 1
else:
# Use VAD for intelligent splitting
print(json.dumps({"status": "running_vad"}), file=sys.stderr)
self.vad_model.reset_states()
# Find silence regions for splitting
window_size = 512
silence_starts = []
in_silence = False
silence_start = 0
for i in range(0, len(samples) - window_size, window_size):
window = samples[i:i + window_size]
speech_prob = self.vad_model(window)
if speech_prob < 0.3: # Silence threshold
if not in_silence:
in_silence = True
silence_start = i
else:
if in_silence:
silence_duration = i - silence_start
if silence_duration >= min_silence_samples:
# Mark middle of silence as potential split point
silence_starts.append(silence_start + silence_duration // 2)
in_silence = False
# Add end of file as final split point
silence_starts.append(len(samples))
# Create segments based on silence boundaries
chunk_idx = 0
current_start = 0
for split_point in silence_starts:
# Check if we need to split here
chunk_duration = split_point - current_start
if chunk_duration >= max_chunk_samples or split_point == len(samples):
# Find the best split point before max duration
if chunk_duration > max_chunk_samples:
# Find nearest silence point before max
best_split = current_start + max_chunk_samples
for sp in silence_starts:
if current_start < sp <= current_start + max_chunk_samples:
best_split = sp
split_point = best_split
# Export chunk
chunk_samples = samples[current_start:split_point]
if len(chunk_samples) > 8000: # At least 0.5 seconds
chunk_path = os.path.join(output_dir, f"chunk_{chunk_idx:03d}.wav")
self._export_wav(chunk_samples, chunk_path)
segments.append({
"index": chunk_idx,
"path": chunk_path,
"start": current_start / 16000,
"end": split_point / 16000,
"duration": (split_point - current_start) / 16000
})
chunk_idx += 1
current_start = split_point
# Handle any remaining audio - split into max_chunk_samples pieces
while current_start < len(samples):
remaining_len = len(samples) - current_start
if remaining_len < 8000: # Less than 0.5 seconds
break
# Determine chunk end (respect max_chunk_samples)
chunk_end = min(current_start + max_chunk_samples, len(samples))
chunk_samples = samples[current_start:chunk_end]
chunk_path = os.path.join(output_dir, f"chunk_{chunk_idx:03d}.wav")
self._export_wav(chunk_samples, chunk_path)
segments.append({
"index": chunk_idx,
"path": chunk_path,
"start": current_start / 16000,
"end": chunk_end / 16000,
"duration": len(chunk_samples) / 16000
})
chunk_idx += 1
current_start = chunk_end
print(json.dumps({
"status": "segmentation_complete",
"total_segments": len(segments)
}), file=sys.stderr)
return {
"status": "success",
"segments": segments,
"total_segments": len(segments),
"total_duration": total_duration_sec,
"output_dir": output_dir
}
except Exception as e:
return {"error": f"Segmentation error: {str(e)}"}
def _export_wav(self, samples: np.ndarray, output_path: str):
"""Export numpy samples to WAV file."""
with wave.open(output_path, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes((samples * 32768).astype(np.int16).tobytes())
def handle_command(self, cmd: dict) -> Optional[dict]:
"""Handle a JSON command."""
action = cmd.get("action")
if action == "transcribe":
# File-based transcription (legacy)
audio_path = cmd.get("file")
if audio_path:
text = self.transcribe_file(audio_path, add_punctuation=True)
return {"result": text, "file": audio_path}
return {"error": "No file specified"}
elif action == "start_stream":
# Start streaming session
if self.streaming_session and self.streaming_session.active:
return {"error": "Stream already active"}
# Pass pre-loaded VAD model to avoid download delay
self.streaming_session = StreamingSession(self, vad_model=self.vad_model)
return {
"status": "streaming",
"session_id": self.streaming_session.session_id
}
elif action == "audio_chunk":
# Process audio chunk
if not self.streaming_session or not self.streaming_session.active:
return {"error": "No active stream"}
data = cmd.get("data")
if not data:
return {"error": "No audio data"}
result = self.streaming_session.process_chunk(data)
return result # May be None if no segment ready
elif action == "stop_stream":
# Stop streaming session
if not self.streaming_session:
return {"error": "No active stream"}
result = self.streaming_session.stop()
self.streaming_session = None
return result
elif action == "segment_audio":
# Segment audio file using VAD
file_path = cmd.get("file_path")
if not file_path:
return {"error": "No file_path specified"}
max_chunk_seconds = cmd.get("max_chunk_seconds", 300)
min_silence_ms = cmd.get("min_silence_ms", 500)
output_dir = cmd.get("output_dir")
return self.segment_audio_file(
file_path,
max_chunk_seconds=max_chunk_seconds,
min_silence_ms=min_silence_ms,
output_dir=output_dir
)
elif action == "ping":
return {"status": "pong"}
elif action == "quit":
return {"status": "exiting"}
else:
return {"error": f"Unknown action: {action}"}
def run_server(self):
"""Run in server mode, reading JSON commands from stdin."""
print(json.dumps({"status": "ready"}))
sys.stdout.flush()
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
cmd = json.loads(line)
result = self.handle_command(cmd)
if result:
print(json.dumps(result))
sys.stdout.flush()
if cmd.get("action") == "quit":
break
except json.JSONDecodeError as e:
print(json.dumps({"error": f"Invalid JSON: {e}"}))
sys.stdout.flush()
def main():
model_size = os.environ.get("WHISPER_MODEL", "medium")
device = os.environ.get("WHISPER_DEVICE", "cpu")
compute_type = os.environ.get("WHISPER_COMPUTE", "int8")
try:
transcriber = Transcriber(model_size, device, compute_type)
if len(sys.argv) > 1:
if sys.argv[1] == "--server":
transcriber.run_server()
else:
# File mode
text = transcriber.transcribe_file(sys.argv[1], add_punctuation=True)
print(text)
else:
# Default to server mode
transcriber.run_server()
except Exception as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()