## Voice Pipeline (P3) - Replace openWakeWord daemon with Wyoming Satellite approach - Add Wyoming Satellite service on port 10700 for HA voice pipeline - Update setup.sh with cross-platform sed compatibility (macOS/Linux) - Add version field to Kokoro TTS voice info - Update launchd service loader to use Wyoming Satellite ## Home Assistant Integration (P4) - Add custom conversation agent component (openclaw_conversation) - Fix: Use IntentResponse instead of plain strings (HA API requirement) - Support both HTTP API and CLI fallback modes - Config flow for easy HA UI setup - Add OpenClaw bridge scripts (Python + Bash) - Add ha-ctl utility for HA entity control - Fix: Use context manager for token file reading - Add HA configuration examples and documentation ## Infrastructure - Add mem0 backup automation (launchd + script) - Add n8n workflow templates (morning briefing, notification router) - Add VS Code workspace configuration - Reorganize model files into categorized folders: - lmstudio-community/ - mlx-community/ - bartowski/ - mradermacher/ ## Documentation - Update PROJECT_PLAN.md with Wyoming Satellite architecture - Update TODO.md with completed Wyoming integration tasks - Add OPENCLAW_INTEGRATION.md for HA setup guide ## Testing - Verified Wyoming services running (STT:10300, TTS:10301, Satellite:10700) - Verified OpenClaw CLI accessibility - Confirmed cross-platform compatibility fixes
189 lines
5.9 KiB
Python
Executable File
189 lines
5.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
ha-ctl — Home Assistant control CLI for OpenClaw agent.
|
|
|
|
Usage:
|
|
ha-ctl list [domain] List entities (optionally filtered by domain)
|
|
ha-ctl state <entity_id_or_name> Get current state of an entity
|
|
ha-ctl on <entity_id_or_name> Turn entity on
|
|
ha-ctl off <entity_id_or_name> Turn entity off
|
|
ha-ctl toggle <entity_id_or_name> Toggle entity
|
|
ha-ctl set <entity_id> <attr> <val> Set attribute (e.g. brightness 128)
|
|
ha-ctl scene <scene_name> Activate a scene
|
|
|
|
Environment:
|
|
HASS_TOKEN Long-lived access token
|
|
HA_URL Base URL (default: https://10.0.0.199:8123)
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import urllib.request
|
|
import urllib.error
|
|
import ssl
|
|
|
|
HA_URL = os.environ.get("HA_URL", "https://10.0.0.199:8123").rstrip("/")
|
|
TOKEN = os.environ.get("HASS_TOKEN") or os.environ.get("HA_TOKEN")
|
|
|
|
if not TOKEN:
|
|
token_file = os.path.expanduser("~/.homeai/hass_token")
|
|
if os.path.exists(token_file):
|
|
with open(token_file) as f:
|
|
TOKEN = f.read().strip()
|
|
|
|
if not TOKEN:
|
|
print("ERROR: No HASS_TOKEN set. Export HASS_TOKEN or write to ~/.homeai/hass_token", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Skip SSL verification for self-signed certs on local HA
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
|
|
def api(method, path, data=None):
|
|
url = f"{HA_URL}/api{path}"
|
|
headers = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
|
|
body = json.dumps(data).encode() if data is not None else None
|
|
req = urllib.request.Request(url, data=body, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, context=ctx, timeout=10) as resp:
|
|
return json.loads(resp.read())
|
|
except urllib.error.HTTPError as e:
|
|
print(f"ERROR: HTTP {e.code} — {e.read().decode()}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def get_states():
|
|
return api("GET", "/states")
|
|
|
|
|
|
def resolve_entity(query, states=None):
|
|
"""Resolve a friendly name or partial entity_id to a full entity_id."""
|
|
if states is None:
|
|
states = get_states()
|
|
query_lower = query.lower().replace("_", " ")
|
|
# Exact entity_id match
|
|
for s in states:
|
|
if s["entity_id"] == query:
|
|
return s["entity_id"]
|
|
# Friendly name exact match
|
|
for s in states:
|
|
name = s.get("attributes", {}).get("friendly_name", "").lower()
|
|
if name == query_lower:
|
|
return s["entity_id"]
|
|
# Partial entity_id match
|
|
for s in states:
|
|
if query_lower in s["entity_id"].lower():
|
|
return s["entity_id"]
|
|
# Partial friendly name match
|
|
matches = []
|
|
for s in states:
|
|
name = s.get("attributes", {}).get("friendly_name", "").lower()
|
|
if query_lower in name:
|
|
matches.append(s)
|
|
if len(matches) == 1:
|
|
return matches[0]["entity_id"]
|
|
if len(matches) > 1:
|
|
names = [f"{m['entity_id']} ({m['attributes'].get('friendly_name','')})" for m in matches]
|
|
print(f"Ambiguous: {', '.join(names)}", file=sys.stderr)
|
|
sys.exit(1)
|
|
print(f"ERROR: No entity found matching '{query}'", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def call_service(domain, service, entity_id, extra=None):
|
|
data = {"entity_id": entity_id}
|
|
if extra:
|
|
data.update(extra)
|
|
result = api("POST", f"/services/{domain}/{service}", data)
|
|
return result
|
|
|
|
|
|
def cmd_list(args):
|
|
domain_filter = args[0] if args else None
|
|
states = get_states()
|
|
if domain_filter:
|
|
states = [s for s in states if s["entity_id"].startswith(domain_filter + ".")]
|
|
for s in sorted(states, key=lambda x: x["entity_id"]):
|
|
name = s.get("attributes", {}).get("friendly_name", "")
|
|
print(f"{s['entity_id']}\t{s['state']}\t{name}")
|
|
|
|
|
|
def cmd_state(args):
|
|
if not args:
|
|
print("Usage: ha-ctl state <entity>", file=sys.stderr); sys.exit(1)
|
|
states = get_states()
|
|
eid = resolve_entity(args[0], states)
|
|
s = next(x for x in states if x["entity_id"] == eid)
|
|
print(f"Entity: {eid}")
|
|
print(f"State: {s['state']}")
|
|
attrs = s.get("attributes", {})
|
|
for k, v in attrs.items():
|
|
print(f" {k}: {v}")
|
|
|
|
|
|
def cmd_control(action, args):
|
|
if not args:
|
|
print(f"Usage: ha-ctl {action} <entity>", file=sys.stderr); sys.exit(1)
|
|
states = get_states()
|
|
eid = resolve_entity(args[0], states)
|
|
domain = eid.split(".")[0]
|
|
service_map = {"on": "turn_on", "off": "turn_off", "toggle": "toggle"}
|
|
service = service_map[action]
|
|
call_service(domain, service, eid)
|
|
name = next((x.get("attributes", {}).get("friendly_name", eid)
|
|
for x in states if x["entity_id"] == eid), eid)
|
|
print(f"OK: {service} → {name} ({eid})")
|
|
|
|
|
|
def cmd_scene(args):
|
|
if not args:
|
|
print("Usage: ha-ctl scene <name>", file=sys.stderr); sys.exit(1)
|
|
states = get_states()
|
|
eid = resolve_entity(args[0], states)
|
|
call_service("scene", "turn_on", eid)
|
|
print(f"OK: scene activated → {eid}")
|
|
|
|
|
|
def cmd_set(args):
|
|
if len(args) < 3:
|
|
print("Usage: ha-ctl set <entity> <attribute> <value>", file=sys.stderr); sys.exit(1)
|
|
states = get_states()
|
|
eid = resolve_entity(args[0], states)
|
|
domain = eid.split(".")[0]
|
|
attr, val = args[1], args[2]
|
|
try:
|
|
val = int(val)
|
|
except ValueError:
|
|
try:
|
|
val = float(val)
|
|
except ValueError:
|
|
pass
|
|
call_service(domain, "turn_on", eid, {attr: val})
|
|
print(f"OK: set {attr}={val} → {eid}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = sys.argv[1:]
|
|
if not args:
|
|
print(__doc__)
|
|
sys.exit(0)
|
|
cmd = args[0]
|
|
rest = args[1:]
|
|
if cmd == "list":
|
|
cmd_list(rest)
|
|
elif cmd == "state":
|
|
cmd_state(rest)
|
|
elif cmd in ("on", "off", "toggle"):
|
|
cmd_control(cmd, rest)
|
|
elif cmd == "scene":
|
|
cmd_scene(rest)
|
|
elif cmd == "set":
|
|
cmd_set(rest)
|
|
else:
|
|
print(f"Unknown command: {cmd}", file=sys.stderr)
|
|
print(__doc__)
|
|
sys.exit(1)
|