#!/usr/bin/env python3 """ VTube Studio Expression Bridge — persistent WebSocket ↔ HTTP bridge. Maintains a long-lived WebSocket connection to VTube Studio and exposes a simple HTTP API so other HomeAI components can trigger expressions and inject parameters (lip sync) without managing their own WS connections. HTTP API (port 8002): POST /expression {"event": "thinking"} → trigger hotkey POST /parameter {"name": "MouthOpen", "value": 0.5} → inject param POST /parameters [{"name": "MouthOpen", "value": 0.5}, ...] POST /auth {} → request new token GET /status → connection info GET /expressions → list available expressions Requires: pip install websockets """ import argparse import asyncio import json import logging import signal import sys import time from http import HTTPStatus from pathlib import Path try: import websockets from websockets.exceptions import ConnectionClosed except ImportError: print("ERROR: 'websockets' package required. Install with: pip install websockets", file=sys.stderr) sys.exit(1) # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- DEFAULT_VTUBE_WS_URL = "ws://localhost:8001" DEFAULT_HTTP_PORT = 8002 TOKEN_PATH = Path.home() / ".openclaw" / "vtube_token.json" DEFAULT_CHARACTER_PATH = ( Path.home() / "gitea" / "homeai" / "homeai-dashboard" / "characters" / "aria.json" ) logger = logging.getLogger("vtube-bridge") # --------------------------------------------------------------------------- # VTube Studio WebSocket Client # --------------------------------------------------------------------------- class VTubeClient: """Persistent async WebSocket client for VTube Studio API.""" def __init__(self, ws_url: str, character_path: Path): self.ws_url = ws_url self.character_path = character_path self._ws = None self._token: str | None = None self._authenticated = False self._current_expression: str | None = None self._connected = False self._request_id = 0 self._lock = asyncio.Lock() self._load_token() self._load_character() # ── Character config ────────────────────────────────────────────── def _load_character(self): """Load expression mappings from character JSON.""" self.expression_map: dict[str, str] = {} self.ws_triggers: dict = {} try: if self.character_path.exists(): cfg = json.loads(self.character_path.read_text()) self.expression_map = cfg.get("live2d_expressions", {}) self.ws_triggers = cfg.get("vtube_ws_triggers", {}) logger.info("Loaded %d expressions from %s", len(self.expression_map), self.character_path.name) else: logger.warning("Character file not found: %s", self.character_path) except Exception as e: logger.error("Failed to load character config: %s", e) def reload_character(self): """Hot-reload character config without restarting.""" self._load_character() return {"expressions": self.expression_map, "triggers": self.ws_triggers} # ── Token persistence ───────────────────────────────────────────── def _load_token(self): try: if TOKEN_PATH.exists(): data = json.loads(TOKEN_PATH.read_text()) self._token = data.get("token") logger.info("Loaded auth token from %s", TOKEN_PATH) except Exception as e: logger.warning("Could not load token: %s", e) def _save_token(self, token: str): TOKEN_PATH.parent.mkdir(parents=True, exist_ok=True) TOKEN_PATH.write_text(json.dumps({"token": token}, indent=2)) self._token = token logger.info("Saved auth token to %s", TOKEN_PATH) # ── WebSocket comms ─────────────────────────────────────────────── def _next_id(self) -> str: self._request_id += 1 return f"homeai-{self._request_id}" async def _send(self, message_type: str, data: dict | None = None) -> dict: """Send a VTube Studio API message and return the response.""" payload = { "apiName": "VTubeStudioPublicAPI", "apiVersion": "1.0", "requestID": self._next_id(), "messageType": message_type, "data": data or {}, } await self._ws.send(json.dumps(payload)) resp = json.loads(await asyncio.wait_for(self._ws.recv(), timeout=10)) return resp # ── Connection lifecycle ────────────────────────────────────────── async def connect(self): """Connect and authenticate to VTube Studio.""" try: self._ws = await websockets.connect(self.ws_url, ping_interval=20, ping_timeout=10) self._connected = True logger.info("Connected to VTube Studio at %s", self.ws_url) if self._token: await self._authenticate() else: logger.warning("No auth token — call POST /auth to initiate authentication") except Exception as e: self._connected = False self._authenticated = False logger.error("Connection failed: %s", e) raise async def _authenticate(self): """Authenticate with an existing token.""" resp = await self._send("AuthenticationRequest", { "pluginName": "HomeAI", "pluginDeveloper": "HomeAI", "authenticationToken": self._token, }) self._authenticated = resp.get("data", {}).get("authenticated", False) if self._authenticated: logger.info("Authenticated successfully") else: logger.warning("Token rejected — request a new one via POST /auth") self._authenticated = False async def request_new_token(self) -> dict: """Request a new auth token. User must click Allow in VTube Studio.""" if not self._connected: return {"error": "Not connected to VTube Studio"} resp = await self._send("AuthenticationTokenRequest", { "pluginName": "HomeAI", "pluginDeveloper": "HomeAI", "pluginIcon": None, }) token = resp.get("data", {}).get("authenticationToken") if token: self._save_token(token) await self._authenticate() return {"authenticated": self._authenticated, "token_saved": True} return {"error": "No token received", "response": resp} async def disconnect(self): if self._ws: await self._ws.close() self._connected = False self._authenticated = False async def ensure_connected(self): """Reconnect if the connection dropped.""" if not self._connected or self._ws is None or self._ws.closed: logger.info("Reconnecting...") await self.connect() # ── Expression & parameter API ──────────────────────────────────── async def trigger_expression(self, event: str) -> dict: """Trigger a named expression from the character config.""" async with self._lock: await self.ensure_connected() if not self._authenticated: return {"error": "Not authenticated"} hotkey_id = self.expression_map.get(event) if not hotkey_id: return {"error": f"Unknown expression: {event}", "available": list(self.expression_map.keys())} resp = await self._send("HotkeyTriggerRequest", {"hotkeyID": hotkey_id}) self._current_expression = event return {"ok": True, "expression": event, "hotkey_id": hotkey_id} async def set_parameter(self, name: str, value: float, weight: float = 1.0) -> dict: """Inject a single VTube Studio parameter value.""" async with self._lock: await self.ensure_connected() if not self._authenticated: return {"error": "Not authenticated"} resp = await self._send("InjectParameterDataRequest", { "parameterValues": [{"id": name, "value": value, "weight": weight}], }) return {"ok": True, "name": name, "value": value} async def set_parameters(self, params: list[dict]) -> dict: """Inject multiple VTube Studio parameters at once.""" async with self._lock: await self.ensure_connected() if not self._authenticated: return {"error": "Not authenticated"} param_values = [ {"id": p["name"], "value": p["value"], "weight": p.get("weight", 1.0)} for p in params ] resp = await self._send("InjectParameterDataRequest", { "parameterValues": param_values, }) return {"ok": True, "count": len(param_values)} async def list_hotkeys(self) -> dict: """List all hotkeys available in the current model.""" async with self._lock: await self.ensure_connected() if not self._authenticated: return {"error": "Not authenticated"} resp = await self._send("HotkeysInCurrentModelRequest", {}) return resp.get("data", {}) async def list_parameters(self) -> dict: """List all input parameters for the current model.""" async with self._lock: await self.ensure_connected() if not self._authenticated: return {"error": "Not authenticated"} resp = await self._send("InputParameterListRequest", {}) return resp.get("data", {}) def status(self) -> dict: return { "connected": self._connected, "authenticated": self._authenticated, "ws_url": self.ws_url, "current_expression": self._current_expression, "expression_count": len(self.expression_map), "expressions": list(self.expression_map.keys()), } # --------------------------------------------------------------------------- # HTTP Server (asyncio-based, no external deps) # --------------------------------------------------------------------------- class BridgeHTTPHandler: """Simple async HTTP request handler for the bridge API.""" def __init__(self, client: VTubeClient): self.client = client async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): try: request_line = await asyncio.wait_for(reader.readline(), timeout=5) if not request_line: writer.close() return method, path, _ = request_line.decode().strip().split(" ", 2) path = path.split("?")[0] # strip query params # Read headers content_length = 0 while True: line = await reader.readline() if line == b"\r\n" or not line: break if line.lower().startswith(b"content-length:"): content_length = int(line.split(b":")[1].strip()) # Read body body = None if content_length > 0: body = await reader.read(content_length) # Route try: result = await self._route(method, path, body) await self._respond(writer, 200, result) except Exception as e: logger.error("Handler error: %s", e, exc_info=True) await self._respond(writer, 500, {"error": str(e)}) except asyncio.TimeoutError: writer.close() except Exception as e: logger.error("Connection error: %s", e) try: writer.close() except Exception: pass async def _route(self, method: str, path: str, body: bytes | None) -> dict: data = {} if body: try: data = json.loads(body) except json.JSONDecodeError: return {"error": "Invalid JSON"} if method == "GET" and path == "/status": return self.client.status() if method == "GET" and path == "/expressions": return { "expressions": self.client.expression_map, "triggers": self.client.ws_triggers, } if method == "GET" and path == "/hotkeys": return await self.client.list_hotkeys() if method == "GET" and path == "/parameters": return await self.client.list_parameters() if method == "POST" and path == "/expression": event = data.get("event") if not event: return {"error": "Missing 'event' field"} return await self.client.trigger_expression(event) if method == "POST" and path == "/parameter": name = data.get("name") value = data.get("value") if name is None or value is None: return {"error": "Missing 'name' or 'value' field"} return await self.client.set_parameter(name, float(value), float(data.get("weight", 1.0))) if method == "POST" and path == "/parameters": if not isinstance(data, list): return {"error": "Expected JSON array of {name, value} objects"} return await self.client.set_parameters(data) if method == "POST" and path == "/auth": return await self.client.request_new_token() if method == "POST" and path == "/reload": return self.client.reload_character() return {"error": f"Unknown route: {method} {path}"} async def _respond(self, writer: asyncio.StreamWriter, status: int, data: dict): body = json.dumps(data, indent=2).encode() status_text = HTTPStatus(status).phrase header = ( f"HTTP/1.1 {status} {status_text}\r\n" f"Content-Type: application/json\r\n" f"Content-Length: {len(body)}\r\n" f"Access-Control-Allow-Origin: *\r\n" f"Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n" f"Access-Control-Allow-Headers: Content-Type\r\n" f"\r\n" ) writer.write(header.encode() + body) await writer.drain() writer.close() # --------------------------------------------------------------------------- # Auto-reconnect loop # --------------------------------------------------------------------------- async def reconnect_loop(client: VTubeClient, interval: float = 5.0): """Background task that keeps the VTube Studio connection alive.""" while True: try: if not client._connected or client._ws is None or client._ws.closed: logger.info("Connection lost — attempting reconnect...") await client.connect() except Exception as e: logger.debug("Reconnect failed: %s (retrying in %.0fs)", e, interval) await asyncio.sleep(interval) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- async def main(args): logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", datefmt="%H:%M:%S", ) character_path = Path(args.character) client = VTubeClient(args.vtube_url, character_path) # Try initial connection (don't fail if VTube Studio isn't running yet) try: await client.connect() except Exception as e: logger.warning("Initial connection failed: %s (will keep retrying)", e) # Start reconnect loop reconnect_task = asyncio.create_task(reconnect_loop(client, interval=5.0)) # Start HTTP server handler = BridgeHTTPHandler(client) server = await asyncio.start_server(handler.handle, "0.0.0.0", args.port) logger.info("HTTP API listening on http://0.0.0.0:%d", args.port) logger.info("Endpoints: /status /expression /parameter /parameters /auth /reload /hotkeys") # Graceful shutdown stop = asyncio.Event() def _signal_handler(): logger.info("Shutting down...") stop.set() loop = asyncio.get_event_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, _signal_handler) async with server: await stop.wait() reconnect_task.cancel() await client.disconnect() logger.info("Goodbye.") if __name__ == "__main__": parser = argparse.ArgumentParser(description="VTube Studio Expression Bridge") parser.add_argument("--port", type=int, default=DEFAULT_HTTP_PORT, help="HTTP API port (default: 8002)") parser.add_argument("--vtube-url", default=DEFAULT_VTUBE_WS_URL, help="VTube Studio WebSocket URL") parser.add_argument("--character", default=str(DEFAULT_CHARACTER_PATH), help="Path to character JSON") parser.add_argument("--verbose", "-v", action="store_true", help="Debug logging") args = parser.parse_args() asyncio.run(main(args))