feat: memory v2, prompt styles, Dream/GAZE integration, Wyoming TTS fix

SQLite + sqlite-vec replaces JSON memory files with semantic search,
follow-up injection, privacy levels, and lifecycle management.

Six prompt styles (quick/standard/creative/roleplayer/game-master/storyteller)
with per-style Claude model tiering (Haiku/Sonnet/Opus), temperature control,
and section stripping. Characters can set default style and per-style overrides.

Dream character import and GAZE character linking in the dashboard editor
with auto-populated fields, cover image resolution, and preset assignment.

Bridge: session isolation (conversation_id / 12h satellite buckets),
model routing refactor, PUT/DELETE support, memory REST endpoints.

Dashboard: mobile-responsive sidebar, retry button, style picker in chat,
follow-up banner, memory lifecycle/privacy UI, cloud model options in editor.

Wyoming TTS: upgraded to v1.8.0 for HA 1.7.2 compatibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Aodhan Collins
2026-03-24 22:31:04 +00:00
parent c3bae6fdc0
commit 56580a2cb2
34 changed files with 2891 additions and 467 deletions

View File

@@ -0,0 +1,865 @@
#!/usr/bin/env python3
"""
HomeAI Memory Store — SQLite + Vector Search
Replaces flat JSON memory files with a structured SQLite database
using sqlite-vec for semantic similarity search.
Used by:
- openclaw-http-bridge.py (memory retrieval + follow-up injection)
- memory-ctl skill (CLI memory management)
- Dashboard API (REST endpoints via bridge)
"""
import json
import os
import sqlite3
import struct
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import sqlite_vec
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DATA_DIR = Path(os.environ.get("DATA_DIR", os.path.expanduser("~/homeai-data")))
MEMORIES_DIR = DATA_DIR / "memories"
DB_PATH = MEMORIES_DIR / "memories.db"
EMBEDDING_DIM = 384 # all-MiniLM-L6-v2
# Privacy keywords for rule-based classification
PRIVACY_KEYWORDS = {
"local_only": [
"health", "illness", "sick", "doctor", "medical", "medication", "surgery",
"salary", "bank", "financial", "debt", "mortgage", "tax",
"depression", "anxiety", "therapy", "divorce", "breakup",
],
"sensitive": [
"address", "phone", "email", "password", "birthday",
],
}
# ---------------------------------------------------------------------------
# Embedding model (lazy-loaded singleton)
# ---------------------------------------------------------------------------
_embedder = None
def _get_embedder():
"""Lazy-load the sentence-transformers model."""
global _embedder
if _embedder is None:
from sentence_transformers import SentenceTransformer
_embedder = SentenceTransformer("all-MiniLM-L6-v2")
return _embedder
def get_embedding(text: str) -> list[float]:
"""Compute a 384-dim embedding for the given text."""
model = _get_embedder()
vec = model.encode(text, normalize_embeddings=True)
return vec.tolist()
def _serialize_f32(vec: list[float]) -> bytes:
"""Serialize a float list to little-endian bytes for sqlite-vec."""
return struct.pack(f"<{len(vec)}f", *vec)
def _deserialize_f32(blob: bytes) -> list[float]:
"""Deserialize sqlite-vec float bytes back to a list."""
n = len(blob) // 4
return list(struct.unpack(f"<{n}f", blob))
# ---------------------------------------------------------------------------
# Database initialization
# ---------------------------------------------------------------------------
_db: Optional[sqlite3.Connection] = None
def init_db() -> sqlite3.Connection:
"""Initialize the SQLite database with schema and sqlite-vec extension."""
global _db
if _db is not None:
return _db
MEMORIES_DIR.mkdir(parents=True, exist_ok=True)
db = sqlite3.connect(str(DB_PATH), check_same_thread=False)
db.enable_load_extension(True)
sqlite_vec.load(db)
db.enable_load_extension(False)
db.row_factory = sqlite3.Row
db.executescript("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
character_id TEXT NOT NULL,
content TEXT NOT NULL,
memory_type TEXT NOT NULL DEFAULT 'semantic',
category TEXT NOT NULL DEFAULT 'other',
privacy_level TEXT NOT NULL DEFAULT 'standard',
importance REAL NOT NULL DEFAULT 0.5,
lifecycle_state TEXT NOT NULL DEFAULT 'active',
follow_up_due TEXT,
follow_up_context TEXT,
source TEXT DEFAULT 'user_explicit',
created_at TEXT NOT NULL,
last_accessed TEXT,
expires_at TEXT,
previous_value TEXT,
tags TEXT,
surfaced_count INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_memories_character
ON memories(character_id);
CREATE INDEX IF NOT EXISTS idx_memories_lifecycle
ON memories(lifecycle_state);
CREATE INDEX IF NOT EXISTS idx_memories_type
ON memories(memory_type);
""")
# Create the vec0 virtual table for vector search
# sqlite-vec requires this specific syntax
db.execute(f"""
CREATE VIRTUAL TABLE IF NOT EXISTS memory_embeddings USING vec0(
id TEXT PRIMARY KEY,
embedding float[{EMBEDDING_DIM}]
)
""")
# Partial index for follow-ups (created manually since executescript can't
# handle IF NOT EXISTS for partial indexes cleanly on all versions)
try:
db.execute("""
CREATE INDEX idx_memories_followup
ON memories(lifecycle_state, follow_up_due)
WHERE lifecycle_state = 'pending_followup'
""")
except sqlite3.OperationalError:
pass # index already exists
db.commit()
_db = db
return db
def _get_db() -> sqlite3.Connection:
"""Get or initialize the database connection."""
if _db is None:
return init_db()
return _db
def _row_to_dict(row: sqlite3.Row) -> dict:
"""Convert a sqlite3.Row to a plain dict."""
return dict(row)
def _generate_id() -> str:
"""Generate a unique memory ID."""
return f"m_{int(time.time() * 1000)}"
def _now_iso() -> str:
"""Current UTC time as ISO string."""
return datetime.now(timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# Write-time classification (rule-based, Phase 1)
# ---------------------------------------------------------------------------
def classify_memory(content: str) -> dict:
"""Rule-based classification for memory properties.
Returns defaults that can be overridden by explicit parameters."""
content_lower = content.lower()
# Privacy detection
privacy = "standard"
for level, keywords in PRIVACY_KEYWORDS.items():
if any(kw in content_lower for kw in keywords):
privacy = level
break
# Memory type detection
memory_type = "semantic"
temporal_markers = [
"today", "yesterday", "tonight", "this morning", "just now",
"feeling", "right now", "this week", "earlier",
]
if any(kw in content_lower for kw in temporal_markers):
memory_type = "episodic"
# Importance heuristic
importance = 0.5
if privacy == "local_only":
importance = 0.7
elif privacy == "sensitive":
importance = 0.6
return {
"memory_type": memory_type,
"privacy_level": privacy,
"importance": importance,
}
# ---------------------------------------------------------------------------
# CRUD operations
# ---------------------------------------------------------------------------
def add_memory(
character_id: str,
content: str,
memory_type: str | None = None,
category: str = "other",
importance: float | None = None,
privacy_level: str | None = None,
tags: list[str] | None = None,
follow_up_due: str | None = None,
follow_up_context: str | None = None,
source: str = "user_explicit",
expires_at: str | None = None,
) -> dict:
"""Add a new memory record. Auto-classifies fields not explicitly set."""
db = _get_db()
classified = classify_memory(content)
memory_type = memory_type or classified["memory_type"]
privacy_level = privacy_level or classified["privacy_level"]
importance = importance if importance is not None else classified["importance"]
lifecycle_state = "active"
if follow_up_due or follow_up_context:
lifecycle_state = "pending_followup"
if not follow_up_due:
follow_up_due = "next_interaction"
mem_id = _generate_id()
now = _now_iso()
# Generate embedding
embedding = get_embedding(content)
db.execute("""
INSERT INTO memories (
id, character_id, content, memory_type, category,
privacy_level, importance, lifecycle_state,
follow_up_due, follow_up_context, source,
created_at, tags, surfaced_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0)
""", (
mem_id, character_id, content, memory_type, category,
privacy_level, importance, lifecycle_state,
follow_up_due, follow_up_context, source,
now, json.dumps(tags) if tags else None,
))
# Insert embedding into vec0 table
db.execute(
"INSERT INTO memory_embeddings (id, embedding) VALUES (?, ?)",
(mem_id, _serialize_f32(embedding)),
)
db.commit()
return {
"id": mem_id,
"character_id": character_id,
"content": content,
"memory_type": memory_type,
"category": category,
"privacy_level": privacy_level,
"importance": importance,
"lifecycle_state": lifecycle_state,
"follow_up_due": follow_up_due,
"follow_up_context": follow_up_context,
"source": source,
"created_at": now,
"tags": tags,
}
def update_memory(memory_id: str, **fields) -> dict | None:
"""Update specific fields on a memory record."""
db = _get_db()
# Validate that memory exists
row = db.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone()
if not row:
return None
allowed = {
"content", "memory_type", "category", "privacy_level", "importance",
"lifecycle_state", "follow_up_due", "follow_up_context", "source",
"last_accessed", "expires_at", "previous_value", "tags", "surfaced_count",
}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
return _row_to_dict(row)
# If content changed, update embedding and store previous value
if "content" in updates:
updates["previous_value"] = row["content"]
embedding = get_embedding(updates["content"])
# Update vec0 table: delete old, insert new
db.execute("DELETE FROM memory_embeddings WHERE id = ?", (memory_id,))
db.execute(
"INSERT INTO memory_embeddings (id, embedding) VALUES (?, ?)",
(memory_id, _serialize_f32(embedding)),
)
if "tags" in updates and isinstance(updates["tags"], list):
updates["tags"] = json.dumps(updates["tags"])
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [memory_id]
db.execute(f"UPDATE memories SET {set_clause} WHERE id = ?", values)
db.commit()
row = db.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone()
return _row_to_dict(row) if row else None
def delete_memory(memory_id: str) -> bool:
"""Delete a memory record and its embedding."""
db = _get_db()
row = db.execute("SELECT id FROM memories WHERE id = ?", (memory_id,)).fetchone()
if not row:
return False
db.execute("DELETE FROM memories WHERE id = ?", (memory_id,))
db.execute("DELETE FROM memory_embeddings WHERE id = ?", (memory_id,))
db.commit()
return True
# ---------------------------------------------------------------------------
# Retrieval
# ---------------------------------------------------------------------------
def retrieve_memories(
character_id: str,
context_text: str = "",
limit: int = 20,
exclude_private_for_cloud: bool = False,
) -> list[dict]:
"""Dual retrieval: semantic similarity + recency, merged and ranked.
If context_text is empty, falls back to recency-only retrieval.
"""
db = _get_db()
privacy_filter = ""
if exclude_private_for_cloud:
privacy_filter = "AND m.privacy_level != 'local_only'"
# Always include high-importance memories
high_importance = db.execute(f"""
SELECT * FROM memories m
WHERE m.character_id = ?
AND m.lifecycle_state IN ('active', 'pending_followup')
AND m.importance > 0.8
{privacy_filter}
ORDER BY m.created_at DESC
LIMIT 5
""", (character_id,)).fetchall()
seen_ids = {r["id"] for r in high_importance}
results = {r["id"]: {**_row_to_dict(r), "_score": 1.0} for r in high_importance}
# Semantic search (if context provided and embeddings exist)
if context_text:
try:
query_emb = get_embedding(context_text)
vec_rows = db.execute("""
SELECT id, distance
FROM memory_embeddings
WHERE embedding MATCH ?
AND k = 30
""", (_serialize_f32(query_emb),)).fetchall()
vec_ids = [r["id"] for r in vec_rows if r["id"] not in seen_ids]
vec_distances = {r["id"]: r["distance"] for r in vec_rows}
if vec_ids:
placeholders = ",".join("?" * len(vec_ids))
sem_rows = db.execute(f"""
SELECT * FROM memories m
WHERE m.id IN ({placeholders})
AND m.character_id = ?
AND m.lifecycle_state IN ('active', 'pending_followup')
{privacy_filter}
""", (*vec_ids, character_id)).fetchall()
for r in sem_rows:
d = _row_to_dict(r)
# Convert cosine distance to similarity (sqlite-vec returns L2 distance for vec0)
dist = vec_distances.get(r["id"], 1.0)
semantic_score = max(0.0, 1.0 - dist)
d["_score"] = 0.6 * semantic_score + 0.1 * d["importance"]
results[r["id"]] = d
seen_ids.add(r["id"])
except Exception as e:
print(f"[MemoryStore] Vector search error: {e}")
# Recency search: last 7 days, ordered by importance + recency
recency_rows = db.execute(f"""
SELECT * FROM memories m
WHERE m.character_id = ?
AND m.lifecycle_state IN ('active', 'pending_followup')
AND m.created_at > datetime('now', '-7 days')
{privacy_filter}
ORDER BY m.importance DESC, m.created_at DESC
LIMIT 10
""", (character_id,)).fetchall()
for r in recency_rows:
if r["id"] not in seen_ids:
d = _row_to_dict(r)
# Recency score based on age in days (newer = higher)
try:
created = datetime.fromisoformat(d["created_at"])
age_days = (datetime.now(timezone.utc) - created).total_seconds() / 86400
recency_score = max(0.0, 1.0 - (age_days / 7.0))
except (ValueError, TypeError):
recency_score = 0.5
d["_score"] = 0.3 * recency_score + 0.1 * d["importance"]
results[r["id"]] = d
seen_ids.add(r["id"])
# Sort by score descending, return top N
ranked = sorted(results.values(), key=lambda x: x.get("_score", 0), reverse=True)
# Update last_accessed for returned memories
returned = ranked[:limit]
now = _now_iso()
for mem in returned:
mem.pop("_score", None)
db.execute(
"UPDATE memories SET last_accessed = ? WHERE id = ?",
(now, mem["id"]),
)
db.commit()
return returned
def get_pending_followups(character_id: str) -> list[dict]:
"""Get follow-up memories that are due for surfacing."""
db = _get_db()
now = _now_iso()
rows = db.execute("""
SELECT * FROM memories
WHERE character_id = ?
AND lifecycle_state = 'pending_followup'
AND (follow_up_due <= ? OR follow_up_due = 'next_interaction')
ORDER BY importance DESC, created_at DESC
LIMIT 5
""", (character_id, now)).fetchall()
return [_row_to_dict(r) for r in rows]
def search_memories(
character_id: str,
query: str,
memory_type: str | None = None,
limit: int = 10,
) -> list[dict]:
"""Semantic search for memories matching a query."""
db = _get_db()
query_emb = get_embedding(query)
vec_rows = db.execute("""
SELECT id, distance
FROM memory_embeddings
WHERE embedding MATCH ?
AND k = ?
""", (_serialize_f32(query_emb), limit * 3)).fetchall()
if not vec_rows:
return []
vec_ids = [r["id"] for r in vec_rows]
vec_distances = {r["id"]: r["distance"] for r in vec_rows}
placeholders = ",".join("?" * len(vec_ids))
type_filter = "AND m.memory_type = ?" if memory_type else ""
params = [*vec_ids, character_id]
if memory_type:
params.append(memory_type)
rows = db.execute(f"""
SELECT * FROM memories m
WHERE m.id IN ({placeholders})
AND m.character_id = ?
{type_filter}
ORDER BY m.created_at DESC
""", params).fetchall()
# Sort by similarity
results = []
for r in rows:
d = _row_to_dict(r)
d["_distance"] = vec_distances.get(r["id"], 1.0)
results.append(d)
results.sort(key=lambda x: x["_distance"])
for r in results:
r.pop("_distance", None)
return results[:limit]
def list_memories(
character_id: str,
memory_type: str | None = None,
lifecycle_state: str | None = None,
category: str | None = None,
limit: int = 20,
offset: int = 0,
) -> list[dict]:
"""List memories with optional filters."""
db = _get_db()
conditions = ["character_id = ?"]
params: list = [character_id]
if memory_type:
conditions.append("memory_type = ?")
params.append(memory_type)
if lifecycle_state:
conditions.append("lifecycle_state = ?")
params.append(lifecycle_state)
if category:
conditions.append("category = ?")
params.append(category)
where = " AND ".join(conditions)
params.extend([limit, offset])
rows = db.execute(f"""
SELECT * FROM memories
WHERE {where}
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", params).fetchall()
return [_row_to_dict(r) for r in rows]
def count_memories(character_id: str) -> int:
"""Count memories for a character."""
db = _get_db()
row = db.execute(
"SELECT COUNT(*) as cnt FROM memories WHERE character_id = ?",
(character_id,),
).fetchone()
return row["cnt"] if row else 0
# ---------------------------------------------------------------------------
# Lifecycle management
# ---------------------------------------------------------------------------
def resolve_followup(memory_id: str) -> bool:
"""Mark a follow-up as resolved."""
db = _get_db()
result = db.execute("""
UPDATE memories
SET lifecycle_state = 'resolved',
follow_up_due = NULL
WHERE id = ? AND lifecycle_state = 'pending_followup'
""", (memory_id,))
db.commit()
return result.rowcount > 0
def archive_memory(memory_id: str) -> bool:
"""Archive a memory (keeps it for relational inference, not surfaced)."""
db = _get_db()
result = db.execute("""
UPDATE memories
SET lifecycle_state = 'archived'
WHERE id = ?
""", (memory_id,))
db.commit()
return result.rowcount > 0
def auto_resolve_expired_followups() -> int:
"""Auto-resolve follow-ups that are more than 48h past due."""
db = _get_db()
cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
result = db.execute("""
UPDATE memories
SET lifecycle_state = 'resolved',
follow_up_due = NULL
WHERE lifecycle_state = 'pending_followup'
AND follow_up_due != 'next_interaction'
AND follow_up_due < ?
""", (cutoff,))
db.commit()
return result.rowcount
def auto_archive_old_resolved() -> int:
"""Archive resolved memories older than 7 days."""
db = _get_db()
cutoff = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat()
result = db.execute("""
UPDATE memories
SET lifecycle_state = 'archived'
WHERE lifecycle_state = 'resolved'
AND created_at < ?
""", (cutoff,))
db.commit()
return result.rowcount
def increment_surfaced_count(memory_id: str) -> int:
"""Increment surfaced_count and return new value. Auto-resolves if >= 1."""
db = _get_db()
row = db.execute(
"SELECT surfaced_count FROM memories WHERE id = ?", (memory_id,)
).fetchone()
if not row:
return 0
new_count = (row["surfaced_count"] or 0) + 1
if new_count >= 2:
# Auto-resolve: surfaced twice without user engagement
db.execute("""
UPDATE memories
SET surfaced_count = ?, lifecycle_state = 'resolved', follow_up_due = NULL
WHERE id = ?
""", (new_count, memory_id))
else:
# Update next_interaction to actual timestamp so the 48h timer starts
db.execute("""
UPDATE memories
SET surfaced_count = ?,
follow_up_due = CASE
WHEN follow_up_due = 'next_interaction' THEN ?
ELSE follow_up_due
END
WHERE id = ?
""", (new_count, _now_iso(), memory_id))
db.commit()
return new_count
# ---------------------------------------------------------------------------
# Deduplication
# ---------------------------------------------------------------------------
def find_similar(
character_id: str,
content: str,
memory_type: str = "semantic",
threshold: float = 0.85,
) -> dict | None:
"""Find an existing memory that is semantically similar (>threshold).
Returns the matching memory dict or None."""
db = _get_db()
query_emb = get_embedding(content)
vec_rows = db.execute("""
SELECT id, distance
FROM memory_embeddings
WHERE embedding MATCH ?
AND k = 5
""", (_serialize_f32(query_emb),)).fetchall()
for vr in vec_rows:
similarity = max(0.0, 1.0 - vr["distance"])
if similarity >= threshold:
row = db.execute("""
SELECT * FROM memories
WHERE id = ? AND character_id = ? AND memory_type = ?
AND lifecycle_state = 'active'
""", (vr["id"], character_id, memory_type)).fetchone()
if row:
return _row_to_dict(row)
return None
def add_or_merge_memory(
character_id: str,
content: str,
memory_type: str | None = None,
category: str = "other",
importance: float | None = None,
privacy_level: str | None = None,
tags: list[str] | None = None,
follow_up_due: str | None = None,
follow_up_context: str | None = None,
source: str = "user_explicit",
expires_at: str | None = None,
dedup_threshold: float = 0.85,
) -> dict:
"""Add a memory, or merge with an existing similar one (semantic dedup).
For semantic memories, if a similar one exists (>threshold), update it
instead of creating a new record."""
resolved_type = memory_type or classify_memory(content)["memory_type"]
if resolved_type == "semantic":
existing = find_similar(character_id, content, "semantic", dedup_threshold)
if existing:
updated = update_memory(existing["id"], content=content)
if updated:
return updated
return add_memory(
character_id=character_id,
content=content,
memory_type=memory_type,
category=category,
importance=importance,
privacy_level=privacy_level,
tags=tags,
follow_up_due=follow_up_due,
follow_up_context=follow_up_context,
source=source,
expires_at=expires_at,
)
# ---------------------------------------------------------------------------
# Migration from JSON
# ---------------------------------------------------------------------------
# Mapping from old JSON categories to new memory types
_CATEGORY_TO_TYPE = {
"preference": "semantic",
"personal_info": "semantic",
"interaction": "episodic",
"emotional": "episodic",
"system": "semantic",
"tool_usage": "semantic",
"home_layout": "semantic",
"device": "semantic",
"routine": "semantic",
"other": "semantic",
}
_CATEGORY_TO_IMPORTANCE = {
"personal_info": 0.7,
"preference": 0.6,
"emotional": 0.5,
"interaction": 0.4,
"system": 0.4,
"tool_usage": 0.3,
"home_layout": 0.5,
"device": 0.4,
"routine": 0.5,
"other": 0.4,
}
_CATEGORY_TO_PRIVACY = {
"emotional": "sensitive",
"personal_info": "sensitive",
}
def migrate_from_json(memories_dir: str | None = None) -> dict:
"""Migrate all JSON memory files to SQLite.
Returns {migrated: int, skipped: int, errors: [str]}."""
db = _get_db()
mem_dir = Path(memories_dir) if memories_dir else MEMORIES_DIR
migrated = 0
skipped = 0
errors = []
# Migrate personal memories
personal_dir = mem_dir / "personal"
if personal_dir.exists():
for json_file in personal_dir.glob("*.json"):
try:
with open(json_file) as f:
data = json.load(f)
character_id = data.get("characterId", json_file.stem)
for mem in data.get("memories", []):
content = mem.get("content", "").strip()
if not content:
skipped += 1
continue
category = mem.get("category", "other")
created_at = mem.get("createdAt", _now_iso())
try:
add_memory(
character_id=character_id,
content=content,
memory_type=_CATEGORY_TO_TYPE.get(category, "semantic"),
category=category,
importance=_CATEGORY_TO_IMPORTANCE.get(category, 0.5),
privacy_level=_CATEGORY_TO_PRIVACY.get(category, "standard"),
source="migrated_json",
)
# Fix created_at to original value
db.execute(
"UPDATE memories SET created_at = ? WHERE id = (SELECT id FROM memories ORDER BY rowid DESC LIMIT 1)",
(created_at,),
)
db.commit()
migrated += 1
except Exception as e:
errors.append(f"personal/{json_file.name}: {e}")
# Rename to backup
backup = json_file.with_suffix(".json.bak")
json_file.rename(backup)
except Exception as e:
errors.append(f"personal/{json_file.name}: {e}")
# Migrate general memories
general_file = mem_dir / "general.json"
if general_file.exists():
try:
with open(general_file) as f:
data = json.load(f)
for mem in data.get("memories", []):
content = mem.get("content", "").strip()
if not content:
skipped += 1
continue
category = mem.get("category", "other")
created_at = mem.get("createdAt", _now_iso())
try:
add_memory(
character_id="shared",
content=content,
memory_type=_CATEGORY_TO_TYPE.get(category, "semantic"),
category=category,
importance=_CATEGORY_TO_IMPORTANCE.get(category, 0.5),
privacy_level="standard",
source="migrated_json",
)
db.execute(
"UPDATE memories SET created_at = ? WHERE id = (SELECT id FROM memories ORDER BY rowid DESC LIMIT 1)",
(created_at,),
)
db.commit()
migrated += 1
except Exception as e:
errors.append(f"general.json: {e}")
backup = general_file.with_suffix(".json.bak")
general_file.rename(backup)
except Exception as e:
errors.append(f"general.json: {e}")
return {"migrated": migrated, "skipped": skipped, "errors": errors}