#!/usr/bin/env python3 """ HomeAI Memory Store — SQLite + Vector Search Replaces flat JSON memory files with a structured SQLite database using sqlite-vec for semantic similarity search. Used by: - openclaw-http-bridge.py (memory retrieval + follow-up injection) - memory-ctl skill (CLI memory management) - Dashboard API (REST endpoints via bridge) """ import json import os import sqlite3 import struct import time from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Optional import sqlite_vec # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- DATA_DIR = Path(os.environ.get("DATA_DIR", os.path.expanduser("~/homeai-data"))) MEMORIES_DIR = DATA_DIR / "memories" DB_PATH = MEMORIES_DIR / "memories.db" EMBEDDING_DIM = 384 # all-MiniLM-L6-v2 # Privacy keywords for rule-based classification PRIVACY_KEYWORDS = { "local_only": [ "health", "illness", "sick", "doctor", "medical", "medication", "surgery", "salary", "bank", "financial", "debt", "mortgage", "tax", "depression", "anxiety", "therapy", "divorce", "breakup", ], "sensitive": [ "address", "phone", "email", "password", "birthday", ], } # --------------------------------------------------------------------------- # Embedding model (lazy-loaded singleton) # --------------------------------------------------------------------------- _embedder = None def _get_embedder(): """Lazy-load the sentence-transformers model.""" global _embedder if _embedder is None: from sentence_transformers import SentenceTransformer _embedder = SentenceTransformer("all-MiniLM-L6-v2") return _embedder def get_embedding(text: str) -> list[float]: """Compute a 384-dim embedding for the given text.""" model = _get_embedder() vec = model.encode(text, normalize_embeddings=True) return vec.tolist() def _serialize_f32(vec: list[float]) -> bytes: """Serialize a float list to little-endian bytes for sqlite-vec.""" return struct.pack(f"<{len(vec)}f", *vec) def _deserialize_f32(blob: bytes) -> list[float]: """Deserialize sqlite-vec float bytes back to a list.""" n = len(blob) // 4 return list(struct.unpack(f"<{n}f", blob)) # --------------------------------------------------------------------------- # Database initialization # --------------------------------------------------------------------------- _db: Optional[sqlite3.Connection] = None def init_db() -> sqlite3.Connection: """Initialize the SQLite database with schema and sqlite-vec extension.""" global _db if _db is not None: return _db MEMORIES_DIR.mkdir(parents=True, exist_ok=True) db = sqlite3.connect(str(DB_PATH), check_same_thread=False) db.enable_load_extension(True) sqlite_vec.load(db) db.enable_load_extension(False) db.row_factory = sqlite3.Row db.executescript(""" CREATE TABLE IF NOT EXISTS memories ( id TEXT PRIMARY KEY, character_id TEXT NOT NULL, content TEXT NOT NULL, memory_type TEXT NOT NULL DEFAULT 'semantic', category TEXT NOT NULL DEFAULT 'other', privacy_level TEXT NOT NULL DEFAULT 'standard', importance REAL NOT NULL DEFAULT 0.5, lifecycle_state TEXT NOT NULL DEFAULT 'active', follow_up_due TEXT, follow_up_context TEXT, source TEXT DEFAULT 'user_explicit', created_at TEXT NOT NULL, last_accessed TEXT, expires_at TEXT, previous_value TEXT, tags TEXT, surfaced_count INTEGER DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_memories_character ON memories(character_id); CREATE INDEX IF NOT EXISTS idx_memories_lifecycle ON memories(lifecycle_state); CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type); """) # Create the vec0 virtual table for vector search # sqlite-vec requires this specific syntax db.execute(f""" CREATE VIRTUAL TABLE IF NOT EXISTS memory_embeddings USING vec0( id TEXT PRIMARY KEY, embedding float[{EMBEDDING_DIM}] ) """) # Partial index for follow-ups (created manually since executescript can't # handle IF NOT EXISTS for partial indexes cleanly on all versions) try: db.execute(""" CREATE INDEX idx_memories_followup ON memories(lifecycle_state, follow_up_due) WHERE lifecycle_state = 'pending_followup' """) except sqlite3.OperationalError: pass # index already exists db.commit() _db = db return db def _get_db() -> sqlite3.Connection: """Get or initialize the database connection.""" if _db is None: return init_db() return _db def _row_to_dict(row: sqlite3.Row) -> dict: """Convert a sqlite3.Row to a plain dict.""" return dict(row) def _generate_id() -> str: """Generate a unique memory ID.""" return f"m_{int(time.time() * 1000)}" def _now_iso() -> str: """Current UTC time as ISO string.""" return datetime.now(timezone.utc).isoformat() # --------------------------------------------------------------------------- # Write-time classification (rule-based, Phase 1) # --------------------------------------------------------------------------- def classify_memory(content: str) -> dict: """Rule-based classification for memory properties. Returns defaults that can be overridden by explicit parameters.""" content_lower = content.lower() # Privacy detection privacy = "standard" for level, keywords in PRIVACY_KEYWORDS.items(): if any(kw in content_lower for kw in keywords): privacy = level break # Memory type detection memory_type = "semantic" temporal_markers = [ "today", "yesterday", "tonight", "this morning", "just now", "feeling", "right now", "this week", "earlier", ] if any(kw in content_lower for kw in temporal_markers): memory_type = "episodic" # Importance heuristic importance = 0.5 if privacy == "local_only": importance = 0.7 elif privacy == "sensitive": importance = 0.6 return { "memory_type": memory_type, "privacy_level": privacy, "importance": importance, } # --------------------------------------------------------------------------- # CRUD operations # --------------------------------------------------------------------------- def add_memory( character_id: str, content: str, memory_type: str | None = None, category: str = "other", importance: float | None = None, privacy_level: str | None = None, tags: list[str] | None = None, follow_up_due: str | None = None, follow_up_context: str | None = None, source: str = "user_explicit", expires_at: str | None = None, ) -> dict: """Add a new memory record. Auto-classifies fields not explicitly set.""" db = _get_db() classified = classify_memory(content) memory_type = memory_type or classified["memory_type"] privacy_level = privacy_level or classified["privacy_level"] importance = importance if importance is not None else classified["importance"] lifecycle_state = "active" if follow_up_due or follow_up_context: lifecycle_state = "pending_followup" if not follow_up_due: follow_up_due = "next_interaction" mem_id = _generate_id() now = _now_iso() # Generate embedding embedding = get_embedding(content) db.execute(""" INSERT INTO memories ( id, character_id, content, memory_type, category, privacy_level, importance, lifecycle_state, follow_up_due, follow_up_context, source, created_at, tags, surfaced_count ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0) """, ( mem_id, character_id, content, memory_type, category, privacy_level, importance, lifecycle_state, follow_up_due, follow_up_context, source, now, json.dumps(tags) if tags else None, )) # Insert embedding into vec0 table db.execute( "INSERT INTO memory_embeddings (id, embedding) VALUES (?, ?)", (mem_id, _serialize_f32(embedding)), ) db.commit() return { "id": mem_id, "character_id": character_id, "content": content, "memory_type": memory_type, "category": category, "privacy_level": privacy_level, "importance": importance, "lifecycle_state": lifecycle_state, "follow_up_due": follow_up_due, "follow_up_context": follow_up_context, "source": source, "created_at": now, "tags": tags, } def update_memory(memory_id: str, **fields) -> dict | None: """Update specific fields on a memory record.""" db = _get_db() # Validate that memory exists row = db.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone() if not row: return None allowed = { "content", "memory_type", "category", "privacy_level", "importance", "lifecycle_state", "follow_up_due", "follow_up_context", "source", "last_accessed", "expires_at", "previous_value", "tags", "surfaced_count", } updates = {k: v for k, v in fields.items() if k in allowed} if not updates: return _row_to_dict(row) # If content changed, update embedding and store previous value if "content" in updates: updates["previous_value"] = row["content"] embedding = get_embedding(updates["content"]) # Update vec0 table: delete old, insert new db.execute("DELETE FROM memory_embeddings WHERE id = ?", (memory_id,)) db.execute( "INSERT INTO memory_embeddings (id, embedding) VALUES (?, ?)", (memory_id, _serialize_f32(embedding)), ) if "tags" in updates and isinstance(updates["tags"], list): updates["tags"] = json.dumps(updates["tags"]) set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [memory_id] db.execute(f"UPDATE memories SET {set_clause} WHERE id = ?", values) db.commit() row = db.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone() return _row_to_dict(row) if row else None def delete_memory(memory_id: str) -> bool: """Delete a memory record and its embedding.""" db = _get_db() row = db.execute("SELECT id FROM memories WHERE id = ?", (memory_id,)).fetchone() if not row: return False db.execute("DELETE FROM memories WHERE id = ?", (memory_id,)) db.execute("DELETE FROM memory_embeddings WHERE id = ?", (memory_id,)) db.commit() return True # --------------------------------------------------------------------------- # Retrieval # --------------------------------------------------------------------------- def retrieve_memories( character_id: str, context_text: str = "", limit: int = 20, exclude_private_for_cloud: bool = False, ) -> list[dict]: """Dual retrieval: semantic similarity + recency, merged and ranked. If context_text is empty, falls back to recency-only retrieval. """ db = _get_db() privacy_filter = "" if exclude_private_for_cloud: privacy_filter = "AND m.privacy_level != 'local_only'" # Always include high-importance memories high_importance = db.execute(f""" SELECT * FROM memories m WHERE m.character_id = ? AND m.lifecycle_state IN ('active', 'pending_followup') AND m.importance > 0.8 {privacy_filter} ORDER BY m.created_at DESC LIMIT 5 """, (character_id,)).fetchall() seen_ids = {r["id"] for r in high_importance} results = {r["id"]: {**_row_to_dict(r), "_score": 1.0} for r in high_importance} # Semantic search (if context provided and embeddings exist) if context_text: try: query_emb = get_embedding(context_text) vec_rows = db.execute(""" SELECT id, distance FROM memory_embeddings WHERE embedding MATCH ? AND k = 30 """, (_serialize_f32(query_emb),)).fetchall() vec_ids = [r["id"] for r in vec_rows if r["id"] not in seen_ids] vec_distances = {r["id"]: r["distance"] for r in vec_rows} if vec_ids: placeholders = ",".join("?" * len(vec_ids)) sem_rows = db.execute(f""" SELECT * FROM memories m WHERE m.id IN ({placeholders}) AND m.character_id = ? AND m.lifecycle_state IN ('active', 'pending_followup') {privacy_filter} """, (*vec_ids, character_id)).fetchall() for r in sem_rows: d = _row_to_dict(r) # Convert cosine distance to similarity (sqlite-vec returns L2 distance for vec0) dist = vec_distances.get(r["id"], 1.0) semantic_score = max(0.0, 1.0 - dist) d["_score"] = 0.6 * semantic_score + 0.1 * d["importance"] results[r["id"]] = d seen_ids.add(r["id"]) except Exception as e: print(f"[MemoryStore] Vector search error: {e}") # Recency search: last 7 days, ordered by importance + recency recency_rows = db.execute(f""" SELECT * FROM memories m WHERE m.character_id = ? AND m.lifecycle_state IN ('active', 'pending_followup') AND m.created_at > datetime('now', '-7 days') {privacy_filter} ORDER BY m.importance DESC, m.created_at DESC LIMIT 10 """, (character_id,)).fetchall() for r in recency_rows: if r["id"] not in seen_ids: d = _row_to_dict(r) # Recency score based on age in days (newer = higher) try: created = datetime.fromisoformat(d["created_at"]) age_days = (datetime.now(timezone.utc) - created).total_seconds() / 86400 recency_score = max(0.0, 1.0 - (age_days / 7.0)) except (ValueError, TypeError): recency_score = 0.5 d["_score"] = 0.3 * recency_score + 0.1 * d["importance"] results[r["id"]] = d seen_ids.add(r["id"]) # Sort by score descending, return top N ranked = sorted(results.values(), key=lambda x: x.get("_score", 0), reverse=True) # Update last_accessed for returned memories returned = ranked[:limit] now = _now_iso() for mem in returned: mem.pop("_score", None) db.execute( "UPDATE memories SET last_accessed = ? WHERE id = ?", (now, mem["id"]), ) db.commit() return returned def get_pending_followups(character_id: str) -> list[dict]: """Get follow-up memories that are due for surfacing.""" db = _get_db() now = _now_iso() rows = db.execute(""" SELECT * FROM memories WHERE character_id = ? AND lifecycle_state = 'pending_followup' AND (follow_up_due <= ? OR follow_up_due = 'next_interaction') ORDER BY importance DESC, created_at DESC LIMIT 5 """, (character_id, now)).fetchall() return [_row_to_dict(r) for r in rows] def search_memories( character_id: str, query: str, memory_type: str | None = None, limit: int = 10, ) -> list[dict]: """Semantic search for memories matching a query.""" db = _get_db() query_emb = get_embedding(query) vec_rows = db.execute(""" SELECT id, distance FROM memory_embeddings WHERE embedding MATCH ? AND k = ? """, (_serialize_f32(query_emb), limit * 3)).fetchall() if not vec_rows: return [] vec_ids = [r["id"] for r in vec_rows] vec_distances = {r["id"]: r["distance"] for r in vec_rows} placeholders = ",".join("?" * len(vec_ids)) type_filter = "AND m.memory_type = ?" if memory_type else "" params = [*vec_ids, character_id] if memory_type: params.append(memory_type) rows = db.execute(f""" SELECT * FROM memories m WHERE m.id IN ({placeholders}) AND m.character_id = ? {type_filter} ORDER BY m.created_at DESC """, params).fetchall() # Sort by similarity results = [] for r in rows: d = _row_to_dict(r) d["_distance"] = vec_distances.get(r["id"], 1.0) results.append(d) results.sort(key=lambda x: x["_distance"]) for r in results: r.pop("_distance", None) return results[:limit] def list_memories( character_id: str, memory_type: str | None = None, lifecycle_state: str | None = None, category: str | None = None, limit: int = 20, offset: int = 0, ) -> list[dict]: """List memories with optional filters.""" db = _get_db() conditions = ["character_id = ?"] params: list = [character_id] if memory_type: conditions.append("memory_type = ?") params.append(memory_type) if lifecycle_state: conditions.append("lifecycle_state = ?") params.append(lifecycle_state) if category: conditions.append("category = ?") params.append(category) where = " AND ".join(conditions) params.extend([limit, offset]) rows = db.execute(f""" SELECT * FROM memories WHERE {where} ORDER BY created_at DESC LIMIT ? OFFSET ? """, params).fetchall() return [_row_to_dict(r) for r in rows] def count_memories(character_id: str) -> int: """Count memories for a character.""" db = _get_db() row = db.execute( "SELECT COUNT(*) as cnt FROM memories WHERE character_id = ?", (character_id,), ).fetchone() return row["cnt"] if row else 0 # --------------------------------------------------------------------------- # Lifecycle management # --------------------------------------------------------------------------- def resolve_followup(memory_id: str) -> bool: """Mark a follow-up as resolved.""" db = _get_db() result = db.execute(""" UPDATE memories SET lifecycle_state = 'resolved', follow_up_due = NULL WHERE id = ? AND lifecycle_state = 'pending_followup' """, (memory_id,)) db.commit() return result.rowcount > 0 def archive_memory(memory_id: str) -> bool: """Archive a memory (keeps it for relational inference, not surfaced).""" db = _get_db() result = db.execute(""" UPDATE memories SET lifecycle_state = 'archived' WHERE id = ? """, (memory_id,)) db.commit() return result.rowcount > 0 def auto_resolve_expired_followups() -> int: """Auto-resolve follow-ups that are more than 48h past due.""" db = _get_db() cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat() result = db.execute(""" UPDATE memories SET lifecycle_state = 'resolved', follow_up_due = NULL WHERE lifecycle_state = 'pending_followup' AND follow_up_due != 'next_interaction' AND follow_up_due < ? """, (cutoff,)) db.commit() return result.rowcount def auto_archive_old_resolved() -> int: """Archive resolved memories older than 7 days.""" db = _get_db() cutoff = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() result = db.execute(""" UPDATE memories SET lifecycle_state = 'archived' WHERE lifecycle_state = 'resolved' AND created_at < ? """, (cutoff,)) db.commit() return result.rowcount def increment_surfaced_count(memory_id: str) -> int: """Increment surfaced_count and return new value. Auto-resolves if >= 1.""" db = _get_db() row = db.execute( "SELECT surfaced_count FROM memories WHERE id = ?", (memory_id,) ).fetchone() if not row: return 0 new_count = (row["surfaced_count"] or 0) + 1 if new_count >= 2: # Auto-resolve: surfaced twice without user engagement db.execute(""" UPDATE memories SET surfaced_count = ?, lifecycle_state = 'resolved', follow_up_due = NULL WHERE id = ? """, (new_count, memory_id)) else: # Update next_interaction to actual timestamp so the 48h timer starts db.execute(""" UPDATE memories SET surfaced_count = ?, follow_up_due = CASE WHEN follow_up_due = 'next_interaction' THEN ? ELSE follow_up_due END WHERE id = ? """, (new_count, _now_iso(), memory_id)) db.commit() return new_count # --------------------------------------------------------------------------- # Deduplication # --------------------------------------------------------------------------- def find_similar( character_id: str, content: str, memory_type: str = "semantic", threshold: float = 0.85, ) -> dict | None: """Find an existing memory that is semantically similar (>threshold). Returns the matching memory dict or None.""" db = _get_db() query_emb = get_embedding(content) vec_rows = db.execute(""" SELECT id, distance FROM memory_embeddings WHERE embedding MATCH ? AND k = 5 """, (_serialize_f32(query_emb),)).fetchall() for vr in vec_rows: similarity = max(0.0, 1.0 - vr["distance"]) if similarity >= threshold: row = db.execute(""" SELECT * FROM memories WHERE id = ? AND character_id = ? AND memory_type = ? AND lifecycle_state = 'active' """, (vr["id"], character_id, memory_type)).fetchone() if row: return _row_to_dict(row) return None def add_or_merge_memory( character_id: str, content: str, memory_type: str | None = None, category: str = "other", importance: float | None = None, privacy_level: str | None = None, tags: list[str] | None = None, follow_up_due: str | None = None, follow_up_context: str | None = None, source: str = "user_explicit", expires_at: str | None = None, dedup_threshold: float = 0.85, ) -> dict: """Add a memory, or merge with an existing similar one (semantic dedup). For semantic memories, if a similar one exists (>threshold), update it instead of creating a new record.""" resolved_type = memory_type or classify_memory(content)["memory_type"] if resolved_type == "semantic": existing = find_similar(character_id, content, "semantic", dedup_threshold) if existing: updated = update_memory(existing["id"], content=content) if updated: return updated return add_memory( character_id=character_id, content=content, memory_type=memory_type, category=category, importance=importance, privacy_level=privacy_level, tags=tags, follow_up_due=follow_up_due, follow_up_context=follow_up_context, source=source, expires_at=expires_at, ) # --------------------------------------------------------------------------- # Migration from JSON # --------------------------------------------------------------------------- # Mapping from old JSON categories to new memory types _CATEGORY_TO_TYPE = { "preference": "semantic", "personal_info": "semantic", "interaction": "episodic", "emotional": "episodic", "system": "semantic", "tool_usage": "semantic", "home_layout": "semantic", "device": "semantic", "routine": "semantic", "other": "semantic", } _CATEGORY_TO_IMPORTANCE = { "personal_info": 0.7, "preference": 0.6, "emotional": 0.5, "interaction": 0.4, "system": 0.4, "tool_usage": 0.3, "home_layout": 0.5, "device": 0.4, "routine": 0.5, "other": 0.4, } _CATEGORY_TO_PRIVACY = { "emotional": "sensitive", "personal_info": "sensitive", } def migrate_from_json(memories_dir: str | None = None) -> dict: """Migrate all JSON memory files to SQLite. Returns {migrated: int, skipped: int, errors: [str]}.""" db = _get_db() mem_dir = Path(memories_dir) if memories_dir else MEMORIES_DIR migrated = 0 skipped = 0 errors = [] # Migrate personal memories personal_dir = mem_dir / "personal" if personal_dir.exists(): for json_file in personal_dir.glob("*.json"): try: with open(json_file) as f: data = json.load(f) character_id = data.get("characterId", json_file.stem) for mem in data.get("memories", []): content = mem.get("content", "").strip() if not content: skipped += 1 continue category = mem.get("category", "other") created_at = mem.get("createdAt", _now_iso()) try: add_memory( character_id=character_id, content=content, memory_type=_CATEGORY_TO_TYPE.get(category, "semantic"), category=category, importance=_CATEGORY_TO_IMPORTANCE.get(category, 0.5), privacy_level=_CATEGORY_TO_PRIVACY.get(category, "standard"), source="migrated_json", ) # Fix created_at to original value db.execute( "UPDATE memories SET created_at = ? WHERE id = (SELECT id FROM memories ORDER BY rowid DESC LIMIT 1)", (created_at,), ) db.commit() migrated += 1 except Exception as e: errors.append(f"personal/{json_file.name}: {e}") # Rename to backup backup = json_file.with_suffix(".json.bak") json_file.rename(backup) except Exception as e: errors.append(f"personal/{json_file.name}: {e}") # Migrate general memories general_file = mem_dir / "general.json" if general_file.exists(): try: with open(general_file) as f: data = json.load(f) for mem in data.get("memories", []): content = mem.get("content", "").strip() if not content: skipped += 1 continue category = mem.get("category", "other") created_at = mem.get("createdAt", _now_iso()) try: add_memory( character_id="shared", content=content, memory_type=_CATEGORY_TO_TYPE.get(category, "semantic"), category=category, importance=_CATEGORY_TO_IMPORTANCE.get(category, 0.5), privacy_level="standard", source="migrated_json", ) db.execute( "UPDATE memories SET created_at = ? WHERE id = (SELECT id FROM memories ORDER BY rowid DESC LIMIT 1)", (created_at,), ) db.commit() migrated += 1 except Exception as e: errors.append(f"general.json: {e}") backup = general_file.with_suffix(".json.bak") general_file.rename(backup) except Exception as e: errors.append(f"general.json: {e}") return {"migrated": migrated, "skipped": skipped, "errors": errors}