Files
Meeting_Assistant/sidecar/transcriber.py
egg 43c413c5ce feat: Upgrade Whisper model to medium and increase beam size
- Change default model from small to medium for better accuracy
- Increase beam_size from 5 to 8 for improved transcription quality
- Add Whisper environment variables to start.sh for centralized config

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 08:25:25 +08:00

711 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Meeting Assistant Transcription Sidecar
Provides speech-to-text transcription using faster-whisper
with automatic Traditional Chinese conversion via OpenCC.
Modes:
1. File mode: transcriber.py <audio_file>
2. Server mode: transcriber.py (default, listens on stdin for JSON commands)
3. Streaming mode: Continuous audio processing with VAD segmentation
Uses ONNX Runtime for VAD (lightweight, ~20MB vs PyTorch ~2GB)
"""
import sys
import os
import json
import tempfile
import base64
import uuid
import re
import wave
import urllib.request
from pathlib import Path
from typing import Optional, List, Tuple
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
try:
from faster_whisper import WhisperModel
import opencc
import numpy as np
except ImportError as e:
print(json.dumps({"error": f"Missing dependency: {e}"}), file=sys.stderr)
sys.exit(1)
# Try to import ONNX Runtime for VAD
try:
import onnxruntime as ort
ONNX_AVAILABLE = True
except ImportError:
ONNX_AVAILABLE = False
class ChinesePunctuator:
"""Rule-based Chinese punctuation processor."""
QUESTION_PATTERNS = [
r'嗎$', r'呢$', r'什麼$', r'怎麼$', r'為什麼$', r'哪裡$', r'哪個$',
r'誰$', r'幾$', r'多少$', r'是否$', r'能否$', r'可否$', r'有沒有$',
r'是不是$', r'會不會$', r'能不能$', r'可不可以$', r'好不好$', r'對不對$'
]
def __init__(self):
self.question_regex = re.compile('|'.join(self.QUESTION_PATTERNS))
def add_punctuation(self, text: str, word_timestamps: Optional[List] = None) -> str:
"""Add punctuation to transcribed text."""
if not text:
return text
text = text.strip()
# Already has ending punctuation
if text and text[-1] in '。?!,;:':
return text
# Check for question patterns
if self.question_regex.search(text):
return text + ''
# Default to period for statements
return text + ''
def process_segments(self, segments: List[dict]) -> str:
"""Process multiple segments with timestamps to add punctuation."""
result_parts = []
for i, seg in enumerate(segments):
text = seg.get('text', '').strip()
if not text:
continue
# Check for long pause before next segment (comma opportunity)
if i < len(segments) - 1:
next_seg = segments[i + 1]
gap = next_seg.get('start', 0) - seg.get('end', 0)
if gap > 0.5 and not text[-1] in '。?!,;:':
# Long pause, add comma if not end of sentence
if not self.question_regex.search(text):
text = text + ''
result_parts.append(text)
# Join and add final punctuation
result = ''.join(result_parts)
return self.add_punctuation(result)
class SileroVAD:
"""Silero VAD using ONNX Runtime (lightweight alternative to PyTorch)."""
MODEL_URL = "https://github.com/snakers4/silero-vad/raw/master/src/silero_vad/data/silero_vad.onnx"
def __init__(self, model_path: Optional[str] = None, threshold: float = 0.5):
self.threshold = threshold
self.session = None
self._state = np.zeros((2, 1, 128), dtype=np.float32)
self.sample_rate = 16000
if not ONNX_AVAILABLE:
print(json.dumps({"warning": "onnxruntime not available, VAD disabled"}), file=sys.stderr)
return
# Determine model path
if model_path is None:
cache_dir = Path.home() / ".cache" / "silero-vad"
cache_dir.mkdir(parents=True, exist_ok=True)
model_path = cache_dir / "silero_vad.onnx"
# Download if not exists
if not Path(model_path).exists():
print(json.dumps({"status": "downloading_vad_model"}), file=sys.stderr)
try:
urllib.request.urlretrieve(self.MODEL_URL, model_path)
print(json.dumps({"status": "vad_model_downloaded"}), file=sys.stderr)
except Exception as e:
print(json.dumps({"warning": f"VAD model download failed: {e}"}), file=sys.stderr)
return
# Load ONNX model
try:
self.session = ort.InferenceSession(
str(model_path),
providers=['CPUExecutionProvider']
)
print(json.dumps({"status": "vad_loaded"}), file=sys.stderr)
except Exception as e:
print(json.dumps({"warning": f"VAD load failed: {e}"}), file=sys.stderr)
def reset_states(self):
"""Reset hidden states."""
self._state = np.zeros((2, 1, 128), dtype=np.float32)
def __call__(self, audio: np.ndarray) -> float:
"""Run VAD on audio chunk, return speech probability."""
if self.session is None:
return 0.5 # Neutral if VAD not available
# Ensure correct shape (batch, samples)
if audio.ndim == 1:
audio = audio[np.newaxis, :]
# Run inference with updated model format
ort_inputs = {
'input': audio.astype(np.float32),
'state': self._state,
'sr': np.array(self.sample_rate, dtype=np.int64)
}
output, self._state = self.session.run(None, ort_inputs)
return float(output[0][0])
class VADProcessor:
"""Voice Activity Detection processor."""
def __init__(self, sample_rate: int = 16000, threshold: float = 0.5, vad_model: Optional[SileroVAD] = None):
self.sample_rate = sample_rate
self.threshold = threshold
# Reuse pre-loaded VAD model if provided
self.vad = vad_model if vad_model else (SileroVAD(threshold=threshold) if ONNX_AVAILABLE else None)
self.reset()
def reset(self):
"""Reset VAD state."""
self.audio_buffer = np.array([], dtype=np.float32)
self.speech_buffer = np.array([], dtype=np.float32)
self.speech_started = False
self.silence_samples = 0
self.speech_samples = 0
if self.vad:
self.vad.reset_states()
def process_chunk(self, audio_chunk: np.ndarray) -> Optional[np.ndarray]:
"""
Process audio chunk and return speech segment if speech end detected.
Returns:
Speech audio if end detected, None otherwise
"""
self.audio_buffer = np.concatenate([self.audio_buffer, audio_chunk])
# Fallback: time-based segmentation if no VAD
if self.vad is None or self.vad.session is None:
# Every 5 seconds, return the buffer
if len(self.audio_buffer) >= self.sample_rate * 5:
result = self.audio_buffer.copy()
self.audio_buffer = np.array([], dtype=np.float32)
return result
return None
# Process in 512-sample windows (32ms at 16kHz)
window_size = 512
silence_threshold_samples = int(0.5 * self.sample_rate) # 500ms
max_speech_samples = int(15 * self.sample_rate) # 15s max
while len(self.audio_buffer) >= window_size:
window = self.audio_buffer[:window_size]
self.audio_buffer = self.audio_buffer[window_size:]
# Run VAD
speech_prob = self.vad(window)
if speech_prob >= self.threshold:
if not self.speech_started:
self.speech_started = True
self.speech_buffer = np.array([], dtype=np.float32)
self.speech_buffer = np.concatenate([self.speech_buffer, window])
self.silence_samples = 0
self.speech_samples += window_size
else:
if self.speech_started:
self.speech_buffer = np.concatenate([self.speech_buffer, window])
self.silence_samples += window_size
# Force segment if speech too long
if self.speech_samples >= max_speech_samples:
result = self.speech_buffer.copy()
self.speech_started = False
self.speech_buffer = np.array([], dtype=np.float32)
self.silence_samples = 0
self.speech_samples = 0
return result
# Detect end of speech (500ms silence)
if self.speech_started and self.silence_samples >= silence_threshold_samples:
if len(self.speech_buffer) > self.sample_rate * 0.3: # At least 300ms
result = self.speech_buffer.copy()
self.speech_started = False
self.speech_buffer = np.array([], dtype=np.float32)
self.silence_samples = 0
self.speech_samples = 0
return result
return None
def flush(self) -> Optional[np.ndarray]:
"""Flush remaining audio."""
# Combine any remaining audio
remaining = np.concatenate([self.speech_buffer, self.audio_buffer])
if len(remaining) > self.sample_rate * 0.5: # At least 500ms
self.reset()
return remaining
self.reset()
return None
class StreamingSession:
"""Manages a streaming transcription session."""
def __init__(self, transcriber: 'Transcriber', vad_model: Optional[SileroVAD] = None):
self.session_id = str(uuid.uuid4())
self.transcriber = transcriber
self.vad = VADProcessor(vad_model=vad_model)
self.segment_id = 0
self.active = True
def process_chunk(self, audio_data: str) -> Optional[dict]:
"""Process base64-encoded audio chunk."""
try:
# Decode base64 to raw PCM (16-bit, 16kHz, mono)
pcm_data = base64.b64decode(audio_data)
audio = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
# Run VAD
speech_segment = self.vad.process_chunk(audio)
if speech_segment is not None and len(speech_segment) > 0:
return self._transcribe_segment(speech_segment)
return None
except Exception as e:
return {"error": f"Chunk processing error: {e}"}
def _transcribe_segment(self, audio: np.ndarray) -> dict:
"""Transcribe a speech segment."""
self.segment_id += 1
# Save to temp file for Whisper
temp_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
try:
import wave
with wave.open(temp_file.name, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes((audio * 32768).astype(np.int16).tobytes())
# Transcribe
text = self.transcriber.transcribe_file(temp_file.name, add_punctuation=True)
return {
"segment_id": self.segment_id,
"text": text,
"is_final": True,
"duration": len(audio) / 16000
}
finally:
os.unlink(temp_file.name)
def stop(self) -> dict:
"""Stop the session and flush remaining audio."""
self.active = False
results = []
# Flush VAD buffer
remaining = self.vad.flush()
if remaining is not None and len(remaining) > 0:
result = self._transcribe_segment(remaining)
if result and not result.get('error'):
results.append(result)
return {
"status": "stream_stopped",
"session_id": self.session_id,
"total_segments": self.segment_id,
"final_segments": results
}
class Transcriber:
"""Main transcription engine."""
def __init__(self, model_size: str = "medium", device: str = "cpu", compute_type: str = "int8"):
self.model = None
self.converter = None
self.punctuator = ChinesePunctuator()
self.streaming_session: Optional[StreamingSession] = None
self.vad_model: Optional[SileroVAD] = None
try:
print(json.dumps({"status": "loading_model", "model": model_size}), file=sys.stderr)
self.model = WhisperModel(model_size, device=device, compute_type=compute_type)
self.converter = opencc.OpenCC("s2twp")
print(json.dumps({"status": "model_loaded"}), file=sys.stderr)
# Pre-load VAD model at startup (not when streaming starts)
if ONNX_AVAILABLE:
self.vad_model = SileroVAD()
except Exception as e:
print(json.dumps({"error": f"Failed to load model: {e}"}), file=sys.stderr)
raise
def transcribe_file(self, audio_path: str, add_punctuation: bool = False) -> str:
"""Transcribe an audio file to text."""
if not self.model:
return ""
if not os.path.exists(audio_path):
print(json.dumps({"error": f"File not found: {audio_path}"}), file=sys.stderr)
return ""
try:
segments, info = self.model.transcribe(
audio_path,
language="zh", # Use "nan" for Taiwanese/Hokkien, "zh" for Mandarin
beam_size=8,
vad_filter=True,
word_timestamps=add_punctuation,
# Anti-hallucination settings
condition_on_previous_text=False, # Prevents hallucination propagation
no_speech_threshold=0.6, # Higher = stricter silence detection
compression_ratio_threshold=2.4, # Filter repetitive/hallucinated text
log_prob_threshold=-1.0, # Filter low-confidence output
temperature=0.0, # Deterministic output (no sampling)
)
if add_punctuation:
# Collect segments with timestamps for punctuation
seg_list = []
for segment in segments:
seg_list.append({
'text': segment.text,
'start': segment.start,
'end': segment.end
})
text = self.punctuator.process_segments(seg_list)
else:
text = ""
for segment in segments:
text += segment.text
# Convert to Traditional Chinese
if text and self.converter:
text = self.converter.convert(text)
return text.strip()
except Exception as e:
print(json.dumps({"error": f"Transcription error: {e}"}), file=sys.stderr)
return ""
def segment_audio_file(
self,
audio_path: str,
max_chunk_seconds: int = 300,
min_silence_ms: int = 500,
output_dir: Optional[str] = None
) -> dict:
"""
Segment an audio file using VAD for natural speech boundaries.
Args:
audio_path: Path to the audio file
max_chunk_seconds: Maximum duration per chunk (default 5 minutes)
min_silence_ms: Minimum silence duration to consider as boundary (default 500ms)
output_dir: Directory to save chunks (default: temp directory)
Returns:
dict with segments list and metadata
"""
try:
# Import audio processing libraries
try:
from pydub import AudioSegment
except ImportError:
return {"error": "pydub not installed. Run: pip install pydub"}
if not os.path.exists(audio_path):
return {"error": f"File not found: {audio_path}"}
# Create output directory
if output_dir is None:
output_dir = tempfile.mkdtemp(prefix="audio_segments_")
else:
os.makedirs(output_dir, exist_ok=True)
# Load audio file and convert to mono 16kHz
print(json.dumps({"status": "loading_audio", "file": audio_path}), file=sys.stderr)
audio = AudioSegment.from_file(audio_path)
audio = audio.set_channels(1).set_frame_rate(16000)
total_duration_ms = len(audio)
total_duration_sec = total_duration_ms / 1000
print(json.dumps({
"status": "audio_loaded",
"duration_seconds": total_duration_sec
}), file=sys.stderr)
# Convert to numpy for VAD processing
samples = np.array(audio.get_array_of_samples(), dtype=np.float32) / 32768.0
# Run VAD to detect speech regions
segments = []
current_start = 0
max_chunk_samples = max_chunk_seconds * 16000
min_silence_samples = int(min_silence_ms * 16 ) # 16 samples per ms at 16kHz
if self.vad_model is None or self.vad_model.session is None:
# No VAD available, use fixed-time splitting
print(json.dumps({"warning": "VAD not available, using fixed-time splitting"}), file=sys.stderr)
chunk_idx = 0
for start_sample in range(0, len(samples), max_chunk_samples):
end_sample = min(start_sample + max_chunk_samples, len(samples))
chunk_samples = samples[start_sample:end_sample]
# Export chunk
chunk_path = os.path.join(output_dir, f"chunk_{chunk_idx:03d}.wav")
self._export_wav(chunk_samples, chunk_path)
segments.append({
"index": chunk_idx,
"path": chunk_path,
"start": start_sample / 16000,
"end": end_sample / 16000,
"duration": (end_sample - start_sample) / 16000
})
chunk_idx += 1
else:
# Use VAD for intelligent splitting
print(json.dumps({"status": "running_vad"}), file=sys.stderr)
self.vad_model.reset_states()
# Find silence regions for splitting
window_size = 512
silence_starts = []
in_silence = False
silence_start = 0
for i in range(0, len(samples) - window_size, window_size):
window = samples[i:i + window_size]
speech_prob = self.vad_model(window)
if speech_prob < 0.3: # Silence threshold
if not in_silence:
in_silence = True
silence_start = i
else:
if in_silence:
silence_duration = i - silence_start
if silence_duration >= min_silence_samples:
# Mark middle of silence as potential split point
silence_starts.append(silence_start + silence_duration // 2)
in_silence = False
# Add end of file as final split point
silence_starts.append(len(samples))
# Create segments based on silence boundaries
chunk_idx = 0
current_start = 0
for split_point in silence_starts:
# Check if we need to split here
chunk_duration = split_point - current_start
if chunk_duration >= max_chunk_samples or split_point == len(samples):
# Find the best split point before max duration
if chunk_duration > max_chunk_samples:
# Find nearest silence point before max
best_split = current_start + max_chunk_samples
for sp in silence_starts:
if current_start < sp <= current_start + max_chunk_samples:
best_split = sp
split_point = best_split
# Export chunk
chunk_samples = samples[current_start:split_point]
if len(chunk_samples) > 8000: # At least 0.5 seconds
chunk_path = os.path.join(output_dir, f"chunk_{chunk_idx:03d}.wav")
self._export_wav(chunk_samples, chunk_path)
segments.append({
"index": chunk_idx,
"path": chunk_path,
"start": current_start / 16000,
"end": split_point / 16000,
"duration": (split_point - current_start) / 16000
})
chunk_idx += 1
current_start = split_point
# Handle any remaining audio - split into max_chunk_samples pieces
while current_start < len(samples):
remaining_len = len(samples) - current_start
if remaining_len < 8000: # Less than 0.5 seconds
break
# Determine chunk end (respect max_chunk_samples)
chunk_end = min(current_start + max_chunk_samples, len(samples))
chunk_samples = samples[current_start:chunk_end]
chunk_path = os.path.join(output_dir, f"chunk_{chunk_idx:03d}.wav")
self._export_wav(chunk_samples, chunk_path)
segments.append({
"index": chunk_idx,
"path": chunk_path,
"start": current_start / 16000,
"end": chunk_end / 16000,
"duration": len(chunk_samples) / 16000
})
chunk_idx += 1
current_start = chunk_end
print(json.dumps({
"status": "segmentation_complete",
"total_segments": len(segments)
}), file=sys.stderr)
return {
"status": "success",
"segments": segments,
"total_segments": len(segments),
"total_duration": total_duration_sec,
"output_dir": output_dir
}
except Exception as e:
return {"error": f"Segmentation error: {str(e)}"}
def _export_wav(self, samples: np.ndarray, output_path: str):
"""Export numpy samples to WAV file."""
with wave.open(output_path, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes((samples * 32768).astype(np.int16).tobytes())
def handle_command(self, cmd: dict) -> Optional[dict]:
"""Handle a JSON command."""
action = cmd.get("action")
if action == "transcribe":
# File-based transcription (legacy)
audio_path = cmd.get("file")
if audio_path:
text = self.transcribe_file(audio_path, add_punctuation=True)
return {"result": text, "file": audio_path}
return {"error": "No file specified"}
elif action == "start_stream":
# Start streaming session
if self.streaming_session and self.streaming_session.active:
return {"error": "Stream already active"}
# Pass pre-loaded VAD model to avoid download delay
self.streaming_session = StreamingSession(self, vad_model=self.vad_model)
return {
"status": "streaming",
"session_id": self.streaming_session.session_id
}
elif action == "audio_chunk":
# Process audio chunk
if not self.streaming_session or not self.streaming_session.active:
return {"error": "No active stream"}
data = cmd.get("data")
if not data:
return {"error": "No audio data"}
result = self.streaming_session.process_chunk(data)
return result # May be None if no segment ready
elif action == "stop_stream":
# Stop streaming session
if not self.streaming_session:
return {"error": "No active stream"}
result = self.streaming_session.stop()
self.streaming_session = None
return result
elif action == "segment_audio":
# Segment audio file using VAD
file_path = cmd.get("file_path")
if not file_path:
return {"error": "No file_path specified"}
max_chunk_seconds = cmd.get("max_chunk_seconds", 300)
min_silence_ms = cmd.get("min_silence_ms", 500)
output_dir = cmd.get("output_dir")
return self.segment_audio_file(
file_path,
max_chunk_seconds=max_chunk_seconds,
min_silence_ms=min_silence_ms,
output_dir=output_dir
)
elif action == "ping":
return {"status": "pong"}
elif action == "quit":
return {"status": "exiting"}
else:
return {"error": f"Unknown action: {action}"}
def run_server(self):
"""Run in server mode, reading JSON commands from stdin."""
print(json.dumps({"status": "ready"}))
sys.stdout.flush()
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
cmd = json.loads(line)
result = self.handle_command(cmd)
if result:
print(json.dumps(result))
sys.stdout.flush()
if cmd.get("action") == "quit":
break
except json.JSONDecodeError as e:
print(json.dumps({"error": f"Invalid JSON: {e}"}))
sys.stdout.flush()
def main():
model_size = os.environ.get("WHISPER_MODEL", "small")
device = os.environ.get("WHISPER_DEVICE", "cpu")
compute_type = os.environ.get("WHISPER_COMPUTE", "int8")
try:
transcriber = Transcriber(model_size, device, compute_type)
if len(sys.argv) > 1:
if sys.argv[1] == "--server":
transcriber.run_server()
else:
# File mode
text = transcriber.transcribe_file(sys.argv[1], add_punctuation=True)
print(text)
else:
# Default to server mode
transcriber.run_server()
except Exception as e:
print(json.dumps({"error": str(e)}), file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()