import os import json import re import random import logging from flask import current_app from sqlalchemy.orm.attributes import flag_modified from models import ( db, Character, Look, Outfit, Action, Style, Scene, Detailer, Checkpoint, Preset ) logger = logging.getLogger('gaze') def sync_characters(): if not os.path.exists(current_app.config['CHARACTERS_DIR']): return current_ids = [] for filename in os.listdir(current_app.config['CHARACTERS_DIR']): if filename.endswith('.json'): file_path = os.path.join(current_app.config['CHARACTERS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) char_id = data.get('character_id') if not char_id: continue current_ids.append(char_id) # Generate URL-safe slug: remove special characters from character_id slug = re.sub(r'[^a-zA-Z0-9_]', '', char_id) # Check if character already exists character = Character.query.filter_by(character_id=char_id).first() name = data.get('character_name', char_id.replace('_', ' ').title()) if character: character.data = data character.name = name character.slug = slug character.filename = filename # Check if cover image still exists if character.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], character.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {character.name}, clearing path.") character.image_path = None # Explicitly tell SQLAlchemy the JSON field was modified flag_modified(character, "data") else: new_char = Character( character_id=char_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_char) except Exception as e: print(f"Error importing {filename}: {e}") # Remove characters that are no longer in the folder all_characters = Character.query.all() for char in all_characters: if char.character_id not in current_ids: db.session.delete(char) db.session.commit() def sync_outfits(): if not os.path.exists(current_app.config['CLOTHING_DIR']): return current_ids = [] for filename in os.listdir(current_app.config['CLOTHING_DIR']): if filename.endswith('.json'): file_path = os.path.join(current_app.config['CLOTHING_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) outfit_id = data.get('outfit_id') or filename.replace('.json', '') current_ids.append(outfit_id) # Generate URL-safe slug: remove special characters from outfit_id slug = re.sub(r'[^a-zA-Z0-9_]', '', outfit_id) # Check if outfit already exists outfit = Outfit.query.filter_by(outfit_id=outfit_id).first() name = data.get('outfit_name', outfit_id.replace('_', ' ').title()) if outfit: outfit.data = data outfit.name = name outfit.slug = slug outfit.filename = filename # Check if cover image still exists if outfit.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], outfit.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {outfit.name}, clearing path.") outfit.image_path = None # Explicitly tell SQLAlchemy the JSON field was modified flag_modified(outfit, "data") else: new_outfit = Outfit( outfit_id=outfit_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_outfit) except Exception as e: print(f"Error importing outfit {filename}: {e}") # Remove outfits that are no longer in the folder all_outfits = Outfit.query.all() for outfit in all_outfits: if outfit.outfit_id not in current_ids: db.session.delete(outfit) db.session.commit() def ensure_default_outfit(): """Ensure a default outfit file exists and is registered in the database. Checks if data/clothing/default.json exists, creates it with a minimal wardrobe structure if missing, and ensures a corresponding Outfit database entry exists. """ default_outfit_path = os.path.join(current_app.config['CLOTHING_DIR'], 'default.json') # Check if default outfit file exists if not os.path.exists(default_outfit_path): logger.info("Default outfit file not found at %s, creating it...", default_outfit_path) # Ensure the clothing directory exists os.makedirs(current_app.config['CLOTHING_DIR'], exist_ok=True) # Create minimal default outfit structure default_outfit_data = { "outfit_id": "default", "outfit_name": "Default", "wardrobe": { "base": "", "head": "", "upper_body": "", "lower_body": "", "hands": "", "feet": "", "additional": "" }, "lora": { "lora_name": "", "lora_weight": 0.8, "lora_triggers": "" }, "tags": [] } try: # Write the default outfit file with open(default_outfit_path, 'w') as f: json.dump(default_outfit_data, f, indent=2) logger.info("Created default outfit file at %s", default_outfit_path) except Exception as e: logger.error("Failed to create default outfit file: %s", e) return False # Check if Outfit database entry exists outfit = Outfit.query.filter_by(outfit_id='default').first() if not outfit: logger.info("Default Outfit database entry not found, creating it...") # Load the outfit data (either existing or newly created) try: with open(default_outfit_path, 'r') as f: outfit_data = json.load(f) except Exception as e: logger.error("Failed to read default outfit file: %s", e) return False # Create database entry try: new_outfit = Outfit( outfit_id='default', slug='default', filename='default.json', name='Default', data=outfit_data ) db.session.add(new_outfit) db.session.commit() logger.info("Created default Outfit database entry") except Exception as e: logger.error("Failed to create default Outfit database entry: %s", e) db.session.rollback() return False else: logger.debug("Default Outfit database entry already exists") logger.info("Default outfit verification complete") return True def sync_looks(): if not os.path.exists(current_app.config['LOOKS_DIR']): return current_ids = [] for filename in os.listdir(current_app.config['LOOKS_DIR']): if filename.endswith('.json'): file_path = os.path.join(current_app.config['LOOKS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) look_id = data.get('look_id') or filename.replace('.json', '') current_ids.append(look_id) slug = re.sub(r'[^a-zA-Z0-9_]', '', look_id) look = Look.query.filter_by(look_id=look_id).first() name = data.get('look_name', look_id.replace('_', ' ').title()) character_id = data.get('character_id', None) if look: look.data = data look.name = name look.slug = slug look.filename = filename look.character_id = character_id if look.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], look.image_path) if not os.path.exists(full_img_path): look.image_path = None flag_modified(look, "data") else: new_look = Look( look_id=look_id, slug=slug, filename=filename, name=name, character_id=character_id, data=data ) db.session.add(new_look) except Exception as e: print(f"Error importing look {filename}: {e}") all_looks = Look.query.all() for look in all_looks: if look.look_id not in current_ids: db.session.delete(look) db.session.commit() def sync_presets(): if not os.path.exists(current_app.config['PRESETS_DIR']): return current_ids = [] for filename in os.listdir(current_app.config['PRESETS_DIR']): if filename.endswith('.json'): file_path = os.path.join(current_app.config['PRESETS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) preset_id = data.get('preset_id') or filename.replace('.json', '') current_ids.append(preset_id) slug = re.sub(r'[^a-zA-Z0-9_]', '', preset_id) preset = Preset.query.filter_by(preset_id=preset_id).first() name = data.get('preset_name', preset_id.replace('_', ' ').title()) if preset: preset.data = data preset.name = name preset.slug = slug preset.filename = filename if preset.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], preset.image_path) if not os.path.exists(full_img_path): preset.image_path = None flag_modified(preset, "data") else: new_preset = Preset( preset_id=preset_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_preset) except Exception as e: print(f"Error importing preset {filename}: {e}") all_presets = Preset.query.all() for preset in all_presets: if preset.preset_id not in current_ids: db.session.delete(preset) db.session.commit() # --------------------------------------------------------------------------- # Preset helpers # --------------------------------------------------------------------------- _PRESET_ENTITY_MAP = { 'character': (Character, 'character_id'), 'outfit': (Outfit, 'outfit_id'), 'action': (Action, 'action_id'), 'style': (Style, 'style_id'), 'scene': (Scene, 'scene_id'), 'detailer': (Detailer, 'detailer_id'), 'look': (Look, 'look_id'), 'checkpoint': (Checkpoint, 'checkpoint_path'), } def _resolve_preset_entity(entity_type, entity_id): """Resolve a preset entity_id ('random', specific ID, or None) to an ORM object.""" if not entity_id: return None model_class, id_field = _PRESET_ENTITY_MAP[entity_type] if entity_id == 'random': return model_class.query.order_by(db.func.random()).first() return model_class.query.filter(getattr(model_class, id_field) == entity_id).first() def _resolve_preset_fields(preset_data): """Convert preset field toggle dicts into a selected_fields list. Each field value: True = include, False = exclude, 'random' = randomly decide. Returns a list of 'section::key' strings for fields that are active. """ selected = [] char_cfg = preset_data.get('character', {}) fields = char_cfg.get('fields', {}) from utils import _BODY_GROUP_KEYS for key in _BODY_GROUP_KEYS: val = fields.get('identity', {}).get(key, True) if val == 'random': val = random.choice([True, False]) if val: selected.append(f'identity::{key}') for key in ['expression', 'pose', 'scene']: val = fields.get('defaults', {}).get(key, False) if val == 'random': val = random.choice([True, False]) if val: selected.append(f'defaults::{key}') wardrobe_cfg = fields.get('wardrobe', {}) for key in _BODY_GROUP_KEYS: val = wardrobe_cfg.get('fields', {}).get(key, True) if val == 'random': val = random.choice([True, False]) if val: selected.append(f'wardrobe::{key}') # Always include name and lora triggers selected.append('special::name') if char_cfg.get('use_lora', True): selected.append('lora::lora_triggers') return selected def sync_actions(): if not os.path.exists(current_app.config['ACTIONS_DIR']): return current_ids = [] for filename in os.listdir(current_app.config['ACTIONS_DIR']): if filename.endswith('.json'): file_path = os.path.join(current_app.config['ACTIONS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) action_id = data.get('action_id') or filename.replace('.json', '') current_ids.append(action_id) # Generate URL-safe slug slug = re.sub(r'[^a-zA-Z0-9_]', '', action_id) # Check if action already exists action = Action.query.filter_by(action_id=action_id).first() name = data.get('action_name', action_id.replace('_', ' ').title()) if action: action.data = data action.name = name action.slug = slug action.filename = filename # Check if cover image still exists if action.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], action.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {action.name}, clearing path.") action.image_path = None flag_modified(action, "data") else: new_action = Action( action_id=action_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_action) except Exception as e: print(f"Error importing action {filename}: {e}") # Remove actions that are no longer in the folder all_actions = Action.query.all() for action in all_actions: if action.action_id not in current_ids: db.session.delete(action) db.session.commit() def sync_styles(): if not os.path.exists(current_app.config['STYLES_DIR']): return current_ids = [] for filename in os.listdir(current_app.config['STYLES_DIR']): if filename.endswith('.json'): file_path = os.path.join(current_app.config['STYLES_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) style_id = data.get('style_id') or filename.replace('.json', '') current_ids.append(style_id) # Generate URL-safe slug slug = re.sub(r'[^a-zA-Z0-9_]', '', style_id) # Check if style already exists style = Style.query.filter_by(style_id=style_id).first() name = data.get('style_name', style_id.replace('_', ' ').title()) if style: style.data = data style.name = name style.slug = slug style.filename = filename # Check if cover image still exists if style.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], style.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {style.name}, clearing path.") style.image_path = None flag_modified(style, "data") else: new_style = Style( style_id=style_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_style) except Exception as e: print(f"Error importing style {filename}: {e}") # Remove styles that are no longer in the folder all_styles = Style.query.all() for style in all_styles: if style.style_id not in current_ids: db.session.delete(style) db.session.commit() def sync_detailers(): if not os.path.exists(current_app.config['DETAILERS_DIR']): return current_ids = [] for filename in os.listdir(current_app.config['DETAILERS_DIR']): if filename.endswith('.json'): file_path = os.path.join(current_app.config['DETAILERS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) detailer_id = data.get('detailer_id') or filename.replace('.json', '') current_ids.append(detailer_id) # Generate URL-safe slug slug = re.sub(r'[^a-zA-Z0-9_]', '', detailer_id) # Check if detailer already exists detailer = Detailer.query.filter_by(detailer_id=detailer_id).first() name = data.get('detailer_name', detailer_id.replace('_', ' ').title()) if detailer: detailer.data = data detailer.name = name detailer.slug = slug detailer.filename = filename # Check if cover image still exists if detailer.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], detailer.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {detailer.name}, clearing path.") detailer.image_path = None flag_modified(detailer, "data") else: new_detailer = Detailer( detailer_id=detailer_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_detailer) except Exception as e: print(f"Error importing detailer {filename}: {e}") # Remove detailers that are no longer in the folder all_detailers = Detailer.query.all() for detailer in all_detailers: if detailer.detailer_id not in current_ids: db.session.delete(detailer) db.session.commit() def sync_scenes(): if not os.path.exists(current_app.config['SCENES_DIR']): return current_ids = [] for filename in os.listdir(current_app.config['SCENES_DIR']): if filename.endswith('.json'): file_path = os.path.join(current_app.config['SCENES_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) scene_id = data.get('scene_id') or filename.replace('.json', '') current_ids.append(scene_id) # Generate URL-safe slug slug = re.sub(r'[^a-zA-Z0-9_]', '', scene_id) # Check if scene already exists scene = Scene.query.filter_by(scene_id=scene_id).first() name = data.get('scene_name', scene_id.replace('_', ' ').title()) if scene: scene.data = data scene.name = name scene.slug = slug scene.filename = filename # Check if cover image still exists if scene.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], scene.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {scene.name}, clearing path.") scene.image_path = None flag_modified(scene, "data") else: new_scene = Scene( scene_id=scene_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_scene) except Exception as e: print(f"Error importing scene {filename}: {e}") # Remove scenes that are no longer in the folder all_scenes = Scene.query.all() for scene in all_scenes: if scene.scene_id not in current_ids: db.session.delete(scene) db.session.commit() def _default_checkpoint_data(checkpoint_path, filename): """Return template-default data for a checkpoint with no JSON file.""" name_base = filename.rsplit('.', 1)[0] return { "checkpoint_path": checkpoint_path, "checkpoint_name": filename, "base_positive": "anime", "base_negative": "text, logo", "steps": 25, "cfg": 5, "sampler_name": "euler_ancestral", "vae": "integrated" } def sync_checkpoints(): checkpoints_dir = current_app.config.get('CHECKPOINTS_DIR', 'data/checkpoints') os.makedirs(checkpoints_dir, exist_ok=True) # Load all JSON data files keyed by checkpoint_path json_data_by_path = {} for filename in os.listdir(checkpoints_dir): if filename.endswith('.json') and not filename.endswith('.template'): file_path = os.path.join(checkpoints_dir, filename) try: with open(file_path, 'r') as f: data = json.load(f) ckpt_path = data.get('checkpoint_path') if ckpt_path: json_data_by_path[ckpt_path] = data except Exception as e: print(f"Error reading checkpoint JSON {filename}: {e}") current_ids = [] dirs = [ (current_app.config.get('ILLUSTRIOUS_MODELS_DIR', ''), 'Illustrious'), (current_app.config.get('NOOB_MODELS_DIR', ''), 'Noob'), ] for dirpath, family in dirs: if not dirpath or not os.path.exists(dirpath): continue for f in sorted(os.listdir(dirpath)): if not (f.endswith('.safetensors') or f.endswith('.ckpt')): continue checkpoint_path = f"{family}/{f}" checkpoint_id = checkpoint_path slug = re.sub(r'[^a-zA-Z0-9_]', '_', checkpoint_path.rsplit('.', 1)[0]).lower().strip('_') name_base = f.rsplit('.', 1)[0] friendly_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).strip().title() current_ids.append(checkpoint_id) data = json_data_by_path.get(checkpoint_path, _default_checkpoint_data(checkpoint_path, f)) display_name = data.get('checkpoint_name', f).rsplit('.', 1)[0] display_name = re.sub(r'[^a-zA-Z0-9]+', ' ', display_name).strip().title() or friendly_name ckpt = Checkpoint.query.filter_by(checkpoint_id=checkpoint_id).first() if ckpt: ckpt.name = display_name ckpt.slug = slug ckpt.checkpoint_path = checkpoint_path ckpt.data = data flag_modified(ckpt, "data") if ckpt.image_path: full_img_path = os.path.join(current_app.config['UPLOAD_FOLDER'], ckpt.image_path) if not os.path.exists(full_img_path): ckpt.image_path = None else: db.session.add(Checkpoint( checkpoint_id=checkpoint_id, slug=slug, name=display_name, checkpoint_path=checkpoint_path, data=data, )) all_ckpts = Checkpoint.query.all() for ckpt in all_ckpts: if ckpt.checkpoint_id not in current_ids: db.session.delete(ckpt) db.session.commit()