Files
homeai/homeai-voice/scripts/benchmark_pipeline.py
Aodhan Collins af6b7bd945 feat: upgrade voice pipeline — MLX Whisper STT (20x faster), Qwen3.5 MoE LLM, fix HA tool calling
- Replace faster-whisper with wyoming-mlx-whisper (whisper-large-v3-turbo, MLX Metal GPU)
  STT latency: 8.4s → 400ms for short voice commands
- Add Qwen3.5-35B-A3B (MoE, 3B active params, Q8_0) to Ollama — 26.7 tok/s vs 5.4 tok/s (70B)
- Add model preload launchd service to pin voice model in VRAM permanently
- Fix HA tool calling: set commands.native=true, symlink ha-ctl to PATH
- Add pipeline benchmark script (STT/LLM/TTS latency profiling)
- Add service restart buttons and STT endpoint to dashboard
- Bind Vite dev server to 0.0.0.0 for LAN access

Total estimated pipeline latency: ~27s → ~4s

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 18:03:12 +00:00

382 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Voice Pipeline Benchmark
Measures latency of each stage independently:
1. STT (Wyoming Whisper, port 10300)
2. LLM (Ollama API, port 11434) — multiple models
3. TTS (Wyoming Kokoro, port 10301)
4. End-to-end via OpenClaw HTTP Bridge (port 8081)
Usage:
python benchmark_pipeline.py [--rounds 3] [--models llama3.3:70b,qwen3:32b]
"""
import argparse
import asyncio
import io
import json
import statistics
import sys
import time
import wave
from urllib.request import Request, urlopen
from urllib.error import URLError
from wyoming.audio import AudioChunk, AudioStart, AudioStop
from wyoming.asr import Transcribe, Transcript
from wyoming.client import AsyncTcpClient
from wyoming.tts import Synthesize, SynthesizeVoice
# --- Config ---
STT_HOST, STT_PORT = "127.0.0.1", 10300
TTS_HOST, TTS_PORT = "127.0.0.1", 10301
OLLAMA_URL = "http://localhost:11434"
BRIDGE_URL = "http://localhost:8081"
TEST_PROMPTS = [
"What is the capital of France?",
"Turn on the living room lights.",
"What's the weather like today?",
]
LONG_PROMPT = "Explain in two sentences how a heat pump works."
# --- Helpers ---
def http_post_json(url: str, data: dict, timeout: int = 180) -> tuple[dict, float]:
"""POST JSON, return (response_dict, elapsed_seconds)."""
body = json.dumps(data).encode()
req = Request(url, data=body, headers={"Content-Type": "application/json"})
t0 = time.perf_counter()
resp = urlopen(req, timeout=timeout)
raw = resp.read()
elapsed = time.perf_counter() - t0
return json.loads(raw), elapsed
def http_post_raw(url: str, data: bytes, content_type: str, timeout: int = 180) -> tuple[bytes, float]:
"""POST raw bytes, return (response_bytes, elapsed_seconds)."""
req = Request(url, data=data, headers={"Content-Type": content_type})
t0 = time.perf_counter()
resp = urlopen(req, timeout=timeout)
raw = resp.read()
elapsed = time.perf_counter() - t0
return raw, elapsed
# --- Stage 1: TTS ---
async def benchmark_tts(text: str) -> tuple[bytes, float]:
"""Synthesize text via Wyoming TTS, return (wav_bytes, elapsed)."""
t0 = time.perf_counter()
client = AsyncTcpClient(TTS_HOST, TTS_PORT)
await client.connect()
await client.read_event() # Info
await client.write_event(
Synthesize(text=text, voice=SynthesizeVoice(name="af_heart")).event()
)
audio_data = bytearray()
rate, width, channels = 24000, 2, 1
while True:
event = await client.read_event()
if event is None:
break
if AudioStart.is_type(event.type):
start = AudioStart.from_event(event)
rate, width, channels = start.rate, start.width, start.channels
elif AudioChunk.is_type(event.type):
audio_data.extend(AudioChunk.from_event(event).audio)
elif AudioStop.is_type(event.type):
break
await client.disconnect()
elapsed = time.perf_counter() - t0
# Package as WAV
wav_io = io.BytesIO()
with wave.open(wav_io, "wb") as wf:
wf.setnchannels(channels)
wf.setsampwidth(width)
wf.setframerate(rate)
wf.writeframes(audio_data)
duration_s = len(audio_data) / (rate * width * channels)
return wav_io.getvalue(), elapsed, duration_s
# --- Stage 2: STT ---
async def benchmark_stt(wav_bytes: bytes) -> tuple[str, float]:
"""Transcribe WAV via Wyoming STT, return (text, elapsed)."""
wav_io = io.BytesIO(wav_bytes)
with wave.open(wav_io, "rb") as wf:
rate = wf.getframerate()
width = wf.getsampwidth()
channels = wf.getnchannels()
pcm = wf.readframes(wf.getnframes())
t0 = time.perf_counter()
client = AsyncTcpClient(STT_HOST, STT_PORT)
await client.connect()
await client.write_event(Transcribe(language="en").event())
await client.write_event(AudioStart(rate=rate, width=width, channels=channels).event())
chunk_size = rate * width * channels # 1 second
for off in range(0, len(pcm), chunk_size):
await client.write_event(
AudioChunk(rate=rate, width=width, channels=channels, audio=pcm[off:off + chunk_size]).event()
)
await client.write_event(AudioStop().event())
text = ""
while True:
event = await client.read_event()
if event is None:
break
if Transcript.is_type(event.type):
text = Transcript.from_event(event).text
break
await client.disconnect()
elapsed = time.perf_counter() - t0
return text, elapsed
# --- Stage 3: LLM ---
def benchmark_llm(model: str, prompt: str, warm: bool = False) -> dict:
"""
Call Ollama /api/generate, return timing breakdown.
If warm=True, we assume the model is already loaded.
"""
data = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"num_ctx": 2048}, # small ctx for benchmark speed
}
body = json.dumps(data).encode()
req = Request(
f"{OLLAMA_URL}/api/generate",
data=body,
headers={"Content-Type": "application/json"},
)
t0 = time.perf_counter()
resp = urlopen(req, timeout=300)
raw = resp.read()
wall_time = time.perf_counter() - t0
result = json.loads(raw)
# Ollama returns timing in nanoseconds
load_ns = result.get("load_duration", 0)
prompt_ns = result.get("prompt_eval_duration", 0)
eval_ns = result.get("eval_duration", 0)
total_ns = result.get("total_duration", 0)
prompt_tokens = result.get("prompt_eval_count", 0)
eval_tokens = result.get("eval_count", 0)
return {
"model": model,
"wall_time_s": wall_time,
"load_s": load_ns / 1e9,
"prompt_eval_s": prompt_ns / 1e9,
"eval_s": eval_ns / 1e9,
"total_s": total_ns / 1e9,
"prompt_tokens": prompt_tokens,
"eval_tokens": eval_tokens,
"tokens_per_sec": eval_tokens / (eval_ns / 1e9) if eval_ns > 0 else 0,
"prompt_tokens_per_sec": prompt_tokens / (prompt_ns / 1e9) if prompt_ns > 0 else 0,
"response": result.get("response", "")[:200],
}
def warm_model(model: str):
"""Send a tiny request to load the model into GPU memory."""
print(f" Warming up {model}...", end=" ", flush=True)
try:
data = json.dumps({"model": model, "prompt": "hi", "stream": False, "options": {"num_ctx": 512}}).encode()
req = Request(f"{OLLAMA_URL}/api/generate", data=data, headers={"Content-Type": "application/json"})
urlopen(req, timeout=300).read()
print("ready")
except Exception as e:
print(f"warning: {e}")
# --- Stage 4: End-to-end via bridge ---
def benchmark_e2e(message: str) -> tuple[str, float]:
"""Call the OpenClaw HTTP bridge end-to-end."""
data = {"message": message, "agent": "main"}
resp, elapsed = http_post_json(f"{BRIDGE_URL}/api/agent/message", data, timeout=300)
return resp.get("response", ""), elapsed
# --- Formatting ---
def fmt_time(seconds: float) -> str:
if seconds < 1:
return f"{seconds*1000:.0f}ms"
return f"{seconds:.1f}s"
def print_table(rows: list[dict], columns: list[tuple[str, str, int]]):
"""Print a formatted table. columns = [(header, key, width), ...]"""
header = " | ".join(h.ljust(w) for h, _, w in columns)
print(header)
print("-" * len(header))
for row in rows:
line = " | ".join(str(row.get(k, "")).ljust(w) for _, k, w in columns)
print(line)
# --- Main ---
def main():
parser = argparse.ArgumentParser(description="Voice Pipeline Benchmark")
parser.add_argument("--rounds", type=int, default=2, help="Rounds per test (default: 2)")
parser.add_argument(
"--models",
default="qwen2.5:7b,qwen3:32b,llama3.3:70b",
help="Comma-separated Ollama models to test",
)
parser.add_argument("--skip-stt", action="store_true", help="Skip STT benchmark")
parser.add_argument("--skip-tts", action="store_true", help="Skip TTS benchmark")
parser.add_argument("--skip-llm", action="store_true", help="Skip LLM benchmark")
parser.add_argument("--skip-e2e", action="store_true", help="Skip end-to-end benchmark")
parser.add_argument("--prompt", default=None, help="Custom prompt for LLM benchmark")
args = parser.parse_args()
models = [m.strip() for m in args.models.split(",")]
llm_prompt = args.prompt or LONG_PROMPT
print("=" * 70)
print(" VOICE PIPELINE BENCHMARK")
print("=" * 70)
print(f" Rounds: {args.rounds}")
print(f" Models: {', '.join(models)}")
print(f" LLM prompt: {llm_prompt!r}")
print()
# ── TTS Benchmark ──
test_wav = None
if not args.skip_tts:
print("── TTS (Kokoro, Wyoming port 10301) ──")
tts_times = []
tts_durations = []
for i in range(args.rounds):
text = TEST_PROMPTS[i % len(TEST_PROMPTS)]
wav, elapsed, audio_dur = asyncio.run(benchmark_tts(text))
tts_times.append(elapsed)
tts_durations.append(audio_dur)
test_wav = wav
print(f" Round {i+1}: {fmt_time(elapsed)}{audio_dur:.1f}s audio (RTF: {elapsed/audio_dur:.2f}x) text={text!r}")
avg_tts = statistics.mean(tts_times)
avg_dur = statistics.mean(tts_durations)
print(f" Average: {fmt_time(avg_tts)} for {avg_dur:.1f}s audio (RTF: {avg_tts/avg_dur:.2f}x)")
print()
# ── STT Benchmark ──
if not args.skip_stt:
print("── STT (Whisper large-v3, Wyoming port 10300) ──")
if test_wav is None:
# Generate a test WAV first
print(" Generating test audio via TTS...")
test_wav, _, _ = asyncio.run(benchmark_tts("The quick brown fox jumps over the lazy dog."))
stt_times = []
for i in range(args.rounds):
text, elapsed = asyncio.run(benchmark_stt(test_wav))
stt_times.append(elapsed)
print(f" Round {i+1}: {fmt_time(elapsed)}{text!r}")
print(f" Average: {fmt_time(statistics.mean(stt_times))}")
print()
# ── LLM Benchmark ──
if not args.skip_llm:
print("── LLM (Ollama) ──")
print(f" Prompt: {llm_prompt!r}")
print()
all_results = []
for model in models:
print(f" Model: {model}")
warm_model(model)
model_runs = []
for i in range(args.rounds):
result = benchmark_llm(model, llm_prompt, warm=True)
model_runs.append(result)
print(
f" Round {i+1}: wall={fmt_time(result['wall_time_s'])} "
f"load={fmt_time(result['load_s'])} "
f"prompt_eval={fmt_time(result['prompt_eval_s'])} ({result['prompt_tokens']}tok, {result['prompt_tokens_per_sec']:.0f}t/s) "
f"gen={fmt_time(result['eval_s'])} ({result['eval_tokens']}tok, {result['tokens_per_sec']:.1f}t/s)"
)
# Truncate response for display
resp_preview = result["response"][:100].replace("\n", " ")
print(f"{resp_preview}")
# Summarize
avg_wall = statistics.mean(r["wall_time_s"] for r in model_runs)
avg_tps = statistics.mean(r["tokens_per_sec"] for r in model_runs)
avg_prompt_tps = statistics.mean(r["prompt_tokens_per_sec"] for r in model_runs)
avg_tokens = statistics.mean(r["eval_tokens"] for r in model_runs)
all_results.append({
"model": model,
"avg_wall": fmt_time(avg_wall),
"avg_gen_tps": f"{avg_tps:.1f}",
"avg_prompt_tps": f"{avg_prompt_tps:.0f}",
"avg_tokens": f"{avg_tokens:.0f}",
})
print()
# Summary table
print(" ┌─ LLM Summary ─────────────────────────────────────────────┐")
print(f" {'Model':<25s} {'Wall time':>10s} {'Gen t/s':>10s} {'Prompt t/s':>11s} {'Avg tokens':>11s}")
print(f" {''*25} {''*10} {''*10} {''*11} {''*11}")
for r in all_results:
print(f" {r['model']:<25s} {r['avg_wall']:>10s} {r['avg_gen_tps']:>10s} {r['avg_prompt_tps']:>11s} {r['avg_tokens']:>11s}")
print()
# ── End-to-end ──
if not args.skip_e2e:
print("── End-to-End (Bridge → OpenClaw → Ollama → response) ──")
print(" (Does not include STT/TTS — just text in → text out via bridge)")
e2e_prompt = "What time is it?"
for i in range(args.rounds):
try:
resp, elapsed = benchmark_e2e(e2e_prompt)
preview = resp[:100].replace("\n", " ")
print(f" Round {i+1}: {fmt_time(elapsed)}{preview}")
except Exception as e:
print(f" Round {i+1}: ERROR - {e}")
print()
# ── Pipeline estimate ──
print("=" * 70)
print(" ESTIMATED PIPELINE LATENCY (per voice interaction)")
print("=" * 70)
print(" wake word detection ~instant (runs locally)")
print(" + STT (Whisper) see above")
print(" + LLM (inference) see above (dominant cost)")
print(" + TTS (Kokoro) see above")
print(" ─────────────────────────────────────")
print(" Tip: smaller models (7B, 32B) dramatically reduce LLM latency.")
print(" The 70B model at ~12 tok/s needs ~5-8s for a typical reply.")
print(" A 7B model at ~80 tok/s would need <1s for the same reply.")
print()
if __name__ == "__main__":
main()