import os import json import re import random import logging from flask import current_app from sqlalchemy.orm.attributes import flag_modified from models import ( db, Character, Look, Outfit, Action, Style, Scene, Detailer, Checkpoint, Preset ) logger = logging.getLogger('gaze') def _sync_nsfw_from_tags(entity, data): """Sync is_nsfw from data['tags']['nsfw'] if tags is a dict. Never touches is_favourite.""" tags = data.get('tags') if isinstance(tags, dict): entity.is_nsfw = bool(tags.get('nsfw', False)) def sync_characters(): if not os.path.exists(current_app.config['CHARACTERS_DIR']): return current_ids = [] for filename in os.listdir(current_app.config['CHARACTERS_DIR']): if filename.endswith('.json'): file_path = os.path.join(current_app.config['CHARACTERS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) char_id = data.get('character_id') if not char_id: continue current_ids.append(char_id) # Generate URL-safe slug: remove special characters from character_id slug = re.sub(r'[^a-zA-Z0-9_]', '', char_id) # Check if character already exists character = Character.query.filter_by(character_id=char_id).first() name = data.get('character_name', char_id.replace('_', ' ').title()) if character: character.data = data character.name = name character.slug = slug character.filename = filename _sync_nsfw_from_tags(character, data) # Check if cover image still exists if character.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], character.image_path) if not os.path.exists(full_img_path): logger.warning("Image missing for %s, clearing path.", character.name) character.image_path = None # Explicitly tell SQLAlchemy the JSON field was modified flag_modified(character, "data") else: new_char = Character( character_id=char_id, slug=slug, filename=filename, name=name, data=data ) _sync_nsfw_from_tags(new_char, data) db.session.add(new_char) except Exception as e: logger.error("Error importing %s: %s", filename, e) # Remove characters that are no longer in the folder all_characters = Character.query.all() for char in all_characters: if char.character_id not in current_ids: db.session.delete(char) db.session.commit() def _sync_category(config_key, model_class, id_field, name_field, extra_fn=None, sync_nsfw=True): """Generic sync: load JSON files from a data directory into the database. Args: config_key: app.config key for the data directory (e.g. 'CLOTHING_DIR') model_class: SQLAlchemy model class (e.g. Outfit) id_field: JSON key for the entity ID (e.g. 'outfit_id') name_field: JSON key for the display name (e.g. 'outfit_name') extra_fn: optional callable(entity, data) for category-specific field updates sync_nsfw: if True, call _sync_nsfw_from_tags on create/update """ data_dir = current_app.config.get(config_key) if not data_dir or not os.path.exists(data_dir): return current_ids = [] for filename in os.listdir(data_dir): if filename.endswith('.json'): file_path = os.path.join(data_dir, filename) try: with open(file_path, 'r') as f: data = json.load(f) entity_id = data.get(id_field) or filename.replace('.json', '') current_ids.append(entity_id) slug = re.sub(r'[^a-zA-Z0-9_]', '', entity_id) entity = model_class.query.filter_by(**{id_field: entity_id}).first() name = data.get(name_field, entity_id.replace('_', ' ').title()) if entity: entity.data = data entity.name = name entity.slug = slug entity.filename = filename if sync_nsfw: _sync_nsfw_from_tags(entity, data) if extra_fn: extra_fn(entity, data) if entity.image_path: full_img_path = os.path.join( current_app.config['UPLOAD_FOLDER'], entity.image_path) if not os.path.exists(full_img_path): logger.warning("Image missing for %s, clearing path.", entity.name) entity.image_path = None flag_modified(entity, "data") else: kwargs = { id_field: entity_id, 'slug': slug, 'filename': filename, 'name': name, 'data': data, } new_entity = model_class(**kwargs) if sync_nsfw: _sync_nsfw_from_tags(new_entity, data) if extra_fn: extra_fn(new_entity, data) db.session.add(new_entity) except Exception as e: logger.error("Error importing %s: %s", filename, e) for entity in model_class.query.all(): if getattr(entity, id_field) not in current_ids: db.session.delete(entity) db.session.commit() def sync_outfits(): _sync_category('CLOTHING_DIR', Outfit, 'outfit_id', 'outfit_name') def ensure_default_outfit(): """Ensure a default outfit file exists and is registered in the database. Checks if data/clothing/default.json exists, creates it with a minimal wardrobe structure if missing, and ensures a corresponding Outfit database entry exists. """ default_outfit_path = os.path.join(current_app.config['CLOTHING_DIR'], 'default.json') # Check if default outfit file exists if not os.path.exists(default_outfit_path): logger.info("Default outfit file not found at %s, creating it...", default_outfit_path) # Ensure the clothing directory exists os.makedirs(current_app.config['CLOTHING_DIR'], exist_ok=True) # Create minimal default outfit structure default_outfit_data = { "outfit_id": "default", "outfit_name": "Default", "wardrobe": { "base": "", "head": "", "upper_body": "", "lower_body": "", "hands": "", "feet": "", "additional": "" }, "lora": { "lora_name": "", "lora_weight": 0.8, "lora_triggers": "" }, "tags": {"outfit_type": "Default", "nsfw": False} } try: # Write the default outfit file with open(default_outfit_path, 'w') as f: json.dump(default_outfit_data, f, indent=2) logger.info("Created default outfit file at %s", default_outfit_path) except Exception as e: logger.error("Failed to create default outfit file: %s", e) return False # Check if Outfit database entry exists outfit = Outfit.query.filter_by(outfit_id='default').first() if not outfit: logger.info("Default Outfit database entry not found, creating it...") # Load the outfit data (either existing or newly created) try: with open(default_outfit_path, 'r') as f: outfit_data = json.load(f) except Exception as e: logger.error("Failed to read default outfit file: %s", e) return False # Create database entry try: new_outfit = Outfit( outfit_id='default', slug='default', filename='default.json', name='Default', data=outfit_data ) db.session.add(new_outfit) db.session.commit() logger.info("Created default Outfit database entry") except Exception as e: logger.error("Failed to create default Outfit database entry: %s", e) db.session.rollback() return False else: logger.debug("Default Outfit database entry already exists") logger.info("Default outfit verification complete") return True def _sync_look_extra(entity, data): entity.character_id = data.get('character_id', None) def sync_looks(): _sync_category('LOOKS_DIR', Look, 'look_id', 'look_name', extra_fn=_sync_look_extra) def sync_presets(): _sync_category('PRESETS_DIR', Preset, 'preset_id', 'preset_name', sync_nsfw=False) # --------------------------------------------------------------------------- # Preset helpers # --------------------------------------------------------------------------- _PRESET_ENTITY_MAP = { 'character': (Character, 'character_id'), 'outfit': (Outfit, 'outfit_id'), 'action': (Action, 'action_id'), 'style': (Style, 'style_id'), 'scene': (Scene, 'scene_id'), 'detailer': (Detailer, 'detailer_id'), 'look': (Look, 'look_id'), 'checkpoint': (Checkpoint, 'checkpoint_path'), } def _resolve_preset_entity(entity_type, entity_id): """Resolve a preset entity_id ('random', specific ID, or None) to an ORM object.""" if not entity_id: return None model_class, id_field = _PRESET_ENTITY_MAP[entity_type] if entity_id == 'random': return model_class.query.order_by(db.func.random()).first() return model_class.query.filter(getattr(model_class, id_field) == entity_id).first() def _resolve_preset_fields(preset_data): """Convert preset field toggle dicts into a selected_fields list. Each field value: True = include, False = exclude, 'random' = randomly decide. Returns a list of 'section::key' strings for fields that are active. """ selected = [] char_cfg = preset_data.get('character', {}) fields = char_cfg.get('fields', {}) from utils import _BODY_GROUP_KEYS for key in _BODY_GROUP_KEYS: val = fields.get('identity', {}).get(key, True) if val == 'random': val = random.choice([True, False]) if val: selected.append(f'identity::{key}') for key in ['expression', 'pose', 'scene']: val = fields.get('defaults', {}).get(key, False) if val == 'random': val = random.choice([True, False]) if val: selected.append(f'defaults::{key}') wardrobe_cfg = fields.get('wardrobe', {}) for key in _BODY_GROUP_KEYS: val = wardrobe_cfg.get('fields', {}).get(key, True) if val == 'random': val = random.choice([True, False]) if val: selected.append(f'wardrobe::{key}') # Always include name and lora triggers selected.append('special::name') if char_cfg.get('use_lora', True): selected.append('lora::lora_triggers') return selected def sync_actions(): _sync_category('ACTIONS_DIR', Action, 'action_id', 'action_name') def sync_styles(): _sync_category('STYLES_DIR', Style, 'style_id', 'style_name') def sync_detailers(): _sync_category('DETAILERS_DIR', Detailer, 'detailer_id', 'detailer_name') def sync_scenes(): _sync_category('SCENES_DIR', Scene, 'scene_id', 'scene_name') def _default_checkpoint_data(checkpoint_path, filename): """Return template-default data for a checkpoint with no JSON file.""" name_base = filename.rsplit('.', 1)[0] return { "checkpoint_path": checkpoint_path, "checkpoint_name": filename, "base_positive": "anime", "base_negative": "text, logo", "steps": 25, "cfg": 5, "sampler_name": "euler_ancestral", "scheduler": "normal", "vae": "integrated" } def sync_checkpoints(): checkpoints_dir = current_app.config.get('CHECKPOINTS_DIR', 'data/checkpoints') os.makedirs(checkpoints_dir, exist_ok=True) # Load all JSON data files keyed by checkpoint_path json_data_by_path = {} for filename in os.listdir(checkpoints_dir): if filename.endswith('.json') and not filename.endswith('.template'): file_path = os.path.join(checkpoints_dir, filename) try: with open(file_path, 'r') as f: data = json.load(f) ckpt_path = data.get('checkpoint_path') if ckpt_path: json_data_by_path[ckpt_path] = data except Exception as e: logger.error("Error reading checkpoint JSON %s: %s", filename, e) current_ids = [] dirs = [ (current_app.config.get('ILLUSTRIOUS_MODELS_DIR', ''), 'Illustrious'), (current_app.config.get('NOOB_MODELS_DIR', ''), 'Noob'), ] for dirpath, family in dirs: if not dirpath or not os.path.exists(dirpath): continue for f in sorted(os.listdir(dirpath)): if not (f.endswith('.safetensors') or f.endswith('.ckpt')): continue checkpoint_path = f"{family}/{f}" checkpoint_id = checkpoint_path slug = re.sub(r'[^a-zA-Z0-9_]', '_', checkpoint_path.rsplit('.', 1)[0]).lower().strip('_') name_base = f.rsplit('.', 1)[0] friendly_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).strip().title() current_ids.append(checkpoint_id) data = json_data_by_path.get(checkpoint_path, _default_checkpoint_data(checkpoint_path, f)) display_name = data.get('checkpoint_name', f).rsplit('.', 1)[0] display_name = re.sub(r'[^a-zA-Z0-9]+', ' ', display_name).strip().title() or friendly_name ckpt = Checkpoint.query.filter_by(checkpoint_id=checkpoint_id).first() if ckpt: ckpt.name = display_name ckpt.slug = slug ckpt.checkpoint_path = checkpoint_path ckpt.data = data _sync_nsfw_from_tags(ckpt, data) flag_modified(ckpt, "data") if ckpt.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], ckpt.image_path) if not os.path.exists(full_img_path): ckpt.image_path = None else: new_ckpt = Checkpoint( checkpoint_id=checkpoint_id, slug=slug, name=display_name, checkpoint_path=checkpoint_path, data=data, ) _sync_nsfw_from_tags(new_ckpt, data) db.session.add(new_ckpt) all_ckpts = Checkpoint.query.all() for ckpt in all_ckpts: if ckpt.checkpoint_id not in current_ids: db.session.delete(ckpt) db.session.commit()