Files
homeai/homeai-agent/openclaw-http-bridge.py
Aodhan Collins 56580a2cb2 feat: memory v2, prompt styles, Dream/GAZE integration, Wyoming TTS fix
SQLite + sqlite-vec replaces JSON memory files with semantic search,
follow-up injection, privacy levels, and lifecycle management.

Six prompt styles (quick/standard/creative/roleplayer/game-master/storyteller)
with per-style Claude model tiering (Haiku/Sonnet/Opus), temperature control,
and section stripping. Characters can set default style and per-style overrides.

Dream character import and GAZE character linking in the dashboard editor
with auto-populated fields, cover image resolution, and preset assignment.

Bridge: session isolation (conversation_id / 12h satellite buckets),
model routing refactor, PUT/DELETE support, memory REST endpoints.

Dashboard: mobile-responsive sidebar, retry button, style picker in chat,
follow-up banner, memory lifecycle/privacy UI, cloud model options in editor.

Wyoming TTS: upgraded to v1.8.0 for HA 1.7.2 compatibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-24 22:31:04 +00:00

1110 lines
44 KiB
Python

#!/usr/bin/env python3
"""
OpenClaw HTTP Bridge
A simple HTTP server that translates HTTP POST requests to OpenClaw CLI calls.
This allows Home Assistant (running in Docker on a different machine) to
communicate with OpenClaw via HTTP.
Usage:
python3 openclaw-http-bridge.py [--port 8081]
Endpoints:
POST /api/agent/message
{
"message": "Your message here",
"agent": "main"
}
Returns:
{
"response": "OpenClaw response text"
}
"""
import argparse
import json
import os
import subprocess
import sys
import asyncio
import urllib.request
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
from urllib.parse import urlparse
from pathlib import Path
import wave
import io
import re
from datetime import datetime, timezone
from urllib.parse import parse_qs
from memory_store import (
init_db as init_memory_db,
retrieve_memories as _retrieve_memories,
get_pending_followups,
auto_resolve_expired_followups,
auto_archive_old_resolved,
increment_surfaced_count,
add_memory as _add_memory,
add_or_merge_memory,
update_memory as _update_memory,
delete_memory as _delete_memory,
list_memories as _list_memories,
search_memories as _search_memories,
resolve_followup,
count_memories,
migrate_from_json,
)
from wyoming.client import AsyncTcpClient
from wyoming.tts import Synthesize, SynthesizeVoice
from wyoming.asr import Transcribe, Transcript
from wyoming.audio import AudioStart, AudioChunk, AudioStop
from wyoming.info import Info
# Timeout settings (seconds)
TIMEOUT_WARM = 120 # Model already loaded in VRAM
TIMEOUT_COLD = 180 # Model needs loading first (~10-20s load + inference)
OLLAMA_PS_URL = "http://localhost:11434/api/ps"
VTUBE_BRIDGE_URL = "http://localhost:8002"
DEFAULT_MODEL = "anthropic/claude-sonnet-4-6"
def _vtube_fire_and_forget(path: str, data: dict):
"""Send a non-blocking POST to the VTube Studio bridge. Failures are silent."""
def _post():
try:
body = json.dumps(data).encode()
req = urllib.request.Request(
f"{VTUBE_BRIDGE_URL}{path}",
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=2)
except Exception:
pass # bridge may not be running — that's fine
threading.Thread(target=_post, daemon=True).start()
def is_model_warm() -> bool:
"""Check if the default Ollama model is already loaded in VRAM."""
try:
req = urllib.request.Request(OLLAMA_PS_URL)
with urllib.request.urlopen(req, timeout=2) as resp:
data = json.loads(resp.read())
return len(data.get("models", [])) > 0
except Exception:
# If we can't reach Ollama, assume cold (safer longer timeout)
return False
CHARACTERS_DIR = Path("/Users/aodhan/homeai-data/characters")
SATELLITE_MAP_PATH = Path("/Users/aodhan/homeai-data/satellite-map.json")
MEMORIES_DIR = Path("/Users/aodhan/homeai-data/memories")
ACTIVE_TTS_VOICE_PATH = Path("/Users/aodhan/homeai-data/active-tts-voice.json")
ACTIVE_MODE_PATH = Path("/Users/aodhan/homeai-data/active-mode.json")
ACTIVE_STYLE_PATH = Path("/Users/aodhan/homeai-data/active-prompt-style.json")
PROMPT_STYLES_DIR = Path(__file__).parent / "prompt-styles"
# Cloud provider model mappings for mode routing (fallback when style has no model)
CLOUD_MODELS = {
"anthropic": "anthropic/claude-sonnet-4-6",
"openai": "openai/gpt-4o",
}
LOCAL_MODEL = "ollama/qwen3.5:35b-a3b"
# Lock to serialise model-switch + agent-call (openclaw config is global)
_model_lock = threading.Lock()
# Initialize memory database at module load
init_memory_db()
def load_mode() -> dict:
"""Load the public/private mode configuration."""
try:
with open(ACTIVE_MODE_PATH) as f:
return json.load(f)
except Exception:
return {"mode": "private", "cloud_provider": "anthropic", "overrides": {}}
def resolve_model(mode_data: dict) -> str:
"""Resolve which model to use based on mode."""
mode = mode_data.get("mode", "private")
if mode == "private":
return mode_data.get("local_model", LOCAL_MODEL)
provider = mode_data.get("cloud_provider", "anthropic")
return CLOUD_MODELS.get(provider, CLOUD_MODELS["anthropic"])
def load_prompt_style(style_id: str) -> dict:
"""Load a prompt style template by ID. Returns the style dict or a default."""
if not style_id:
style_id = "standard"
safe_id = style_id.replace("/", "_").replace("..", "")
style_path = PROMPT_STYLES_DIR / f"{safe_id}.json"
try:
with open(style_path) as f:
return json.load(f)
except Exception:
return {"id": "standard", "name": "Standard", "group": "cloud", "instruction": "", "strip_sections": []}
def load_active_style() -> str:
"""Load the active prompt style ID from state file. Defaults to 'standard'."""
try:
with open(ACTIVE_STYLE_PATH) as f:
data = json.load(f)
return data.get("style", "standard")
except Exception:
return "standard"
def resolve_model_for_style(style: dict, mode_data: dict) -> str:
"""Resolve model based on prompt style, falling back to mode config.
Priority: style 'model' field > group-based routing > mode default."""
mode = mode_data.get("mode", "private")
group = style.get("group", "cloud")
# Private mode always uses local model regardless of style
if mode == "private" and group == "local":
return mode_data.get("local_model", LOCAL_MODEL)
# Per-style model override (e.g. haiku for quick, opus for roleplay)
style_model = style.get("model")
if style_model:
return style_model
# Fallback: cloud model from mode config
if group == "local":
return mode_data.get("local_model", LOCAL_MODEL)
provider = mode_data.get("cloud_provider", "anthropic")
return CLOUD_MODELS.get(provider, CLOUD_MODELS["anthropic"])
def clean_text_for_tts(text: str) -> str:
"""Strip content that shouldn't be spoken: tags, asterisks, emojis, markdown."""
# Remove HTML/XML tags and their content for common non-spoken tags
text = re.sub(r'<[^>]+>', '', text)
# Remove content between asterisks (actions/emphasis markup like *sighs*)
text = re.sub(r'\*[^*]+\*', '', text)
# Remove markdown bold/italic markers that might remain
text = re.sub(r'[*_]{1,3}', '', text)
# Remove markdown headers
text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE)
# Remove markdown links [text](url) → keep text
text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
# Remove bare URLs
text = re.sub(r'https?://\S+', '', text)
# Remove code blocks and inline code
text = re.sub(r'```[\s\S]*?```', '', text)
text = re.sub(r'`[^`]+`', '', text)
# Remove emojis
text = re.sub(
r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF'
r'\U0001F1E0-\U0001F1FF\U0001F900-\U0001F9FF\U0001FA00-\U0001FAFF'
r'\U00002702-\U000027B0\U0000FE00-\U0000FE0F\U0000200D'
r'\U00002600-\U000026FF\U00002300-\U000023FF]+', '', text
)
# Collapse multiple spaces/newlines
text = re.sub(r'\n{2,}', '\n', text)
text = re.sub(r'[ \t]{2,}', ' ', text)
return text.strip()
def load_satellite_map() -> dict:
"""Load the satellite-to-character mapping."""
try:
with open(SATELLITE_MAP_PATH) as f:
return json.load(f)
except Exception:
return {"default": "aria_default", "satellites": {}}
def set_active_tts_voice(character_id: str, tts_config: dict):
"""Write the active TTS config to a state file for the Wyoming TTS server to read."""
try:
ACTIVE_TTS_VOICE_PATH.parent.mkdir(parents=True, exist_ok=True)
state = {
"character_id": character_id,
"engine": tts_config.get("engine", "kokoro"),
"kokoro_voice": tts_config.get("kokoro_voice", ""),
"elevenlabs_voice_id": tts_config.get("elevenlabs_voice_id", ""),
"elevenlabs_model": tts_config.get("elevenlabs_model", "eleven_multilingual_v2"),
"speed": tts_config.get("speed", 1),
}
with open(ACTIVE_TTS_VOICE_PATH, "w") as f:
json.dump(state, f)
except Exception as e:
print(f"[OpenClaw Bridge] Warning: could not write active TTS config: {e}")
def resolve_character_id(satellite_id: str = None) -> str:
"""Resolve a satellite ID to a character profile ID."""
sat_map = load_satellite_map()
if satellite_id and satellite_id in sat_map.get("satellites", {}):
return sat_map["satellites"][satellite_id]
return sat_map.get("default", "aria_default")
def load_character(character_id: str = None) -> dict:
"""Load a character profile by ID. Returns the full character data dict."""
if not character_id:
character_id = resolve_character_id()
safe_id = character_id.replace("/", "_")
character_path = CHARACTERS_DIR / f"{safe_id}.json"
if not character_path.exists():
return {}
try:
with open(character_path) as f:
profile = json.load(f)
return profile.get("data", {})
except Exception:
return {}
def load_character_prompt(satellite_id: str = None, character_id: str = None,
prompt_style: str = None, user_message: str = "",
is_cloud: bool = False) -> str:
"""Load the full system prompt for a character, resolved by satellite or explicit ID.
Builds a rich prompt from style instruction + system_prompt + profile fields + memories.
The prompt_style controls HOW the character responds (brief, conversational, roleplay, etc.)."""
if not character_id:
character_id = resolve_character_id(satellite_id)
char = load_character(character_id)
if not char:
return ""
# Load prompt style template
style_id = prompt_style or load_active_style()
style = load_prompt_style(style_id)
strip_sections = set(style.get("strip_sections", []))
sections = []
# 1. Response style instruction (framing directive — goes first)
instruction = style.get("instruction", "")
if instruction:
sections.append(f"[Response Style: {style.get('name', style_id)}]\n{instruction}")
# 2. Core character identity (system_prompt)
prompt = char.get("system_prompt", "")
if prompt:
sections.append(prompt)
# 3. Character profile fields (filtered by style's strip_sections)
profile_parts = []
if "background" not in strip_sections and char.get("background"):
profile_parts.append(f"## Background\n{char['background']}")
if "appearance" not in strip_sections and char.get("appearance"):
profile_parts.append(f"## Appearance\n{char['appearance']}")
if "dialogue_style" not in strip_sections and char.get("dialogue_style"):
profile_parts.append(f"## Dialogue Style\n{char['dialogue_style']}")
if "skills" not in strip_sections and char.get("skills"):
skills = char["skills"]
if isinstance(skills, list):
skills_text = ", ".join(skills[:15])
else:
skills_text = str(skills)
profile_parts.append(f"## Skills & Interests\n{skills_text}")
if profile_parts:
sections.append("[Character Profile]\n" + "\n\n".join(profile_parts))
# 4. Per-character style overrides (optional customization per style)
style_overrides = char.get("prompt_style_overrides", {}).get(style_id, {})
if style_overrides:
override_parts = []
if style_overrides.get("dialogue_style"):
override_parts.append(f"## Dialogue Style Override\n{style_overrides['dialogue_style']}")
if style_overrides.get("system_prompt_suffix"):
override_parts.append(style_overrides["system_prompt_suffix"])
if override_parts:
sections.append("[Style-Specific Notes]\n" + "\n\n".join(override_parts))
# 5. Character metadata
meta_lines = []
if char.get("display_name"):
meta_lines.append(f"Your name is: {char['display_name']}")
# Support both v1 (gaze_preset string) and v2 (gaze_presets array)
gaze_presets = char.get("gaze_presets", [])
if gaze_presets and isinstance(gaze_presets, list):
for gp in gaze_presets:
preset = gp.get("preset", "")
trigger = gp.get("trigger", "self-portrait")
if preset:
meta_lines.append(f"GAZE preset '{preset}' — use for: {trigger}")
elif char.get("gaze_preset"):
meta_lines.append(f"Your gaze_preset for self-portraits is: {char['gaze_preset']}")
if meta_lines:
sections.append("[Character Metadata]\n" + "\n".join(meta_lines))
# 6. Memories (personal + general, context-aware retrieval)
personal, general, followups = load_memories(character_id, context=user_message, is_cloud=is_cloud)
if personal:
sections.append("[Personal Memories]\n" + "\n".join(f"- {m}" for m in personal))
if general:
sections.append("[General Knowledge]\n" + "\n".join(f"- {m}" for m in general))
# 7. Pending follow-ups (things the character should naturally bring up)
if followups:
followup_lines = [
f"- {fu['follow_up_context']} (from {fu['created_at'][:10]})"
for fu in followups[:3]
]
sections.append(
"[Pending Follow-ups — Bring these up naturally if relevant]\n"
"You have unresolved topics to check on with the user. "
"Weave them into conversation naturally — don't list them. "
"If the user addresses one, use memory-ctl resolve <id> to mark it resolved.\n"
+ "\n".join(followup_lines)
)
return "\n\n".join(sections)
def _truncate_to_budget(contents: list[str], budget: int) -> list[str]:
"""Truncate a list of strings to fit within a character budget."""
result = []
used = 0
for content in contents:
if used + len(content) > budget:
break
result.append(content)
used += len(content)
return result
def load_memories(character_id: str, context: str = "", is_cloud: bool = False) -> tuple[list[str], list[str], list[dict]]:
"""Load personal and general memories using semantic + recency retrieval.
Returns (personal_contents, general_contents, pending_followups)."""
PERSONAL_BUDGET = 4000
GENERAL_BUDGET = 3000
# Check if SQLite has any memories; fall back to JSON if empty (pre-migration)
if count_memories(character_id) == 0 and count_memories("shared") == 0:
return _load_memories_json_fallback(character_id), [], []
personal_mems = _retrieve_memories(character_id, context, limit=15,
exclude_private_for_cloud=is_cloud)
general_mems = _retrieve_memories("shared", context, limit=10,
exclude_private_for_cloud=is_cloud)
followups = get_pending_followups(character_id)
personal = _truncate_to_budget([m["content"] for m in personal_mems], PERSONAL_BUDGET)
general = _truncate_to_budget([m["content"] for m in general_mems], GENERAL_BUDGET)
return personal, general, followups
def _load_memories_json_fallback(character_id: str) -> list[str]:
"""Legacy JSON fallback for pre-migration state."""
def _read(path: Path, budget: int) -> list[str]:
try:
with open(path) as f:
data = json.load(f)
except Exception:
return []
memories = data.get("memories", [])
memories.sort(key=lambda m: m.get("createdAt", ""), reverse=True)
result, used = [], 0
for m in memories:
content = m.get("content", "").strip()
if not content:
continue
if used + len(content) > 4000:
break
result.append(content)
used += len(content)
return result
safe_id = character_id.replace("/", "_")
return _read(MEMORIES_DIR / "personal" / f"{safe_id}.json", 4000)
class OpenClawBridgeHandler(BaseHTTPRequestHandler):
"""HTTP request handler for OpenClaw bridge."""
def log_message(self, format, *args):
"""Log requests to stderr."""
print(f"[OpenClaw Bridge] {self.address_string()} - {format % args}")
def _send_json_response(self, status_code: int, data: dict):
"""Send a JSON response."""
self.send_response(status_code)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps(data).encode())
def do_POST(self):
"""Handle POST requests."""
parsed_path = urlparse(self.path)
# Handle wake word notification
if parsed_path.path == "/wake":
self._handle_wake_word()
return
# Handle TTS preview requests
if parsed_path.path == "/api/tts":
self._handle_tts_request()
return
# Handle STT requests
if parsed_path.path == "/api/stt":
self._handle_stt_request()
return
# Agent message endpoint
if parsed_path.path == "/api/agent/message":
self._handle_agent_request()
return
# Memory API: POST /api/memories/...
if parsed_path.path.startswith("/api/memories/"):
parts = parsed_path.path[len("/api/memories/"):].strip("/").split("/")
self._handle_memory_post(parts)
return
self._send_json_response(404, {"error": "Not found"})
def _handle_tts_request(self):
"""Handle TTS request and return audio. Routes to Kokoro or ElevenLabs based on engine."""
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self._send_json_response(400, {"error": "Empty body"})
return
try:
body = self.rfile.read(content_length).decode()
data = json.loads(body)
except json.JSONDecodeError:
self._send_json_response(400, {"error": "Invalid JSON"})
return
text = data.get("text", "Hello, this is a test.")
text = clean_text_for_tts(text)
voice = data.get("voice", "af_heart")
engine = data.get("engine", "kokoro")
try:
# Signal avatar: speaking
_vtube_fire_and_forget("/expression", {"event": "speaking"})
if engine == "elevenlabs":
audio_bytes, content_type = self._synthesize_elevenlabs(text, voice, data.get("model"))
else:
# Default: local Kokoro via Wyoming
audio_bytes = asyncio.run(self._synthesize_audio(text, voice))
content_type = "audio/wav"
# Signal avatar: idle
_vtube_fire_and_forget("/expression", {"event": "idle"})
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(audio_bytes)
except Exception as e:
_vtube_fire_and_forget("/expression", {"event": "error"})
self._send_json_response(500, {"error": str(e)})
def _synthesize_elevenlabs(self, text: str, voice_id: str, model: str = None) -> tuple[bytes, str]:
"""Call ElevenLabs TTS API and return (audio_bytes, content_type)."""
api_key = os.environ.get("ELEVENLABS_API_KEY", "")
if not api_key:
raise RuntimeError("ELEVENLABS_API_KEY not set in environment")
if not voice_id:
raise RuntimeError("No ElevenLabs voice ID provided")
model = model or "eleven_multilingual_v2"
url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}"
payload = json.dumps({
"text": text,
"model_id": model,
"voice_settings": {"stability": 0.5, "similarity_boost": 0.75},
}).encode()
req = urllib.request.Request(
url,
data=payload,
headers={
"Content-Type": "application/json",
"xi-api-key": api_key,
"Accept": "audio/mpeg",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=30) as resp:
audio_bytes = resp.read()
return audio_bytes, "audio/mpeg"
def do_PUT(self):
"""Handle PUT requests (memory updates)."""
parsed_path = urlparse(self.path)
if parsed_path.path.startswith("/api/memories/"):
parts = parsed_path.path[len("/api/memories/"):].strip("/").split("/")
self._handle_memory_put(parts)
return
self._send_json_response(404, {"error": "Not found"})
def do_DELETE(self):
"""Handle DELETE requests (memory deletion)."""
parsed_path = urlparse(self.path)
if parsed_path.path.startswith("/api/memories/"):
parts = parsed_path.path[len("/api/memories/"):].strip("/").split("/")
self._handle_memory_delete(parts)
return
self._send_json_response(404, {"error": "Not found"})
def do_OPTIONS(self):
"""Handle CORS preflight requests."""
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, GET, PUT, DELETE, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
async def _synthesize_audio(self, text: str, voice: str) -> bytes:
"""Connect to Wyoming TTS server and get audio bytes."""
client = AsyncTcpClient("127.0.0.1", 10301)
await client.connect()
# Read the initial Info event
await client.read_event()
# Send Synthesize event
await client.write_event(Synthesize(text=text, voice=SynthesizeVoice(name=voice)).event())
audio_data = bytearray()
rate = 24000
width = 2
channels = 1
while True:
event = await client.read_event()
if event is None:
break
if AudioStart.is_type(event.type):
start = AudioStart.from_event(event)
rate = start.rate
width = start.width
channels = start.channels
elif AudioChunk.is_type(event.type):
chunk = AudioChunk.from_event(event)
audio_data.extend(chunk.audio)
elif AudioStop.is_type(event.type):
break
await client.disconnect()
# Package raw PCM into WAV
wav_io = io.BytesIO()
with wave.open(wav_io, 'wb') as wav_file:
wav_file.setnchannels(channels)
wav_file.setsampwidth(width)
wav_file.setframerate(rate)
wav_file.writeframes(audio_data)
return wav_io.getvalue()
def _handle_stt_request(self):
"""Handle STT request — accept WAV audio, return transcribed text."""
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self._send_json_response(400, {"error": "Empty body"})
return
try:
audio_bytes = self.rfile.read(content_length)
# Parse WAV to get PCM data and format
wav_io = io.BytesIO(audio_bytes)
with wave.open(wav_io, 'rb') as wav_file:
rate = wav_file.getframerate()
width = wav_file.getsampwidth()
channels = wav_file.getnchannels()
pcm_data = wav_file.readframes(wav_file.getnframes())
# Run the async Wyoming client
text = asyncio.run(self._transcribe_audio(pcm_data, rate, width, channels))
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps({"text": text}).encode())
except wave.Error as e:
self._send_json_response(400, {"error": f"Invalid WAV: {e}"})
except Exception as e:
self._send_json_response(500, {"error": str(e)})
async def _transcribe_audio(self, pcm_data: bytes, rate: int, width: int, channels: int) -> str:
"""Connect to Wyoming STT server and transcribe audio."""
client = AsyncTcpClient("127.0.0.1", 10300)
await client.connect()
# Send Transcribe request (STT server does not send an initial Info event)
await client.write_event(Transcribe(language="en").event())
# Send audio
await client.write_event(AudioStart(rate=rate, width=width, channels=channels).event())
# Send in chunks (1 second each)
bytes_per_second = rate * width * channels
for offset in range(0, len(pcm_data), bytes_per_second):
chunk = pcm_data[offset:offset + bytes_per_second]
await client.write_event(AudioChunk(rate=rate, width=width, channels=channels, audio=chunk).event())
await client.write_event(AudioStop().event())
# Read transcript
while True:
event = await client.read_event()
if event is None:
break
if Transcript.is_type(event.type):
transcript = Transcript.from_event(event)
await client.disconnect()
return transcript.text
await client.disconnect()
return ""
def _handle_wake_word(self):
"""Handle wake word detection notification."""
content_length = int(self.headers.get("Content-Length", 0))
wake_word_data = {}
if content_length > 0:
try:
body = self.rfile.read(content_length).decode()
wake_word_data = json.loads(body)
except (json.JSONDecodeError, ConnectionResetError, OSError):
# Client may close connection early, that's ok
pass
print(f"[OpenClaw Bridge] Wake word detected: {wake_word_data.get('wake_word', 'unknown')}")
self._send_json_response(200, {"status": "ok", "message": "Wake word received"})
@staticmethod
def _config_set(path: str, value: str):
"""Set an OpenClaw config value."""
subprocess.run(
["/opt/homebrew/bin/openclaw", "config", "set", path, value],
capture_output=True, text=True, timeout=5,
)
@staticmethod
def _call_openclaw(message: str, agent: str, timeout: int,
model: str = None, session_id: str = None,
params: dict = None, thinking: str = None) -> str:
"""Call OpenClaw CLI and return stdout.
Temporarily switches the gateway's primary model and inference params
via `openclaw config set`, protected by _model_lock to prevent races."""
cmd = ["/opt/homebrew/bin/openclaw", "agent", "--message", message, "--agent", agent]
if session_id:
cmd.extend(["--session-id", session_id])
if thinking:
cmd.extend(["--thinking", thinking])
with _model_lock:
if model:
OpenClawBridgeHandler._config_set(
"agents.defaults.model.primary", model)
# Set per-style temperature if provided
temp_path = None
if model and params and params.get("temperature") is not None:
temp_path = f'agents.defaults.models["{model}"].params.temperature'
OpenClawBridgeHandler._config_set(
temp_path, str(params["temperature"]))
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=True,
)
return result.stdout.strip()
finally:
# Restore defaults
if model and model != DEFAULT_MODEL:
OpenClawBridgeHandler._config_set(
"agents.defaults.model.primary", DEFAULT_MODEL)
if temp_path:
# Restore to neutral default
OpenClawBridgeHandler._config_set(temp_path, "0.5")
@staticmethod
def _needs_followup(response: str) -> bool:
"""Detect if the model promised to act but didn't actually do it.
Returns True if the response looks like a 'will do' without a result."""
if not response:
return False
resp_lower = response.lower()
# If the response contains a URL or JSON-like output, it probably completed
if "http://" in response or "https://" in response or '"status"' in response:
return False
# If it contains a tool result indicator (ha-ctl output, gaze-ctl output)
if any(kw in resp_lower for kw in ["image_url", "seed", "entity_id", "state:", "turned on", "turned off"]):
return False
# Detect promise-like language without substance
promise_phrases = [
"let me", "i'll ", "i will ", "sure thing", "sure,", "right away",
"generating", "one moment", "working on", "hang on", "just a moment",
"on it", "let me generate", "let me create",
]
has_promise = any(phrase in resp_lower for phrase in promise_phrases)
# Short responses with promise language are likely incomplete
if has_promise and len(response) < 200:
return True
return False
def _handle_agent_request(self):
"""Handle agent message request."""
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self._send_json_response(400, {"error": "Empty body"})
return
try:
body = self.rfile.read(content_length).decode()
data = json.loads(body)
except json.JSONDecodeError:
self._send_json_response(400, {"error": "Invalid JSON"})
return
message = data.get("message")
agent = data.get("agent", "main")
satellite_id = data.get("satellite_id")
explicit_character_id = data.get("character_id")
requested_style = data.get("prompt_style")
conversation_id = data.get("conversation_id")
if not message:
self._send_json_response(400, {"error": "Message is required"})
return
# Resolve character: explicit ID > satellite mapping > default
if explicit_character_id:
character_id = explicit_character_id
else:
character_id = resolve_character_id(satellite_id)
# Resolve prompt style: explicit > character default > global active
char = load_character(character_id)
style_id = requested_style or char.get("default_prompt_style") or load_active_style()
style = load_prompt_style(style_id)
print(f"[OpenClaw Bridge] Prompt style: {style.get('name', style_id)} ({style.get('group', 'cloud')})")
# Determine if routing to cloud (for privacy filtering)
mode_data = load_mode()
active_model = resolve_model_for_style(style, mode_data)
is_cloud = style.get("group", "cloud") == "cloud" and mode_data.get("mode") != "private"
system_prompt = load_character_prompt(
character_id=character_id, prompt_style=style_id,
user_message=message, is_cloud=is_cloud,
)
# Run lifecycle maintenance (cheap SQL updates)
auto_resolve_expired_followups()
auto_archive_old_resolved()
# Set the active TTS config for the Wyoming server to pick up
tts_config = char.get("tts", {})
if tts_config:
set_active_tts_voice(character_id, tts_config)
engine = tts_config.get("engine", "kokoro")
voice_label = tts_config.get("kokoro_voice", "") if engine == "kokoro" else tts_config.get("elevenlabs_voice_id", "")
print(f"[OpenClaw Bridge] Active TTS: {engine} / {voice_label}")
if satellite_id:
print(f"[OpenClaw Bridge] Satellite: {satellite_id} → character: {character_id}")
elif explicit_character_id:
print(f"[OpenClaw Bridge] Character: {character_id}")
if system_prompt:
message = f"System Context: {system_prompt}\n\nUser Request: {message}"
group = style.get("group", "cloud")
print(f"[OpenClaw Bridge] Routing: {group.upper()}{active_model}")
# Resolve session ID for OpenClaw thread isolation
# Dashboard chats: use conversation_id (each "New Chat" = fresh thread)
# Satellites: use rotating 12-hour bucket so old context expires naturally
if conversation_id:
session_id = conversation_id
elif satellite_id:
now = datetime.now(timezone.utc)
half = "am" if now.hour < 12 else "pm"
session_id = f"sat_{satellite_id}_{now.strftime('%Y%m%d')}_{half}"
else:
# API call with no conversation or satellite — use a transient session
session_id = f"api_{int(datetime.now(timezone.utc).timestamp())}"
print(f"[OpenClaw Bridge] Session: {session_id}")
# Extract style inference params (temperature, etc.) and thinking level
style_params = style.get("params", {})
style_thinking = style.get("thinking")
if style_params:
print(f"[OpenClaw Bridge] Style params: {style_params}")
if style_thinking:
print(f"[OpenClaw Bridge] Thinking: {style_thinking}")
# Check if model is warm to set appropriate timeout
warm = is_model_warm()
timeout = TIMEOUT_WARM if warm else TIMEOUT_COLD
print(f"[OpenClaw Bridge] Model {'warm' if warm else 'cold'}, timeout={timeout}s")
# Signal avatar: thinking
_vtube_fire_and_forget("/expression", {"event": "thinking"})
# Call OpenClaw CLI (use full path for launchd compatibility)
try:
response_text = self._call_openclaw(message, agent, timeout, model=active_model, session_id=session_id, params=style_params, thinking=style_thinking)
# Re-prompt if the model promised to act but didn't call a tool.
# Detect "I'll do X" / "Let me X" responses that lack any result.
if self._needs_followup(response_text):
print(f"[OpenClaw Bridge] Response looks like a promise without action, re-prompting")
followup = (
"You just said you would do something but didn't actually call the exec tool. "
"Do NOT explain what you will do — call the tool NOW using exec and return the result."
)
response_text = self._call_openclaw(followup, agent, timeout, model=active_model, session_id=session_id, params=style_params, thinking=style_thinking)
# Increment surfaced_count on follow-ups that were injected into prompt
try:
followups = get_pending_followups(character_id)
for fu in followups[:3]:
increment_surfaced_count(fu["id"])
except Exception as e:
print(f"[OpenClaw Bridge] Follow-up tracking error: {e}")
# Signal avatar: idle (TTS handler will override to 'speaking' if voice is used)
_vtube_fire_and_forget("/expression", {"event": "idle"})
self._send_json_response(200, {"response": response_text, "model": active_model, "prompt_style": style_id})
except subprocess.TimeoutExpired:
self._send_json_response(504, {"error": f"OpenClaw command timed out after {timeout}s (model was {'warm' if warm else 'cold'})"})
except subprocess.CalledProcessError as e:
error_msg = e.stderr.strip() if e.stderr else "OpenClaw command failed"
self._send_json_response(500, {"error": error_msg})
except FileNotFoundError:
self._send_json_response(500, {"error": "OpenClaw CLI not found"})
except Exception as e:
self._send_json_response(500, {"error": str(e)})
# ------------------------------------------------------------------
# Memory REST API
# ------------------------------------------------------------------
def _read_json_body(self) -> dict | None:
"""Read and parse JSON body from request. Returns None on error (response already sent)."""
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self._send_json_response(400, {"error": "Empty body"})
return None
try:
return json.loads(self.rfile.read(content_length).decode())
except json.JSONDecodeError:
self._send_json_response(400, {"error": "Invalid JSON"})
return None
def _handle_memory_get(self, path_parts: list[str], query_params: dict):
"""Handle GET /api/memories/..."""
# GET /api/memories/general
if len(path_parts) == 1 and path_parts[0] == "general":
limit = int(query_params.get("limit", ["50"])[0])
offset = int(query_params.get("offset", ["0"])[0])
memory_type = query_params.get("type", [None])[0]
lifecycle = query_params.get("lifecycle", [None])[0]
category = query_params.get("category", [None])[0]
memories = _list_memories("shared", memory_type=memory_type,
lifecycle_state=lifecycle, category=category,
limit=limit, offset=offset)
self._send_json_response(200, {"memories": memories})
return
if len(path_parts) < 1:
self._send_json_response(400, {"error": "Character ID required"})
return
char_id = path_parts[0]
# GET /api/memories/:characterId/followups
if len(path_parts) == 2 and path_parts[1] == "followups":
followups = get_pending_followups(char_id)
self._send_json_response(200, {"followups": followups})
return
# GET /api/memories/:characterId
limit = int(query_params.get("limit", ["50"])[0])
offset = int(query_params.get("offset", ["0"])[0])
memory_type = query_params.get("type", [None])[0]
lifecycle = query_params.get("lifecycle", [None])[0]
category = query_params.get("category", [None])[0]
query = query_params.get("q", [None])[0]
if query:
memories = _search_memories(char_id, query, memory_type=memory_type, limit=limit)
else:
memories = _list_memories(char_id, memory_type=memory_type,
lifecycle_state=lifecycle, category=category,
limit=limit, offset=offset)
self._send_json_response(200, {"memories": memories, "characterId": char_id})
def _handle_memory_post(self, path_parts: list[str]):
"""Handle POST /api/memories/..."""
data = self._read_json_body()
if data is None:
return
# POST /api/memories/migrate
if len(path_parts) == 1 and path_parts[0] == "migrate":
result = migrate_from_json()
self._send_json_response(200, result)
return
# POST /api/memories/:memoryId/resolve
if len(path_parts) == 2 and path_parts[1] == "resolve":
ok = resolve_followup(path_parts[0])
self._send_json_response(200 if ok else 404,
{"ok": ok, "id": path_parts[0]})
return
# POST /api/memories/general — add general memory
if len(path_parts) == 1 and path_parts[0] == "general":
content = data.get("content", "").strip()
if not content:
self._send_json_response(400, {"error": "content is required"})
return
mem = add_or_merge_memory(
character_id="shared",
content=content,
memory_type=data.get("memory_type"),
category=data.get("category", "other"),
importance=data.get("importance"),
privacy_level=data.get("privacy_level"),
tags=data.get("tags"),
source=data.get("source", "dashboard"),
)
self._send_json_response(200, {"ok": True, "memory": mem})
return
# POST /api/memories/:characterId — add personal memory
if len(path_parts) == 1:
char_id = path_parts[0]
content = data.get("content", "").strip()
if not content:
self._send_json_response(400, {"error": "content is required"})
return
mem = add_or_merge_memory(
character_id=char_id,
content=content,
memory_type=data.get("memory_type"),
category=data.get("category", "other"),
importance=data.get("importance"),
privacy_level=data.get("privacy_level"),
tags=data.get("tags"),
follow_up_due=data.get("follow_up_due"),
follow_up_context=data.get("follow_up_context"),
source=data.get("source", "dashboard"),
)
self._send_json_response(200, {"ok": True, "memory": mem})
return
self._send_json_response(404, {"error": "Not found"})
def _handle_memory_put(self, path_parts: list[str]):
"""Handle PUT /api/memories/:memoryId — update a memory."""
if len(path_parts) != 1:
self._send_json_response(400, {"error": "Memory ID required"})
return
data = self._read_json_body()
if data is None:
return
mem = _update_memory(path_parts[0], **data)
if mem:
self._send_json_response(200, {"ok": True, "memory": mem})
else:
self._send_json_response(404, {"error": "Memory not found"})
def _handle_memory_delete(self, path_parts: list[str]):
"""Handle DELETE /api/memories/:memoryId."""
if len(path_parts) != 1:
self._send_json_response(400, {"error": "Memory ID required"})
return
ok = _delete_memory(path_parts[0])
self._send_json_response(200 if ok else 404, {"ok": ok, "id": path_parts[0]})
# ------------------------------------------------------------------
# HTTP method dispatchers
# ------------------------------------------------------------------
def do_GET(self):
"""Handle GET requests."""
parsed_path = urlparse(self.path)
path = parsed_path.path
if path == "/status" or path == "/":
self._send_json_response(200, {
"status": "ok",
"service": "OpenClaw HTTP Bridge",
"version": "2.0.0"
})
return
# Memory API: GET /api/memories/...
if path.startswith("/api/memories/"):
parts = path[len("/api/memories/"):].strip("/").split("/")
query_params = parse_qs(parsed_path.query)
self._handle_memory_get(parts, query_params)
return
self._send_json_response(404, {"error": "Not found"})
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
def main():
"""Run the HTTP bridge server."""
parser = argparse.ArgumentParser(description="OpenClaw HTTP Bridge")
parser.add_argument(
"--port",
type=int,
default=8081,
help="Port to listen on (default: 8081)"
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="Host to bind to (default: 0.0.0.0)"
)
args = parser.parse_args()
ThreadingHTTPServer.allow_reuse_address = True
server = ThreadingHTTPServer((args.host, args.port), OpenClawBridgeHandler)
print(f"OpenClaw HTTP Bridge running on http://{args.host}:{args.port}")
print(f"Endpoint: POST http://{args.host}:{args.port}/api/agent/message")
print("Press Ctrl+C to stop")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down...")
server.shutdown()
if __name__ == "__main__":
main()