feat: character system v2 — schema upgrade, memory system, per-character TTS routing
Character schema v2: background, dialogue_style, appearance, skills, gaze_presets with automatic v1→v2 migration. LLM-assisted character creation via Character MCP server. Two-tier memory system (personal per-character + general shared) with budget-based injection into LLM system prompt. Per-character TTS voice routing via state file — Wyoming TTS server reads active config to route between Kokoro (local) and ElevenLabs (cloud PCM 24kHz). Dashboard: memories page, conversation history, character profile on cards, auto-TTS engine selection from character config. Also includes VTube Studio expression bridge and ComfyUI API guide. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
40
homeai-visual/launchd/com.homeai.vtube-bridge.plist
Normal file
40
homeai-visual/launchd/com.homeai.vtube-bridge.plist
Normal file
@@ -0,0 +1,40 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN"
|
||||
"http://www.apple.com/DTDs/PropertyList-1.0.dtd">
|
||||
<plist version="1.0">
|
||||
<dict>
|
||||
<key>Label</key>
|
||||
<string>com.homeai.vtube-bridge</string>
|
||||
|
||||
<key>ProgramArguments</key>
|
||||
<array>
|
||||
<string>/Users/aodhan/homeai-visual-env/bin/python3</string>
|
||||
<string>/Users/aodhan/gitea/homeai/homeai-visual/vtube-bridge.py</string>
|
||||
<string>--port</string>
|
||||
<string>8002</string>
|
||||
<string>--character</string>
|
||||
<string>/Users/aodhan/gitea/homeai/homeai-dashboard/characters/aria.json</string>
|
||||
</array>
|
||||
|
||||
<key>RunAtLoad</key>
|
||||
<true/>
|
||||
|
||||
<key>KeepAlive</key>
|
||||
<true/>
|
||||
|
||||
<key>StandardOutPath</key>
|
||||
<string>/tmp/homeai-vtube-bridge.log</string>
|
||||
|
||||
<key>StandardErrorPath</key>
|
||||
<string>/tmp/homeai-vtube-bridge-error.log</string>
|
||||
|
||||
<key>ThrottleInterval</key>
|
||||
<integer>10</integer>
|
||||
|
||||
<key>EnvironmentVariables</key>
|
||||
<dict>
|
||||
<key>PATH</key>
|
||||
<string>/opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin</string>
|
||||
</dict>
|
||||
</dict>
|
||||
</plist>
|
||||
170
homeai-visual/scripts/test-expressions.py
Normal file
170
homeai-visual/scripts/test-expressions.py
Normal file
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script for VTube Studio Expression Bridge.
|
||||
|
||||
Usage:
|
||||
python3 test-expressions.py # test all expressions
|
||||
python3 test-expressions.py --auth # run auth flow first
|
||||
python3 test-expressions.py --lipsync # test lip sync parameter
|
||||
python3 test-expressions.py --latency # measure round-trip latency
|
||||
|
||||
Requires the vtube-bridge to be running on port 8002.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
|
||||
BRIDGE_URL = "http://localhost:8002"
|
||||
|
||||
EXPRESSIONS = ["idle", "listening", "thinking", "speaking", "happy", "sad", "surprised", "error"]
|
||||
|
||||
|
||||
def _post(path: str, data: dict | None = None) -> dict:
|
||||
body = json.dumps(data or {}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{BRIDGE_URL}{path}",
|
||||
data=body,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
return json.loads(resp.read())
|
||||
|
||||
|
||||
def _get(path: str) -> dict:
|
||||
req = urllib.request.Request(f"{BRIDGE_URL}{path}")
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
return json.loads(resp.read())
|
||||
|
||||
|
||||
def check_bridge():
|
||||
"""Verify bridge is running and connected."""
|
||||
try:
|
||||
status = _get("/status")
|
||||
print(f"Bridge status: connected={status['connected']}, authenticated={status['authenticated']}")
|
||||
print(f" Expressions: {', '.join(status.get('expressions', []))}")
|
||||
if not status["connected"]:
|
||||
print("\n WARNING: Not connected to VTube Studio. Is it running?")
|
||||
if not status["authenticated"]:
|
||||
print(" WARNING: Not authenticated. Run with --auth to initiate auth flow.")
|
||||
return status
|
||||
except Exception as e:
|
||||
print(f"ERROR: Cannot reach bridge at {BRIDGE_URL}: {e}")
|
||||
print(" Is vtube-bridge.py running?")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def run_auth():
|
||||
"""Initiate auth flow — user must click Allow in VTube Studio."""
|
||||
print("Requesting authentication token...")
|
||||
print(" >>> Click 'Allow' in VTube Studio when prompted <<<")
|
||||
result = _post("/auth")
|
||||
print(f" Result: {json.dumps(result, indent=2)}")
|
||||
return result
|
||||
|
||||
|
||||
def test_expressions(delay: float = 2.0):
|
||||
"""Cycle through all expressions with a pause between each."""
|
||||
print(f"\nCycling through {len(EXPRESSIONS)} expressions ({delay}s each):\n")
|
||||
for expr in EXPRESSIONS:
|
||||
print(f" → {expr}...", end=" ", flush=True)
|
||||
t0 = time.monotonic()
|
||||
result = _post("/expression", {"event": expr})
|
||||
dt = (time.monotonic() - t0) * 1000
|
||||
if result.get("ok"):
|
||||
print(f"OK ({dt:.0f}ms)")
|
||||
else:
|
||||
print(f"FAILED: {result.get('error', 'unknown')}")
|
||||
time.sleep(delay)
|
||||
|
||||
# Return to idle
|
||||
_post("/expression", {"event": "idle"})
|
||||
print("\n Returned to idle.")
|
||||
|
||||
|
||||
def test_lipsync(duration: float = 3.0):
|
||||
"""Simulate lip sync by sweeping MouthOpen 0→1→0."""
|
||||
import math
|
||||
print(f"\nTesting lip sync (MouthOpen sweep, {duration}s)...\n")
|
||||
|
||||
fps = 20
|
||||
frames = int(duration * fps)
|
||||
for i in range(frames):
|
||||
t = i / frames
|
||||
# Sine wave for smooth open/close
|
||||
value = abs(math.sin(t * math.pi * 4))
|
||||
value = round(value, 3)
|
||||
_post("/parameter", {"name": "MouthOpen", "value": value})
|
||||
print(f"\r MouthOpen = {value:.3f}", end="", flush=True)
|
||||
time.sleep(1.0 / fps)
|
||||
|
||||
_post("/parameter", {"name": "MouthOpen", "value": 0.0})
|
||||
print("\r MouthOpen = 0.000 (done) ")
|
||||
|
||||
|
||||
def test_latency(iterations: int = 20):
|
||||
"""Measure expression trigger round-trip latency."""
|
||||
print(f"\nMeasuring latency ({iterations} iterations)...\n")
|
||||
times = []
|
||||
for i in range(iterations):
|
||||
expr = "thinking" if i % 2 == 0 else "idle"
|
||||
t0 = time.monotonic()
|
||||
_post("/expression", {"event": expr})
|
||||
dt = (time.monotonic() - t0) * 1000
|
||||
times.append(dt)
|
||||
print(f" {i+1:2d}. {expr:10s} → {dt:.1f}ms")
|
||||
|
||||
avg = sum(times) / len(times)
|
||||
mn = min(times)
|
||||
mx = max(times)
|
||||
print(f"\n Avg: {avg:.1f}ms Min: {mn:.1f}ms Max: {mx:.1f}ms")
|
||||
if avg < 100:
|
||||
print(" PASS: Average latency under 100ms target")
|
||||
else:
|
||||
print(" WARNING: Average latency exceeds 100ms target")
|
||||
|
||||
# Return to idle
|
||||
_post("/expression", {"event": "idle"})
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="VTube Studio Expression Bridge Tester")
|
||||
parser.add_argument("--auth", action="store_true", help="Run auth flow")
|
||||
parser.add_argument("--lipsync", action="store_true", help="Test lip sync parameter sweep")
|
||||
parser.add_argument("--latency", action="store_true", help="Measure round-trip latency")
|
||||
parser.add_argument("--delay", type=float, default=2.0, help="Delay between expressions (default: 2s)")
|
||||
parser.add_argument("--all", action="store_true", help="Run all tests")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("VTube Studio Expression Bridge Tester")
|
||||
print("=" * 42)
|
||||
|
||||
status = check_bridge()
|
||||
|
||||
if args.auth:
|
||||
run_auth()
|
||||
print()
|
||||
status = check_bridge()
|
||||
|
||||
if not status.get("authenticated") and not args.auth:
|
||||
print("\nNot authenticated — skipping expression tests.")
|
||||
print("Run with --auth to authenticate, or start VTube Studio first.")
|
||||
return
|
||||
|
||||
if args.all:
|
||||
test_expressions(args.delay)
|
||||
test_lipsync()
|
||||
test_latency()
|
||||
elif args.lipsync:
|
||||
test_lipsync()
|
||||
elif args.latency:
|
||||
test_latency()
|
||||
else:
|
||||
test_expressions(args.delay)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,17 +1,16 @@
|
||||
#!/usr/bin/env bash
|
||||
# homeai-visual/setup.sh — P7: VTube Studio bridge + Live2D expressions
|
||||
# homeai-visual/setup.sh — P7: VTube Studio Expression Bridge
|
||||
#
|
||||
# Components:
|
||||
# - vtube_studio.py — WebSocket client skill for OpenClaw
|
||||
# - lipsync.py — amplitude-based lip sync
|
||||
# - auth.py — VTube Studio token management
|
||||
# Sets up:
|
||||
# - Python venv with websockets
|
||||
# - vtube-bridge daemon (HTTP ↔ WebSocket bridge)
|
||||
# - vtube-ctl CLI (symlinked to PATH)
|
||||
# - launchd service
|
||||
#
|
||||
# Prerequisites:
|
||||
# - P4 (homeai-agent) — OpenClaw running
|
||||
# - P5 (homeai-character) — aria.json with live2d_expressions set
|
||||
# - macOS: VTube Studio installed (Mac App Store)
|
||||
# - Linux: N/A — VTube Studio is macOS/Windows/iOS only
|
||||
# Linux dev can test the skill code but not the VTube Studio side
|
||||
# - VTube Studio installed (Mac App Store) with WebSocket API enabled
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
@@ -19,42 +18,61 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
REPO_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)"
|
||||
source "${REPO_DIR}/scripts/common.sh"
|
||||
|
||||
log_section "P7: VTube Studio Bridge"
|
||||
detect_platform
|
||||
VENV_DIR="$HOME/homeai-visual-env"
|
||||
PLIST_SRC="${SCRIPT_DIR}/launchd/com.homeai.vtube-bridge.plist"
|
||||
PLIST_DST="$HOME/Library/LaunchAgents/com.homeai.vtube-bridge.plist"
|
||||
VTUBE_CTL_SRC="$HOME/.openclaw/skills/vtube-studio/scripts/vtube-ctl"
|
||||
|
||||
if [[ "$OS_TYPE" == "linux" ]]; then
|
||||
log_warn "VTube Studio is not available on Linux."
|
||||
log_warn "This sub-project requires macOS (Mac Mini)."
|
||||
log_section "P7: VTube Studio Expression Bridge"
|
||||
|
||||
# ─── Python venv ──────────────────────────────────────────────────────────────
|
||||
|
||||
if [[ ! -d "$VENV_DIR" ]]; then
|
||||
log_info "Creating Python venv at $VENV_DIR..."
|
||||
python3 -m venv "$VENV_DIR"
|
||||
fi
|
||||
|
||||
# ─── TODO: Implementation ──────────────────────────────────────────────────────
|
||||
cat <<'EOF'
|
||||
log_info "Installing dependencies..."
|
||||
"$VENV_DIR/bin/pip" install --upgrade pip -q
|
||||
"$VENV_DIR/bin/pip" install websockets -q
|
||||
log_ok "Python venv ready ($(${VENV_DIR}/bin/python3 --version))"
|
||||
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ P7: homeai-visual — NOT YET IMPLEMENTED │
|
||||
│ │
|
||||
│ macOS only (VTube Studio is macOS/iOS/Windows) │
|
||||
│ │
|
||||
│ Implementation steps: │
|
||||
│ 1. Install VTube Studio from Mac App Store │
|
||||
│ 2. Enable WebSocket API in VTube Studio (Settings → port 8001) │
|
||||
│ 3. Source/purchase Live2D model │
|
||||
│ 4. Create expression hotkeys for 8 states │
|
||||
│ 5. Implement skills/vtube_studio.py (WebSocket client) │
|
||||
│ 6. Implement skills/lipsync.py (amplitude → MouthOpen param) │
|
||||
│ 7. Implement skills/auth.py (token request + persistence) │
|
||||
│ 8. Register vtube_studio skill with OpenClaw │
|
||||
│ 9. Update aria.json live2d_expressions with hotkey IDs │
|
||||
│ 10. Test all 8 expression states │
|
||||
│ │
|
||||
│ On Linux: implement Python skills, test WebSocket protocol │
|
||||
│ with a mock server before connecting to real VTube Studio. │
|
||||
│ │
|
||||
│ Interface contracts: │
|
||||
│ VTUBE_WS_URL=ws://localhost:8001 │
|
||||
└─────────────────────────────────────────────────────────────────┘
|
||||
# ─── vtube-ctl symlink ───────────────────────────────────────────────────────
|
||||
|
||||
EOF
|
||||
if [[ -f "$VTUBE_CTL_SRC" ]]; then
|
||||
chmod +x "$VTUBE_CTL_SRC"
|
||||
ln -sf "$VTUBE_CTL_SRC" /opt/homebrew/bin/vtube-ctl
|
||||
log_ok "vtube-ctl symlinked to /opt/homebrew/bin/vtube-ctl"
|
||||
else
|
||||
log_warn "vtube-ctl not found at $VTUBE_CTL_SRC — skipping symlink"
|
||||
fi
|
||||
|
||||
log_info "P7 is not yet implemented. See homeai-visual/PLAN.md for details."
|
||||
exit 0
|
||||
# ─── launchd service ─────────────────────────────────────────────────────────
|
||||
|
||||
if [[ -f "$PLIST_SRC" ]]; then
|
||||
# Unload if already loaded
|
||||
launchctl bootout "gui/$(id -u)/com.homeai.vtube-bridge" 2>/dev/null || true
|
||||
|
||||
cp "$PLIST_SRC" "$PLIST_DST"
|
||||
launchctl bootstrap "gui/$(id -u)" "$PLIST_DST"
|
||||
log_ok "launchd service loaded: com.homeai.vtube-bridge"
|
||||
else
|
||||
log_warn "Plist not found at $PLIST_SRC — skipping launchd setup"
|
||||
fi
|
||||
|
||||
# ─── Status ──────────────────────────────────────────────────────────────────
|
||||
|
||||
echo ""
|
||||
log_info "VTube Bridge setup complete."
|
||||
log_info ""
|
||||
log_info "Next steps:"
|
||||
log_info " 1. Install VTube Studio from Mac App Store"
|
||||
log_info " 2. Enable WebSocket API: Settings > WebSocket API > port 8001"
|
||||
log_info " 3. Load a Live2D model"
|
||||
log_info " 4. Create expression hotkeys (idle, listening, thinking, speaking, happy, sad, surprised, error)"
|
||||
log_info " 5. Run: vtube-ctl auth (click Allow in VTube Studio)"
|
||||
log_info " 6. Run: python3 ${SCRIPT_DIR}/scripts/test-expressions.py --all"
|
||||
log_info " 7. Update aria.json with real hotkey UUIDs"
|
||||
log_info ""
|
||||
log_info "Logs: /tmp/homeai-vtube-bridge.log"
|
||||
log_info "Bridge: http://localhost:8002/status"
|
||||
|
||||
454
homeai-visual/vtube-bridge.py
Normal file
454
homeai-visual/vtube-bridge.py
Normal file
@@ -0,0 +1,454 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
VTube Studio Expression Bridge — persistent WebSocket ↔ HTTP bridge.
|
||||
|
||||
Maintains a long-lived WebSocket connection to VTube Studio and exposes
|
||||
a simple HTTP API so other HomeAI components can trigger expressions and
|
||||
inject parameters (lip sync) without managing their own WS connections.
|
||||
|
||||
HTTP API (port 8002):
|
||||
POST /expression {"event": "thinking"} → trigger hotkey
|
||||
POST /parameter {"name": "MouthOpen", "value": 0.5} → inject param
|
||||
POST /parameters [{"name": "MouthOpen", "value": 0.5}, ...]
|
||||
POST /auth {} → request new token
|
||||
GET /status → connection info
|
||||
GET /expressions → list available expressions
|
||||
|
||||
Requires: pip install websockets
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from http import HTTPStatus
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import websockets
|
||||
from websockets.exceptions import ConnectionClosed
|
||||
except ImportError:
|
||||
print("ERROR: 'websockets' package required. Install with: pip install websockets", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
DEFAULT_VTUBE_WS_URL = "ws://localhost:8001"
|
||||
DEFAULT_HTTP_PORT = 8002
|
||||
TOKEN_PATH = Path.home() / ".openclaw" / "vtube_token.json"
|
||||
DEFAULT_CHARACTER_PATH = (
|
||||
Path.home() / "gitea" / "homeai" / "homeai-dashboard" / "characters" / "aria.json"
|
||||
)
|
||||
|
||||
logger = logging.getLogger("vtube-bridge")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# VTube Studio WebSocket Client
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class VTubeClient:
|
||||
"""Persistent async WebSocket client for VTube Studio API."""
|
||||
|
||||
def __init__(self, ws_url: str, character_path: Path):
|
||||
self.ws_url = ws_url
|
||||
self.character_path = character_path
|
||||
self._ws = None
|
||||
self._token: str | None = None
|
||||
self._authenticated = False
|
||||
self._current_expression: str | None = None
|
||||
self._connected = False
|
||||
self._request_id = 0
|
||||
self._lock = asyncio.Lock()
|
||||
self._load_token()
|
||||
self._load_character()
|
||||
|
||||
# ── Character config ──────────────────────────────────────────────
|
||||
|
||||
def _load_character(self):
|
||||
"""Load expression mappings from character JSON."""
|
||||
self.expression_map: dict[str, str] = {}
|
||||
self.ws_triggers: dict = {}
|
||||
try:
|
||||
if self.character_path.exists():
|
||||
cfg = json.loads(self.character_path.read_text())
|
||||
self.expression_map = cfg.get("live2d_expressions", {})
|
||||
self.ws_triggers = cfg.get("vtube_ws_triggers", {})
|
||||
logger.info("Loaded %d expressions from %s", len(self.expression_map), self.character_path.name)
|
||||
else:
|
||||
logger.warning("Character file not found: %s", self.character_path)
|
||||
except Exception as e:
|
||||
logger.error("Failed to load character config: %s", e)
|
||||
|
||||
def reload_character(self):
|
||||
"""Hot-reload character config without restarting."""
|
||||
self._load_character()
|
||||
return {"expressions": self.expression_map, "triggers": self.ws_triggers}
|
||||
|
||||
# ── Token persistence ─────────────────────────────────────────────
|
||||
|
||||
def _load_token(self):
|
||||
try:
|
||||
if TOKEN_PATH.exists():
|
||||
data = json.loads(TOKEN_PATH.read_text())
|
||||
self._token = data.get("token")
|
||||
logger.info("Loaded auth token from %s", TOKEN_PATH)
|
||||
except Exception as e:
|
||||
logger.warning("Could not load token: %s", e)
|
||||
|
||||
def _save_token(self, token: str):
|
||||
TOKEN_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
TOKEN_PATH.write_text(json.dumps({"token": token}, indent=2))
|
||||
self._token = token
|
||||
logger.info("Saved auth token to %s", TOKEN_PATH)
|
||||
|
||||
# ── WebSocket comms ───────────────────────────────────────────────
|
||||
|
||||
def _next_id(self) -> str:
|
||||
self._request_id += 1
|
||||
return f"homeai-{self._request_id}"
|
||||
|
||||
async def _send(self, message_type: str, data: dict | None = None) -> dict:
|
||||
"""Send a VTube Studio API message and return the response."""
|
||||
payload = {
|
||||
"apiName": "VTubeStudioPublicAPI",
|
||||
"apiVersion": "1.0",
|
||||
"requestID": self._next_id(),
|
||||
"messageType": message_type,
|
||||
"data": data or {},
|
||||
}
|
||||
await self._ws.send(json.dumps(payload))
|
||||
resp = json.loads(await asyncio.wait_for(self._ws.recv(), timeout=10))
|
||||
return resp
|
||||
|
||||
# ── Connection lifecycle ──────────────────────────────────────────
|
||||
|
||||
async def connect(self):
|
||||
"""Connect and authenticate to VTube Studio."""
|
||||
try:
|
||||
self._ws = await websockets.connect(self.ws_url, ping_interval=20, ping_timeout=10)
|
||||
self._connected = True
|
||||
logger.info("Connected to VTube Studio at %s", self.ws_url)
|
||||
|
||||
if self._token:
|
||||
await self._authenticate()
|
||||
else:
|
||||
logger.warning("No auth token — call POST /auth to initiate authentication")
|
||||
except Exception as e:
|
||||
self._connected = False
|
||||
self._authenticated = False
|
||||
logger.error("Connection failed: %s", e)
|
||||
raise
|
||||
|
||||
async def _authenticate(self):
|
||||
"""Authenticate with an existing token."""
|
||||
resp = await self._send("AuthenticationRequest", {
|
||||
"pluginName": "HomeAI",
|
||||
"pluginDeveloper": "HomeAI",
|
||||
"authenticationToken": self._token,
|
||||
})
|
||||
self._authenticated = resp.get("data", {}).get("authenticated", False)
|
||||
if self._authenticated:
|
||||
logger.info("Authenticated successfully")
|
||||
else:
|
||||
logger.warning("Token rejected — request a new one via POST /auth")
|
||||
self._authenticated = False
|
||||
|
||||
async def request_new_token(self) -> dict:
|
||||
"""Request a new auth token. User must click Allow in VTube Studio."""
|
||||
if not self._connected:
|
||||
return {"error": "Not connected to VTube Studio"}
|
||||
|
||||
resp = await self._send("AuthenticationTokenRequest", {
|
||||
"pluginName": "HomeAI",
|
||||
"pluginDeveloper": "HomeAI",
|
||||
"pluginIcon": None,
|
||||
})
|
||||
token = resp.get("data", {}).get("authenticationToken")
|
||||
if token:
|
||||
self._save_token(token)
|
||||
await self._authenticate()
|
||||
return {"authenticated": self._authenticated, "token_saved": True}
|
||||
return {"error": "No token received", "response": resp}
|
||||
|
||||
async def disconnect(self):
|
||||
if self._ws:
|
||||
await self._ws.close()
|
||||
self._connected = False
|
||||
self._authenticated = False
|
||||
|
||||
async def ensure_connected(self):
|
||||
"""Reconnect if the connection dropped."""
|
||||
if not self._connected or self._ws is None or self._ws.closed:
|
||||
logger.info("Reconnecting...")
|
||||
await self.connect()
|
||||
|
||||
# ── Expression & parameter API ────────────────────────────────────
|
||||
|
||||
async def trigger_expression(self, event: str) -> dict:
|
||||
"""Trigger a named expression from the character config."""
|
||||
async with self._lock:
|
||||
await self.ensure_connected()
|
||||
if not self._authenticated:
|
||||
return {"error": "Not authenticated"}
|
||||
|
||||
hotkey_id = self.expression_map.get(event)
|
||||
if not hotkey_id:
|
||||
return {"error": f"Unknown expression: {event}", "available": list(self.expression_map.keys())}
|
||||
|
||||
resp = await self._send("HotkeyTriggerRequest", {"hotkeyID": hotkey_id})
|
||||
self._current_expression = event
|
||||
return {"ok": True, "expression": event, "hotkey_id": hotkey_id}
|
||||
|
||||
async def set_parameter(self, name: str, value: float, weight: float = 1.0) -> dict:
|
||||
"""Inject a single VTube Studio parameter value."""
|
||||
async with self._lock:
|
||||
await self.ensure_connected()
|
||||
if not self._authenticated:
|
||||
return {"error": "Not authenticated"}
|
||||
|
||||
resp = await self._send("InjectParameterDataRequest", {
|
||||
"parameterValues": [{"id": name, "value": value, "weight": weight}],
|
||||
})
|
||||
return {"ok": True, "name": name, "value": value}
|
||||
|
||||
async def set_parameters(self, params: list[dict]) -> dict:
|
||||
"""Inject multiple VTube Studio parameters at once."""
|
||||
async with self._lock:
|
||||
await self.ensure_connected()
|
||||
if not self._authenticated:
|
||||
return {"error": "Not authenticated"}
|
||||
|
||||
param_values = [
|
||||
{"id": p["name"], "value": p["value"], "weight": p.get("weight", 1.0)}
|
||||
for p in params
|
||||
]
|
||||
resp = await self._send("InjectParameterDataRequest", {
|
||||
"parameterValues": param_values,
|
||||
})
|
||||
return {"ok": True, "count": len(param_values)}
|
||||
|
||||
async def list_hotkeys(self) -> dict:
|
||||
"""List all hotkeys available in the current model."""
|
||||
async with self._lock:
|
||||
await self.ensure_connected()
|
||||
if not self._authenticated:
|
||||
return {"error": "Not authenticated"}
|
||||
|
||||
resp = await self._send("HotkeysInCurrentModelRequest", {})
|
||||
return resp.get("data", {})
|
||||
|
||||
async def list_parameters(self) -> dict:
|
||||
"""List all input parameters for the current model."""
|
||||
async with self._lock:
|
||||
await self.ensure_connected()
|
||||
if not self._authenticated:
|
||||
return {"error": "Not authenticated"}
|
||||
|
||||
resp = await self._send("InputParameterListRequest", {})
|
||||
return resp.get("data", {})
|
||||
|
||||
def status(self) -> dict:
|
||||
return {
|
||||
"connected": self._connected,
|
||||
"authenticated": self._authenticated,
|
||||
"ws_url": self.ws_url,
|
||||
"current_expression": self._current_expression,
|
||||
"expression_count": len(self.expression_map),
|
||||
"expressions": list(self.expression_map.keys()),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HTTP Server (asyncio-based, no external deps)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class BridgeHTTPHandler:
|
||||
"""Simple async HTTP request handler for the bridge API."""
|
||||
|
||||
def __init__(self, client: VTubeClient):
|
||||
self.client = client
|
||||
|
||||
async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||
try:
|
||||
request_line = await asyncio.wait_for(reader.readline(), timeout=5)
|
||||
if not request_line:
|
||||
writer.close()
|
||||
return
|
||||
|
||||
method, path, _ = request_line.decode().strip().split(" ", 2)
|
||||
path = path.split("?")[0] # strip query params
|
||||
|
||||
# Read headers
|
||||
content_length = 0
|
||||
while True:
|
||||
line = await reader.readline()
|
||||
if line == b"\r\n" or not line:
|
||||
break
|
||||
if line.lower().startswith(b"content-length:"):
|
||||
content_length = int(line.split(b":")[1].strip())
|
||||
|
||||
# Read body
|
||||
body = None
|
||||
if content_length > 0:
|
||||
body = await reader.read(content_length)
|
||||
|
||||
# Route
|
||||
try:
|
||||
result = await self._route(method, path, body)
|
||||
await self._respond(writer, 200, result)
|
||||
except Exception as e:
|
||||
logger.error("Handler error: %s", e, exc_info=True)
|
||||
await self._respond(writer, 500, {"error": str(e)})
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
writer.close()
|
||||
except Exception as e:
|
||||
logger.error("Connection error: %s", e)
|
||||
try:
|
||||
writer.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _route(self, method: str, path: str, body: bytes | None) -> dict:
|
||||
data = {}
|
||||
if body:
|
||||
try:
|
||||
data = json.loads(body)
|
||||
except json.JSONDecodeError:
|
||||
return {"error": "Invalid JSON"}
|
||||
|
||||
if method == "GET" and path == "/status":
|
||||
return self.client.status()
|
||||
|
||||
if method == "GET" and path == "/expressions":
|
||||
return {
|
||||
"expressions": self.client.expression_map,
|
||||
"triggers": self.client.ws_triggers,
|
||||
}
|
||||
|
||||
if method == "GET" and path == "/hotkeys":
|
||||
return await self.client.list_hotkeys()
|
||||
|
||||
if method == "GET" and path == "/parameters":
|
||||
return await self.client.list_parameters()
|
||||
|
||||
if method == "POST" and path == "/expression":
|
||||
event = data.get("event")
|
||||
if not event:
|
||||
return {"error": "Missing 'event' field"}
|
||||
return await self.client.trigger_expression(event)
|
||||
|
||||
if method == "POST" and path == "/parameter":
|
||||
name = data.get("name")
|
||||
value = data.get("value")
|
||||
if name is None or value is None:
|
||||
return {"error": "Missing 'name' or 'value' field"}
|
||||
return await self.client.set_parameter(name, float(value), float(data.get("weight", 1.0)))
|
||||
|
||||
if method == "POST" and path == "/parameters":
|
||||
if not isinstance(data, list):
|
||||
return {"error": "Expected JSON array of {name, value} objects"}
|
||||
return await self.client.set_parameters(data)
|
||||
|
||||
if method == "POST" and path == "/auth":
|
||||
return await self.client.request_new_token()
|
||||
|
||||
if method == "POST" and path == "/reload":
|
||||
return self.client.reload_character()
|
||||
|
||||
return {"error": f"Unknown route: {method} {path}"}
|
||||
|
||||
async def _respond(self, writer: asyncio.StreamWriter, status: int, data: dict):
|
||||
body = json.dumps(data, indent=2).encode()
|
||||
status_text = HTTPStatus(status).phrase
|
||||
header = (
|
||||
f"HTTP/1.1 {status} {status_text}\r\n"
|
||||
f"Content-Type: application/json\r\n"
|
||||
f"Content-Length: {len(body)}\r\n"
|
||||
f"Access-Control-Allow-Origin: *\r\n"
|
||||
f"Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n"
|
||||
f"Access-Control-Allow-Headers: Content-Type\r\n"
|
||||
f"\r\n"
|
||||
)
|
||||
writer.write(header.encode() + body)
|
||||
await writer.drain()
|
||||
writer.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Auto-reconnect loop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def reconnect_loop(client: VTubeClient, interval: float = 5.0):
|
||||
"""Background task that keeps the VTube Studio connection alive."""
|
||||
while True:
|
||||
try:
|
||||
if not client._connected or client._ws is None or client._ws.closed:
|
||||
logger.info("Connection lost — attempting reconnect...")
|
||||
await client.connect()
|
||||
except Exception as e:
|
||||
logger.debug("Reconnect failed: %s (retrying in %.0fs)", e, interval)
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def main(args):
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
||||
datefmt="%H:%M:%S",
|
||||
)
|
||||
|
||||
character_path = Path(args.character)
|
||||
client = VTubeClient(args.vtube_url, character_path)
|
||||
|
||||
# Try initial connection (don't fail if VTube Studio isn't running yet)
|
||||
try:
|
||||
await client.connect()
|
||||
except Exception as e:
|
||||
logger.warning("Initial connection failed: %s (will keep retrying)", e)
|
||||
|
||||
# Start reconnect loop
|
||||
reconnect_task = asyncio.create_task(reconnect_loop(client, interval=5.0))
|
||||
|
||||
# Start HTTP server
|
||||
handler = BridgeHTTPHandler(client)
|
||||
server = await asyncio.start_server(handler.handle, "0.0.0.0", args.port)
|
||||
logger.info("HTTP API listening on http://0.0.0.0:%d", args.port)
|
||||
logger.info("Endpoints: /status /expression /parameter /parameters /auth /reload /hotkeys")
|
||||
|
||||
# Graceful shutdown
|
||||
stop = asyncio.Event()
|
||||
|
||||
def _signal_handler():
|
||||
logger.info("Shutting down...")
|
||||
stop.set()
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||
loop.add_signal_handler(sig, _signal_handler)
|
||||
|
||||
async with server:
|
||||
await stop.wait()
|
||||
|
||||
reconnect_task.cancel()
|
||||
await client.disconnect()
|
||||
logger.info("Goodbye.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="VTube Studio Expression Bridge")
|
||||
parser.add_argument("--port", type=int, default=DEFAULT_HTTP_PORT, help="HTTP API port (default: 8002)")
|
||||
parser.add_argument("--vtube-url", default=DEFAULT_VTUBE_WS_URL, help="VTube Studio WebSocket URL")
|
||||
parser.add_argument("--character", default=str(DEFAULT_CHARACTER_PATH), help="Path to character JSON")
|
||||
parser.add_argument("--verbose", "-v", action="store_true", help="Debug logging")
|
||||
args = parser.parse_args()
|
||||
asyncio.run(main(args))
|
||||
Reference in New Issue
Block a user