#!/usr/bin/env python3 """ OpenClaw HTTP Bridge A simple HTTP server that translates HTTP POST requests to OpenClaw CLI calls. This allows Home Assistant (running in Docker on a different machine) to communicate with OpenClaw via HTTP. Usage: python3 openclaw-http-bridge.py [--port 8081] Endpoints: POST /api/agent/message { "message": "Your message here", "agent": "main" } Returns: { "response": "OpenClaw response text" } """ import argparse import json import subprocess import sys import asyncio from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn from urllib.parse import urlparse from pathlib import Path import wave import io import re from wyoming.client import AsyncTcpClient from wyoming.tts import Synthesize, SynthesizeVoice from wyoming.asr import Transcribe, Transcript from wyoming.audio import AudioStart, AudioChunk, AudioStop from wyoming.info import Info def load_character_prompt() -> str: """Load the active character system prompt.""" character_path = Path.home() / ".openclaw" / "characters" / "aria.json" if not character_path.exists(): return "" try: with open(character_path) as f: data = json.load(f) return data.get("system_prompt", "") except Exception: return "" class OpenClawBridgeHandler(BaseHTTPRequestHandler): """HTTP request handler for OpenClaw bridge.""" def log_message(self, format, *args): """Log requests to stderr.""" print(f"[OpenClaw Bridge] {self.address_string()} - {format % args}") def _send_json_response(self, status_code: int, data: dict): """Send a JSON response.""" self.send_response(status_code) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(data).encode()) def do_POST(self): """Handle POST requests.""" parsed_path = urlparse(self.path) # Handle wake word notification if parsed_path.path == "/wake": self._handle_wake_word() return # Handle TTS preview requests if parsed_path.path == "/api/tts": self._handle_tts_request() return # Handle STT requests if parsed_path.path == "/api/stt": self._handle_stt_request() return # Only handle the agent message endpoint if parsed_path.path == "/api/agent/message": self._handle_agent_request() return self._send_json_response(404, {"error": "Not found"}) def _handle_tts_request(self): """Handle TTS request and return wav audio.""" content_length = int(self.headers.get("Content-Length", 0)) if content_length == 0: self._send_json_response(400, {"error": "Empty body"}) return try: body = self.rfile.read(content_length).decode() data = json.loads(body) except json.JSONDecodeError: self._send_json_response(400, {"error": "Invalid JSON"}) return text = data.get("text", "Hello, this is a test.") # Strip emojis so TTS doesn't try to read them out text = re.sub( r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF' r'\U0001F1E0-\U0001F1FF\U0001F900-\U0001F9FF\U0001FA00-\U0001FAFF' r'\U00002702-\U000027B0\U0000FE00-\U0000FE0F\U0000200D' r'\U00002600-\U000026FF\U00002300-\U000023FF]+', '', text ).strip() voice = data.get("voice", "af_heart") try: # Run the async Wyoming client audio_bytes = asyncio.run(self._synthesize_audio(text, voice)) # Send WAV response self.send_response(200) self.send_header("Content-Type", "audio/wav") # Allow CORS for local testing from Vite self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(audio_bytes) except Exception as e: self._send_json_response(500, {"error": str(e)}) def do_OPTIONS(self): """Handle CORS preflight requests.""" self.send_response(204) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() async def _synthesize_audio(self, text: str, voice: str) -> bytes: """Connect to Wyoming TTS server and get audio bytes.""" client = AsyncTcpClient("127.0.0.1", 10301) await client.connect() # Read the initial Info event await client.read_event() # Send Synthesize event await client.write_event(Synthesize(text=text, voice=SynthesizeVoice(name=voice)).event()) audio_data = bytearray() rate = 24000 width = 2 channels = 1 while True: event = await client.read_event() if event is None: break if AudioStart.is_type(event.type): start = AudioStart.from_event(event) rate = start.rate width = start.width channels = start.channels elif AudioChunk.is_type(event.type): chunk = AudioChunk.from_event(event) audio_data.extend(chunk.audio) elif AudioStop.is_type(event.type): break await client.disconnect() # Package raw PCM into WAV wav_io = io.BytesIO() with wave.open(wav_io, 'wb') as wav_file: wav_file.setnchannels(channels) wav_file.setsampwidth(width) wav_file.setframerate(rate) wav_file.writeframes(audio_data) return wav_io.getvalue() def _handle_stt_request(self): """Handle STT request — accept WAV audio, return transcribed text.""" content_length = int(self.headers.get("Content-Length", 0)) if content_length == 0: self._send_json_response(400, {"error": "Empty body"}) return try: audio_bytes = self.rfile.read(content_length) # Parse WAV to get PCM data and format wav_io = io.BytesIO(audio_bytes) with wave.open(wav_io, 'rb') as wav_file: rate = wav_file.getframerate() width = wav_file.getsampwidth() channels = wav_file.getnchannels() pcm_data = wav_file.readframes(wav_file.getnframes()) # Run the async Wyoming client text = asyncio.run(self._transcribe_audio(pcm_data, rate, width, channels)) self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(json.dumps({"text": text}).encode()) except wave.Error as e: self._send_json_response(400, {"error": f"Invalid WAV: {e}"}) except Exception as e: self._send_json_response(500, {"error": str(e)}) async def _transcribe_audio(self, pcm_data: bytes, rate: int, width: int, channels: int) -> str: """Connect to Wyoming STT server and transcribe audio.""" client = AsyncTcpClient("127.0.0.1", 10300) await client.connect() # Send Transcribe request (STT server does not send an initial Info event) await client.write_event(Transcribe(language="en").event()) # Send audio await client.write_event(AudioStart(rate=rate, width=width, channels=channels).event()) # Send in chunks (1 second each) bytes_per_second = rate * width * channels for offset in range(0, len(pcm_data), bytes_per_second): chunk = pcm_data[offset:offset + bytes_per_second] await client.write_event(AudioChunk(rate=rate, width=width, channels=channels, audio=chunk).event()) await client.write_event(AudioStop().event()) # Read transcript while True: event = await client.read_event() if event is None: break if Transcript.is_type(event.type): transcript = Transcript.from_event(event) await client.disconnect() return transcript.text await client.disconnect() return "" def _handle_wake_word(self): """Handle wake word detection notification.""" content_length = int(self.headers.get("Content-Length", 0)) wake_word_data = {} if content_length > 0: try: body = self.rfile.read(content_length).decode() wake_word_data = json.loads(body) except (json.JSONDecodeError, ConnectionResetError, OSError): # Client may close connection early, that's ok pass print(f"[OpenClaw Bridge] Wake word detected: {wake_word_data.get('wake_word', 'unknown')}") self._send_json_response(200, {"status": "ok", "message": "Wake word received"}) def _handle_agent_request(self): """Handle agent message request.""" content_length = int(self.headers.get("Content-Length", 0)) if content_length == 0: self._send_json_response(400, {"error": "Empty body"}) return try: body = self.rfile.read(content_length).decode() data = json.loads(body) except json.JSONDecodeError: self._send_json_response(400, {"error": "Invalid JSON"}) return message = data.get("message") agent = data.get("agent", "main") if not message: self._send_json_response(400, {"error": "Message is required"}) return # Inject system prompt system_prompt = load_character_prompt() if system_prompt: message = f"System Context: {system_prompt}\n\nUser Request: {message}" # Call OpenClaw CLI (use full path for launchd compatibility) try: result = subprocess.run( ["/opt/homebrew/bin/openclaw", "agent", "--message", message, "--agent", agent], capture_output=True, text=True, timeout=120, check=True ) response_text = result.stdout.strip() self._send_json_response(200, {"response": response_text}) except subprocess.TimeoutExpired: self._send_json_response(504, {"error": "OpenClaw command timed out"}) except subprocess.CalledProcessError as e: error_msg = e.stderr.strip() if e.stderr else "OpenClaw command failed" self._send_json_response(500, {"error": error_msg}) except FileNotFoundError: self._send_json_response(500, {"error": "OpenClaw CLI not found"}) except Exception as e: self._send_json_response(500, {"error": str(e)}) def do_GET(self): """Handle GET requests (health check).""" parsed_path = urlparse(self.path) if parsed_path.path == "/status" or parsed_path.path == "/": self._send_json_response(200, { "status": "ok", "service": "OpenClaw HTTP Bridge", "version": "1.0.0" }) else: self._send_json_response(404, {"error": "Not found"}) class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): daemon_threads = True def main(): """Run the HTTP bridge server.""" parser = argparse.ArgumentParser(description="OpenClaw HTTP Bridge") parser.add_argument( "--port", type=int, default=8081, help="Port to listen on (default: 8081)" ) parser.add_argument( "--host", default="0.0.0.0", help="Host to bind to (default: 0.0.0.0)" ) args = parser.parse_args() ThreadingHTTPServer.allow_reuse_address = True server = ThreadingHTTPServer((args.host, args.port), OpenClawBridgeHandler) print(f"OpenClaw HTTP Bridge running on http://{args.host}:{args.port}") print(f"Endpoint: POST http://{args.host}:{args.port}/api/agent/message") print("Press Ctrl+C to stop") try: server.serve_forever() except KeyboardInterrupt: print("\nShutting down...") server.shutdown() if __name__ == "__main__": main()