Character schema v2: background, dialogue_style, appearance, skills, gaze_presets with automatic v1→v2 migration. LLM-assisted character creation via Character MCP server. Two-tier memory system (personal per-character + general shared) with budget-based injection into LLM system prompt. Per-character TTS voice routing via state file — Wyoming TTS server reads active config to route between Kokoro (local) and ElevenLabs (cloud PCM 24kHz). Dashboard: memories page, conversation history, character profile on cards, auto-TTS engine selection from character config. Also includes VTube Studio expression bridge and ComfyUI API guide. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
455 lines
18 KiB
Python
455 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
VTube Studio Expression Bridge — persistent WebSocket ↔ HTTP bridge.
|
|
|
|
Maintains a long-lived WebSocket connection to VTube Studio and exposes
|
|
a simple HTTP API so other HomeAI components can trigger expressions and
|
|
inject parameters (lip sync) without managing their own WS connections.
|
|
|
|
HTTP API (port 8002):
|
|
POST /expression {"event": "thinking"} → trigger hotkey
|
|
POST /parameter {"name": "MouthOpen", "value": 0.5} → inject param
|
|
POST /parameters [{"name": "MouthOpen", "value": 0.5}, ...]
|
|
POST /auth {} → request new token
|
|
GET /status → connection info
|
|
GET /expressions → list available expressions
|
|
|
|
Requires: pip install websockets
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import signal
|
|
import sys
|
|
import time
|
|
from http import HTTPStatus
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import websockets
|
|
from websockets.exceptions import ConnectionClosed
|
|
except ImportError:
|
|
print("ERROR: 'websockets' package required. Install with: pip install websockets", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DEFAULT_VTUBE_WS_URL = "ws://localhost:8001"
|
|
DEFAULT_HTTP_PORT = 8002
|
|
TOKEN_PATH = Path.home() / ".openclaw" / "vtube_token.json"
|
|
DEFAULT_CHARACTER_PATH = (
|
|
Path.home() / "gitea" / "homeai" / "homeai-dashboard" / "characters" / "aria.json"
|
|
)
|
|
|
|
logger = logging.getLogger("vtube-bridge")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# VTube Studio WebSocket Client
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class VTubeClient:
|
|
"""Persistent async WebSocket client for VTube Studio API."""
|
|
|
|
def __init__(self, ws_url: str, character_path: Path):
|
|
self.ws_url = ws_url
|
|
self.character_path = character_path
|
|
self._ws = None
|
|
self._token: str | None = None
|
|
self._authenticated = False
|
|
self._current_expression: str | None = None
|
|
self._connected = False
|
|
self._request_id = 0
|
|
self._lock = asyncio.Lock()
|
|
self._load_token()
|
|
self._load_character()
|
|
|
|
# ── Character config ──────────────────────────────────────────────
|
|
|
|
def _load_character(self):
|
|
"""Load expression mappings from character JSON."""
|
|
self.expression_map: dict[str, str] = {}
|
|
self.ws_triggers: dict = {}
|
|
try:
|
|
if self.character_path.exists():
|
|
cfg = json.loads(self.character_path.read_text())
|
|
self.expression_map = cfg.get("live2d_expressions", {})
|
|
self.ws_triggers = cfg.get("vtube_ws_triggers", {})
|
|
logger.info("Loaded %d expressions from %s", len(self.expression_map), self.character_path.name)
|
|
else:
|
|
logger.warning("Character file not found: %s", self.character_path)
|
|
except Exception as e:
|
|
logger.error("Failed to load character config: %s", e)
|
|
|
|
def reload_character(self):
|
|
"""Hot-reload character config without restarting."""
|
|
self._load_character()
|
|
return {"expressions": self.expression_map, "triggers": self.ws_triggers}
|
|
|
|
# ── Token persistence ─────────────────────────────────────────────
|
|
|
|
def _load_token(self):
|
|
try:
|
|
if TOKEN_PATH.exists():
|
|
data = json.loads(TOKEN_PATH.read_text())
|
|
self._token = data.get("token")
|
|
logger.info("Loaded auth token from %s", TOKEN_PATH)
|
|
except Exception as e:
|
|
logger.warning("Could not load token: %s", e)
|
|
|
|
def _save_token(self, token: str):
|
|
TOKEN_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
TOKEN_PATH.write_text(json.dumps({"token": token}, indent=2))
|
|
self._token = token
|
|
logger.info("Saved auth token to %s", TOKEN_PATH)
|
|
|
|
# ── WebSocket comms ───────────────────────────────────────────────
|
|
|
|
def _next_id(self) -> str:
|
|
self._request_id += 1
|
|
return f"homeai-{self._request_id}"
|
|
|
|
async def _send(self, message_type: str, data: dict | None = None) -> dict:
|
|
"""Send a VTube Studio API message and return the response."""
|
|
payload = {
|
|
"apiName": "VTubeStudioPublicAPI",
|
|
"apiVersion": "1.0",
|
|
"requestID": self._next_id(),
|
|
"messageType": message_type,
|
|
"data": data or {},
|
|
}
|
|
await self._ws.send(json.dumps(payload))
|
|
resp = json.loads(await asyncio.wait_for(self._ws.recv(), timeout=10))
|
|
return resp
|
|
|
|
# ── Connection lifecycle ──────────────────────────────────────────
|
|
|
|
async def connect(self):
|
|
"""Connect and authenticate to VTube Studio."""
|
|
try:
|
|
self._ws = await websockets.connect(self.ws_url, ping_interval=20, ping_timeout=10)
|
|
self._connected = True
|
|
logger.info("Connected to VTube Studio at %s", self.ws_url)
|
|
|
|
if self._token:
|
|
await self._authenticate()
|
|
else:
|
|
logger.warning("No auth token — call POST /auth to initiate authentication")
|
|
except Exception as e:
|
|
self._connected = False
|
|
self._authenticated = False
|
|
logger.error("Connection failed: %s", e)
|
|
raise
|
|
|
|
async def _authenticate(self):
|
|
"""Authenticate with an existing token."""
|
|
resp = await self._send("AuthenticationRequest", {
|
|
"pluginName": "HomeAI",
|
|
"pluginDeveloper": "HomeAI",
|
|
"authenticationToken": self._token,
|
|
})
|
|
self._authenticated = resp.get("data", {}).get("authenticated", False)
|
|
if self._authenticated:
|
|
logger.info("Authenticated successfully")
|
|
else:
|
|
logger.warning("Token rejected — request a new one via POST /auth")
|
|
self._authenticated = False
|
|
|
|
async def request_new_token(self) -> dict:
|
|
"""Request a new auth token. User must click Allow in VTube Studio."""
|
|
if not self._connected:
|
|
return {"error": "Not connected to VTube Studio"}
|
|
|
|
resp = await self._send("AuthenticationTokenRequest", {
|
|
"pluginName": "HomeAI",
|
|
"pluginDeveloper": "HomeAI",
|
|
"pluginIcon": None,
|
|
})
|
|
token = resp.get("data", {}).get("authenticationToken")
|
|
if token:
|
|
self._save_token(token)
|
|
await self._authenticate()
|
|
return {"authenticated": self._authenticated, "token_saved": True}
|
|
return {"error": "No token received", "response": resp}
|
|
|
|
async def disconnect(self):
|
|
if self._ws:
|
|
await self._ws.close()
|
|
self._connected = False
|
|
self._authenticated = False
|
|
|
|
async def ensure_connected(self):
|
|
"""Reconnect if the connection dropped."""
|
|
if not self._connected or self._ws is None or self._ws.closed:
|
|
logger.info("Reconnecting...")
|
|
await self.connect()
|
|
|
|
# ── Expression & parameter API ────────────────────────────────────
|
|
|
|
async def trigger_expression(self, event: str) -> dict:
|
|
"""Trigger a named expression from the character config."""
|
|
async with self._lock:
|
|
await self.ensure_connected()
|
|
if not self._authenticated:
|
|
return {"error": "Not authenticated"}
|
|
|
|
hotkey_id = self.expression_map.get(event)
|
|
if not hotkey_id:
|
|
return {"error": f"Unknown expression: {event}", "available": list(self.expression_map.keys())}
|
|
|
|
resp = await self._send("HotkeyTriggerRequest", {"hotkeyID": hotkey_id})
|
|
self._current_expression = event
|
|
return {"ok": True, "expression": event, "hotkey_id": hotkey_id}
|
|
|
|
async def set_parameter(self, name: str, value: float, weight: float = 1.0) -> dict:
|
|
"""Inject a single VTube Studio parameter value."""
|
|
async with self._lock:
|
|
await self.ensure_connected()
|
|
if not self._authenticated:
|
|
return {"error": "Not authenticated"}
|
|
|
|
resp = await self._send("InjectParameterDataRequest", {
|
|
"parameterValues": [{"id": name, "value": value, "weight": weight}],
|
|
})
|
|
return {"ok": True, "name": name, "value": value}
|
|
|
|
async def set_parameters(self, params: list[dict]) -> dict:
|
|
"""Inject multiple VTube Studio parameters at once."""
|
|
async with self._lock:
|
|
await self.ensure_connected()
|
|
if not self._authenticated:
|
|
return {"error": "Not authenticated"}
|
|
|
|
param_values = [
|
|
{"id": p["name"], "value": p["value"], "weight": p.get("weight", 1.0)}
|
|
for p in params
|
|
]
|
|
resp = await self._send("InjectParameterDataRequest", {
|
|
"parameterValues": param_values,
|
|
})
|
|
return {"ok": True, "count": len(param_values)}
|
|
|
|
async def list_hotkeys(self) -> dict:
|
|
"""List all hotkeys available in the current model."""
|
|
async with self._lock:
|
|
await self.ensure_connected()
|
|
if not self._authenticated:
|
|
return {"error": "Not authenticated"}
|
|
|
|
resp = await self._send("HotkeysInCurrentModelRequest", {})
|
|
return resp.get("data", {})
|
|
|
|
async def list_parameters(self) -> dict:
|
|
"""List all input parameters for the current model."""
|
|
async with self._lock:
|
|
await self.ensure_connected()
|
|
if not self._authenticated:
|
|
return {"error": "Not authenticated"}
|
|
|
|
resp = await self._send("InputParameterListRequest", {})
|
|
return resp.get("data", {})
|
|
|
|
def status(self) -> dict:
|
|
return {
|
|
"connected": self._connected,
|
|
"authenticated": self._authenticated,
|
|
"ws_url": self.ws_url,
|
|
"current_expression": self._current_expression,
|
|
"expression_count": len(self.expression_map),
|
|
"expressions": list(self.expression_map.keys()),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP Server (asyncio-based, no external deps)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class BridgeHTTPHandler:
|
|
"""Simple async HTTP request handler for the bridge API."""
|
|
|
|
def __init__(self, client: VTubeClient):
|
|
self.client = client
|
|
|
|
async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
try:
|
|
request_line = await asyncio.wait_for(reader.readline(), timeout=5)
|
|
if not request_line:
|
|
writer.close()
|
|
return
|
|
|
|
method, path, _ = request_line.decode().strip().split(" ", 2)
|
|
path = path.split("?")[0] # strip query params
|
|
|
|
# Read headers
|
|
content_length = 0
|
|
while True:
|
|
line = await reader.readline()
|
|
if line == b"\r\n" or not line:
|
|
break
|
|
if line.lower().startswith(b"content-length:"):
|
|
content_length = int(line.split(b":")[1].strip())
|
|
|
|
# Read body
|
|
body = None
|
|
if content_length > 0:
|
|
body = await reader.read(content_length)
|
|
|
|
# Route
|
|
try:
|
|
result = await self._route(method, path, body)
|
|
await self._respond(writer, 200, result)
|
|
except Exception as e:
|
|
logger.error("Handler error: %s", e, exc_info=True)
|
|
await self._respond(writer, 500, {"error": str(e)})
|
|
|
|
except asyncio.TimeoutError:
|
|
writer.close()
|
|
except Exception as e:
|
|
logger.error("Connection error: %s", e)
|
|
try:
|
|
writer.close()
|
|
except Exception:
|
|
pass
|
|
|
|
async def _route(self, method: str, path: str, body: bytes | None) -> dict:
|
|
data = {}
|
|
if body:
|
|
try:
|
|
data = json.loads(body)
|
|
except json.JSONDecodeError:
|
|
return {"error": "Invalid JSON"}
|
|
|
|
if method == "GET" and path == "/status":
|
|
return self.client.status()
|
|
|
|
if method == "GET" and path == "/expressions":
|
|
return {
|
|
"expressions": self.client.expression_map,
|
|
"triggers": self.client.ws_triggers,
|
|
}
|
|
|
|
if method == "GET" and path == "/hotkeys":
|
|
return await self.client.list_hotkeys()
|
|
|
|
if method == "GET" and path == "/parameters":
|
|
return await self.client.list_parameters()
|
|
|
|
if method == "POST" and path == "/expression":
|
|
event = data.get("event")
|
|
if not event:
|
|
return {"error": "Missing 'event' field"}
|
|
return await self.client.trigger_expression(event)
|
|
|
|
if method == "POST" and path == "/parameter":
|
|
name = data.get("name")
|
|
value = data.get("value")
|
|
if name is None or value is None:
|
|
return {"error": "Missing 'name' or 'value' field"}
|
|
return await self.client.set_parameter(name, float(value), float(data.get("weight", 1.0)))
|
|
|
|
if method == "POST" and path == "/parameters":
|
|
if not isinstance(data, list):
|
|
return {"error": "Expected JSON array of {name, value} objects"}
|
|
return await self.client.set_parameters(data)
|
|
|
|
if method == "POST" and path == "/auth":
|
|
return await self.client.request_new_token()
|
|
|
|
if method == "POST" and path == "/reload":
|
|
return self.client.reload_character()
|
|
|
|
return {"error": f"Unknown route: {method} {path}"}
|
|
|
|
async def _respond(self, writer: asyncio.StreamWriter, status: int, data: dict):
|
|
body = json.dumps(data, indent=2).encode()
|
|
status_text = HTTPStatus(status).phrase
|
|
header = (
|
|
f"HTTP/1.1 {status} {status_text}\r\n"
|
|
f"Content-Type: application/json\r\n"
|
|
f"Content-Length: {len(body)}\r\n"
|
|
f"Access-Control-Allow-Origin: *\r\n"
|
|
f"Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n"
|
|
f"Access-Control-Allow-Headers: Content-Type\r\n"
|
|
f"\r\n"
|
|
)
|
|
writer.write(header.encode() + body)
|
|
await writer.drain()
|
|
writer.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auto-reconnect loop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def reconnect_loop(client: VTubeClient, interval: float = 5.0):
|
|
"""Background task that keeps the VTube Studio connection alive."""
|
|
while True:
|
|
try:
|
|
if not client._connected or client._ws is None or client._ws.closed:
|
|
logger.info("Connection lost — attempting reconnect...")
|
|
await client.connect()
|
|
except Exception as e:
|
|
logger.debug("Reconnect failed: %s (retrying in %.0fs)", e, interval)
|
|
await asyncio.sleep(interval)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def main(args):
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
|
|
character_path = Path(args.character)
|
|
client = VTubeClient(args.vtube_url, character_path)
|
|
|
|
# Try initial connection (don't fail if VTube Studio isn't running yet)
|
|
try:
|
|
await client.connect()
|
|
except Exception as e:
|
|
logger.warning("Initial connection failed: %s (will keep retrying)", e)
|
|
|
|
# Start reconnect loop
|
|
reconnect_task = asyncio.create_task(reconnect_loop(client, interval=5.0))
|
|
|
|
# Start HTTP server
|
|
handler = BridgeHTTPHandler(client)
|
|
server = await asyncio.start_server(handler.handle, "0.0.0.0", args.port)
|
|
logger.info("HTTP API listening on http://0.0.0.0:%d", args.port)
|
|
logger.info("Endpoints: /status /expression /parameter /parameters /auth /reload /hotkeys")
|
|
|
|
# Graceful shutdown
|
|
stop = asyncio.Event()
|
|
|
|
def _signal_handler():
|
|
logger.info("Shutting down...")
|
|
stop.set()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
loop.add_signal_handler(sig, _signal_handler)
|
|
|
|
async with server:
|
|
await stop.wait()
|
|
|
|
reconnect_task.cancel()
|
|
await client.disconnect()
|
|
logger.info("Goodbye.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="VTube Studio Expression Bridge")
|
|
parser.add_argument("--port", type=int, default=DEFAULT_HTTP_PORT, help="HTTP API port (default: 8002)")
|
|
parser.add_argument("--vtube-url", default=DEFAULT_VTUBE_WS_URL, help="VTube Studio WebSocket URL")
|
|
parser.add_argument("--character", default=str(DEFAULT_CHARACTER_PATH), help="Path to character JSON")
|
|
parser.add_argument("--verbose", "-v", action="store_true", help="Debug logging")
|
|
args = parser.parse_args()
|
|
asyncio.run(main(args))
|