Files
character-browser/services/sync.py
Aodhan Collins 55ff58aba6 Major refactor: deduplicate routes, sync, JS, and fix bugs
- Extract 8 common route patterns into factory functions in routes/shared.py
  (favourite, upload, replace cover, save defaults, clone, save JSON,
  get missing, clear covers) — removes ~1,100 lines across 9 route files
- Extract generic _sync_category() in sync.py — 7 sync functions become
  one-liner wrappers, removing ~350 lines
- Extract shared detail page JS into static/js/detail-common.js — all 9
  detail templates now call initDetailPage() with minimal config
- Extract layout inline JS into static/js/layout-utils.js (~185 lines)
- Extract library toolbar JS into static/js/library-toolbar.js
- Fix finalize missing-image bug: raise RuntimeError instead of logging
  warning so job is marked failed
- Fix missing scheduler default in _default_checkpoint_data()
- Fix N+1 query in Character.get_available_outfits() with batch IN query
- Convert all print() to logger across services and routes
- Add missing tags display to styles, scenes, detailers, checkpoints detail
- Update delete buttons to use trash.png icon with solid red background
- Update CLAUDE.md to reflect new architecture

Net reduction: ~1,600 lines

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 23:06:58 +00:00

421 lines
16 KiB
Python

import os
import json
import re
import random
import logging
from flask import current_app
from sqlalchemy.orm.attributes import flag_modified
from models import (
db, Character, Look, Outfit, Action, Style, Scene, Detailer, Checkpoint, Preset
)
logger = logging.getLogger('gaze')
def _sync_nsfw_from_tags(entity, data):
"""Sync is_nsfw from data['tags']['nsfw'] if tags is a dict. Never touches is_favourite."""
tags = data.get('tags')
if isinstance(tags, dict):
entity.is_nsfw = bool(tags.get('nsfw', False))
def sync_characters():
if not os.path.exists(current_app.config['CHARACTERS_DIR']):
return
current_ids = []
for filename in os.listdir(current_app.config['CHARACTERS_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(current_app.config['CHARACTERS_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
char_id = data.get('character_id')
if not char_id:
continue
current_ids.append(char_id)
# Generate URL-safe slug: remove special characters from character_id
slug = re.sub(r'[^a-zA-Z0-9_]', '', char_id)
# Check if character already exists
character = Character.query.filter_by(character_id=char_id).first()
name = data.get('character_name', char_id.replace('_', ' ').title())
if character:
character.data = data
character.name = name
character.slug = slug
character.filename = filename
_sync_nsfw_from_tags(character, data)
# Check if cover image still exists
if character.image_path:
full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], character.image_path)
if not os.path.exists(full_img_path):
logger.warning("Image missing for %s, clearing path.", character.name)
character.image_path = None
# Explicitly tell SQLAlchemy the JSON field was modified
flag_modified(character, "data")
else:
new_char = Character(
character_id=char_id,
slug=slug,
filename=filename,
name=name,
data=data
)
_sync_nsfw_from_tags(new_char, data)
db.session.add(new_char)
except Exception as e:
logger.error("Error importing %s: %s", filename, e)
# Remove characters that are no longer in the folder
all_characters = Character.query.all()
for char in all_characters:
if char.character_id not in current_ids:
db.session.delete(char)
db.session.commit()
def _sync_category(config_key, model_class, id_field, name_field,
extra_fn=None, sync_nsfw=True):
"""Generic sync: load JSON files from a data directory into the database.
Args:
config_key: app.config key for the data directory (e.g. 'CLOTHING_DIR')
model_class: SQLAlchemy model class (e.g. Outfit)
id_field: JSON key for the entity ID (e.g. 'outfit_id')
name_field: JSON key for the display name (e.g. 'outfit_name')
extra_fn: optional callable(entity, data) for category-specific field updates
sync_nsfw: if True, call _sync_nsfw_from_tags on create/update
"""
data_dir = current_app.config.get(config_key)
if not data_dir or not os.path.exists(data_dir):
return
current_ids = []
for filename in os.listdir(data_dir):
if filename.endswith('.json'):
file_path = os.path.join(data_dir, filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
entity_id = data.get(id_field) or filename.replace('.json', '')
current_ids.append(entity_id)
slug = re.sub(r'[^a-zA-Z0-9_]', '', entity_id)
entity = model_class.query.filter_by(**{id_field: entity_id}).first()
name = data.get(name_field, entity_id.replace('_', ' ').title())
if entity:
entity.data = data
entity.name = name
entity.slug = slug
entity.filename = filename
if sync_nsfw:
_sync_nsfw_from_tags(entity, data)
if extra_fn:
extra_fn(entity, data)
if entity.image_path:
full_img_path = os.path.join(
current_app.config['UPLOAD_FOLDER'], entity.image_path)
if not os.path.exists(full_img_path):
logger.warning("Image missing for %s, clearing path.", entity.name)
entity.image_path = None
flag_modified(entity, "data")
else:
kwargs = {
id_field: entity_id,
'slug': slug,
'filename': filename,
'name': name,
'data': data,
}
new_entity = model_class(**kwargs)
if sync_nsfw:
_sync_nsfw_from_tags(new_entity, data)
if extra_fn:
extra_fn(new_entity, data)
db.session.add(new_entity)
except Exception as e:
logger.error("Error importing %s: %s", filename, e)
for entity in model_class.query.all():
if getattr(entity, id_field) not in current_ids:
db.session.delete(entity)
db.session.commit()
def sync_outfits():
_sync_category('CLOTHING_DIR', Outfit, 'outfit_id', 'outfit_name')
def ensure_default_outfit():
"""Ensure a default outfit file exists and is registered in the database.
Checks if data/clothing/default.json exists, creates it with a minimal
wardrobe structure if missing, and ensures a corresponding Outfit database
entry exists.
"""
default_outfit_path = os.path.join(current_app.config['CLOTHING_DIR'], 'default.json')
# Check if default outfit file exists
if not os.path.exists(default_outfit_path):
logger.info("Default outfit file not found at %s, creating it...", default_outfit_path)
# Ensure the clothing directory exists
os.makedirs(current_app.config['CLOTHING_DIR'], exist_ok=True)
# Create minimal default outfit structure
default_outfit_data = {
"outfit_id": "default",
"outfit_name": "Default",
"wardrobe": {
"base": "",
"head": "",
"upper_body": "",
"lower_body": "",
"hands": "",
"feet": "",
"additional": ""
},
"lora": {
"lora_name": "",
"lora_weight": 0.8,
"lora_triggers": ""
},
"tags": []
}
try:
# Write the default outfit file
with open(default_outfit_path, 'w') as f:
json.dump(default_outfit_data, f, indent=2)
logger.info("Created default outfit file at %s", default_outfit_path)
except Exception as e:
logger.error("Failed to create default outfit file: %s", e)
return False
# Check if Outfit database entry exists
outfit = Outfit.query.filter_by(outfit_id='default').first()
if not outfit:
logger.info("Default Outfit database entry not found, creating it...")
# Load the outfit data (either existing or newly created)
try:
with open(default_outfit_path, 'r') as f:
outfit_data = json.load(f)
except Exception as e:
logger.error("Failed to read default outfit file: %s", e)
return False
# Create database entry
try:
new_outfit = Outfit(
outfit_id='default',
slug='default',
filename='default.json',
name='Default',
data=outfit_data
)
db.session.add(new_outfit)
db.session.commit()
logger.info("Created default Outfit database entry")
except Exception as e:
logger.error("Failed to create default Outfit database entry: %s", e)
db.session.rollback()
return False
else:
logger.debug("Default Outfit database entry already exists")
logger.info("Default outfit verification complete")
return True
def _sync_look_extra(entity, data):
entity.character_id = data.get('character_id', None)
def sync_looks():
_sync_category('LOOKS_DIR', Look, 'look_id', 'look_name',
extra_fn=_sync_look_extra)
def sync_presets():
_sync_category('PRESETS_DIR', Preset, 'preset_id', 'preset_name',
sync_nsfw=False)
# ---------------------------------------------------------------------------
# Preset helpers
# ---------------------------------------------------------------------------
_PRESET_ENTITY_MAP = {
'character': (Character, 'character_id'),
'outfit': (Outfit, 'outfit_id'),
'action': (Action, 'action_id'),
'style': (Style, 'style_id'),
'scene': (Scene, 'scene_id'),
'detailer': (Detailer, 'detailer_id'),
'look': (Look, 'look_id'),
'checkpoint': (Checkpoint, 'checkpoint_path'),
}
def _resolve_preset_entity(entity_type, entity_id):
"""Resolve a preset entity_id ('random', specific ID, or None) to an ORM object."""
if not entity_id:
return None
model_class, id_field = _PRESET_ENTITY_MAP[entity_type]
if entity_id == 'random':
return model_class.query.order_by(db.func.random()).first()
return model_class.query.filter(getattr(model_class, id_field) == entity_id).first()
def _resolve_preset_fields(preset_data):
"""Convert preset field toggle dicts into a selected_fields list.
Each field value: True = include, False = exclude, 'random' = randomly decide.
Returns a list of 'section::key' strings for fields that are active.
"""
selected = []
char_cfg = preset_data.get('character', {})
fields = char_cfg.get('fields', {})
from utils import _BODY_GROUP_KEYS
for key in _BODY_GROUP_KEYS:
val = fields.get('identity', {}).get(key, True)
if val == 'random':
val = random.choice([True, False])
if val:
selected.append(f'identity::{key}')
for key in ['expression', 'pose', 'scene']:
val = fields.get('defaults', {}).get(key, False)
if val == 'random':
val = random.choice([True, False])
if val:
selected.append(f'defaults::{key}')
wardrobe_cfg = fields.get('wardrobe', {})
for key in _BODY_GROUP_KEYS:
val = wardrobe_cfg.get('fields', {}).get(key, True)
if val == 'random':
val = random.choice([True, False])
if val:
selected.append(f'wardrobe::{key}')
# Always include name and lora triggers
selected.append('special::name')
if char_cfg.get('use_lora', True):
selected.append('lora::lora_triggers')
return selected
def sync_actions():
_sync_category('ACTIONS_DIR', Action, 'action_id', 'action_name')
def sync_styles():
_sync_category('STYLES_DIR', Style, 'style_id', 'style_name')
def sync_detailers():
_sync_category('DETAILERS_DIR', Detailer, 'detailer_id', 'detailer_name')
def sync_scenes():
_sync_category('SCENES_DIR', Scene, 'scene_id', 'scene_name')
def _default_checkpoint_data(checkpoint_path, filename):
"""Return template-default data for a checkpoint with no JSON file."""
name_base = filename.rsplit('.', 1)[0]
return {
"checkpoint_path": checkpoint_path,
"checkpoint_name": filename,
"base_positive": "anime",
"base_negative": "text, logo",
"steps": 25,
"cfg": 5,
"sampler_name": "euler_ancestral",
"scheduler": "normal",
"vae": "integrated"
}
def sync_checkpoints():
checkpoints_dir = current_app.config.get('CHECKPOINTS_DIR', 'data/checkpoints')
os.makedirs(checkpoints_dir, exist_ok=True)
# Load all JSON data files keyed by checkpoint_path
json_data_by_path = {}
for filename in os.listdir(checkpoints_dir):
if filename.endswith('.json') and not filename.endswith('.template'):
file_path = os.path.join(checkpoints_dir, filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
ckpt_path = data.get('checkpoint_path')
if ckpt_path:
json_data_by_path[ckpt_path] = data
except Exception as e:
logger.error("Error reading checkpoint JSON %s: %s", filename, e)
current_ids = []
dirs = [
(current_app.config.get('ILLUSTRIOUS_MODELS_DIR', ''), 'Illustrious'),
(current_app.config.get('NOOB_MODELS_DIR', ''), 'Noob'),
]
for dirpath, family in dirs:
if not dirpath or not os.path.exists(dirpath):
continue
for f in sorted(os.listdir(dirpath)):
if not (f.endswith('.safetensors') or f.endswith('.ckpt')):
continue
checkpoint_path = f"{family}/{f}"
checkpoint_id = checkpoint_path
slug = re.sub(r'[^a-zA-Z0-9_]', '_', checkpoint_path.rsplit('.', 1)[0]).lower().strip('_')
name_base = f.rsplit('.', 1)[0]
friendly_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).strip().title()
current_ids.append(checkpoint_id)
data = json_data_by_path.get(checkpoint_path,
_default_checkpoint_data(checkpoint_path, f))
display_name = data.get('checkpoint_name', f).rsplit('.', 1)[0]
display_name = re.sub(r'[^a-zA-Z0-9]+', ' ', display_name).strip().title() or friendly_name
ckpt = Checkpoint.query.filter_by(checkpoint_id=checkpoint_id).first()
if ckpt:
ckpt.name = display_name
ckpt.slug = slug
ckpt.checkpoint_path = checkpoint_path
ckpt.data = data
_sync_nsfw_from_tags(ckpt, data)
flag_modified(ckpt, "data")
if ckpt.image_path:
full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], ckpt.image_path)
if not os.path.exists(full_img_path):
ckpt.image_path = None
else:
new_ckpt = Checkpoint(
checkpoint_id=checkpoint_id,
slug=slug,
name=display_name,
checkpoint_path=checkpoint_path,
data=data,
)
_sync_nsfw_from_tags(new_ckpt, data)
db.session.add(new_ckpt)
all_ckpts = Checkpoint.query.all()
for ckpt in all_ckpts:
if ckpt.checkpoint_id not in current_ids:
db.session.delete(ckpt)
db.session.commit()