import os import json import time import re import requests import random import asyncio import subprocess from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from flask import Flask, render_template, request, redirect, url_for, flash, session from flask_session import Session from werkzeug.utils import secure_filename from models import db, Character, Settings, Outfit, Action, Style, Detailer, Scene, Checkpoint, Look app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['UPLOAD_FOLDER'] = 'static/uploads' app.config['SECRET_KEY'] = 'dev-key-123' app.config['CHARACTERS_DIR'] = 'data/characters' app.config['CLOTHING_DIR'] = 'data/clothing' app.config['ACTIONS_DIR'] = 'data/actions' app.config['STYLES_DIR'] = 'data/styles' app.config['SCENES_DIR'] = 'data/scenes' app.config['DETAILERS_DIR'] = 'data/detailers' app.config['CHECKPOINTS_DIR'] = 'data/checkpoints' app.config['LOOKS_DIR'] = 'data/looks' app.config['COMFYUI_URL'] = 'http://127.0.0.1:8188' app.config['ILLUSTRIOUS_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Illustrious/' app.config['NOOB_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Noob/' app.config['LORA_DIR'] = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Looks/' # Server-side session configuration to avoid cookie size limits app.config['SESSION_TYPE'] = 'filesystem' app.config['SESSION_FILE_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], '../flask_session') app.config['SESSION_PERMANENT'] = False db.init_app(app) Session(app) # Path to the danbooru-mcp docker-compose project, relative to this file. MCP_TOOLS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tools') MCP_COMPOSE_DIR = os.path.join(MCP_TOOLS_DIR, 'danbooru-mcp') MCP_REPO_URL = 'https://git.liveaodh.com/aodhan/danbooru-mcp' def _ensure_mcp_repo(): """Clone or update the danbooru-mcp source repository inside tools/. - If ``tools/danbooru-mcp/`` does not exist, clone from MCP_REPO_URL. - If it already exists, run ``git pull`` to fetch the latest changes. Errors are non-fatal. """ os.makedirs(MCP_TOOLS_DIR, exist_ok=True) try: if not os.path.isdir(MCP_COMPOSE_DIR): print(f'Cloning danbooru-mcp from {MCP_REPO_URL} …') subprocess.run( ['git', 'clone', MCP_REPO_URL, MCP_COMPOSE_DIR], timeout=120, check=True, ) print('danbooru-mcp cloned successfully.') else: print('Updating danbooru-mcp via git pull …') subprocess.run( ['git', 'pull'], cwd=MCP_COMPOSE_DIR, timeout=60, check=True, ) print('danbooru-mcp updated.') except FileNotFoundError: print('WARNING: git not found on PATH — danbooru-mcp repo will not be cloned/updated.') except subprocess.CalledProcessError as e: print(f'WARNING: git operation failed for danbooru-mcp: {e}') except subprocess.TimeoutExpired: print('WARNING: git timed out while cloning/updating danbooru-mcp.') except Exception as e: print(f'WARNING: Could not clone/update danbooru-mcp repo: {e}') def ensure_mcp_server_running(): """Ensure the danbooru-mcp repo is present/up-to-date, then start the Docker container if it is not already running. Uses ``docker compose up -d`` so the image is built automatically on first run. Errors are non-fatal — the app will still start even if Docker is unavailable. """ _ensure_mcp_repo() try: result = subprocess.run( ['docker', 'ps', '--filter', 'name=danbooru-mcp', '--format', '{{.Names}}'], capture_output=True, text=True, timeout=10, ) if 'danbooru-mcp' in result.stdout: print('danbooru-mcp container already running.') return # Container not running — start it via docker compose print('Starting danbooru-mcp container via docker compose …') subprocess.run( ['docker', 'compose', 'up', '-d'], cwd=MCP_COMPOSE_DIR, timeout=120, ) print('danbooru-mcp container started.') except FileNotFoundError: print('WARNING: docker not found on PATH — danbooru-mcp will not be started automatically.') except subprocess.TimeoutExpired: print('WARNING: docker timed out while starting danbooru-mcp.') except Exception as e: print(f'WARNING: Could not ensure danbooru-mcp is running: {e}') @app.context_processor def inject_comfyui_ws_url(): url = app.config.get('COMFYUI_URL', 'http://127.0.0.1:8188') # If the URL is localhost/127.0.0.1, replace it with the current request's host # so that remote clients connect to the correct machine for WebSockets. if '127.0.0.1' in url or 'localhost' in url: host = request.host.split(':')[0] url = url.replace('127.0.0.1', host).replace('localhost', host) # Convert http/https to ws/wss ws_url = url.replace('http://', 'ws://').replace('https://', 'wss://') return dict(COMFYUI_WS_URL=f"{ws_url}/ws") @app.context_processor def inject_default_checkpoint(): from models import Checkpoint checkpoints = Checkpoint.query.order_by(Checkpoint.name).all() return dict(all_checkpoints=checkpoints, default_checkpoint_path=session.get('default_checkpoint', '')) @app.route('/set_default_checkpoint', methods=['POST']) def set_default_checkpoint(): session['default_checkpoint'] = request.form.get('checkpoint_path', '') return {'status': 'ok'} @app.route('/api/status/comfyui') def api_status_comfyui(): """Return whether ComfyUI is reachable.""" url = app.config.get('COMFYUI_URL', 'http://127.0.0.1:8188') try: resp = requests.get(f'{url}/system_stats', timeout=3) if resp.ok: return {'status': 'ok'} except Exception: pass return {'status': 'error'} @app.route('/api/status/mcp') def api_status_mcp(): """Return whether the danbooru-mcp Docker container is running.""" try: result = subprocess.run( ['docker', 'ps', '--filter', 'name=danbooru-mcp', '--format', '{{.Names}}'], capture_output=True, text=True, timeout=5, ) if 'danbooru-mcp' in result.stdout: return {'status': 'ok'} except Exception: pass return {'status': 'error'} ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} def get_available_loras(): loras = [] if os.path.exists(app.config['LORA_DIR']): for f in os.listdir(app.config['LORA_DIR']): if f.endswith('.safetensors'): # Using the format seen in character JSONs loras.append(f"Illustrious/Looks/{f}") return sorted(loras) def get_available_clothing_loras(): """Get LoRAs from the Clothing directory for outfit LoRAs.""" clothing_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Clothing/' loras = [] if os.path.exists(clothing_lora_dir): for f in os.listdir(clothing_lora_dir): if f.endswith('.safetensors'): loras.append(f"Illustrious/Clothing/{f}") return sorted(loras) def get_available_action_loras(): """Get LoRAs from the Poses directory for action LoRAs.""" poses_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Poses/' loras = [] if os.path.exists(poses_lora_dir): for f in os.listdir(poses_lora_dir): if f.endswith('.safetensors'): loras.append(f"Illustrious/Poses/{f}") return sorted(loras) def get_available_style_loras(): """Get LoRAs from the Styles directory for style LoRAs.""" styles_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Styles/' loras = [] if os.path.exists(styles_lora_dir): for f in os.listdir(styles_lora_dir): if f.endswith('.safetensors'): loras.append(f"Illustrious/Styles/{f}") return sorted(loras) def get_available_detailer_loras(): """Get LoRAs from the Detailers directory for detailer LoRAs.""" detailers_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Detailers/' loras = [] if os.path.exists(detailers_lora_dir): for f in os.listdir(detailers_lora_dir): if f.endswith('.safetensors'): loras.append(f"Illustrious/Detailers/{f}") return sorted(loras) def get_available_scene_loras(): """Get LoRAs from the Backgrounds directory for scene LoRAs.""" backgrounds_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Backgrounds/' loras = [] if os.path.exists(backgrounds_lora_dir): for f in os.listdir(backgrounds_lora_dir): if f.endswith('.safetensors'): loras.append(f"Illustrious/Backgrounds/{f}") return sorted(loras) def get_available_checkpoints(): checkpoints = [] # Scan Illustrious if os.path.exists(app.config['ILLUSTRIOUS_MODELS_DIR']): for f in os.listdir(app.config['ILLUSTRIOUS_MODELS_DIR']): if f.endswith('.safetensors') or f.endswith('.ckpt'): checkpoints.append(f"Illustrious/{f}") # Scan Noob if os.path.exists(app.config['NOOB_MODELS_DIR']): for f in os.listdir(app.config['NOOB_MODELS_DIR']): if f.endswith('.safetensors') or f.endswith('.ckpt'): checkpoints.append(f"Noob/{f}") return sorted(checkpoints) def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def parse_orientation(orientation_str): if not orientation_str: return [] m_count = orientation_str.upper().count('M') f_count = orientation_str.upper().count('F') total = m_count + f_count tags = [] # Gender counts if m_count == 1: tags.append("1boy") elif m_count > 1: tags.append(f"{m_count}boys") if f_count == 1: tags.append("1girl") elif f_count > 1: tags.append(f"{f_count}girls") # Relationships/Group type if total == 1: tags.append("solo") elif total > 1: if m_count > 0 and f_count > 0: tags.append("hetero") elif f_count > 1 and m_count == 0: tags.append("yuri") elif m_count > 1 and f_count == 0: tags.append("yaoi") return tags def _dedup_tags(prompt_str): """Remove duplicate tags from a comma-separated prompt string, preserving first-occurrence order.""" seen = set() result = [] for tag in prompt_str.split(','): t = tag.strip() if t and t.lower() not in seen: seen.add(t.lower()) result.append(t) return ', '.join(result) def _cross_dedup_prompts(positive, negative): """Remove tags shared between positive and negative prompts. Repeatedly strips the first occurrence from each side until the tag exists on only one side. Equal counts cancel out completely; any excess on one side retains the remainder, allowing deliberate overrides (e.g. adding a tag twice in the positive while it appears once in the negative leaves one copy positive). """ def parse_tags(s): return [t.strip() for t in s.split(',') if t.strip()] pos_tags = parse_tags(positive) neg_tags = parse_tags(negative) shared = {t.lower() for t in pos_tags} & {t.lower() for t in neg_tags} for tag_lower in shared: while ( any(t.lower() == tag_lower for t in pos_tags) and any(t.lower() == tag_lower for t in neg_tags) ): pos_tags.pop(next(i for i, t in enumerate(pos_tags) if t.lower() == tag_lower)) neg_tags.pop(next(i for i, t in enumerate(neg_tags) if t.lower() == tag_lower)) return ', '.join(pos_tags), ', '.join(neg_tags) def _resolve_lora_weight(lora_data, override=None): """Return effective LoRA weight, randomising between min/max when they differ. If *override* is provided it takes absolute precedence (used by the Strengths Gallery to pin a specific value for each step). """ if override is not None: return float(override) weight = float(lora_data.get('lora_weight', 1.0)) min_w = lora_data.get('lora_weight_min') max_w = lora_data.get('lora_weight_max') if min_w is not None and max_w is not None: min_w, max_w = float(min_w), float(max_w) if min_w != max_w: weight = random.uniform(min(min_w, max_w), max(min_w, max_w)) return weight def build_prompt(data, selected_fields=None, default_fields=None, active_outfit='default'): def is_selected(section, key): # Priority: # 1. Manual selection from form (if list is not empty) # 2. Database defaults (if they exist) # 3. Select all (default behavior) if selected_fields: return f"{section}::{key}" in selected_fields if default_fields: return f"{section}::{key}" in default_fields return True identity = data.get('identity', {}) # Get wardrobe - handle both new nested format and legacy flat format wardrobe_data = data.get('wardrobe', {}) if 'default' in wardrobe_data and isinstance(wardrobe_data.get('default'), dict): # New nested format - get active outfit wardrobe = wardrobe_data.get(active_outfit or 'default', wardrobe_data.get('default', {})) else: # Legacy flat format wardrobe = wardrobe_data defaults = data.get('defaults', {}) action_data = data.get('action', {}) style_data = data.get('style', {}) participants = data.get('participants', {}) # Pre-calculate Hand/Glove priority # Priority: wardrobe gloves > wardrobe hands (outfit) > identity hands (character) hand_val = "" if wardrobe.get('gloves') and is_selected('wardrobe', 'gloves'): hand_val = wardrobe.get('gloves') elif wardrobe.get('hands') and is_selected('wardrobe', 'hands'): hand_val = wardrobe.get('hands') elif identity.get('hands') and is_selected('identity', 'hands'): hand_val = identity.get('hands') # 1. Main Prompt parts = [] # Handle participants logic if participants: if participants.get('solo_focus') == 'true': parts.append('(solo focus:1.2)') orientation = participants.get('orientation', '') if orientation: parts.extend(parse_orientation(orientation)) else: # Default behavior parts.append("(solo:1.2)") # Use character_id (underscores to spaces) for tags compatibility char_tag = data.get('character_id', '').replace('_', ' ') if char_tag and is_selected('special', 'name'): parts.append(char_tag) for key in ['base_specs', 'hair', 'eyes', 'extra']: val = identity.get(key) if val and is_selected('identity', key): # Filter out conflicting tags if participants data is present if participants and key == 'base_specs': # Remove 1girl, 1boy, solo, etc. val = re.sub(r'\b(1girl|1boy|solo)\b', '', val).replace(', ,', ',').strip(', ') parts.append(val) # Add defaults (expression, pose, scene) for key in ['expression', 'pose', 'scene']: val = defaults.get(key) if val and is_selected('defaults', key): parts.append(val) # Add hand priority value to main prompt if hand_val: parts.append(hand_val) for key in ['full_body', 'top', 'bottom', 'headwear', 'legwear', 'footwear', 'accessories']: val = wardrobe.get(key) if val and is_selected('wardrobe', key): parts.append(val) # Standard character styles char_aesthetic = data.get('styles', {}).get('aesthetic') if char_aesthetic and is_selected('styles', 'aesthetic'): parts.append(f"{char_aesthetic} style") # New Styles Gallery logic if style_data.get('artist_name') and is_selected('style', 'artist_name'): parts.append(f"by {style_data['artist_name']}") if style_data.get('artistic_style') and is_selected('style', 'artistic_style'): parts.append(style_data['artistic_style']) tags = data.get('tags', []) if tags and is_selected('special', 'tags'): parts.extend(tags) lora = data.get('lora', {}) if lora.get('lora_triggers') and is_selected('lora', 'lora_triggers'): parts.append(lora.get('lora_triggers')) # 2. Face Prompt: Tag, Eyes, Expression, Headwear, Action details face_parts = [] if char_tag and is_selected('special', 'name'): face_parts.append(char_tag) if identity.get('eyes') and is_selected('identity', 'eyes'): face_parts.append(identity.get('eyes')) if defaults.get('expression') and is_selected('defaults', 'expression'): face_parts.append(defaults.get('expression')) if wardrobe.get('headwear') and is_selected('wardrobe', 'headwear'): face_parts.append(wardrobe.get('headwear')) # Add specific Action expression details if available if action_data.get('head') and is_selected('action', 'head'): face_parts.append(action_data.get('head')) if action_data.get('eyes') and is_selected('action', 'eyes'): face_parts.append(action_data.get('eyes')) # 3. Hand Prompt: Hand value (Gloves or Hands), Action details hand_parts = [hand_val] if hand_val else [] if action_data.get('arms') and is_selected('action', 'arms'): hand_parts.append(action_data.get('arms')) if action_data.get('hands') and is_selected('action', 'hands'): hand_parts.append(action_data.get('hands')) return { "main": _dedup_tags(", ".join(parts)), "face": _dedup_tags(", ".join(face_parts)), "hand": _dedup_tags(", ".join(hand_parts)) } def queue_prompt(prompt_workflow, client_id=None): p = {"prompt": prompt_workflow} if client_id: p["client_id"] = client_id data = json.dumps(p).encode('utf-8') response = requests.post(f"{app.config['COMFYUI_URL']}/prompt", data=data) return response.json() def get_history(prompt_id): response = requests.get(f"{app.config['COMFYUI_URL']}/history/{prompt_id}") return response.json() def get_image(filename, subfolder, folder_type): data = {"filename": filename, "subfolder": subfolder, "type": folder_type} response = requests.get(f"{app.config['COMFYUI_URL']}/view", params=data) return response.content from sqlalchemy.orm.attributes import flag_modified def sync_characters(): if not os.path.exists(app.config['CHARACTERS_DIR']): return current_ids = [] for filename in os.listdir(app.config['CHARACTERS_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['CHARACTERS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) char_id = data.get('character_id') if not char_id: continue current_ids.append(char_id) # Generate URL-safe slug: remove special characters from character_id slug = re.sub(r'[^a-zA-Z0-9_]', '', char_id) # Check if character already exists character = Character.query.filter_by(character_id=char_id).first() name = data.get('character_name', char_id.replace('_', ' ').title()) if character: character.data = data character.name = name character.slug = slug character.filename = filename # Check if cover image still exists if character.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], character.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {character.name}, clearing path.") character.image_path = None # Explicitly tell SQLAlchemy the JSON field was modified flag_modified(character, "data") else: new_char = Character( character_id=char_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_char) except Exception as e: print(f"Error importing {filename}: {e}") # Remove characters that are no longer in the folder all_characters = Character.query.all() for char in all_characters: if char.character_id not in current_ids: db.session.delete(char) db.session.commit() def sync_outfits(): if not os.path.exists(app.config['CLOTHING_DIR']): return current_ids = [] for filename in os.listdir(app.config['CLOTHING_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['CLOTHING_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) outfit_id = data.get('outfit_id') or filename.replace('.json', '') current_ids.append(outfit_id) # Generate URL-safe slug: remove special characters from outfit_id slug = re.sub(r'[^a-zA-Z0-9_]', '', outfit_id) # Check if outfit already exists outfit = Outfit.query.filter_by(outfit_id=outfit_id).first() name = data.get('outfit_name', outfit_id.replace('_', ' ').title()) if outfit: outfit.data = data outfit.name = name outfit.slug = slug outfit.filename = filename # Check if cover image still exists if outfit.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], outfit.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {outfit.name}, clearing path.") outfit.image_path = None # Explicitly tell SQLAlchemy the JSON field was modified flag_modified(outfit, "data") else: new_outfit = Outfit( outfit_id=outfit_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_outfit) except Exception as e: print(f"Error importing outfit {filename}: {e}") # Remove outfits that are no longer in the folder all_outfits = Outfit.query.all() for outfit in all_outfits: if outfit.outfit_id not in current_ids: db.session.delete(outfit) db.session.commit() def sync_looks(): if not os.path.exists(app.config['LOOKS_DIR']): return current_ids = [] for filename in os.listdir(app.config['LOOKS_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['LOOKS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) look_id = data.get('look_id') or filename.replace('.json', '') current_ids.append(look_id) slug = re.sub(r'[^a-zA-Z0-9_]', '', look_id) look = Look.query.filter_by(look_id=look_id).first() name = data.get('look_name', look_id.replace('_', ' ').title()) character_id = data.get('character_id', None) if look: look.data = data look.name = name look.slug = slug look.filename = filename look.character_id = character_id if look.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], look.image_path) if not os.path.exists(full_img_path): look.image_path = None flag_modified(look, "data") else: new_look = Look( look_id=look_id, slug=slug, filename=filename, name=name, character_id=character_id, data=data ) db.session.add(new_look) except Exception as e: print(f"Error importing look {filename}: {e}") all_looks = Look.query.all() for look in all_looks: if look.look_id not in current_ids: db.session.delete(look) db.session.commit() def sync_actions(): if not os.path.exists(app.config['ACTIONS_DIR']): return current_ids = [] for filename in os.listdir(app.config['ACTIONS_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['ACTIONS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) action_id = data.get('action_id') or filename.replace('.json', '') current_ids.append(action_id) # Generate URL-safe slug slug = re.sub(r'[^a-zA-Z0-9_]', '', action_id) # Check if action already exists action = Action.query.filter_by(action_id=action_id).first() name = data.get('action_name', action_id.replace('_', ' ').title()) if action: action.data = data action.name = name action.slug = slug action.filename = filename # Check if cover image still exists if action.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], action.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {action.name}, clearing path.") action.image_path = None flag_modified(action, "data") else: new_action = Action( action_id=action_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_action) except Exception as e: print(f"Error importing action {filename}: {e}") # Remove actions that are no longer in the folder all_actions = Action.query.all() for action in all_actions: if action.action_id not in current_ids: db.session.delete(action) db.session.commit() def sync_styles(): if not os.path.exists(app.config['STYLES_DIR']): return current_ids = [] for filename in os.listdir(app.config['STYLES_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['STYLES_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) style_id = data.get('style_id') or filename.replace('.json', '') current_ids.append(style_id) # Generate URL-safe slug slug = re.sub(r'[^a-zA-Z0-9_]', '', style_id) # Check if style already exists style = Style.query.filter_by(style_id=style_id).first() name = data.get('style_name', style_id.replace('_', ' ').title()) if style: style.data = data style.name = name style.slug = slug style.filename = filename # Check if cover image still exists if style.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], style.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {style.name}, clearing path.") style.image_path = None flag_modified(style, "data") else: new_style = Style( style_id=style_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_style) except Exception as e: print(f"Error importing style {filename}: {e}") # Remove styles that are no longer in the folder all_styles = Style.query.all() for style in all_styles: if style.style_id not in current_ids: db.session.delete(style) db.session.commit() def sync_detailers(): if not os.path.exists(app.config['DETAILERS_DIR']): return current_ids = [] for filename in os.listdir(app.config['DETAILERS_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['DETAILERS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) detailer_id = data.get('detailer_id') or filename.replace('.json', '') current_ids.append(detailer_id) # Generate URL-safe slug slug = re.sub(r'[^a-zA-Z0-9_]', '', detailer_id) # Check if detailer already exists detailer = Detailer.query.filter_by(detailer_id=detailer_id).first() name = data.get('detailer_name', detailer_id.replace('_', ' ').title()) if detailer: detailer.data = data detailer.name = name detailer.slug = slug detailer.filename = filename # Check if cover image still exists if detailer.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], detailer.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {detailer.name}, clearing path.") detailer.image_path = None flag_modified(detailer, "data") else: new_detailer = Detailer( detailer_id=detailer_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_detailer) except Exception as e: print(f"Error importing detailer {filename}: {e}") # Remove detailers that are no longer in the folder all_detailers = Detailer.query.all() for detailer in all_detailers: if detailer.detailer_id not in current_ids: db.session.delete(detailer) db.session.commit() def sync_scenes(): if not os.path.exists(app.config['SCENES_DIR']): return current_ids = [] for filename in os.listdir(app.config['SCENES_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['SCENES_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) scene_id = data.get('scene_id') or filename.replace('.json', '') current_ids.append(scene_id) # Generate URL-safe slug slug = re.sub(r'[^a-zA-Z0-9_]', '', scene_id) # Check if scene already exists scene = Scene.query.filter_by(scene_id=scene_id).first() name = data.get('scene_name', scene_id.replace('_', ' ').title()) if scene: scene.data = data scene.name = name scene.slug = slug scene.filename = filename # Check if cover image still exists if scene.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], scene.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {scene.name}, clearing path.") scene.image_path = None flag_modified(scene, "data") else: new_scene = Scene( scene_id=scene_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_scene) except Exception as e: print(f"Error importing scene {filename}: {e}") # Remove scenes that are no longer in the folder all_scenes = Scene.query.all() for scene in all_scenes: if scene.scene_id not in current_ids: db.session.delete(scene) db.session.commit() def _default_checkpoint_data(checkpoint_path, filename): """Return template-default data for a checkpoint with no JSON file.""" name_base = filename.rsplit('.', 1)[0] return { "checkpoint_path": checkpoint_path, "checkpoint_name": filename, "base_positive": "anime", "base_negative": "text, logo", "steps": 25, "cfg": 5, "sampler_name": "euler_ancestral", "vae": "integrated" } def sync_checkpoints(): checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints') os.makedirs(checkpoints_dir, exist_ok=True) # Load all JSON data files keyed by checkpoint_path json_data_by_path = {} for filename in os.listdir(checkpoints_dir): if filename.endswith('.json') and not filename.endswith('.template'): file_path = os.path.join(checkpoints_dir, filename) try: with open(file_path, 'r') as f: data = json.load(f) ckpt_path = data.get('checkpoint_path') if ckpt_path: json_data_by_path[ckpt_path] = data except Exception as e: print(f"Error reading checkpoint JSON {filename}: {e}") current_ids = [] dirs = [ (app.config.get('ILLUSTRIOUS_MODELS_DIR', ''), 'Illustrious'), (app.config.get('NOOB_MODELS_DIR', ''), 'Noob'), ] for dirpath, family in dirs: if not dirpath or not os.path.exists(dirpath): continue for f in sorted(os.listdir(dirpath)): if not (f.endswith('.safetensors') or f.endswith('.ckpt')): continue checkpoint_path = f"{family}/{f}" checkpoint_id = checkpoint_path slug = re.sub(r'[^a-zA-Z0-9_]', '_', checkpoint_path.rsplit('.', 1)[0]).lower().strip('_') name_base = f.rsplit('.', 1)[0] friendly_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).strip().title() current_ids.append(checkpoint_id) data = json_data_by_path.get(checkpoint_path, _default_checkpoint_data(checkpoint_path, f)) display_name = data.get('checkpoint_name', f).rsplit('.', 1)[0] display_name = re.sub(r'[^a-zA-Z0-9]+', ' ', display_name).strip().title() or friendly_name ckpt = Checkpoint.query.filter_by(checkpoint_id=checkpoint_id).first() if ckpt: ckpt.name = display_name ckpt.slug = slug ckpt.checkpoint_path = checkpoint_path ckpt.data = data flag_modified(ckpt, "data") if ckpt.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], ckpt.image_path) if not os.path.exists(full_img_path): ckpt.image_path = None else: db.session.add(Checkpoint( checkpoint_id=checkpoint_id, slug=slug, name=display_name, checkpoint_path=checkpoint_path, data=data, )) all_ckpts = Checkpoint.query.all() for ckpt in all_ckpts: if ckpt.checkpoint_id not in current_ids: db.session.delete(ckpt) db.session.commit() DANBOORU_TOOLS = [ { "type": "function", "function": { "name": "search_tags", "description": "Prefix/full-text search for Danbooru tags. Returns rich tag objects ordered by relevance.", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "Search string. Trailing * added automatically."}, "limit": {"type": "integer", "description": "Max results (1-200)", "default": 20}, "category": {"type": "string", "enum": ["general", "artist", "copyright", "character", "meta"], "description": "Optional category filter."} }, "required": ["query"] } } }, { "type": "function", "function": { "name": "validate_tags", "description": "Exact-match validation for a list of tags. Splits into valid, deprecated, and invalid.", "parameters": { "type": "object", "properties": { "tags": {"type": "array", "items": {"type": "string"}, "description": "Tags to validate."} }, "required": ["tags"] } } }, { "type": "function", "function": { "name": "suggest_tags", "description": "Autocomplete-style suggestions for a partial or approximate tag. Sorted by post count.", "parameters": { "type": "object", "properties": { "partial": {"type": "string", "description": "Partial tag or rough approximation."}, "limit": {"type": "integer", "description": "Max suggestions (1-50)", "default": 10}, "category": {"type": "string", "enum": ["general", "artist", "copyright", "character", "meta"], "description": "Optional category filter."} }, "required": ["partial"] } } } ] async def _run_mcp_tool(name, arguments): server_params = StdioServerParameters( command="docker", args=["run", "--rm", "-i", "danbooru-mcp:latest"], ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() result = await session.call_tool(name, arguments) return result.content[0].text def call_mcp_tool(name, arguments): try: return asyncio.run(_run_mcp_tool(name, arguments)) except Exception as e: print(f"MCP Tool Error: {e}") return json.dumps({"error": str(e)}) def load_prompt(filename): path = os.path.join('data/prompts', filename) if os.path.exists(path): with open(path, 'r') as f: return f.read() return None def call_llm(prompt, system_prompt="You are a creative assistant."): settings = Settings.query.first() if not settings: raise ValueError("Settings not configured.") is_local = settings.llm_provider != 'openrouter' if not is_local: if not settings.openrouter_api_key: raise ValueError("OpenRouter API Key not configured. Please configure it in Settings.") url = "https://openrouter.ai/api/v1/chat/completions" headers = { "Authorization": f"Bearer {settings.openrouter_api_key}", "Content-Type": "application/json", "HTTP-Referer": request.url_root, "X-Title": "Character Browser" } model = settings.openrouter_model or 'google/gemini-2.0-flash-001' else: # Local provider (Ollama or LMStudio) if not settings.local_base_url: raise ValueError(f"{settings.llm_provider.title()} Base URL not configured.") url = f"{settings.local_base_url.rstrip('/')}/chat/completions" headers = {"Content-Type": "application/json"} model = settings.local_model if not model: raise ValueError(f"No local model selected for {settings.llm_provider.title()}. Please select one in Settings.") messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ] max_turns = 10 use_tools = True format_retries = 3 # retries allowed for unexpected response format while max_turns > 0: max_turns -= 1 data = { "model": model, "messages": messages, } # Only add tools if supported/requested if use_tools: data["tools"] = DANBOORU_TOOLS data["tool_choice"] = "auto" try: response = requests.post(url, headers=headers, json=data) # If 400 Bad Request and we were using tools, try once without tools if response.status_code == 400 and use_tools: print(f"LLM Provider {settings.llm_provider} rejected tools. Retrying without tool calling...") use_tools = False max_turns += 1 # Reset turn for the retry continue response.raise_for_status() result = response.json() # Validate expected OpenAI-compatible response shape if 'choices' not in result or not result['choices']: raise KeyError('choices') message = result['choices'][0].get('message') if message is None: raise KeyError('message') if message.get('tool_calls'): messages.append(message) for tool_call in message['tool_calls']: name = tool_call['function']['name'] args = json.loads(tool_call['function']['arguments']) print(f"Executing MCP tool: {name}({args})") tool_result = call_mcp_tool(name, args) messages.append({ "role": "tool", "tool_call_id": tool_call['id'], "name": name, "content": tool_result }) continue return message['content'] except requests.exceptions.RequestException as e: error_body = "" try: error_body = f" - Body: {response.text}" except: pass raise RuntimeError(f"LLM API request failed: {str(e)}{error_body}") from e except (KeyError, IndexError) as e: # Log the raw response to help diagnose the issue raw = "" try: raw = response.text[:500] except: pass print(f"Unexpected LLM response format (key={e}). Raw response: {raw}") if format_retries > 0: format_retries -= 1 max_turns += 1 # don't burn a turn on a format error # Ask the model to try again with the correct format messages.append({ "role": "user", "content": ( "Your previous response was not in the expected format. " "Please respond with valid JSON only, exactly as specified in the system prompt. " "Do not include any explanation or markdown — only the raw JSON object." ) }) print(f"Retrying after format error ({format_retries} retries left)…") continue raise RuntimeError(f"Unexpected LLM response format after retries: {str(e)}") from e raise RuntimeError("LLM tool calling loop exceeded maximum turns") @app.route('/get_openrouter_models', methods=['POST']) def get_openrouter_models(): api_key = request.form.get('api_key') if not api_key: return {'error': 'API key is required'}, 400 headers = {"Authorization": f"Bearer {api_key}"} try: response = requests.get("https://openrouter.ai/api/v1/models", headers=headers) response.raise_for_status() models = response.json().get('data', []) # Return simplified list of models return {'models': [{'id': m['id'], 'name': m.get('name', m['id'])} for m in models]} except Exception as e: return {'error': str(e)}, 500 @app.route('/get_local_models', methods=['POST']) def get_local_models(): base_url = request.form.get('base_url') if not base_url: return {'error': 'Base URL is required'}, 400 try: response = requests.get(f"{base_url.rstrip('/')}/models") response.raise_for_status() models = response.json().get('data', []) # Ollama/LMStudio often follow the same structure as OpenAI return {'models': [{'id': m['id'], 'name': m.get('name', m['id'])} for m in models]} except Exception as e: return {'error': str(e)}, 500 @app.route('/settings', methods=['GET', 'POST']) def settings(): settings = Settings.query.first() if not settings: settings = Settings() db.session.add(settings) db.session.commit() if request.method == 'POST': settings.llm_provider = request.form.get('llm_provider', 'openrouter') settings.openrouter_api_key = request.form.get('api_key') settings.openrouter_model = request.form.get('model') settings.local_base_url = request.form.get('local_base_url') settings.local_model = request.form.get('local_model') db.session.commit() flash('Settings updated successfully!') return redirect(url_for('settings')) return render_template('settings.html', settings=settings) @app.route('/') def index(): characters = Character.query.order_by(Character.name).all() return render_template('index.html', characters=characters) @app.route('/rescan', methods=['POST']) def rescan(): sync_characters() flash('Database synced with character files.') return redirect(url_for('index')) def build_extras_prompt(actions, outfits, scenes, styles, detailers): """Combine positive prompt text from all selected category items.""" parts = [] for action in actions: data = action.data lora = data.get('lora', {}) if lora.get('lora_triggers'): parts.append(lora['lora_triggers']) parts.extend(data.get('tags', [])) for key in ['full_body', 'additional']: val = data.get('action', {}).get(key) if val: parts.append(val) for outfit in outfits: data = outfit.data wardrobe = data.get('wardrobe', {}) for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'accessories']: val = wardrobe.get(key) if val: parts.append(val) lora = data.get('lora', {}) if lora.get('lora_triggers'): parts.append(lora['lora_triggers']) parts.extend(data.get('tags', [])) for scene in scenes: data = scene.data scene_fields = data.get('scene', {}) for key in ['background', 'foreground', 'lighting']: val = scene_fields.get(key) if val: parts.append(val) lora = data.get('lora', {}) if lora.get('lora_triggers'): parts.append(lora['lora_triggers']) parts.extend(data.get('tags', [])) for style in styles: data = style.data style_fields = data.get('style', {}) if style_fields.get('artist_name'): parts.append(f"by {style_fields['artist_name']}") if style_fields.get('artistic_style'): parts.append(style_fields['artistic_style']) lora = data.get('lora', {}) if lora.get('lora_triggers'): parts.append(lora['lora_triggers']) for detailer in detailers: data = detailer.data if data.get('prompt'): parts.append(data['prompt']) lora = data.get('lora', {}) if lora.get('lora_triggers'): parts.append(lora['lora_triggers']) return ", ".join(p for p in parts if p) @app.route('/generator', methods=['GET', 'POST']) def generator(): characters = Character.query.order_by(Character.name).all() checkpoints = get_available_checkpoints() actions = Action.query.order_by(Action.name).all() outfits = Outfit.query.order_by(Outfit.name).all() scenes = Scene.query.order_by(Scene.name).all() styles = Style.query.order_by(Style.name).all() detailers = Detailer.query.order_by(Detailer.name).all() if not checkpoints: checkpoints = ["Noob/oneObsession_v19Atypical.safetensors"] if request.method == 'POST': char_slug = request.form.get('character') checkpoint = request.form.get('checkpoint') custom_positive = request.form.get('positive_prompt', '') custom_negative = request.form.get('negative_prompt', '') client_id = request.form.get('client_id') action_slugs = request.form.getlist('action_slugs') outfit_slugs = request.form.getlist('outfit_slugs') scene_slugs = request.form.getlist('scene_slugs') style_slugs = request.form.getlist('style_slugs') detailer_slugs = request.form.getlist('detailer_slugs') override_prompt = request.form.get('override_prompt', '').strip() width = request.form.get('width') or 1024 height = request.form.get('height') or 1024 character = Character.query.filter_by(slug=char_slug).first_or_404() sel_actions = Action.query.filter(Action.slug.in_(action_slugs)).all() if action_slugs else [] sel_outfits = Outfit.query.filter(Outfit.slug.in_(outfit_slugs)).all() if outfit_slugs else [] sel_scenes = Scene.query.filter(Scene.slug.in_(scene_slugs)).all() if scene_slugs else [] sel_styles = Style.query.filter(Style.slug.in_(style_slugs)).all() if style_slugs else [] sel_detailers = Detailer.query.filter(Detailer.slug.in_(detailer_slugs)).all() if detailer_slugs else [] try: with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # Build base prompts from character defaults prompts = build_prompt(character.data, default_fields=character.default_fields) if override_prompt: prompts["main"] = override_prompt else: extras = build_extras_prompt(sel_actions, sel_outfits, sel_scenes, sel_styles, sel_detailers) combined = prompts["main"] if extras: combined = f"{combined}, {extras}" if custom_positive: combined = f"{combined}, {custom_positive}" prompts["main"] = combined # Prepare workflow - first selected item per category supplies its LoRA slot ckpt_obj = Checkpoint.query.filter_by(checkpoint_path=checkpoint).first() if checkpoint else None workflow = _prepare_workflow( workflow, character, prompts, checkpoint, custom_negative, outfit=sel_outfits[0] if sel_outfits else None, action=sel_actions[0] if sel_actions else None, style=sel_styles[0] if sel_styles else None, detailer=sel_detailers[0] if sel_detailers else None, scene=sel_scenes[0] if sel_scenes else None, width=width, height=height, checkpoint_data=ckpt_obj.data if ckpt_obj else None, ) print(f"Queueing generator prompt for {character.character_id}") prompt_response = queue_prompt(workflow, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} flash("Generation started...") max_retries = 120 while max_retries > 0: history = get_history(prompt_id) if prompt_id in history: outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{character.slug}") os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) relative_path = f"characters/{character.slug}/{filename}" return render_template('generator.html', characters=characters, checkpoints=checkpoints, actions=actions, outfits=outfits, scenes=scenes, styles=styles, detailers=detailers, generated_image=relative_path, selected_char=char_slug, selected_ckpt=checkpoint) time.sleep(2) max_retries -= 1 flash("Generation timed out.") except Exception as e: print(f"Generator error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error: {str(e)}") return render_template('generator.html', characters=characters, checkpoints=checkpoints, actions=actions, outfits=outfits, scenes=scenes, styles=styles, detailers=detailers) @app.route('/generator/finalize//', methods=['POST']) def finalize_generator(slug, prompt_id): character = Character.query.filter_by(slug=slug).first_or_404() try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{slug}") os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) relative_path = f"characters/{slug}/{filename}" return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 @app.route('/generator/preview_prompt', methods=['POST']) def generator_preview_prompt(): char_slug = request.form.get('character') if not char_slug: return {'error': 'No character selected'}, 400 character = Character.query.filter_by(slug=char_slug).first() if not character: return {'error': 'Character not found'}, 404 action_slugs = request.form.getlist('action_slugs') outfit_slugs = request.form.getlist('outfit_slugs') scene_slugs = request.form.getlist('scene_slugs') style_slugs = request.form.getlist('style_slugs') detailer_slugs = request.form.getlist('detailer_slugs') custom_positive = request.form.get('positive_prompt', '') sel_actions = Action.query.filter(Action.slug.in_(action_slugs)).all() if action_slugs else [] sel_outfits = Outfit.query.filter(Outfit.slug.in_(outfit_slugs)).all() if outfit_slugs else [] sel_scenes = Scene.query.filter(Scene.slug.in_(scene_slugs)).all() if scene_slugs else [] sel_styles = Style.query.filter(Style.slug.in_(style_slugs)).all() if style_slugs else [] sel_detailers = Detailer.query.filter(Detailer.slug.in_(detailer_slugs)).all() if detailer_slugs else [] prompts = build_prompt(character.data, default_fields=character.default_fields) extras = build_extras_prompt(sel_actions, sel_outfits, sel_scenes, sel_styles, sel_detailers) combined = prompts["main"] if extras: combined = f"{combined}, {extras}" if custom_positive: combined = f"{combined}, {custom_positive}" return {'prompt': combined} @app.route('/character/') def detail(slug): character = Character.query.filter_by(slug=slug).first_or_404() # Load state from session preferences = session.get(f'prefs_{slug}') preview_image = session.get(f'preview_{slug}') return render_template('detail.html', character=character, preferences=preferences, preview_image=preview_image) @app.route('/create', methods=['GET', 'POST']) def create_character(): if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename', '').strip() prompt = request.form.get('prompt', '') use_llm = request.form.get('use_llm') == 'on' # Auto-generate slug from name if not provided if not slug: slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_') # Validate slug safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: safe_slug = 'character' # Find available filename (increment if exists) base_slug = safe_slug counter = 1 while os.path.exists(os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json")): safe_slug = f"{base_slug}_{counter}" counter += 1 # Check if LLM generation is requested if use_llm: if not prompt: flash("Description is required when AI generation is enabled.") return redirect(request.url) # Generate JSON with LLM system_prompt = load_prompt('character_system.txt') if not system_prompt: flash("System prompt file not found.") return redirect(request.url) try: llm_response = call_llm(f"Create a character profile for '{name}' based on this description: {prompt}", system_prompt) # Clean response (remove markdown if present) clean_json = llm_response.replace('```json', '').replace('```', '').strip() char_data = json.loads(clean_json) # Enforce IDs char_data['character_id'] = safe_slug char_data['character_name'] = name except Exception as e: print(f"LLM error: {e}") flash(f"Failed to generate character profile: {e}") return redirect(request.url) else: # Create blank character template char_data = { "character_id": safe_slug, "character_name": name, "identity": { "base_specs": "", "hair": "", "eyes": "", "hands": "", "arms": "", "torso": "", "pelvis": "", "legs": "", "feet": "", "extra": "" }, "defaults": { "expression": "", "pose": "", "scene": "" }, "wardrobe": { "full_body": "", "headwear": "", "top": "", "bottom": "", "legwear": "", "footwear": "", "hands": "", "accessories": "" }, "styles": { "aesthetic": "", "primary_color": "", "secondary_color": "", "tertiary_color": "" }, "lora": { "lora_name": "", "lora_weight": 1.0, "lora_triggers": "" }, "tags": [] } try: # Save file file_path = os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(char_data, f, indent=2) # Add to DB new_char = Character( character_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=char_data ) db.session.add(new_char) db.session.commit() flash('Character created successfully!') return redirect(url_for('detail', slug=safe_slug)) except Exception as e: print(f"Save error: {e}") flash(f"Failed to create character: {e}") return redirect(request.url) return render_template('create.html') @app.route('/character//edit', methods=['GET', 'POST']) def edit_character(slug): character = Character.query.filter_by(slug=slug).first_or_404() loras = get_available_loras() char_looks = Look.query.filter_by(character_id=character.character_id).order_by(Look.name).all() if request.method == 'POST': try: # 1. Update basic fields character.name = request.form.get('character_name') # 2. Rebuild the data dictionary new_data = character.data.copy() new_data['character_name'] = character.name # Update nested sections (non-wardrobe) for section in ['identity', 'defaults', 'styles', 'lora']: if section in new_data: for key in new_data[section]: form_key = f"{section}_{key}" if form_key in request.form: val = request.form.get(form_key) # Handle numeric weight if key == 'lora_weight': try: val = float(val) except: val = 1.0 new_data[section][key] = val # LoRA weight randomization bounds (new fields not present in existing JSON) for bound in ['lora_weight_min', 'lora_weight_max']: val_str = request.form.get(f'lora_{bound}', '').strip() if val_str: try: new_data.setdefault('lora', {})[bound] = float(val_str) except ValueError: pass else: new_data.setdefault('lora', {}).pop(bound, None) # Handle wardrobe - support both nested and flat formats wardrobe = new_data.get('wardrobe', {}) if 'default' in wardrobe and isinstance(wardrobe.get('default'), dict): # New nested format - update each outfit for outfit_name in wardrobe.keys(): for key in wardrobe[outfit_name].keys(): form_key = f"wardrobe_{outfit_name}_{key}" if form_key in request.form: wardrobe[outfit_name][key] = request.form.get(form_key) new_data['wardrobe'] = wardrobe else: # Legacy flat format if 'wardrobe' in new_data: for key in new_data['wardrobe'].keys(): form_key = f"wardrobe_{key}" if form_key in request.form: new_data['wardrobe'][key] = request.form.get(form_key) # Update Tags (comma separated string to list) tags_raw = request.form.get('tags', '') new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t] character.data = new_data flag_modified(character, "data") # 3. Write back to JSON file # Use the filename we stored during sync, or fallback to a sanitized ID char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Character profile updated successfully!') return redirect(url_for('detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('edit.html', character=character, loras=loras, char_looks=char_looks) @app.route('/character//outfit/switch', methods=['POST']) def switch_outfit(slug): """Switch the active outfit for a character.""" character = Character.query.filter_by(slug=slug).first_or_404() outfit_name = request.form.get('outfit', 'default') # Validate outfit exists available_outfits = character.get_available_outfits() if outfit_name in available_outfits: character.active_outfit = outfit_name db.session.commit() flash(f'Switched to "{outfit_name}" outfit.') else: flash(f'Outfit "{outfit_name}" not found.', 'error') return redirect(url_for('detail', slug=slug)) @app.route('/character//outfit/add', methods=['POST']) def add_outfit(slug): """Add a new outfit to a character.""" character = Character.query.filter_by(slug=slug).first_or_404() outfit_name = request.form.get('outfit_name', '').strip() if not outfit_name: flash('Outfit name cannot be empty.', 'error') return redirect(url_for('edit_character', slug=slug)) # Sanitize outfit name for use as key safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', outfit_name.lower()) # Get wardrobe data wardrobe = character.data.get('wardrobe', {}) # Ensure wardrobe is in new nested format if 'default' not in wardrobe or not isinstance(wardrobe.get('default'), dict): # Convert legacy format wardrobe = {'default': wardrobe} # Check if outfit already exists if safe_name in wardrobe: flash(f'Outfit "{safe_name}" already exists.', 'error') return redirect(url_for('edit_character', slug=slug)) # Create new outfit (copy from default as template) default_outfit = wardrobe.get('default', { 'headwear': '', 'top': '', 'legwear': '', 'footwear': '', 'hands': '', 'accessories': '' }) wardrobe[safe_name] = default_outfit.copy() # Update character data character.data['wardrobe'] = wardrobe flag_modified(character, 'data') # Save to JSON file char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(character.data, f, indent=2) db.session.commit() flash(f'Added new outfit "{safe_name}".') return redirect(url_for('edit_character', slug=slug)) @app.route('/character//outfit/delete', methods=['POST']) def delete_outfit(slug): """Delete an outfit from a character.""" character = Character.query.filter_by(slug=slug).first_or_404() outfit_name = request.form.get('outfit', '') wardrobe = character.data.get('wardrobe', {}) # Cannot delete default if outfit_name == 'default': flash('Cannot delete the default outfit.', 'error') return redirect(url_for('edit_character', slug=slug)) if outfit_name not in wardrobe: flash(f'Outfit "{outfit_name}" not found.', 'error') return redirect(url_for('edit_character', slug=slug)) # Delete outfit del wardrobe[outfit_name] character.data['wardrobe'] = wardrobe flag_modified(character, 'data') # Switch active outfit if deleted was active if character.active_outfit == outfit_name: character.active_outfit = 'default' # Save to JSON file char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(character.data, f, indent=2) db.session.commit() flash(f'Deleted outfit "{outfit_name}".') return redirect(url_for('edit_character', slug=slug)) @app.route('/character//outfit/rename', methods=['POST']) def rename_outfit(slug): """Rename an outfit.""" character = Character.query.filter_by(slug=slug).first_or_404() old_name = request.form.get('old_name', '') new_name = request.form.get('new_name', '').strip() if not new_name: flash('New name cannot be empty.', 'error') return redirect(url_for('edit_character', slug=slug)) # Sanitize new name safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', new_name.lower()) wardrobe = character.data.get('wardrobe', {}) if old_name not in wardrobe: flash(f'Outfit "{old_name}" not found.', 'error') return redirect(url_for('edit_character', slug=slug)) if safe_name in wardrobe and safe_name != old_name: flash(f'Outfit "{safe_name}" already exists.', 'error') return redirect(url_for('edit_character', slug=slug)) # Rename (copy to new key, delete old) wardrobe[safe_name] = wardrobe.pop(old_name) character.data['wardrobe'] = wardrobe flag_modified(character, 'data') # Update active outfit if renamed was active if character.active_outfit == old_name: character.active_outfit = safe_name # Save to JSON file char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(character.data, f, indent=2) db.session.commit() flash(f'Renamed outfit "{old_name}" to "{safe_name}".') return redirect(url_for('edit_character', slug=slug)) @app.route('/character//upload', methods=['POST']) def upload_image(slug): character = Character.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create character subfolder char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{slug}") os.makedirs(char_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(char_folder, filename) file.save(file_path) # Store relative path in DB character.image_path = f"characters/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('detail', slug=slug)) @app.route('/character//finalize_generation/', methods=['POST']) def finalize_generation(slug, prompt_id): character = Character.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) # Create character subfolder char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{slug}") os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) print(f"Image saved to: {os.path.abspath(file_path)}") # Handle actions - always save as preview relative_path = f"characters/{slug}/{filename}" session[f'preview_{slug}'] = relative_path session.modified = True # Ensure session is saved for JSON response # If action is 'replace', also update the character's cover image immediately if action == 'replace': character.image_path = relative_path db.session.commit() return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 @app.route('/character//replace_cover_from_preview', methods=['POST']) def replace_cover_from_preview(slug): character = Character.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_{slug}') if preview_path: character.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('detail', slug=slug)) def _log_workflow_prompts(label, workflow): """Print the final assembled ComfyUI prompts in a consistent, readable block.""" sep = "=" * 72 print(f"\n{sep}") print(f" WORKFLOW PROMPTS [{label}]") print(sep) print(f" Checkpoint : {workflow['4']['inputs'].get('ckpt_name', '(not set)')}") print(f" Seed : {workflow['3']['inputs'].get('seed', '(not set)')}") print(f" Resolution : {workflow['5']['inputs'].get('width', '?')} x {workflow['5']['inputs'].get('height', '?')}") print(f" Sampler : {workflow['3']['inputs'].get('sampler_name', '?')} / {workflow['3']['inputs'].get('scheduler', '?')} steps={workflow['3']['inputs'].get('steps', '?')} cfg={workflow['3']['inputs'].get('cfg', '?')}") # LoRA chain summary active_loras = [] for node_id, label_str in [("16", "char/look"), ("17", "outfit"), ("18", "action"), ("19", "style/detail/scene")]: if node_id in workflow: name = workflow[node_id]["inputs"].get("lora_name", "") if name: w = workflow[node_id]["inputs"].get("strength_model", "?") active_loras.append(f"{label_str}:{name.split('/')[-1]}@{w:.3f}" if isinstance(w, float) else f"{label_str}:{name.split('/')[-1]}@{w}") print(f" LoRAs : {' | '.join(active_loras) if active_loras else '(none)'}") print(f" [+] Positive : {workflow['6']['inputs'].get('text', '')}") print(f" [-] Negative : {workflow['7']['inputs'].get('text', '')}") face_text = workflow.get('14', {}).get('inputs', {}).get('text', '') hand_text = workflow.get('15', {}).get('inputs', {}).get('text', '') if face_text: print(f" [F] Face : {face_text}") if hand_text: print(f" [H] Hand : {hand_text}") print(f"{sep}\n") def _prepare_workflow(workflow, character, prompts, checkpoint=None, custom_negative=None, outfit=None, action=None, style=None, detailer=None, scene=None, width=None, height=None, checkpoint_data=None, look=None, fixed_seed=None): # 1. Update prompts using replacement to preserve embeddings workflow["6"]["inputs"]["text"] = workflow["6"]["inputs"]["text"].replace("{{POSITIVE_PROMPT}}", prompts["main"]) if custom_negative: workflow["7"]["inputs"]["text"] = f"{workflow['7']['inputs']['text']}, {custom_negative}" if "14" in workflow: workflow["14"]["inputs"]["text"] = workflow["14"]["inputs"]["text"].replace("{{FACE_PROMPT}}", prompts["face"]) if "15" in workflow: workflow["15"]["inputs"]["text"] = workflow["15"]["inputs"]["text"].replace("{{HAND_PROMPT}}", prompts["hand"]) # 2. Update Checkpoint if checkpoint: workflow["4"]["inputs"]["ckpt_name"] = checkpoint # 3. Handle LoRAs - Node 16 for character, Node 17 for outfit, Node 18 for action, Node 19 for style/detailer # Start with direct checkpoint connections model_source = ["4", 0] clip_source = ["4", 1] # Look negative prompt (applied before character LoRA) if look: look_negative = look.data.get('negative', '') if look_negative: workflow["7"]["inputs"]["text"] = f"{look_negative}, {workflow['7']['inputs']['text']}" # Character LoRA (Node 16) — look LoRA overrides character LoRA when present if look: char_lora_data = look.data.get('lora', {}) else: char_lora_data = character.data.get('lora', {}) if character else {} char_lora_name = char_lora_data.get('lora_name') if char_lora_name and "16" in workflow: _w16 = _resolve_lora_weight(char_lora_data) workflow["16"]["inputs"]["lora_name"] = char_lora_name workflow["16"]["inputs"]["strength_model"] = _w16 workflow["16"]["inputs"]["strength_clip"] = _w16 workflow["16"]["inputs"]["model"] = ["4", 0] # From checkpoint workflow["16"]["inputs"]["clip"] = ["4", 1] # From checkpoint model_source = ["16", 0] clip_source = ["16", 1] print(f"Character LoRA: {char_lora_name} @ {_w16}") # Outfit LoRA (Node 17) - chains from character LoRA or checkpoint outfit_lora_data = outfit.data.get('lora', {}) if outfit else {} outfit_lora_name = outfit_lora_data.get('lora_name') if outfit_lora_name and "17" in workflow: _w17 = _resolve_lora_weight({**{'lora_weight': 0.8}, **outfit_lora_data}) workflow["17"]["inputs"]["lora_name"] = outfit_lora_name workflow["17"]["inputs"]["strength_model"] = _w17 workflow["17"]["inputs"]["strength_clip"] = _w17 # Chain from character LoRA (node 16) or checkpoint (node 4) workflow["17"]["inputs"]["model"] = model_source workflow["17"]["inputs"]["clip"] = clip_source model_source = ["17", 0] clip_source = ["17", 1] print(f"Outfit LoRA: {outfit_lora_name} @ {_w17}") # Action LoRA (Node 18) - chains from previous LoRA or checkpoint action_lora_data = action.data.get('lora', {}) if action else {} action_lora_name = action_lora_data.get('lora_name') if action_lora_name and "18" in workflow: _w18 = _resolve_lora_weight(action_lora_data) workflow["18"]["inputs"]["lora_name"] = action_lora_name workflow["18"]["inputs"]["strength_model"] = _w18 workflow["18"]["inputs"]["strength_clip"] = _w18 # Chain from previous source workflow["18"]["inputs"]["model"] = model_source workflow["18"]["inputs"]["clip"] = clip_source model_source = ["18", 0] clip_source = ["18", 1] print(f"Action LoRA: {action_lora_name} @ {_w18}") # Style/Detailer/Scene LoRA (Node 19) - chains from previous LoRA or checkpoint # Priority: Style > Detailer > Scene (Scene LoRAs are rare but supported) target_obj = style or detailer or scene style_lora_data = target_obj.data.get('lora', {}) if target_obj else {} style_lora_name = style_lora_data.get('lora_name') if style_lora_name and "19" in workflow: _w19 = _resolve_lora_weight(style_lora_data) workflow["19"]["inputs"]["lora_name"] = style_lora_name workflow["19"]["inputs"]["strength_model"] = _w19 workflow["19"]["inputs"]["strength_clip"] = _w19 # Chain from previous source workflow["19"]["inputs"]["model"] = model_source workflow["19"]["inputs"]["clip"] = clip_source model_source = ["19", 0] clip_source = ["19", 1] print(f"Style/Detailer LoRA: {style_lora_name} @ {_w19}") # Apply connections to all model/clip consumers workflow["3"]["inputs"]["model"] = model_source workflow["11"]["inputs"]["model"] = model_source workflow["13"]["inputs"]["model"] = model_source workflow["6"]["inputs"]["clip"] = clip_source workflow["7"]["inputs"]["clip"] = clip_source workflow["11"]["inputs"]["clip"] = clip_source workflow["13"]["inputs"]["clip"] = clip_source workflow["14"]["inputs"]["clip"] = clip_source workflow["15"]["inputs"]["clip"] = clip_source # 4. Randomize seeds (or use a fixed seed for reproducible batches like Strengths Gallery) gen_seed = fixed_seed if fixed_seed is not None else random.randint(1, 10**15) workflow["3"]["inputs"]["seed"] = gen_seed if "11" in workflow: workflow["11"]["inputs"]["seed"] = gen_seed if "13" in workflow: workflow["13"]["inputs"]["seed"] = gen_seed # 5. Set image dimensions if "5" in workflow: if width: workflow["5"]["inputs"]["width"] = int(width) if height: workflow["5"]["inputs"]["height"] = int(height) # 6. Apply checkpoint-specific settings (steps, cfg, sampler, base prompts, VAE) if checkpoint_data: workflow = _apply_checkpoint_settings(workflow, checkpoint_data) # 7. Sync sampler/scheduler from main KSampler to adetailer nodes sampler_name = workflow["3"]["inputs"].get("sampler_name") scheduler = workflow["3"]["inputs"].get("scheduler") for node_id in ["11", "13"]: if node_id in workflow: if sampler_name: workflow[node_id]["inputs"]["sampler_name"] = sampler_name if scheduler: workflow[node_id]["inputs"]["scheduler"] = scheduler # 8. Cross-deduplicate: remove tags shared between positive and negative pos_text, neg_text = _cross_dedup_prompts( workflow["6"]["inputs"]["text"], workflow["7"]["inputs"]["text"] ) workflow["6"]["inputs"]["text"] = pos_text workflow["7"]["inputs"]["text"] = neg_text # 9. Final prompt debug — logged after all transformations are complete _log_workflow_prompts("_prepare_workflow", workflow) return workflow def _get_default_checkpoint(): """Return (checkpoint_path, checkpoint_data) from the session default, or (None, None).""" ckpt_path = session.get('default_checkpoint') if not ckpt_path: return None, None ckpt = Checkpoint.query.filter_by(checkpoint_path=ckpt_path).first() if not ckpt: return None, None return ckpt.checkpoint_path, ckpt.data or {} def _queue_generation(character, action='preview', selected_fields=None, client_id=None): # 1. Load workflow with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # 2. Build prompts with active outfit prompts = build_prompt(character.data, selected_fields, character.default_fields, character.active_outfit) # 3. Prepare workflow ckpt_path, ckpt_data = _get_default_checkpoint() workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_path, checkpoint_data=ckpt_data) return queue_prompt(workflow, client_id=client_id) @app.route('/get_missing_characters') def get_missing_characters(): missing = Character.query.filter((Character.image_path == None) | (Character.image_path == '')).all() return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]} @app.route('/clear_all_covers', methods=['POST']) def clear_all_covers(): characters = Character.query.all() for char in characters: char.image_path = None db.session.commit() return {'success': True} @app.route('/generate_missing', methods=['POST']) def generate_missing(): # Query fresh from database for each check to avoid stale session issues def get_missing_count(): return Character.query.filter((Character.image_path == None) | (Character.image_path == '')).count() if get_missing_count() == 0: flash("No characters missing cover images.") return redirect(url_for('index')) success_count = 0 processed = 0 # Keep generating until no more missing while get_missing_count() > 0: # Get the next character in alphabetical order character = Character.query.filter( (Character.image_path == None) | (Character.image_path == '') ).order_by(Character.name).first() if not character: break character_slug = character.slug character_name = character.name processed += 1 try: print(f"Batch generating for: {character_name}") prompt_response = _queue_generation(character, action='replace') prompt_id = prompt_response['prompt_id'] # Simple synchronous wait for each max_retries = 120 while max_retries > 0: history = get_history(prompt_id) if prompt_id in history: outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{character_slug}") os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) # Re-query the character to ensure it's attached to the session character_to_update = Character.query.filter_by(slug=character_slug).first() if character_to_update: character_to_update.image_path = f"characters/{character_slug}/{filename}" db.session.commit() print(f"Saved cover for {character_name}: {character_to_update.image_path}") success_count += 1 break break time.sleep(2) max_retries -= 1 except Exception as e: print(f"Error generating for {character_name}: {e}") db.session.rollback() # Rollback on error to ensure clean state flash(f"Batch generation complete. Generated {success_count} images.") return redirect(url_for('index')) @app.route('/check_status/') def check_status(prompt_id): try: history = get_history(prompt_id) if prompt_id in history: return {'status': 'finished'} return {'status': 'pending'} except Exception: return {'status': 'error'}, 500 @app.route('/character//generate', methods=['POST']) def generate_image(slug): character = Character.query.filter_by(slug=slug).first_or_404() try: # Get action type action = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Save preferences session[f'prefs_{slug}'] = selected_fields # Queue generation using helper prompt_response = _queue_generation(character, action, selected_fields, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] # Return JSON if AJAX request if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('detail', slug=slug)) @app.route('/character//save_defaults', methods=['POST']) def save_defaults(slug): character = Character.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') character.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this character!') return redirect(url_for('detail', slug=slug)) @app.route('/get_missing_outfits') def get_missing_outfits(): missing = Outfit.query.filter((Outfit.image_path == None) | (Outfit.image_path == '')).all() return {'missing': [{'slug': o.slug, 'name': o.name} for o in missing]} @app.route('/clear_all_outfit_covers', methods=['POST']) def clear_all_outfit_covers(): outfits = Outfit.query.all() for outfit in outfits: outfit.image_path = None db.session.commit() return {'success': True} @app.route('/get_missing_actions') def get_missing_actions(): missing = Action.query.filter((Action.image_path == None) | (Action.image_path == '')).all() return {'missing': [{'slug': a.slug, 'name': a.name} for a in missing]} @app.route('/clear_all_action_covers', methods=['POST']) def clear_all_action_covers(): actions = Action.query.all() for action in actions: action.image_path = None db.session.commit() return {'success': True} @app.route('/get_missing_scenes') def get_missing_scenes(): missing = Scene.query.filter((Scene.image_path == None) | (Scene.image_path == '')).all() return {'missing': [{'slug': s.slug, 'name': s.name} for s in missing]} @app.route('/clear_all_scene_covers', methods=['POST']) def clear_all_scene_covers(): scenes = Scene.query.all() for scene in scenes: scene.image_path = None db.session.commit() return {'success': True} # ============ OUTFIT ROUTES ============ @app.route('/outfits') def outfits_index(): outfits = Outfit.query.order_by(Outfit.name).all() return render_template('outfits/index.html', outfits=outfits) @app.route('/outfits/rescan', methods=['POST']) def rescan_outfits(): sync_outfits() flash('Database synced with outfit files.') return redirect(url_for('outfits_index')) @app.route('/outfits/bulk_create', methods=['POST']) def bulk_create_outfits_from_loras(): clothing_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Clothing/' if not os.path.exists(clothing_lora_dir): flash('Clothing LoRA directory not found.', 'error') return redirect(url_for('outfits_index')) overwrite = request.form.get('overwrite') == 'true' created_count = 0 skipped_count = 0 overwritten_count = 0 system_prompt = load_prompt('outfit_system.txt') if not system_prompt: flash('Outfit system prompt file not found.', 'error') return redirect(url_for('outfits_index')) for filename in os.listdir(clothing_lora_dir): if not filename.endswith('.safetensors'): continue name_base = filename.rsplit('.', 1)[0] outfit_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower()) outfit_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title() json_filename = f"{outfit_id}.json" json_path = os.path.join(app.config['CLOTHING_DIR'], json_filename) is_existing = os.path.exists(json_path) if is_existing and not overwrite: skipped_count += 1 continue html_filename = f"{name_base}.html" html_path = os.path.join(clothing_lora_dir, html_filename) html_content = "" if os.path.exists(html_path): try: with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf: html_raw = hf.read() clean_html = re.sub(r']*>.*?', '', html_raw, flags=re.DOTALL) clean_html = re.sub(r']*>.*?', '', clean_html, flags=re.DOTALL) clean_html = re.sub(r']*>', '', clean_html) clean_html = re.sub(r'<[^>]+>', ' ', clean_html) html_content = ' '.join(clean_html.split()) except Exception as e: print(f"Error reading HTML {html_filename}: {e}") try: print(f"Asking LLM to describe outfit: {outfit_name}") prompt = f"Create an outfit profile for a clothing LoRA based on the filename: '{filename}'" if html_content: prompt += f"\n\nHere is descriptive text extracted from an associated HTML file:\n###\n{html_content[:3000]}\n###" llm_response = call_llm(prompt, system_prompt) clean_json = llm_response.replace('```json', '').replace('```', '').strip() outfit_data = json.loads(clean_json) outfit_data['outfit_id'] = outfit_id outfit_data['outfit_name'] = outfit_name if 'lora' not in outfit_data: outfit_data['lora'] = {} outfit_data['lora']['lora_name'] = f"Illustrious/Clothing/{filename}" if not outfit_data['lora'].get('lora_triggers'): outfit_data['lora']['lora_triggers'] = name_base if outfit_data['lora'].get('lora_weight') is None: outfit_data['lora']['lora_weight'] = 0.8 if outfit_data['lora'].get('lora_weight_min') is None: outfit_data['lora']['lora_weight_min'] = 0.7 if outfit_data['lora'].get('lora_weight_max') is None: outfit_data['lora']['lora_weight_max'] = 1.0 os.makedirs(app.config['CLOTHING_DIR'], exist_ok=True) with open(json_path, 'w') as f: json.dump(outfit_data, f, indent=2) if is_existing: overwritten_count += 1 else: created_count += 1 time.sleep(0.5) except Exception as e: print(f"Error creating outfit for {filename}: {e}") if created_count > 0 or overwritten_count > 0: sync_outfits() msg = f'Successfully processed outfits: {created_count} created, {overwritten_count} overwritten.' if skipped_count > 0: msg += f' (Skipped {skipped_count} existing)' flash(msg) else: flash(f'No outfits created or overwritten. {skipped_count} existing entries found.') return redirect(url_for('outfits_index')) @app.route('/outfit/') def outfit_detail(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() # Load state from session preferences = session.get(f'prefs_outfit_{slug}') preview_image = session.get(f'preview_outfit_{slug}') selected_character = session.get(f'char_outfit_{slug}') # List existing preview images upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"outfits/{slug}") existing_previews = [] if os.path.isdir(upload_dir): files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True) existing_previews = [f"outfits/{slug}/{f}" for f in files] return render_template('outfits/detail.html', outfit=outfit, characters=characters, preferences=preferences, preview_image=preview_image, selected_character=selected_character, existing_previews=existing_previews) @app.route('/outfit//edit', methods=['GET', 'POST']) def edit_outfit(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() loras = get_available_clothing_loras() # Use clothing LoRAs for outfits if request.method == 'POST': try: # 1. Update basic fields outfit.name = request.form.get('outfit_name') # 2. Rebuild the data dictionary new_data = outfit.data.copy() new_data['outfit_name'] = outfit.name # Update outfit_id if provided new_outfit_id = request.form.get('outfit_id', outfit.outfit_id) new_data['outfit_id'] = new_outfit_id # Update wardrobe section if 'wardrobe' in new_data: for key in new_data['wardrobe'].keys(): form_key = f"wardrobe_{key}" if form_key in request.form: new_data['wardrobe'][key] = request.form.get(form_key) # Update lora section if 'lora' in new_data: for key in new_data['lora'].keys(): form_key = f"lora_{key}" if form_key in request.form: val = request.form.get(form_key) if key == 'lora_weight': try: val = float(val) except: val = 0.8 new_data['lora'][key] = val # LoRA weight randomization bounds for bound in ['lora_weight_min', 'lora_weight_max']: val_str = request.form.get(f'lora_{bound}', '').strip() if val_str: try: new_data.setdefault('lora', {})[bound] = float(val_str) except ValueError: pass else: new_data.setdefault('lora', {}).pop(bound, None) # Update Tags (comma separated string to list) tags_raw = request.form.get('tags', '') new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t] outfit.data = new_data flag_modified(outfit, "data") # 3. Write back to JSON file outfit_file = outfit.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', outfit.outfit_id)}.json" file_path = os.path.join(app.config['CLOTHING_DIR'], outfit_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Outfit profile updated successfully!') return redirect(url_for('outfit_detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('outfits/edit.html', outfit=outfit, loras=loras) @app.route('/outfit//upload', methods=['POST']) def upload_outfit_image(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create outfit subfolder outfit_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"outfits/{slug}") os.makedirs(outfit_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(outfit_folder, filename) file.save(file_path) # Store relative path in DB outfit.image_path = f"outfits/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('outfit_detail', slug=slug)) @app.route('/outfit//generate', methods=['POST']) def generate_outfit_image(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() try: # Get action type action = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Get selected character (if any) character_slug = request.form.get('character_slug', '') character = None # Handle random character selection if character_slug == '__random__': all_characters = Character.query.all() if all_characters: character = random.choice(all_characters) character_slug = character.slug elif character_slug: character = Character.query.filter_by(slug=character_slug).first() # Save preferences session[f'prefs_outfit_{slug}'] = selected_fields session[f'char_outfit_{slug}'] = character_slug # Build combined data for prompt building if character: # Combine character identity/defaults with outfit wardrobe combined_data = { 'character_id': character.character_id, 'identity': character.data.get('identity', {}), 'defaults': character.data.get('defaults', {}), 'wardrobe': outfit.data.get('wardrobe', {}), # Use outfit's wardrobe 'styles': character.data.get('styles', {}), # Use character's styles 'lora': outfit.data.get('lora', {}), # Use outfit's lora 'tags': outfit.data.get('tags', []) } # When character is selected, merge character identity fields into selected_fields # so they are included in the prompt if selected_fields: # Add character identity fields to selection if not already present for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']: if character.data.get('identity', {}).get(key): field_key = f'identity::{key}' if field_key not in selected_fields: selected_fields.append(field_key) # Add expression and pose, but NOT scene (outfit previews use simple background) for key in ['expression', 'pose']: if character.data.get('defaults', {}).get(key): field_key = f'defaults::{key}' if field_key not in selected_fields: selected_fields.append(field_key) # Always include character name if 'special::name' not in selected_fields: selected_fields.append('special::name') else: # No explicit field selection (e.g. batch generation) — build a selection # that includes identity + wardrobe + name + lora triggers, but NOT character # defaults (expression, pose, scene), so outfit covers stay generic. for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']: if character.data.get('identity', {}).get(key): selected_fields.append(f'identity::{key}') outfit_wardrobe = outfit.data.get('wardrobe', {}) for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']: if outfit_wardrobe.get(key): selected_fields.append(f'wardrobe::{key}') selected_fields.append('special::name') if outfit.data.get('lora', {}).get('lora_triggers'): selected_fields.append('lora::lora_triggers') default_fields = character.default_fields else: # Outfit only - no character combined_data = { 'character_id': outfit.outfit_id, 'wardrobe': outfit.data.get('wardrobe', {}), 'lora': outfit.data.get('lora', {}), 'tags': outfit.data.get('tags', []) } default_fields = outfit.default_fields # Queue generation with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # Build prompts for combined data prompts = build_prompt(combined_data, selected_fields, default_fields) # Add colored simple background to the main prompt for outfit previews # Use character's primary_color if available if character: primary_color = character.data.get('styles', {}).get('primary_color', '') if primary_color: prompts["main"] = f"{prompts['main']}, {primary_color} simple background" else: prompts["main"] = f"{prompts['main']}, simple background" else: prompts["main"] = f"{prompts['main']}, simple background" # Prepare workflow - pass both character and outfit for dual LoRA support ckpt_path, ckpt_data = _get_default_checkpoint() workflow = _prepare_workflow(workflow, character, prompts, outfit=outfit, checkpoint=ckpt_path, checkpoint_data=ckpt_data) prompt_response = queue_prompt(workflow, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] # Return JSON if AJAX request if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('outfit_detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('outfit_detail', slug=slug)) @app.route('/outfit//finalize_generation/', methods=['POST']) def finalize_outfit_generation(slug, prompt_id): outfit = Outfit.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) # Create outfit subfolder outfit_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"outfits/{slug}") os.makedirs(outfit_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(outfit_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) print(f"Image saved to: {os.path.abspath(file_path)}") # Always save as preview relative_path = f"outfits/{slug}/{filename}" session[f'preview_outfit_{slug}'] = relative_path session.modified = True # Ensure session is saved for JSON response # If action is 'replace', also update the outfit's cover image immediately if action == 'replace': outfit.image_path = relative_path db.session.commit() return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 @app.route('/outfit//replace_cover_from_preview', methods=['POST']) def replace_outfit_cover_from_preview(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_outfit_{slug}') if preview_path: outfit.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('outfit_detail', slug=slug)) @app.route('/outfit/create', methods=['GET', 'POST']) def create_outfit(): if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename', '').strip() prompt = request.form.get('prompt', '') use_llm = request.form.get('use_llm') == 'on' # Auto-generate slug from name if not provided if not slug: slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_') # Validate slug safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: safe_slug = 'outfit' # Find available filename (increment if exists) base_slug = safe_slug counter = 1 while os.path.exists(os.path.join(app.config['CLOTHING_DIR'], f"{safe_slug}.json")): safe_slug = f"{base_slug}_{counter}" counter += 1 # Check if LLM generation is requested if use_llm: if not prompt: flash("Description is required when AI generation is enabled.") return redirect(request.url) # Generate JSON with LLM system_prompt = load_prompt('outfit_system.txt') if not system_prompt: flash("System prompt file not found.") return redirect(request.url) try: llm_response = call_llm(f"Create an outfit profile for '{name}' based on this description: {prompt}", system_prompt) # Clean response (remove markdown if present) clean_json = llm_response.replace('```json', '').replace('```', '').strip() outfit_data = json.loads(clean_json) # Enforce IDs outfit_data['outfit_id'] = safe_slug outfit_data['outfit_name'] = name # Ensure required fields exist if 'wardrobe' not in outfit_data: outfit_data['wardrobe'] = { "full_body": "", "headwear": "", "top": "", "bottom": "", "legwear": "", "footwear": "", "hands": "", "accessories": "" } if 'lora' not in outfit_data: outfit_data['lora'] = { "lora_name": "", "lora_weight": 0.8, "lora_triggers": "" } if 'tags' not in outfit_data: outfit_data['tags'] = [] except Exception as e: print(f"LLM error: {e}") flash(f"Failed to generate outfit profile: {e}") return redirect(request.url) else: # Create blank outfit template outfit_data = { "outfit_id": safe_slug, "outfit_name": name, "wardrobe": { "full_body": "", "headwear": "", "top": "", "bottom": "", "legwear": "", "footwear": "", "hands": "", "accessories": "" }, "lora": { "lora_name": "", "lora_weight": 0.8, "lora_triggers": "" }, "tags": [] } try: # Save file file_path = os.path.join(app.config['CLOTHING_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(outfit_data, f, indent=2) # Add to DB new_outfit = Outfit( outfit_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=outfit_data ) db.session.add(new_outfit) db.session.commit() flash('Outfit created successfully!') return redirect(url_for('outfit_detail', slug=safe_slug)) except Exception as e: print(f"Save error: {e}") flash(f"Failed to create outfit: {e}") return redirect(request.url) return render_template('outfits/create.html') @app.route('/outfit//save_defaults', methods=['POST']) def save_outfit_defaults(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') outfit.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this outfit!') return redirect(url_for('outfit_detail', slug=slug)) @app.route('/outfit//clone', methods=['POST']) def clone_outfit(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() # Find the next available number for the clone base_id = outfit.outfit_id # Extract base name without number suffix import re match = re.match(r'^(.+?)_(\d+)$', base_id) if match: base_name = match.group(1) current_num = int(match.group(2)) else: base_name = base_id current_num = 1 # Find next available number next_num = current_num + 1 while True: new_id = f"{base_name}_{next_num:02d}" new_filename = f"{new_id}.json" new_path = os.path.join(app.config['CLOTHING_DIR'], new_filename) if not os.path.exists(new_path): break next_num += 1 # Create new outfit data (copy of original) new_data = outfit.data.copy() new_data['outfit_id'] = new_id new_data['outfit_name'] = f"{outfit.name} (Copy)" # Save the new JSON file with open(new_path, 'w') as f: json.dump(new_data, f, indent=2) # Create new outfit in database new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id) new_outfit = Outfit( outfit_id=new_id, slug=new_slug, filename=new_filename, name=new_data['outfit_name'], data=new_data ) db.session.add(new_outfit) db.session.commit() flash(f'Outfit cloned as "{new_id}"!') return redirect(url_for('outfit_detail', slug=new_slug)) @app.route('/outfit//save_json', methods=['POST']) def save_outfit_json(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() try: new_data = json.loads(request.form.get('json_data', '')) except (ValueError, TypeError) as e: return {'success': False, 'error': f'Invalid JSON: {e}'}, 400 outfit.data = new_data flag_modified(outfit, 'data') db.session.commit() if outfit.filename: file_path = os.path.join(app.config['CLOTHING_DIR'], outfit.filename) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) return {'success': True} # ============ ACTION ROUTES ============ @app.route('/actions') def actions_index(): actions = Action.query.order_by(Action.name).all() return render_template('actions/index.html', actions=actions) @app.route('/actions/rescan', methods=['POST']) def rescan_actions(): sync_actions() flash('Database synced with action files.') return redirect(url_for('actions_index')) @app.route('/action/') def action_detail(slug): action = Action.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() # Load state from session preferences = session.get(f'prefs_action_{slug}') preview_image = session.get(f'preview_action_{slug}') selected_character = session.get(f'char_action_{slug}') # List existing preview images upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}") existing_previews = [] if os.path.isdir(upload_dir): files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True) existing_previews = [f"actions/{slug}/{f}" for f in files] return render_template('actions/detail.html', action=action, characters=characters, preferences=preferences, preview_image=preview_image, selected_character=selected_character, existing_previews=existing_previews) @app.route('/action//edit', methods=['GET', 'POST']) def edit_action(slug): action = Action.query.filter_by(slug=slug).first_or_404() loras = get_available_action_loras() if request.method == 'POST': try: # 1. Update basic fields action.name = request.form.get('action_name') # 2. Rebuild the data dictionary new_data = action.data.copy() new_data['action_name'] = action.name # Update action_id if provided new_action_id = request.form.get('action_id', action.action_id) new_data['action_id'] = new_action_id # Update action section if 'action' in new_data: for key in new_data['action'].keys(): form_key = f"action_{key}" if form_key in request.form: new_data['action'][key] = request.form.get(form_key) # Update lora section if 'lora' in new_data: for key in new_data['lora'].keys(): form_key = f"lora_{key}" if form_key in request.form: val = request.form.get(form_key) if key == 'lora_weight': try: val = float(val) except: val = 1.0 new_data['lora'][key] = val # LoRA weight randomization bounds for bound in ['lora_weight_min', 'lora_weight_max']: val_str = request.form.get(f'lora_{bound}', '').strip() if val_str: try: new_data.setdefault('lora', {})[bound] = float(val_str) except ValueError: pass else: new_data.setdefault('lora', {}).pop(bound, None) # Update Tags (comma separated string to list) tags_raw = request.form.get('tags', '') new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t] action.data = new_data flag_modified(action, "data") # 3. Write back to JSON file action_file = action.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', action.action_id)}.json" file_path = os.path.join(app.config['ACTIONS_DIR'], action_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Action profile updated successfully!') return redirect(url_for('action_detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('actions/edit.html', action=action, loras=loras) @app.route('/action//upload', methods=['POST']) def upload_action_image(slug): action = Action.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create action subfolder action_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}") os.makedirs(action_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(action_folder, filename) file.save(file_path) # Store relative path in DB action.image_path = f"actions/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('action_detail', slug=slug)) @app.route('/action//generate', methods=['POST']) def generate_action_image(slug): action_obj = Action.query.filter_by(slug=slug).first_or_404() try: # Get action type action_type = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Get selected character (if any) character_slug = request.form.get('character_slug', '') character = None # Handle random character selection if character_slug == '__random__': all_characters = Character.query.all() if all_characters: character = random.choice(all_characters) character_slug = character.slug elif character_slug: character = Character.query.filter_by(slug=character_slug).first() # Save preferences session[f'char_action_{slug}'] = character_slug session[f'prefs_action_{slug}'] = selected_fields # Build combined data for prompt building if character: # Combine character identity/wardrobe with action details # Action details replace character's 'defaults' (pose, etc.) combined_data = character.data.copy() # Update 'defaults' with action details action_data = action_obj.data.get('action', {}) combined_data['action'] = action_data # Ensure action section is present for routing combined_data['participants'] = action_obj.data.get('participants', {}) # Add participants # Aggregate pose-related fields into 'pose' pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet'] pose_parts = [action_data.get(k) for k in pose_fields if action_data.get(k)] # Aggregate expression-related fields into 'expression' expression_parts = [action_data.get(k) for k in ['head', 'eyes'] if action_data.get(k)] combined_data['defaults'] = { 'pose': ", ".join(pose_parts), 'expression': ", ".join(expression_parts), 'scene': action_data.get('additional', '') } # Merge lora triggers if present action_lora = action_obj.data.get('lora', {}) if action_lora.get('lora_triggers'): if 'lora' not in combined_data: combined_data['lora'] = {} combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {action_lora['lora_triggers']}" # Merge tags combined_data['tags'] = list(set(combined_data.get('tags', []) + action_obj.data.get('tags', []))) # Use action's defaults if no manual selection if not selected_fields: selected_fields = list(action_obj.default_fields) if action_obj.default_fields else [] # Auto-include essential character fields if a character is selected if selected_fields: # Add character identity fields to selection if not already present for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']: if character.data.get('identity', {}).get(key): field_key = f'identity::{key}' if field_key not in selected_fields: selected_fields.append(field_key) # Always include character name if 'special::name' not in selected_fields: selected_fields.append('special::name') # Add active wardrobe fields wardrobe = character.get_active_wardrobe() for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']: if wardrobe.get(key): field_key = f'wardrobe::{key}' if field_key not in selected_fields: selected_fields.append(field_key) else: # Fallback to sensible defaults if still empty (no checkboxes and no action defaults) selected_fields = ['special::name', 'defaults::pose', 'defaults::expression'] # Add identity fields for key in ['base_specs', 'hair', 'eyes']: if character.data.get('identity', {}).get(key): selected_fields.append(f'identity::{key}') # Add wardrobe fields wardrobe = character.get_active_wardrobe() for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']: if wardrobe.get(key): selected_fields.append(f'wardrobe::{key}') default_fields = action_obj.default_fields active_outfit = character.active_outfit else: # Action only - no character (rarely makes sense for actions but let's handle it) action_data = action_obj.data.get('action', {}) # Aggregate pose-related fields into 'pose' pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet'] pose_parts = [action_data.get(k) for k in pose_fields if action_data.get(k)] # Aggregate expression-related fields into 'expression' expression_parts = [action_data.get(k) for k in ['head', 'eyes'] if action_data.get(k)] combined_data = { 'character_id': action_obj.action_id, 'defaults': { 'pose': ", ".join(pose_parts), 'expression': ", ".join(expression_parts), 'scene': action_data.get('additional', '') }, 'lora': action_obj.data.get('lora', {}), 'tags': action_obj.data.get('tags', []) } if not selected_fields: selected_fields = ['defaults::pose', 'defaults::expression', 'defaults::scene', 'lora::lora_triggers', 'special::tags'] default_fields = action_obj.default_fields active_outfit = 'default' # Queue generation with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # Build prompts for combined data prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit) # Handle multiple female characters participants = action_obj.data.get('participants', {}) orientation = participants.get('orientation', '') f_count = orientation.upper().count('F') if f_count > 1: # We need f_count - 1 additional characters num_extras = f_count - 1 # Get all characters excluding the current one query = Character.query if character: query = query.filter(Character.id != character.id) all_others = query.all() if len(all_others) >= num_extras: extras = random.sample(all_others, num_extras) for extra_char in extras: extra_parts = [] # Identity ident = extra_char.data.get('identity', {}) for key in ['base_specs', 'hair', 'eyes', 'extra']: val = ident.get(key) if val: # Remove 1girl/solo val = re.sub(r'\b(1girl|1boy|solo)\b', '', val).replace(', ,', ',').strip(', ') extra_parts.append(val) # Wardrobe (active outfit) wardrobe = extra_char.get_active_wardrobe() for key in ['top', 'headwear', 'legwear', 'footwear', 'accessories']: val = wardrobe.get(key) if val: extra_parts.append(val) # Append to main prompt if extra_parts: prompts["main"] += ", " + ", ".join(extra_parts) print(f"Added extra character: {extra_char.name}") # Add colored simple background to the main prompt for action previews if character: primary_color = character.data.get('styles', {}).get('primary_color', '') if primary_color: prompts["main"] = f"{prompts['main']}, {primary_color} simple background" else: prompts["main"] = f"{prompts['main']}, simple background" else: prompts["main"] = f"{prompts['main']}, simple background" # Prepare workflow ckpt_path, ckpt_data = _get_default_checkpoint() workflow = _prepare_workflow(workflow, character, prompts, action=action_obj, checkpoint=ckpt_path, checkpoint_data=ckpt_data) prompt_response = queue_prompt(workflow, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('action_detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('action_detail', slug=slug)) @app.route('/action//finalize_generation/', methods=['POST']) def finalize_action_generation(slug, prompt_id): action_obj = Action.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) # Create action subfolder action_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}") os.makedirs(action_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(action_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) # Always save as preview relative_path = f"actions/{slug}/{filename}" session[f'preview_action_{slug}'] = relative_path session.modified = True # Ensure session is saved for JSON response # If action is 'replace', also update the action's cover image immediately if action == 'replace': action_obj.image_path = relative_path db.session.commit() return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 @app.route('/action//replace_cover_from_preview', methods=['POST']) def replace_action_cover_from_preview(slug): action = Action.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_action_{slug}') if preview_path: action.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('action_detail', slug=slug)) @app.route('/action//save_defaults', methods=['POST']) def save_action_defaults(slug): action = Action.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') action.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this action!') return redirect(url_for('action_detail', slug=slug)) @app.route('/actions/bulk_create', methods=['POST']) def bulk_create_actions_from_loras(): actions_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Poses/' if not os.path.exists(actions_lora_dir): flash('Actions LoRA directory not found.', 'error') return redirect(url_for('actions_index')) overwrite = request.form.get('overwrite') == 'true' created_count = 0 skipped_count = 0 overwritten_count = 0 system_prompt = load_prompt('action_system.txt') if not system_prompt: flash('Action system prompt file not found.', 'error') return redirect(url_for('actions_index')) for filename in os.listdir(actions_lora_dir): if filename.endswith('.safetensors'): name_base = filename.rsplit('.', 1)[0] action_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower()) action_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title() json_filename = f"{action_id}.json" json_path = os.path.join(app.config['ACTIONS_DIR'], json_filename) is_existing = os.path.exists(json_path) if is_existing and not overwrite: skipped_count += 1 continue html_filename = f"{name_base}.html" html_path = os.path.join(actions_lora_dir, html_filename) html_content = "" if os.path.exists(html_path): try: with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf: html_raw = hf.read() # Strip HTML tags but keep text content for LLM context clean_html = re.sub(r']*>.*?', '', html_raw, flags=re.DOTALL) clean_html = re.sub(r']*>.*?', '', clean_html, flags=re.DOTALL) clean_html = re.sub(r']*>', '', clean_html) clean_html = re.sub(r'<[^>]+>', ' ', clean_html) html_content = ' '.join(clean_html.split()) except Exception as e: print(f"Error reading HTML {html_filename}: {e}") try: print(f"Asking LLM to describe action: {action_name}") prompt = f"Describe an action/pose for an AI image generation model based on the LoRA filename: '{filename}'" if html_content: prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###" llm_response = call_llm(prompt, system_prompt) # Clean response clean_json = llm_response.replace('```json', '').replace('```', '').strip() action_data = json.loads(clean_json) # Enforce system values while preserving LLM-extracted metadata action_data['action_id'] = action_id action_data['action_name'] = action_name # Update lora dict safely if 'lora' not in action_data: action_data['lora'] = {} action_data['lora']['lora_name'] = f"Illustrious/Poses/{filename}" # Fallbacks if LLM failed to extract metadata if not action_data['lora'].get('lora_triggers'): action_data['lora']['lora_triggers'] = name_base if action_data['lora'].get('lora_weight') is None: action_data['lora']['lora_weight'] = 1.0 if action_data['lora'].get('lora_weight_min') is None: action_data['lora']['lora_weight_min'] = 0.7 if action_data['lora'].get('lora_weight_max') is None: action_data['lora']['lora_weight_max'] = 1.0 with open(json_path, 'w') as f: json.dump(action_data, f, indent=2) if is_existing: overwritten_count += 1 else: created_count += 1 # Small delay to avoid API rate limits if many files time.sleep(0.5) except Exception as e: print(f"Error creating action for {filename}: {e}") if created_count > 0 or overwritten_count > 0: sync_actions() msg = f'Successfully processed actions: {created_count} created, {overwritten_count} overwritten.' if skipped_count > 0: msg += f' (Skipped {skipped_count} existing)' flash(msg) else: flash(f'No actions created or overwritten. {skipped_count} existing actions found.') return redirect(url_for('actions_index')) @app.route('/action/create', methods=['GET', 'POST']) def create_action(): if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename', '').strip() prompt = request.form.get('prompt', '') use_llm = request.form.get('use_llm') == 'on' if not slug: slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_') safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: safe_slug = 'action' base_slug = safe_slug counter = 1 while os.path.exists(os.path.join(app.config['ACTIONS_DIR'], f"{safe_slug}.json")): safe_slug = f"{base_slug}_{counter}" counter += 1 if use_llm: if not prompt: flash("Description is required when AI generation is enabled.") return redirect(request.url) system_prompt = load_prompt('action_system.txt') if not system_prompt: flash("Action system prompt file not found.") return redirect(request.url) try: llm_response = call_llm(f"Create an action profile for '{name}' based on this description: {prompt}", system_prompt) clean_json = llm_response.replace('```json', '').replace('```', '').strip() action_data = json.loads(clean_json) action_data['action_id'] = safe_slug action_data['action_name'] = name except Exception as e: print(f"LLM error: {e}") flash(f"Failed to generate action profile: {e}") return redirect(request.url) else: action_data = { "action_id": safe_slug, "action_name": name, "action": { "full_body": "", "head": "", "eyes": "", "arms": "", "hands": "", "torso": "", "pelvis": "", "legs": "", "feet": "", "additional": "" }, "lora": {"lora_name": "", "lora_weight": 1.0, "lora_triggers": ""}, "tags": [] } try: file_path = os.path.join(app.config['ACTIONS_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(action_data, f, indent=2) new_action = Action( action_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=action_data ) db.session.add(new_action) db.session.commit() flash('Action created successfully!') return redirect(url_for('action_detail', slug=safe_slug)) except Exception as e: print(f"Save error: {e}") flash(f"Failed to create action: {e}") return redirect(request.url) return render_template('actions/create.html') @app.route('/action//clone', methods=['POST']) def clone_action(slug): action = Action.query.filter_by(slug=slug).first_or_404() # Find the next available number for the clone base_id = action.action_id import re match = re.match(r'^(.+?)_(\d+)$', base_id) if match: base_name = match.group(1) current_num = int(match.group(2)) else: base_name = base_id current_num = 1 next_num = current_num + 1 while True: new_id = f"{base_name}_{next_num:02d}" new_filename = f"{new_id}.json" new_path = os.path.join(app.config['ACTIONS_DIR'], new_filename) if not os.path.exists(new_path): break next_num += 1 new_data = action.data.copy() new_data['action_id'] = new_id new_data['action_name'] = f"{action.name} (Copy)" with open(new_path, 'w') as f: json.dump(new_data, f, indent=2) new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id) new_action = Action( action_id=new_id, slug=new_slug, filename=new_filename, name=new_data['action_name'], data=new_data ) db.session.add(new_action) db.session.commit() flash(f'Action cloned as "{new_id}"!') return redirect(url_for('action_detail', slug=new_slug)) @app.route('/action//save_json', methods=['POST']) def save_action_json(slug): action = Action.query.filter_by(slug=slug).first_or_404() try: new_data = json.loads(request.form.get('json_data', '')) except (ValueError, TypeError) as e: return {'success': False, 'error': f'Invalid JSON: {e}'}, 400 action.data = new_data flag_modified(action, 'data') db.session.commit() if action.filename: file_path = os.path.join(app.config['ACTIONS_DIR'], action.filename) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) return {'success': True} # ============ STYLE ROUTES ============ @app.route('/styles') def styles_index(): styles = Style.query.order_by(Style.name).all() return render_template('styles/index.html', styles=styles) @app.route('/styles/rescan', methods=['POST']) def rescan_styles(): sync_styles() flash('Database synced with style files.') return redirect(url_for('styles_index')) @app.route('/style/') def style_detail(slug): style = Style.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() # Load state from session preferences = session.get(f'prefs_style_{slug}') preview_image = session.get(f'preview_style_{slug}') selected_character = session.get(f'char_style_{slug}') # List existing preview images upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}") existing_previews = [] if os.path.isdir(upload_dir): files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True) existing_previews = [f"styles/{slug}/{f}" for f in files] return render_template('styles/detail.html', style=style, characters=characters, preferences=preferences, preview_image=preview_image, selected_character=selected_character, existing_previews=existing_previews) @app.route('/style//edit', methods=['GET', 'POST']) def edit_style(slug): style = Style.query.filter_by(slug=slug).first_or_404() loras = get_available_style_loras() if request.method == 'POST': try: # 1. Update basic fields style.name = request.form.get('style_name') # 2. Rebuild the data dictionary new_data = style.data.copy() new_data['style_name'] = style.name # Update style section if 'style' in new_data: for key in new_data['style'].keys(): form_key = f"style_{key}" if form_key in request.form: new_data['style'][key] = request.form.get(form_key) # Update lora section if 'lora' in new_data: for key in new_data['lora'].keys(): form_key = f"lora_{key}" if form_key in request.form: val = request.form.get(form_key) if key == 'lora_weight': try: val = float(val) except: val = 1.0 new_data['lora'][key] = val # LoRA weight randomization bounds for bound in ['lora_weight_min', 'lora_weight_max']: val_str = request.form.get(f'lora_{bound}', '').strip() if val_str: try: new_data.setdefault('lora', {})[bound] = float(val_str) except ValueError: pass else: new_data.setdefault('lora', {}).pop(bound, None) style.data = new_data flag_modified(style, "data") # 3. Write back to JSON file style_file = style.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', style.style_id)}.json" file_path = os.path.join(app.config['STYLES_DIR'], style_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Style updated successfully!') return redirect(url_for('style_detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('styles/edit.html', style=style, loras=loras) @app.route('/style//upload', methods=['POST']) def upload_style_image(slug): style = Style.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create style subfolder style_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}") os.makedirs(style_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(style_folder, filename) file.save(file_path) # Store relative path in DB style.image_path = f"styles/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('style_detail', slug=slug)) def _queue_style_generation(style_obj, character=None, selected_fields=None, client_id=None): if character: combined_data = character.data.copy() combined_data['character_id'] = character.character_id combined_data['style'] = style_obj.data.get('style', {}) # Merge style lora triggers if present style_lora = style_obj.data.get('lora', {}) if style_lora.get('lora_triggers'): if 'lora' not in combined_data: combined_data['lora'] = {} combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {style_lora['lora_triggers']}" # Merge character identity and wardrobe fields into selected_fields if selected_fields: # Add character identity fields to selection if not already present for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']: if character.data.get('identity', {}).get(key): field_key = f'identity::{key}' if field_key not in selected_fields: selected_fields.append(field_key) # Always include character name if 'special::name' not in selected_fields: selected_fields.append('special::name') # Add active wardrobe fields wardrobe = character.get_active_wardrobe() for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']: if wardrobe.get(key): field_key = f'wardrobe::{key}' if field_key not in selected_fields: selected_fields.append(field_key) else: # Auto-include essential character fields selected_fields = [] for key in ['base_specs', 'hair', 'eyes']: if character.data.get('identity', {}).get(key): selected_fields.append(f'identity::{key}') selected_fields.append('special::name') # Add active wardrobe wardrobe = character.get_active_wardrobe() for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']: if wardrobe.get(key): selected_fields.append(f'wardrobe::{key}') # Add style fields selected_fields.extend(['style::artist_name', 'style::artistic_style', 'lora::lora_triggers']) default_fields = style_obj.default_fields active_outfit = character.active_outfit else: combined_data = { 'character_id': style_obj.style_id, 'style': style_obj.data.get('style', {}), 'lora': style_obj.data.get('lora', {}), 'tags': style_obj.data.get('tags', []) } if not selected_fields: selected_fields = ['style::artist_name', 'style::artistic_style', 'lora::lora_triggers'] default_fields = style_obj.default_fields active_outfit = 'default' with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit) if character: primary_color = character.data.get('styles', {}).get('primary_color', '') if primary_color: prompts["main"] = f"{prompts['main']}, {primary_color} simple background" else: prompts["main"] = f"{prompts['main']}, simple background" else: prompts["main"] = f"{prompts['main']}, simple background" ckpt_path, ckpt_data = _get_default_checkpoint() workflow = _prepare_workflow(workflow, character, prompts, style=style_obj, checkpoint=ckpt_path, checkpoint_data=ckpt_data) return queue_prompt(workflow, client_id=client_id) @app.route('/style//generate', methods=['POST']) def generate_style_image(slug): style_obj = Style.query.filter_by(slug=slug).first_or_404() try: # Get action type action = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Get selected character (if any) character_slug = request.form.get('character_slug', '') character = None if character_slug == '__random__': all_characters = Character.query.all() if all_characters: character = random.choice(all_characters) character_slug = character.slug elif character_slug: character = Character.query.filter_by(slug=character_slug).first() # Save preferences session[f'char_style_{slug}'] = character_slug session[f'prefs_style_{slug}'] = selected_fields # Queue generation using helper prompt_response = _queue_style_generation(style_obj, character, selected_fields, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('style_detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('style_detail', slug=slug)) @app.route('/style//finalize_generation/', methods=['POST']) def finalize_style_generation(slug, prompt_id): style_obj = Style.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) style_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}") os.makedirs(style_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(style_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) relative_path = f"styles/{slug}/{filename}" session[f'preview_style_{slug}'] = relative_path session.modified = True # Ensure session is saved for JSON response # If action is 'replace', also update the style's cover image immediately if action == 'replace': style_obj.image_path = relative_path db.session.commit() return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 @app.route('/style//save_defaults', methods=['POST']) def save_style_defaults(slug): style = Style.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') style.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this style!') return redirect(url_for('style_detail', slug=slug)) @app.route('/style//replace_cover_from_preview', methods=['POST']) def replace_style_cover_from_preview(slug): style = Style.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_style_{slug}') if preview_path: style.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('style_detail', slug=slug)) @app.route('/get_missing_styles') def get_missing_styles(): missing = Style.query.filter((Style.image_path == None) | (Style.image_path == '')).all() return {'missing': [{'slug': s.slug, 'name': s.name} for s in missing]} @app.route('/get_missing_detailers') def get_missing_detailers(): missing = Detailer.query.filter((Detailer.image_path == None) | (Detailer.image_path == '')).all() return {'missing': [{'slug': d.slug, 'name': d.name} for d in missing]} @app.route('/clear_all_detailer_covers', methods=['POST']) def clear_all_detailer_covers(): detailers = Detailer.query.all() for detailer in detailers: detailer.image_path = None db.session.commit() return {'success': True} @app.route('/clear_all_style_covers', methods=['POST']) def clear_all_style_covers(): styles = Style.query.all() for style in styles: style.image_path = None db.session.commit() return {'success': True} @app.route('/styles/generate_missing', methods=['POST']) def generate_missing_styles(): def get_missing_count(): return Style.query.filter((Style.image_path == None) | (Style.image_path == '')).count() if get_missing_count() == 0: flash("No styles missing cover images.") return redirect(url_for('styles_index')) # Get all characters once to pick from all_characters = Character.query.all() if not all_characters: flash("No characters available to preview styles with.", "error") return redirect(url_for('styles_index')) success_count = 0 while get_missing_count() > 0: style_obj = Style.query.filter((Style.image_path == None) | (Style.image_path == '')).order_by(Style.name).first() if not style_obj: break # Pick a random character for each style for variety character = random.choice(all_characters) style_slug = style_obj.slug try: print(f"Batch generating style: {style_obj.name} with character {character.name}") prompt_response = _queue_style_generation(style_obj, character=character) prompt_id = prompt_response['prompt_id'] max_retries = 120 while max_retries > 0: history = get_history(prompt_id) if prompt_id in history: outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) style_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{style_slug}") os.makedirs(style_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(style_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) style_to_update = Style.query.filter_by(slug=style_slug).first() if style_to_update: style_to_update.image_path = f"styles/{style_slug}/{filename}" db.session.commit() success_count += 1 break break time.sleep(2) max_retries -= 1 except Exception as e: print(f"Error generating for style {style_obj.name}: {e}") db.session.rollback() flash(f"Batch style generation complete. Generated {success_count} images.") return redirect(url_for('styles_index')) @app.route('/styles/bulk_create', methods=['POST']) def bulk_create_styles_from_loras(): styles_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Styles/' if not os.path.exists(styles_lora_dir): flash('Styles LoRA directory not found.', 'error') return redirect(url_for('styles_index')) overwrite = request.form.get('overwrite') == 'true' created_count = 0 skipped_count = 0 overwritten_count = 0 system_prompt = load_prompt('style_system.txt') if not system_prompt: flash('Style system prompt file not found.', 'error') return redirect(url_for('styles_index')) for filename in os.listdir(styles_lora_dir): if filename.endswith('.safetensors'): name_base = filename.rsplit('.', 1)[0] style_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower()) style_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title() json_filename = f"{style_id}.json" json_path = os.path.join(app.config['STYLES_DIR'], json_filename) is_existing = os.path.exists(json_path) if is_existing and not overwrite: skipped_count += 1 continue html_filename = f"{name_base}.html" html_path = os.path.join(styles_lora_dir, html_filename) html_content = "" if os.path.exists(html_path): try: with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf: html_raw = hf.read() clean_html = re.sub(r']*>.*?', '', html_raw, flags=re.DOTALL) clean_html = re.sub(r']*>.*?', '', clean_html, flags=re.DOTALL) clean_html = re.sub(r']*>', '', clean_html) clean_html = re.sub(r'<[^>]+>', ' ', clean_html) html_content = ' '.join(clean_html.split()) except Exception as e: print(f"Error reading HTML {html_filename}: {e}") try: print(f"Asking LLM to describe style: {style_name}") prompt = f"Describe an art style or artist LoRA for AI image generation based on the filename: '{filename}'" if html_content: prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###" llm_response = call_llm(prompt, system_prompt) clean_json = llm_response.replace('```json', '').replace('```', '').strip() style_data = json.loads(clean_json) style_data['style_id'] = style_id style_data['style_name'] = style_name if 'lora' not in style_data: style_data['lora'] = {} style_data['lora']['lora_name'] = f"Illustrious/Styles/{filename}" if not style_data['lora'].get('lora_triggers'): style_data['lora']['lora_triggers'] = name_base if style_data['lora'].get('lora_weight') is None: style_data['lora']['lora_weight'] = 1.0 if style_data['lora'].get('lora_weight_min') is None: style_data['lora']['lora_weight_min'] = 0.7 if style_data['lora'].get('lora_weight_max') is None: style_data['lora']['lora_weight_max'] = 1.0 with open(json_path, 'w') as f: json.dump(style_data, f, indent=2) if is_existing: overwritten_count += 1 else: created_count += 1 time.sleep(0.5) except Exception as e: print(f"Error creating style for {filename}: {e}") if created_count > 0 or overwritten_count > 0: sync_styles() msg = f'Successfully processed styles: {created_count} created, {overwritten_count} overwritten.' if skipped_count > 0: msg += f' (Skipped {skipped_count} existing)' flash(msg) else: flash(f'No styles created or overwritten. {skipped_count} existing styles found.') return redirect(url_for('styles_index')) @app.route('/style/create', methods=['GET', 'POST']) def create_style(): if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename', '').strip() if not slug: slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_') safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: safe_slug = 'style' base_slug = safe_slug counter = 1 while os.path.exists(os.path.join(app.config['STYLES_DIR'], f"{safe_slug}.json")): safe_slug = f"{base_slug}_{counter}" counter += 1 style_data = { "style_id": safe_slug, "style_name": name, "style": { "artist_name": "", "artistic_style": "" }, "lora": { "lora_name": "", "lora_weight": 1.0, "lora_triggers": "" } } try: file_path = os.path.join(app.config['STYLES_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(style_data, f, indent=2) new_style = Style( style_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=style_data ) db.session.add(new_style) db.session.commit() flash('Style created successfully!') return redirect(url_for('style_detail', slug=safe_slug)) except Exception as e: print(f"Save error: {e}") flash(f"Failed to create style: {e}") return redirect(request.url) return render_template('styles/create.html') @app.route('/style//clone', methods=['POST']) def clone_style(slug): style = Style.query.filter_by(slug=slug).first_or_404() base_id = style.style_id import re match = re.match(r'^(.+?)_(\d+)$', base_id) if match: base_name = match.group(1) current_num = int(match.group(2)) else: base_name = base_id current_num = 1 next_num = current_num + 1 while True: new_id = f"{base_name}_{next_num:02d}" new_filename = f"{new_id}.json" new_path = os.path.join(app.config['STYLES_DIR'], new_filename) if not os.path.exists(new_path): break next_num += 1 new_data = style.data.copy() new_data['style_id'] = new_id new_data['style_name'] = f"{style.name} (Copy)" with open(new_path, 'w') as f: json.dump(new_data, f, indent=2) new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id) new_style = Style( style_id=new_id, slug=new_slug, filename=new_filename, name=new_data['style_name'], data=new_data ) db.session.add(new_style) db.session.commit() flash(f'Style cloned as "{new_id}"!') return redirect(url_for('style_detail', slug=new_slug)) @app.route('/style//save_json', methods=['POST']) def save_style_json(slug): style = Style.query.filter_by(slug=slug).first_or_404() try: new_data = json.loads(request.form.get('json_data', '')) except (ValueError, TypeError) as e: return {'success': False, 'error': f'Invalid JSON: {e}'}, 400 style.data = new_data flag_modified(style, 'data') db.session.commit() if style.filename: file_path = os.path.join(app.config['STYLES_DIR'], style.filename) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) return {'success': True} # ============ SCENE ROUTES ============ @app.route('/scenes') def scenes_index(): scenes = Scene.query.order_by(Scene.name).all() return render_template('scenes/index.html', scenes=scenes) @app.route('/scenes/rescan', methods=['POST']) def rescan_scenes(): sync_scenes() flash('Database synced with scene files.') return redirect(url_for('scenes_index')) @app.route('/scene/') def scene_detail(slug): scene = Scene.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() # Load state from session preferences = session.get(f'prefs_scene_{slug}') preview_image = session.get(f'preview_scene_{slug}') selected_character = session.get(f'char_scene_{slug}') # List existing preview images upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"scenes/{slug}") existing_previews = [] if os.path.isdir(upload_dir): files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True) existing_previews = [f"scenes/{slug}/{f}" for f in files] return render_template('scenes/detail.html', scene=scene, characters=characters, preferences=preferences, preview_image=preview_image, selected_character=selected_character, existing_previews=existing_previews) @app.route('/scene//edit', methods=['GET', 'POST']) def edit_scene(slug): scene = Scene.query.filter_by(slug=slug).first_or_404() loras = get_available_scene_loras() if request.method == 'POST': try: # 1. Update basic fields scene.name = request.form.get('scene_name') # 2. Rebuild the data dictionary new_data = scene.data.copy() new_data['scene_name'] = scene.name # Update scene section if 'scene' in new_data: for key in new_data['scene'].keys(): form_key = f"scene_{key}" if form_key in request.form: val = request.form.get(form_key) # Handle list for furniture/colors if they were originally lists if key in ['furniture', 'colors'] and isinstance(new_data['scene'][key], list): val = [v.strip() for v in val.split(',') if v.strip()] new_data['scene'][key] = val # Update lora section if 'lora' in new_data: for key in new_data['lora'].keys(): form_key = f"lora_{key}" if form_key in request.form: val = request.form.get(form_key) if key == 'lora_weight': try: val = float(val) except: val = 1.0 new_data['lora'][key] = val # LoRA weight randomization bounds for bound in ['lora_weight_min', 'lora_weight_max']: val_str = request.form.get(f'lora_{bound}', '').strip() if val_str: try: new_data.setdefault('lora', {})[bound] = float(val_str) except ValueError: pass else: new_data.setdefault('lora', {}).pop(bound, None) # Update Tags (comma separated string to list) tags_raw = request.form.get('tags', '') new_data['tags'] = [t.strip() for t in tags_raw.split(',') if t.strip()] scene.data = new_data flag_modified(scene, "data") # 3. Write back to JSON file scene_file = scene.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', scene.scene_id)}.json" file_path = os.path.join(app.config['SCENES_DIR'], scene_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Scene updated successfully!') return redirect(url_for('scene_detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('scenes/edit.html', scene=scene, loras=loras) @app.route('/scene//upload', methods=['POST']) def upload_scene_image(slug): scene = Scene.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create scene subfolder scene_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"scenes/{slug}") os.makedirs(scene_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(scene_folder, filename) file.save(file_path) # Store relative path in DB scene.image_path = f"scenes/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('scene_detail', slug=slug)) def _queue_scene_generation(scene_obj, character=None, selected_fields=None, client_id=None): if character: combined_data = character.data.copy() combined_data['character_id'] = character.character_id # Update character's 'defaults' with scene details scene_data = scene_obj.data.get('scene', {}) # Build scene tag string scene_tags = [] for key in ['background', 'foreground', 'furniture', 'colors', 'lighting', 'theme']: val = scene_data.get(key) if val: if isinstance(val, list): scene_tags.extend(val) else: scene_tags.append(val) combined_data['defaults']['scene'] = ", ".join(scene_tags) # Merge scene lora triggers if present scene_lora = scene_obj.data.get('lora', {}) if scene_lora.get('lora_triggers'): if 'lora' not in combined_data: combined_data['lora'] = {} combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {scene_lora['lora_triggers']}" # Merge character identity and wardrobe fields into selected_fields if selected_fields: # Add character identity fields to selection if not already present for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']: if character.data.get('identity', {}).get(key): field_key = f'identity::{key}' if field_key not in selected_fields: selected_fields.append(field_key) # Always include character name if 'special::name' not in selected_fields: selected_fields.append('special::name') # Add active wardrobe fields wardrobe = character.get_active_wardrobe() for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']: if wardrobe.get(key): field_key = f'wardrobe::{key}' if field_key not in selected_fields: selected_fields.append(field_key) else: # Auto-include essential character fields selected_fields = [] for key in ['base_specs', 'hair', 'eyes']: if character.data.get('identity', {}).get(key): selected_fields.append(f'identity::{key}') selected_fields.append('special::name') # Add active wardrobe wardrobe = character.get_active_wardrobe() for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']: if wardrobe.get(key): selected_fields.append(f'wardrobe::{key}') # Add scene fields selected_fields.extend(['defaults::scene', 'lora::lora_triggers']) default_fields = scene_obj.default_fields active_outfit = character.active_outfit else: # Scene only - no character scene_data = scene_obj.data.get('scene', {}) scene_tags = [] for key in ['background', 'foreground', 'furniture', 'colors', 'lighting', 'theme']: val = scene_data.get(key) if val: if isinstance(val, list): scene_tags.extend(val) else: scene_tags.append(val) combined_data = { 'character_id': scene_obj.scene_id, 'defaults': { 'scene': ", ".join(scene_tags) }, 'lora': scene_obj.data.get('lora', {}), 'tags': scene_obj.data.get('tags', []) } if not selected_fields: selected_fields = ['defaults::scene', 'lora::lora_triggers'] default_fields = scene_obj.default_fields active_outfit = 'default' with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit) # For scene generation, we want to ensure Node 20 is handled in _prepare_workflow ckpt_path, ckpt_data = _get_default_checkpoint() workflow = _prepare_workflow(workflow, character, prompts, scene=scene_obj, checkpoint=ckpt_path, checkpoint_data=ckpt_data) return queue_prompt(workflow, client_id=client_id) @app.route('/scene//generate', methods=['POST']) def generate_scene_image(slug): scene_obj = Scene.query.filter_by(slug=slug).first_or_404() try: # Get action type action = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Get selected character (if any) character_slug = request.form.get('character_slug', '') character = None if character_slug == '__random__': all_characters = Character.query.all() if all_characters: character = random.choice(all_characters) character_slug = character.slug elif character_slug: character = Character.query.filter_by(slug=character_slug).first() # Save preferences session[f'char_scene_{slug}'] = character_slug session[f'prefs_scene_{slug}'] = selected_fields # Queue generation using helper prompt_response = _queue_scene_generation(scene_obj, character, selected_fields, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('scene_detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('scene_detail', slug=slug)) @app.route('/scene//finalize_generation/', methods=['POST']) def finalize_scene_generation(slug, prompt_id): scene_obj = Scene.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) scene_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"scenes/{slug}") os.makedirs(scene_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(scene_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) relative_path = f"scenes/{slug}/{filename}" session[f'preview_scene_{slug}'] = relative_path session.modified = True # Ensure session is saved for JSON response # If action is 'replace', also update the scene's cover image immediately if action == 'replace': scene_obj.image_path = relative_path db.session.commit() return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 @app.route('/scene//save_defaults', methods=['POST']) def save_scene_defaults(slug): scene = Scene.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') scene.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this scene!') return redirect(url_for('scene_detail', slug=slug)) @app.route('/scene//replace_cover_from_preview', methods=['POST']) def replace_scene_cover_from_preview(slug): scene = Scene.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_scene_{slug}') if preview_path: scene.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('scene_detail', slug=slug)) @app.route('/scenes/bulk_create', methods=['POST']) def bulk_create_scenes_from_loras(): backgrounds_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Backgrounds/' if not os.path.exists(backgrounds_lora_dir): flash('Backgrounds LoRA directory not found.', 'error') return redirect(url_for('scenes_index')) overwrite = request.form.get('overwrite') == 'true' created_count = 0 skipped_count = 0 overwritten_count = 0 system_prompt = load_prompt('scene_system.txt') if not system_prompt: flash('Scene system prompt file not found.', 'error') return redirect(url_for('scenes_index')) for filename in os.listdir(backgrounds_lora_dir): if filename.endswith('.safetensors'): name_base = filename.rsplit('.', 1)[0] scene_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower()) scene_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title() json_filename = f"{scene_id}.json" json_path = os.path.join(app.config['SCENES_DIR'], json_filename) is_existing = os.path.exists(json_path) if is_existing and not overwrite: skipped_count += 1 continue html_filename = f"{name_base}.html" html_path = os.path.join(backgrounds_lora_dir, html_filename) html_content = "" if os.path.exists(html_path): try: with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf: html_raw = hf.read() # Strip HTML tags but keep text content for LLM context clean_html = re.sub(r']*>.*?', '', html_raw, flags=re.DOTALL) clean_html = re.sub(r']*>.*?', '', clean_html, flags=re.DOTALL) clean_html = re.sub(r']*>', '', clean_html) clean_html = re.sub(r'<[^>]+>', ' ', clean_html) html_content = ' '.join(clean_html.split()) except Exception as e: print(f"Error reading HTML {html_filename}: {e}") try: print(f"Asking LLM to describe scene: {scene_name}") prompt = f"Describe a scene for an AI image generation model based on the LoRA filename: '{filename}'" if html_content: prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###" llm_response = call_llm(prompt, system_prompt) # Clean response clean_json = llm_response.replace('```json', '').replace('```', '').strip() scene_data = json.loads(clean_json) # Enforce system values while preserving LLM-extracted metadata scene_data['scene_id'] = scene_id scene_data['scene_name'] = scene_name if 'lora' not in scene_data: scene_data['lora'] = {} scene_data['lora']['lora_name'] = f"Illustrious/Backgrounds/{filename}" if not scene_data['lora'].get('lora_triggers'): scene_data['lora']['lora_triggers'] = name_base if scene_data['lora'].get('lora_weight') is None: scene_data['lora']['lora_weight'] = 1.0 if scene_data['lora'].get('lora_weight_min') is None: scene_data['lora']['lora_weight_min'] = 0.7 if scene_data['lora'].get('lora_weight_max') is None: scene_data['lora']['lora_weight_max'] = 1.0 with open(json_path, 'w') as f: json.dump(scene_data, f, indent=2) if is_existing: overwritten_count += 1 else: created_count += 1 # Small delay to avoid API rate limits if many files time.sleep(0.5) except Exception as e: print(f"Error creating scene for {filename}: {e}") if created_count > 0 or overwritten_count > 0: sync_scenes() msg = f'Successfully processed scenes: {created_count} created, {overwritten_count} overwritten.' if skipped_count > 0: msg += f' (Skipped {skipped_count} existing)' flash(msg) else: flash(f'No scenes created or overwritten. {skipped_count} existing scenes found.') return redirect(url_for('scenes_index')) @app.route('/scene/create', methods=['GET', 'POST']) def create_scene(): if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename', '').strip() if not slug: slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_') safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: safe_slug = 'scene' base_slug = safe_slug counter = 1 while os.path.exists(os.path.join(app.config['SCENES_DIR'], f"{safe_slug}.json")): safe_slug = f"{base_slug}_{counter}" counter += 1 scene_data = { "scene_id": safe_slug, "scene_name": name, "scene": { "background": "", "foreground": "", "furniture": [], "colors": [], "lighting": "", "theme": "" }, "lora": { "lora_name": "", "lora_weight": 1.0, "lora_triggers": "" } } try: file_path = os.path.join(app.config['SCENES_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(scene_data, f, indent=2) new_scene = Scene( scene_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=scene_data ) db.session.add(new_scene) db.session.commit() flash('Scene created successfully!') return redirect(url_for('scene_detail', slug=safe_slug)) except Exception as e: print(f"Save error: {e}") flash(f"Failed to create scene: {e}") return redirect(request.url) return render_template('scenes/create.html') @app.route('/scene//clone', methods=['POST']) def clone_scene(slug): scene = Scene.query.filter_by(slug=slug).first_or_404() base_id = scene.scene_id import re match = re.match(r'^(.+?)_(\d+)$', base_id) if match: base_name = match.group(1) current_num = int(match.group(2)) else: base_name = base_id current_num = 1 next_num = current_num + 1 while True: new_id = f"{base_name}_{next_num:02d}" new_filename = f"{new_id}.json" new_path = os.path.join(app.config['SCENES_DIR'], new_filename) if not os.path.exists(new_path): break next_num += 1 new_data = scene.data.copy() new_data['scene_id'] = new_id new_data['scene_name'] = f"{scene.name} (Copy)" with open(new_path, 'w') as f: json.dump(new_data, f, indent=2) new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id) new_scene = Scene( scene_id=new_id, slug=new_slug, filename=new_filename, name=new_data['scene_name'], data=new_data ) db.session.add(new_scene) db.session.commit() flash(f'Scene cloned as "{new_id}"!') return redirect(url_for('scene_detail', slug=new_slug)) @app.route('/scene//save_json', methods=['POST']) def save_scene_json(slug): scene = Scene.query.filter_by(slug=slug).first_or_404() try: new_data = json.loads(request.form.get('json_data', '')) except (ValueError, TypeError) as e: return {'success': False, 'error': f'Invalid JSON: {e}'}, 400 scene.data = new_data flag_modified(scene, 'data') db.session.commit() if scene.filename: file_path = os.path.join(app.config['SCENES_DIR'], scene.filename) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) return {'success': True} # ============ DETAILER ROUTES ============ @app.route('/detailers') def detailers_index(): detailers = Detailer.query.order_by(Detailer.name).all() return render_template('detailers/index.html', detailers=detailers) @app.route('/detailers/rescan', methods=['POST']) def rescan_detailers(): sync_detailers() flash('Database synced with detailer files.') return redirect(url_for('detailers_index')) @app.route('/detailer/') def detailer_detail(slug): detailer = Detailer.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() actions = Action.query.order_by(Action.name).all() # Load state from session preferences = session.get(f'prefs_detailer_{slug}') preview_image = session.get(f'preview_detailer_{slug}') selected_character = session.get(f'char_detailer_{slug}') selected_action = session.get(f'action_detailer_{slug}') extra_positive = session.get(f'extra_pos_detailer_{slug}', '') extra_negative = session.get(f'extra_neg_detailer_{slug}', '') # List existing preview images upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"detailers/{slug}") existing_previews = [] if os.path.isdir(upload_dir): files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True) existing_previews = [f"detailers/{slug}/{f}" for f in files] return render_template('detailers/detail.html', detailer=detailer, characters=characters, actions=actions, preferences=preferences, preview_image=preview_image, selected_character=selected_character, selected_action=selected_action, extra_positive=extra_positive, extra_negative=extra_negative, existing_previews=existing_previews) @app.route('/detailer//edit', methods=['GET', 'POST']) def edit_detailer(slug): detailer = Detailer.query.filter_by(slug=slug).first_or_404() loras = get_available_detailer_loras() if request.method == 'POST': try: # 1. Update basic fields detailer.name = request.form.get('detailer_name') # 2. Rebuild the data dictionary new_data = detailer.data.copy() new_data['detailer_name'] = detailer.name # Update prompt (stored as a plain string) new_data['prompt'] = request.form.get('detailer_prompt', '') # Update lora section if 'lora' in new_data: for key in new_data['lora'].keys(): form_key = f"lora_{key}" if form_key in request.form: val = request.form.get(form_key) if key == 'lora_weight': try: val = float(val) except: val = 1.0 new_data['lora'][key] = val # LoRA weight randomization bounds for bound in ['lora_weight_min', 'lora_weight_max']: val_str = request.form.get(f'lora_{bound}', '').strip() if val_str: try: new_data.setdefault('lora', {})[bound] = float(val_str) except ValueError: pass else: new_data.setdefault('lora', {}).pop(bound, None) # Update Tags (comma separated string to list) tags_raw = request.form.get('tags', '') new_data['tags'] = [t.strip() for t in tags_raw.split(',') if t.strip()] detailer.data = new_data flag_modified(detailer, "data") # 3. Write back to JSON file detailer_file = detailer.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', detailer.detailer_id)}.json" file_path = os.path.join(app.config['DETAILERS_DIR'], detailer_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Detailer updated successfully!') return redirect(url_for('detailer_detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('detailers/edit.html', detailer=detailer, loras=loras) @app.route('/detailer//upload', methods=['POST']) def upload_detailer_image(slug): detailer = Detailer.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create detailer subfolder detailer_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"detailers/{slug}") os.makedirs(detailer_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(detailer_folder, filename) file.save(file_path) # Store relative path in DB detailer.image_path = f"detailers/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('detailer_detail', slug=slug)) def _queue_detailer_generation(detailer_obj, character=None, selected_fields=None, client_id=None, action=None, extra_positive=None, extra_negative=None): if character: combined_data = character.data.copy() combined_data['character_id'] = character.character_id # Merge detailer prompt into character's tags detailer_prompt = detailer_obj.data.get('prompt', '') if detailer_prompt: if 'tags' not in combined_data: combined_data['tags'] = [] combined_data['tags'].append(detailer_prompt) # Merge detailer lora triggers if present detailer_lora = detailer_obj.data.get('lora', {}) if detailer_lora.get('lora_triggers'): if 'lora' not in combined_data: combined_data['lora'] = {} combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {detailer_lora['lora_triggers']}" # Merge character identity and wardrobe fields into selected_fields if selected_fields: # Add character identity fields to selection if not already present for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']: if character.data.get('identity', {}).get(key): field_key = f'identity::{key}' if field_key not in selected_fields: selected_fields.append(field_key) # Always include character name if 'special::name' not in selected_fields: selected_fields.append('special::name') # Add active wardrobe fields wardrobe = character.get_active_wardrobe() for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']: if wardrobe.get(key): field_key = f'wardrobe::{key}' if field_key not in selected_fields: selected_fields.append(field_key) else: # Auto-include essential character fields selected_fields = [] for key in ['base_specs', 'hair', 'eyes']: if character.data.get('identity', {}).get(key): selected_fields.append(f'identity::{key}') selected_fields.append('special::name') # Add active wardrobe wardrobe = character.get_active_wardrobe() for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']: if wardrobe.get(key): selected_fields.append(f'wardrobe::{key}') # Add detailer fields selected_fields.extend(['special::tags', 'lora::lora_triggers']) default_fields = detailer_obj.default_fields active_outfit = character.active_outfit else: # Detailer only - no character detailer_prompt = detailer_obj.data.get('prompt', '') detailer_tags = [detailer_prompt] if detailer_prompt else [] combined_data = { 'character_id': detailer_obj.detailer_id, 'tags': detailer_tags, 'lora': detailer_obj.data.get('lora', {}), } if not selected_fields: selected_fields = ['special::tags', 'lora::lora_triggers'] default_fields = detailer_obj.default_fields active_outfit = 'default' with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit) # Add colored simple background to the main prompt for detailer previews if character: primary_color = character.data.get('styles', {}).get('primary_color', '') if primary_color: prompts["main"] = f"{prompts['main']}, {primary_color} simple background" else: prompts["main"] = f"{prompts['main']}, simple background" else: prompts["main"] = f"{prompts['main']}, simple background" if extra_positive: prompts["main"] = f"{prompts['main']}, {extra_positive}" ckpt_path, ckpt_data = _get_default_checkpoint() workflow = _prepare_workflow(workflow, character, prompts, detailer=detailer_obj, action=action, custom_negative=extra_negative or None, checkpoint=ckpt_path, checkpoint_data=ckpt_data) return queue_prompt(workflow, client_id=client_id) @app.route('/detailer//generate', methods=['POST']) def generate_detailer_image(slug): detailer_obj = Detailer.query.filter_by(slug=slug).first_or_404() try: # Get action type action = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Get selected character (if any) character_slug = request.form.get('character_slug', '') character = None if character_slug == '__random__': all_characters = Character.query.all() if all_characters: character = random.choice(all_characters) character_slug = character.slug elif character_slug: character = Character.query.filter_by(slug=character_slug).first() # Get selected action (if any) action_slug = request.form.get('action_slug', '') action = Action.query.filter_by(slug=action_slug).first() if action_slug else None # Get additional prompts extra_positive = request.form.get('extra_positive', '').strip() extra_negative = request.form.get('extra_negative', '').strip() # Save preferences session[f'char_detailer_{slug}'] = character_slug session[f'action_detailer_{slug}'] = action_slug session[f'extra_pos_detailer_{slug}'] = extra_positive session[f'extra_neg_detailer_{slug}'] = extra_negative session[f'prefs_detailer_{slug}'] = selected_fields # Queue generation using helper prompt_response = _queue_detailer_generation(detailer_obj, character, selected_fields, client_id=client_id, action=action, extra_positive=extra_positive, extra_negative=extra_negative) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('detailer_detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('detailer_detail', slug=slug)) @app.route('/detailer//finalize_generation/', methods=['POST']) def finalize_detailer_generation(slug, prompt_id): detailer_obj = Detailer.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) detailer_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"detailers/{slug}") os.makedirs(detailer_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(detailer_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) relative_path = f"detailers/{slug}/{filename}" session[f'preview_detailer_{slug}'] = relative_path session.modified = True # Ensure session is saved for JSON response # If action is 'replace', also update the detailer's cover image immediately if action == 'replace': detailer_obj.image_path = relative_path db.session.commit() return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 @app.route('/detailer//save_defaults', methods=['POST']) def save_detailer_defaults(slug): detailer = Detailer.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') detailer.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this detailer!') return redirect(url_for('detailer_detail', slug=slug)) @app.route('/detailer//replace_cover_from_preview', methods=['POST']) def replace_detailer_cover_from_preview(slug): detailer = Detailer.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_detailer_{slug}') if preview_path: detailer.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('detailer_detail', slug=slug)) @app.route('/detailer//save_json', methods=['POST']) def save_detailer_json(slug): detailer = Detailer.query.filter_by(slug=slug).first_or_404() try: new_data = json.loads(request.form.get('json_data', '')) except (ValueError, TypeError) as e: return {'success': False, 'error': f'Invalid JSON: {e}'}, 400 detailer.data = new_data flag_modified(detailer, 'data') db.session.commit() if detailer.filename: file_path = os.path.join(app.config['DETAILERS_DIR'], detailer.filename) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) return {'success': True} @app.route('/detailers/bulk_create', methods=['POST']) def bulk_create_detailers_from_loras(): detailers_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Detailers/' if not os.path.exists(detailers_lora_dir): flash('Detailers LoRA directory not found.', 'error') return redirect(url_for('detailers_index')) overwrite = request.form.get('overwrite') == 'true' created_count = 0 skipped_count = 0 overwritten_count = 0 system_prompt = load_prompt('detailer_system.txt') if not system_prompt: flash('Detailer system prompt file not found.', 'error') return redirect(url_for('detailers_index')) for filename in os.listdir(detailers_lora_dir): if filename.endswith('.safetensors'): name_base = filename.rsplit('.', 1)[0] detailer_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower()) detailer_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title() json_filename = f"{detailer_id}.json" json_path = os.path.join(app.config['DETAILERS_DIR'], json_filename) is_existing = os.path.exists(json_path) if is_existing and not overwrite: skipped_count += 1 continue html_filename = f"{name_base}.html" html_path = os.path.join(detailers_lora_dir, html_filename) html_content = "" if os.path.exists(html_path): try: with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf: html_raw = hf.read() clean_html = re.sub(r']*>.*?', '', html_raw, flags=re.DOTALL) clean_html = re.sub(r']*>.*?', '', clean_html, flags=re.DOTALL) clean_html = re.sub(r']*>', '', clean_html) clean_html = re.sub(r'<[^>]+>', ' ', clean_html) html_content = ' '.join(clean_html.split()) except Exception as e: print(f"Error reading HTML {html_filename}: {e}") try: print(f"Asking LLM to describe detailer: {detailer_name}") prompt = f"Describe a detailer LoRA for AI image generation based on the filename: '{filename}'" if html_content: prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###" llm_response = call_llm(prompt, system_prompt) clean_json = llm_response.replace('```json', '').replace('```', '').strip() detailer_data = json.loads(clean_json) detailer_data['detailer_id'] = detailer_id detailer_data['detailer_name'] = detailer_name if 'lora' not in detailer_data: detailer_data['lora'] = {} detailer_data['lora']['lora_name'] = f"Illustrious/Detailers/{filename}" if not detailer_data['lora'].get('lora_triggers'): detailer_data['lora']['lora_triggers'] = name_base if detailer_data['lora'].get('lora_weight') is None: detailer_data['lora']['lora_weight'] = 1.0 if detailer_data['lora'].get('lora_weight_min') is None: detailer_data['lora']['lora_weight_min'] = 0.7 if detailer_data['lora'].get('lora_weight_max') is None: detailer_data['lora']['lora_weight_max'] = 1.0 with open(json_path, 'w') as f: json.dump(detailer_data, f, indent=2) if is_existing: overwritten_count += 1 else: created_count += 1 # Small delay to avoid API rate limits if many files time.sleep(0.5) except Exception as e: print(f"Error creating detailer for {filename}: {e}") if created_count > 0 or overwritten_count > 0: sync_detailers() msg = f'Successfully processed detailers: {created_count} created, {overwritten_count} overwritten.' if skipped_count > 0: msg += f' (Skipped {skipped_count} existing)' flash(msg) else: flash(f'No new detailers created or overwritten. {skipped_count} existing detailers found.') return redirect(url_for('detailers_index')) @app.route('/detailer/create', methods=['GET', 'POST']) def create_detailer(): if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename', '').strip() if not slug: slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_') safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: safe_slug = 'detailer' base_slug = safe_slug counter = 1 while os.path.exists(os.path.join(app.config['DETAILERS_DIR'], f"{safe_slug}.json")): safe_slug = f"{base_slug}_{counter}" counter += 1 detailer_data = { "detailer_id": safe_slug, "detailer_name": name, "prompt": "", "lora": { "lora_name": "", "lora_weight": 1.0, "lora_triggers": "" } } try: file_path = os.path.join(app.config['DETAILERS_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(detailer_data, f, indent=2) new_detailer = Detailer( detailer_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=detailer_data ) db.session.add(new_detailer) db.session.commit() flash('Detailer created successfully!') return redirect(url_for('detailer_detail', slug=safe_slug)) except Exception as e: print(f"Save error: {e}") flash(f"Failed to create detailer: {e}") return redirect(request.url) return render_template('detailers/create.html') # --------------------------------------------------------------------------- # Checkpoints # --------------------------------------------------------------------------- @app.route('/checkpoints') def checkpoints_index(): checkpoints = Checkpoint.query.order_by(Checkpoint.name).all() return render_template('checkpoints/index.html', checkpoints=checkpoints) @app.route('/checkpoints/rescan', methods=['POST']) def rescan_checkpoints(): sync_checkpoints() flash('Checkpoint list synced from disk.') return redirect(url_for('checkpoints_index')) @app.route('/checkpoint/') def checkpoint_detail(slug): ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() preview_image = session.get(f'preview_checkpoint_{slug}') selected_character = session.get(f'char_checkpoint_{slug}') # List existing preview images upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"checkpoints/{slug}") existing_previews = [] if os.path.isdir(upload_dir): files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True) existing_previews = [f"checkpoints/{slug}/{f}" for f in files] return render_template('checkpoints/detail.html', ckpt=ckpt, characters=characters, preview_image=preview_image, selected_character=selected_character, existing_previews=existing_previews) @app.route('/checkpoint//upload', methods=['POST']) def upload_checkpoint_image(slug): ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(url_for('checkpoint_detail', slug=slug)) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(url_for('checkpoint_detail', slug=slug)) if file and allowed_file(file.filename): folder = os.path.join(app.config['UPLOAD_FOLDER'], f"checkpoints/{slug}") os.makedirs(folder, exist_ok=True) filename = secure_filename(file.filename) file.save(os.path.join(folder, filename)) ckpt.image_path = f"checkpoints/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('checkpoint_detail', slug=slug)) def _apply_checkpoint_settings(workflow, ckpt_data): """Apply checkpoint-specific sampler/prompt/VAE settings to the workflow.""" steps = ckpt_data.get('steps') cfg = ckpt_data.get('cfg') sampler_name = ckpt_data.get('sampler_name') scheduler = ckpt_data.get('scheduler') base_positive = ckpt_data.get('base_positive', '') base_negative = ckpt_data.get('base_negative', '') vae = ckpt_data.get('vae', 'integrated') # KSampler (node 3) if steps and '3' in workflow: workflow['3']['inputs']['steps'] = int(steps) if cfg and '3' in workflow: workflow['3']['inputs']['cfg'] = float(cfg) if sampler_name and '3' in workflow: workflow['3']['inputs']['sampler_name'] = sampler_name if scheduler and '3' in workflow: workflow['3']['inputs']['scheduler'] = scheduler # Face/hand detailers (nodes 11, 13) for node_id in ['11', '13']: if node_id in workflow: if steps: workflow[node_id]['inputs']['steps'] = int(steps) if cfg: workflow[node_id]['inputs']['cfg'] = float(cfg) if sampler_name: workflow[node_id]['inputs']['sampler_name'] = sampler_name if scheduler: workflow[node_id]['inputs']['scheduler'] = scheduler # Prepend base_positive to positive prompts (main + face/hand detailers) if base_positive: for node_id in ['6', '14', '15']: if node_id in workflow: workflow[node_id]['inputs']['text'] = f"{base_positive}, {workflow[node_id]['inputs']['text']}" # Append base_negative to negative prompt (shared by main + detailers via node 7) if base_negative and '7' in workflow: workflow['7']['inputs']['text'] = f"{workflow['7']['inputs']['text']}, {base_negative}" # VAE: if not integrated, inject a VAELoader node and rewire if vae and vae != 'integrated': workflow['21'] = { 'inputs': {'vae_name': vae}, 'class_type': 'VAELoader' } if '8' in workflow: workflow['8']['inputs']['vae'] = ['21', 0] for node_id in ['11', '13']: if node_id in workflow: workflow[node_id]['inputs']['vae'] = ['21', 0] return workflow def _queue_checkpoint_generation(ckpt_obj, character=None, client_id=None): with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) if character: combined_data = character.data.copy() combined_data['character_id'] = character.character_id selected_fields = [] for key in ['base_specs', 'hair', 'eyes']: if character.data.get('identity', {}).get(key): selected_fields.append(f'identity::{key}') selected_fields.append('special::name') wardrobe = character.get_active_wardrobe() for key in ['full_body', 'top', 'bottom']: if wardrobe.get(key): selected_fields.append(f'wardrobe::{key}') prompts = build_prompt(combined_data, selected_fields, None, active_outfit=character.active_outfit) primary_color = character.data.get('styles', {}).get('primary_color', '') prompts["main"] = f"{prompts['main']}, {primary_color + ' ' if primary_color else ''}simple background" else: prompts = { "main": "masterpiece, best quality, 1girl, solo, simple background, looking at viewer", "face": "masterpiece, best quality", "hand": "masterpiece, best quality", } workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_obj.checkpoint_path, checkpoint_data=ckpt_obj.data or {}) return queue_prompt(workflow, client_id=client_id) @app.route('/checkpoint//generate', methods=['POST']) def generate_checkpoint_image(slug): ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404() try: client_id = request.form.get('client_id') character_slug = request.form.get('character_slug', '') character = None if character_slug == '__random__': all_characters = Character.query.all() if all_characters: character = random.choice(all_characters) character_slug = character.slug elif character_slug: character = Character.query.filter_by(slug=character_slug).first() session[f'char_checkpoint_{slug}'] = character_slug prompt_response = _queue_checkpoint_generation(ckpt, character, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('checkpoint_detail', slug=slug)) except Exception as e: print(f"Checkpoint generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('checkpoint_detail', slug=slug)) @app.route('/checkpoint//finalize_generation/', methods=['POST']) def finalize_checkpoint_generation(slug, prompt_id): ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) folder = os.path.join(app.config['UPLOAD_FOLDER'], f"checkpoints/{slug}") os.makedirs(folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" with open(os.path.join(folder, filename), 'wb') as f: f.write(image_data) relative_path = f"checkpoints/{slug}/{filename}" session[f'preview_checkpoint_{slug}'] = relative_path session.modified = True if action == 'replace': ckpt.image_path = relative_path db.session.commit() return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize checkpoint error: {e}") return {'error': str(e)}, 500 @app.route('/checkpoint//replace_cover_from_preview', methods=['POST']) def replace_checkpoint_cover_from_preview(slug): ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_checkpoint_{slug}') if preview_path: ckpt.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('checkpoint_detail', slug=slug)) @app.route('/checkpoint//save_json', methods=['POST']) def save_checkpoint_json(slug): ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404() try: new_data = json.loads(request.form.get('json_data', '')) except (ValueError, TypeError) as e: return {'success': False, 'error': f'Invalid JSON: {e}'}, 400 ckpt.data = new_data flag_modified(ckpt, 'data') db.session.commit() checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints') file_path = os.path.join(checkpoints_dir, f'{ckpt.slug}.json') with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) return {'success': True} @app.route('/get_missing_checkpoints') def get_missing_checkpoints(): missing = Checkpoint.query.filter((Checkpoint.image_path == None) | (Checkpoint.image_path == '')).all() return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]} @app.route('/clear_all_checkpoint_covers', methods=['POST']) def clear_all_checkpoint_covers(): for ckpt in Checkpoint.query.all(): ckpt.image_path = None db.session.commit() return {'success': True} @app.route('/checkpoints/bulk_create', methods=['POST']) def bulk_create_checkpoints(): checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints') os.makedirs(checkpoints_dir, exist_ok=True) overwrite = request.form.get('overwrite') == 'true' created_count = 0 skipped_count = 0 overwritten_count = 0 system_prompt = load_prompt('checkpoint_system.txt') if not system_prompt: flash('Checkpoint system prompt file not found.', 'error') return redirect(url_for('checkpoints_index')) dirs = [ (app.config.get('ILLUSTRIOUS_MODELS_DIR', ''), 'Illustrious'), (app.config.get('NOOB_MODELS_DIR', ''), 'Noob'), ] for dirpath, family in dirs: if not dirpath or not os.path.exists(dirpath): continue for filename in sorted(os.listdir(dirpath)): if not (filename.endswith('.safetensors') or filename.endswith('.ckpt')): continue checkpoint_path = f"{family}/{filename}" name_base = filename.rsplit('.', 1)[0] safe_id = re.sub(r'[^a-zA-Z0-9_]', '_', checkpoint_path.rsplit('.', 1)[0]).lower().strip('_') json_filename = f"{safe_id}.json" json_path = os.path.join(checkpoints_dir, json_filename) is_existing = os.path.exists(json_path) if is_existing and not overwrite: skipped_count += 1 continue # Look for a matching HTML file alongside the model file html_path = os.path.join(dirpath, f"{name_base}.html") html_content = "" if os.path.exists(html_path): try: with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf: html_raw = hf.read() clean_html = re.sub(r']*>.*?', '', html_raw, flags=re.DOTALL) clean_html = re.sub(r']*>.*?', '', clean_html, flags=re.DOTALL) clean_html = re.sub(r']*>', '', clean_html) clean_html = re.sub(r'<[^>]+>', ' ', clean_html) html_content = ' '.join(clean_html.split()) except Exception as e: print(f"Error reading HTML for {filename}: {e}") defaults = _default_checkpoint_data(checkpoint_path, filename) if html_content: try: print(f"Asking LLM to describe checkpoint: {filename}") prompt = ( f"Generate checkpoint metadata JSON for the model file: '{filename}' " f"(checkpoint_path: '{checkpoint_path}').\n\n" f"Here is descriptive text extracted from an associated HTML file:\n###\n{html_content[:3000]}\n###" ) llm_response = call_llm(prompt, system_prompt) clean_json = llm_response.replace('```json', '').replace('```', '').strip() ckpt_data = json.loads(clean_json) # Enforce fixed fields ckpt_data['checkpoint_path'] = checkpoint_path ckpt_data['checkpoint_name'] = filename # Fill missing fields with defaults for key, val in defaults.items(): if key not in ckpt_data or ckpt_data[key] is None: ckpt_data[key] = val time.sleep(0.5) except Exception as e: print(f"LLM error for {filename}: {e}. Using defaults.") ckpt_data = defaults else: ckpt_data = defaults try: with open(json_path, 'w') as f: json.dump(ckpt_data, f, indent=2) if is_existing: overwritten_count += 1 else: created_count += 1 except Exception as e: print(f"Error saving JSON for {filename}: {e}") if created_count > 0 or overwritten_count > 0: sync_checkpoints() msg = f'Successfully processed checkpoints: {created_count} created, {overwritten_count} overwritten.' if skipped_count > 0: msg += f' (Skipped {skipped_count} existing)' flash(msg) else: flash(f'No checkpoints created or overwritten. {skipped_count} existing entries found.') return redirect(url_for('checkpoints_index')) # ============ LOOK ROUTES ============ @app.route('/looks') def looks_index(): looks = Look.query.order_by(Look.name).all() return render_template('looks/index.html', looks=looks) @app.route('/looks/rescan', methods=['POST']) def rescan_looks(): sync_looks() flash('Database synced with look files.') return redirect(url_for('looks_index')) @app.route('/look/') def look_detail(slug): look = Look.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() # Pre-select the linked character if set preferences = session.get(f'prefs_look_{slug}') preview_image = session.get(f'preview_look_{slug}') selected_character = session.get(f'char_look_{slug}', look.character_id or '') return render_template('looks/detail.html', look=look, characters=characters, preferences=preferences, preview_image=preview_image, selected_character=selected_character) @app.route('/look//edit', methods=['GET', 'POST']) def edit_look(slug): look = Look.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() loras = get_available_loras() if request.method == 'POST': look.name = request.form.get('look_name', look.name) character_id = request.form.get('character_id', '') look.character_id = character_id if character_id else None new_data = look.data.copy() new_data['look_name'] = look.name new_data['character_id'] = look.character_id new_data['positive'] = request.form.get('positive', '') new_data['negative'] = request.form.get('negative', '') lora_name = request.form.get('lora_lora_name', '') lora_weight = float(request.form.get('lora_lora_weight', 1.0) or 1.0) lora_triggers = request.form.get('lora_lora_triggers', '') new_data['lora'] = {'lora_name': lora_name, 'lora_weight': lora_weight, 'lora_triggers': lora_triggers} for bound in ['lora_weight_min', 'lora_weight_max']: val_str = request.form.get(f'lora_{bound}', '').strip() if val_str: try: new_data['lora'][bound] = float(val_str) except ValueError: pass tags_raw = request.form.get('tags', '') new_data['tags'] = [t.strip() for t in tags_raw.split(',') if t.strip()] look.data = new_data flag_modified(look, 'data') db.session.commit() if look.filename: file_path = os.path.join(app.config['LOOKS_DIR'], look.filename) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) flash(f'Look "{look.name}" updated!') return redirect(url_for('look_detail', slug=look.slug)) return render_template('looks/edit.html', look=look, characters=characters, loras=loras) @app.route('/look//upload', methods=['POST']) def upload_look_image(slug): look = Look.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file selected') return redirect(url_for('look_detail', slug=slug)) file = request.files['image'] if file and allowed_file(file.filename): filename = secure_filename(file.filename) look_folder = os.path.join(app.config['UPLOAD_FOLDER'], f'looks/{slug}') os.makedirs(look_folder, exist_ok=True) file_path = os.path.join(look_folder, filename) file.save(file_path) look.image_path = f'looks/{slug}/{filename}' db.session.commit() return redirect(url_for('look_detail', slug=slug)) @app.route('/look//generate', methods=['POST']) def generate_look_image(slug): look = Look.query.filter_by(slug=slug).first_or_404() try: action = request.form.get('action', 'preview') client_id = request.form.get('client_id') selected_fields = request.form.getlist('include_field') character_slug = request.form.get('character_slug', '') character = None # Only load a character when the user explicitly selects one if character_slug == '__random__': all_characters = Character.query.all() if all_characters: character = random.choice(all_characters) character_slug = character.slug elif character_slug: character = Character.query.filter_by(slug=character_slug).first() if not character: character = Character.query.filter_by(character_id=character_slug).first() # No fallback to look.character_id — looks are self-contained session[f'prefs_look_{slug}'] = selected_fields session[f'char_look_{slug}'] = character_slug lora_triggers = look.data.get('lora', {}).get('lora_triggers', '') look_positive = look.data.get('positive', '') with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) if character: # Merge character identity with look LoRA and positive prompt combined_data = { 'character_id': character.character_id, 'identity': character.data.get('identity', {}), 'defaults': character.data.get('defaults', {}), 'wardrobe': character.data.get('wardrobe', {}).get(character.active_outfit or 'default', character.data.get('wardrobe', {}).get('default', {})), 'styles': character.data.get('styles', {}), 'lora': look.data.get('lora', {}), 'tags': look.data.get('tags', []) } for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']: if character.data.get('identity', {}).get(key): field_key = f'identity::{key}' if field_key not in selected_fields: selected_fields.append(field_key) for key in ['expression', 'pose']: if character.data.get('defaults', {}).get(key): field_key = f'defaults::{key}' if field_key not in selected_fields: selected_fields.append(field_key) if 'special::name' not in selected_fields: selected_fields.append('special::name') prompts = build_prompt(combined_data, selected_fields, character.default_fields) # Append look-specific triggers and positive extra = ', '.join(filter(None, [lora_triggers, look_positive])) if extra: prompts['main'] = _dedup_tags(f"{prompts['main']}, {extra}" if prompts['main'] else extra) primary_color = character.data.get('styles', {}).get('primary_color', '') bg = f"{primary_color} simple background" if primary_color else "simple background" else: # Look is self-contained: build prompt from its own positive and triggers only main = _dedup_tags(', '.join(filter(None, ['(solo:1.2)', lora_triggers, look_positive]))) prompts = {'main': main, 'face': '', 'hand': ''} bg = "simple background" prompts['main'] = _dedup_tags(f"{prompts['main']}, {bg}" if prompts['main'] else bg) ckpt_path, ckpt_data = _get_default_checkpoint() workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_path, checkpoint_data=ckpt_data, look=look) prompt_response = queue_prompt(workflow, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('look_detail', slug=slug)) except Exception as e: print(f"Look generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('look_detail', slug=slug)) @app.route('/look//finalize_generation/', methods=['POST']) def finalize_look_generation(slug, prompt_id): look = Look.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) look_folder = os.path.join(app.config['UPLOAD_FOLDER'], f'looks/{slug}') os.makedirs(look_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(look_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) relative_path = f'looks/{slug}/{filename}' session[f'preview_look_{slug}'] = relative_path session.modified = True if action == 'replace': look.image_path = relative_path db.session.commit() return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize look error: {e}") return {'error': str(e)}, 500 @app.route('/look//replace_cover_from_preview', methods=['POST']) def replace_look_cover_from_preview(slug): look = Look.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_look_{slug}') if preview_path: look.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('look_detail', slug=slug)) @app.route('/look//save_defaults', methods=['POST']) def save_look_defaults(slug): look = Look.query.filter_by(slug=slug).first_or_404() look.default_fields = request.form.getlist('include_field') db.session.commit() flash('Default prompt selection saved!') return redirect(url_for('look_detail', slug=slug)) @app.route('/look//save_json', methods=['POST']) def save_look_json(slug): look = Look.query.filter_by(slug=slug).first_or_404() try: new_data = json.loads(request.form.get('json_data', '')) except (ValueError, TypeError) as e: return {'success': False, 'error': f'Invalid JSON: {e}'}, 400 look.data = new_data look.character_id = new_data.get('character_id', look.character_id) flag_modified(look, 'data') db.session.commit() if look.filename: file_path = os.path.join(app.config['LOOKS_DIR'], look.filename) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) return {'success': True} @app.route('/look/create', methods=['GET', 'POST']) def create_look(): characters = Character.query.order_by(Character.name).all() loras = get_available_loras() if request.method == 'POST': name = request.form.get('name', '').strip() look_id = re.sub(r'[^a-zA-Z0-9_]', '_', name.lower().replace(' ', '_')) filename = f'{look_id}.json' file_path = os.path.join(app.config['LOOKS_DIR'], filename) character_id = request.form.get('character_id', '') or None lora_name = request.form.get('lora_lora_name', '') lora_weight = float(request.form.get('lora_lora_weight', 1.0) or 1.0) lora_triggers = request.form.get('lora_lora_triggers', '') positive = request.form.get('positive', '') negative = request.form.get('negative', '') tags = [t.strip() for t in request.form.get('tags', '').split(',') if t.strip()] data = { 'look_id': look_id, 'look_name': name, 'character_id': character_id, 'positive': positive, 'negative': negative, 'lora': {'lora_name': lora_name, 'lora_weight': lora_weight, 'lora_triggers': lora_triggers}, 'tags': tags } os.makedirs(app.config['LOOKS_DIR'], exist_ok=True) with open(file_path, 'w') as f: json.dump(data, f, indent=2) slug = re.sub(r'[^a-zA-Z0-9_]', '', look_id) new_look = Look(look_id=look_id, slug=slug, filename=filename, name=name, character_id=character_id, data=data) db.session.add(new_look) db.session.commit() flash(f'Look "{name}" created!') return redirect(url_for('look_detail', slug=slug)) return render_template('looks/create.html', characters=characters, loras=loras) @app.route('/get_missing_looks') def get_missing_looks(): missing = Look.query.filter((Look.image_path == None) | (Look.image_path == '')).all() return {'missing': [{'slug': l.slug, 'name': l.name} for l in missing]} @app.route('/clear_all_look_covers', methods=['POST']) def clear_all_look_covers(): looks = Look.query.all() for look in looks: look.image_path = None db.session.commit() return {'success': True} @app.route('/looks/bulk_create', methods=['POST']) def bulk_create_looks_from_loras(): lora_dir = app.config['LORA_DIR'] if not os.path.exists(lora_dir): flash('Looks LoRA directory not found.', 'error') return redirect(url_for('looks_index')) overwrite = request.form.get('overwrite') == 'true' created_count = 0 skipped_count = 0 overwritten_count = 0 system_prompt = load_prompt('look_system.txt') if not system_prompt: flash('Look system prompt file not found.', 'error') return redirect(url_for('looks_index')) for filename in os.listdir(lora_dir): if not filename.endswith('.safetensors'): continue name_base = filename.rsplit('.', 1)[0] look_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower()) look_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title() json_filename = f"{look_id}.json" json_path = os.path.join(app.config['LOOKS_DIR'], json_filename) is_existing = os.path.exists(json_path) if is_existing and not overwrite: skipped_count += 1 continue html_filename = f"{name_base}.html" html_path = os.path.join(lora_dir, html_filename) html_content = "" if os.path.exists(html_path): try: with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf: html_raw = hf.read() clean_html = re.sub(r']*>.*?', '', html_raw, flags=re.DOTALL) clean_html = re.sub(r']*>.*?', '', clean_html, flags=re.DOTALL) clean_html = re.sub(r']*>', '', clean_html) clean_html = re.sub(r'<[^>]+>', ' ', clean_html) html_content = ' '.join(clean_html.split()) except Exception as e: print(f"Error reading HTML {html_filename}: {e}") try: print(f"Asking LLM to describe look: {look_name}") prompt = f"Create a look profile for a character appearance LoRA based on the filename: '{filename}'" if html_content: prompt += f"\n\nHere is descriptive text extracted from an associated HTML file:\n###\n{html_content[:3000]}\n###" llm_response = call_llm(prompt, system_prompt) clean_json = llm_response.replace('```json', '').replace('```', '').strip() look_data = json.loads(clean_json) look_data['look_id'] = look_id look_data['look_name'] = look_name if 'lora' not in look_data: look_data['lora'] = {} look_data['lora']['lora_name'] = f"Illustrious/Looks/{filename}" if not look_data['lora'].get('lora_triggers'): look_data['lora']['lora_triggers'] = name_base if look_data['lora'].get('lora_weight') is None: look_data['lora']['lora_weight'] = 0.8 if look_data['lora'].get('lora_weight_min') is None: look_data['lora']['lora_weight_min'] = 0.7 if look_data['lora'].get('lora_weight_max') is None: look_data['lora']['lora_weight_max'] = 1.0 os.makedirs(app.config['LOOKS_DIR'], exist_ok=True) with open(json_path, 'w') as f: json.dump(look_data, f, indent=2) if is_existing: overwritten_count += 1 else: created_count += 1 time.sleep(0.5) except Exception as e: print(f"Error creating look for {filename}: {e}") if created_count > 0 or overwritten_count > 0: sync_looks() msg = f'Successfully processed looks: {created_count} created, {overwritten_count} overwritten.' if skipped_count > 0: msg += f' (Skipped {skipped_count} existing)' flash(msg) else: flash(f'No looks created or overwritten. {skipped_count} existing entries found.') return redirect(url_for('looks_index')) # --------------------------------------------------------------------------- # Gallery # --------------------------------------------------------------------------- GALLERY_CATEGORIES = ['characters', 'actions', 'outfits', 'scenes', 'styles', 'detailers', 'checkpoints'] _MODEL_MAP = { 'characters': Character, 'actions': Action, 'outfits': Outfit, 'scenes': Scene, 'styles': Style, 'detailers': Detailer, 'checkpoints': Checkpoint, } def _scan_gallery_images(category_filter='all', slug_filter=''): """Return sorted list of image dicts from the uploads directory.""" upload_folder = app.config['UPLOAD_FOLDER'] images = [] cats = GALLERY_CATEGORIES if category_filter == 'all' else [category_filter] for cat in cats: cat_folder = os.path.join(upload_folder, cat) if not os.path.isdir(cat_folder): continue try: slugs = os.listdir(cat_folder) except OSError: continue for item_slug in slugs: if slug_filter and slug_filter != item_slug: continue item_folder = os.path.join(cat_folder, item_slug) if not os.path.isdir(item_folder): continue try: files = os.listdir(item_folder) except OSError: continue for filename in files: if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')): continue try: ts = int(filename.replace('gen_', '').rsplit('.', 1)[0]) except ValueError: ts = 0 images.append({ 'path': f"{cat}/{item_slug}/{filename}", 'category': cat, 'slug': item_slug, 'filename': filename, 'timestamp': ts, }) images.sort(key=lambda x: x['timestamp'], reverse=True) return images def _enrich_with_names(images): """Add item_name field to each image dict, querying DB once per category.""" by_cat = {} for img in images: by_cat.setdefault(img['category'], set()).add(img['slug']) name_map = {} for cat, slugs in by_cat.items(): Model = _MODEL_MAP.get(cat) if not Model: continue items = Model.query.filter(Model.slug.in_(slugs)).with_entities(Model.slug, Model.name).all() for slug, name in items: name_map[(cat, slug)] = name for img in images: img['item_name'] = name_map.get((img['category'], img['slug']), img['slug']) return images @app.route('/gallery') def gallery(): category = request.args.get('category', 'all') slug = request.args.get('slug', '') sort = request.args.get('sort', 'newest') page = max(1, int(request.args.get('page', 1))) per_page = int(request.args.get('per_page', 48)) per_page = per_page if per_page in (24, 48, 96) else 48 images = _scan_gallery_images(category, slug) if sort == 'oldest': images.reverse() total = len(images) total_pages = max(1, (total + per_page - 1) // per_page) page = min(page, total_pages) page_images = images[(page - 1) * per_page: page * per_page] _enrich_with_names(page_images) slug_options = [] if category != 'all': Model = _MODEL_MAP.get(category) if Model: slug_options = [(r.slug, r.name) for r in Model.query.order_by(Model.name).with_entities(Model.slug, Model.name).all()] return render_template( 'gallery.html', images=page_images, page=page, per_page=per_page, total=total, total_pages=total_pages, category=category, slug=slug, sort=sort, categories=GALLERY_CATEGORIES, slug_options=slug_options, ) def _parse_comfy_png_metadata(image_path): """Read ComfyUI generation metadata from a PNG's tEXt 'prompt' chunk. Returns a dict with keys: positive, negative, checkpoint, loras, seed, steps, cfg, sampler, scheduler. Any missing field is None/[]. """ from PIL import Image as PilImage result = { 'positive': None, 'negative': None, 'checkpoint': None, 'loras': [], # list of {name, strength} 'seed': None, 'steps': None, 'cfg': None, 'sampler': None, 'scheduler': None, } try: with PilImage.open(image_path) as im: raw = im.info.get('prompt') if not raw: return result nodes = json.loads(raw) except Exception: return result for node in nodes.values(): ct = node.get('class_type', '') inp = node.get('inputs', {}) if ct == 'KSampler': result['seed'] = inp.get('seed') result['steps'] = inp.get('steps') result['cfg'] = inp.get('cfg') result['sampler'] = inp.get('sampler_name') result['scheduler'] = inp.get('scheduler') elif ct == 'CheckpointLoaderSimple': result['checkpoint'] = inp.get('ckpt_name') elif ct == 'CLIPTextEncode': # Identify positive vs negative by which KSampler input they connect to. # Simpler heuristic: node "6" = positive, node "7" = negative (our fixed workflow). # But to be robust, we check both via node graph references where possible. # Fallback: first CLIPTextEncode = positive, second = negative. text = inp.get('text', '') if result['positive'] is None: result['positive'] = text elif result['negative'] is None: result['negative'] = text elif ct == 'LoraLoader': name = inp.get('lora_name', '') if name: result['loras'].append({ 'name': name, 'strength': inp.get('strength_model', 1.0), }) # Re-parse with fixed node IDs from the known workflow (more reliable) try: if '6' in nodes: result['positive'] = nodes['6']['inputs'].get('text', result['positive']) if '7' in nodes: result['negative'] = nodes['7']['inputs'].get('text', result['negative']) except Exception: pass return result @app.route('/gallery/prompt-data') def gallery_prompt_data(): """Return generation metadata for a specific image by reading its PNG tEXt chunk.""" img_path = request.args.get('path', '') if not img_path: return {'error': 'path parameter required'}, 400 # Validate path stays within uploads folder upload_folder = os.path.abspath(app.config['UPLOAD_FOLDER']) abs_img = os.path.abspath(os.path.join(upload_folder, img_path)) if not abs_img.startswith(upload_folder + os.sep): return {'error': 'Invalid path'}, 400 if not os.path.isfile(abs_img): return {'error': 'File not found'}, 404 meta = _parse_comfy_png_metadata(abs_img) meta['path'] = img_path return meta @app.route('/gallery/delete', methods=['POST']) def gallery_delete(): """Delete a generated image from the gallery. Only the image file is removed.""" data = request.get_json(silent=True) or {} img_path = data.get('path', '') if not img_path: return {'error': 'path required'}, 400 if len(img_path.split('/')) != 3: return {'error': 'invalid path format'}, 400 upload_folder = os.path.abspath(app.config['UPLOAD_FOLDER']) abs_img = os.path.abspath(os.path.join(upload_folder, img_path)) if not abs_img.startswith(upload_folder + os.sep): return {'error': 'Invalid path'}, 400 if os.path.isfile(abs_img): os.remove(abs_img) return {'status': 'ok'} @app.route('/resource///delete', methods=['POST']) def resource_delete(category, slug): """Delete a resource item from a category gallery. soft: removes JSON data file + DB record; LoRA/checkpoint file kept on disk. hard: removes JSON data file + LoRA/checkpoint safetensors + DB record. """ _RESOURCE_MODEL_MAP = { 'looks': Look, 'styles': Style, 'actions': Action, 'outfits': Outfit, 'scenes': Scene, 'detailers': Detailer, 'checkpoints': Checkpoint, } _RESOURCE_DATA_DIRS = { 'looks': app.config['LOOKS_DIR'], 'styles': app.config['STYLES_DIR'], 'actions': app.config['ACTIONS_DIR'], 'outfits': app.config['CLOTHING_DIR'], 'scenes': app.config['SCENES_DIR'], 'detailers': app.config['DETAILERS_DIR'], 'checkpoints': app.config['CHECKPOINTS_DIR'], } _LORA_BASE = '/mnt/alexander/AITools/Image Models/lora/' if category not in _RESOURCE_MODEL_MAP: return {'error': 'unknown category'}, 400 req = request.get_json(silent=True) or {} mode = req.get('mode', 'soft') data_dir = _RESOURCE_DATA_DIRS[category] json_path = os.path.join(data_dir, f'{slug}.json') deleted = [] asset_abs = None # Resolve asset path before deleting JSON (hard only) if mode == 'hard' and os.path.isfile(json_path): try: with open(json_path) as f: item_data = json.load(f) if category == 'checkpoints': ckpt_rel = item_data.get('checkpoint_path', '') if ckpt_rel.startswith('Illustrious/'): asset_abs = os.path.join(app.config['ILLUSTRIOUS_MODELS_DIR'], ckpt_rel[len('Illustrious/'):]) elif ckpt_rel.startswith('Noob/'): asset_abs = os.path.join(app.config['NOOB_MODELS_DIR'], ckpt_rel[len('Noob/'):]) else: lora_name = item_data.get('lora', {}).get('lora_name', '') if lora_name: asset_abs = os.path.join(_LORA_BASE, lora_name) except Exception: pass # Delete JSON if os.path.isfile(json_path): os.remove(json_path) deleted.append('json') # Delete LoRA/checkpoint file (hard only) if mode == 'hard' and asset_abs and os.path.isfile(asset_abs): os.remove(asset_abs) deleted.append('lora' if category != 'checkpoints' else 'checkpoint') # Remove DB record Model = _RESOURCE_MODEL_MAP[category] rec = Model.query.filter_by(slug=slug).first() if rec: db.session.delete(rec) db.session.commit() deleted.append('db') return {'status': 'ok', 'deleted': deleted} # --------------------------------------------------------------------------- # Strengths Gallery # --------------------------------------------------------------------------- _STRENGTHS_MODEL_MAP = { 'characters': Character, 'looks': Look, 'outfits': Outfit, 'actions': Action, 'styles': Style, 'scenes': Scene, 'detailers': Detailer, } # Which ComfyUI LoRA node each category occupies _CATEGORY_LORA_NODES = { 'characters': '16', 'looks': '16', 'outfits': '17', 'actions': '18', 'styles': '19', 'scenes': '19', 'detailers': '19', } def _build_strengths_prompts(category, entity, character, action=None, extra_positive=''): """Build main/face/hand prompt strings for the Strengths Gallery. Only includes prompt *content* from the entity and (optionally) the character. LoRA triggers from other nodes are intentionally excluded so the result reflects only the swept LoRA's contribution. action — optional Action model object (used for detailer category) extra_positive — additional free-text to append to the main prompt """ if category == 'characters': # The entity IS the character — build its full prompt normally return build_prompt(entity.data, [], entity.default_fields) if category == 'looks': # Start with linked character data, prepend Look positive tags base = build_prompt(character.data, [], character.default_fields) if character else {'main': '', 'face': '', 'hand': ''} look_pos = entity.data.get('positive', '') look_triggers = entity.data.get('lora', {}).get('lora_triggers', '') prefix_parts = [p for p in [look_triggers, look_pos] if p] prefix = ', '.join(prefix_parts) if prefix: base['main'] = f"{prefix}, {base['main']}" if base['main'] else prefix return base if category == 'outfits': wardrobe = entity.data.get('wardrobe', {}) outfit_triggers = entity.data.get('lora', {}).get('lora_triggers', '') tags = entity.data.get('tags', []) wardrobe_parts = [v for v in wardrobe.values() if isinstance(v, str) and v] char_parts = [] face_parts = [] hand_parts = [] if character: identity = character.data.get('identity', {}) defaults = character.data.get('defaults', {}) char_parts = [v for v in [identity.get('base_specs'), identity.get('hair'), identity.get('eyes'), defaults.get('expression')] if v] face_parts = [v for v in [identity.get('hair'), identity.get('eyes'), defaults.get('expression')] if v] hand_parts = [v for v in [wardrobe.get('hands'), wardrobe.get('gloves')] if v] main_parts = ([outfit_triggers] if outfit_triggers else []) + char_parts + wardrobe_parts + tags return { 'main': _dedup_tags(', '.join(p for p in main_parts if p)), 'face': _dedup_tags(', '.join(face_parts)), 'hand': _dedup_tags(', '.join(hand_parts)), } if category == 'actions': action_data = entity.data.get('action', {}) action_triggers = entity.data.get('lora', {}).get('lora_triggers', '') tags = entity.data.get('tags', []) pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet', 'additional'] pose_parts = [action_data.get(k, '') for k in pose_fields if action_data.get(k)] expr_parts = [action_data.get(k, '') for k in ['head', 'eyes'] if action_data.get(k)] char_parts = [] face_parts = list(expr_parts) hand_parts = [action_data.get('hands', '')] if action_data.get('hands') else [] if character: identity = character.data.get('identity', {}) char_parts = [v for v in [identity.get('base_specs'), identity.get('hair'), identity.get('eyes')] if v] face_parts = [v for v in [identity.get('hair'), identity.get('eyes')] + expr_parts if v] main_parts = ([action_triggers] if action_triggers else []) + char_parts + pose_parts + tags return { 'main': _dedup_tags(', '.join(p for p in main_parts if p)), 'face': _dedup_tags(', '.join(face_parts)), 'hand': _dedup_tags(', '.join(hand_parts)), } # styles / scenes / detailers — character prompt + entity tags/triggers entity_triggers = entity.data.get('lora', {}).get('lora_triggers', '') tags = entity.data.get('tags', []) if category == 'styles': sdata = entity.data.get('style', {}) artist = f"by {sdata['artist_name']}" if sdata.get('artist_name') else '' style_tags = sdata.get('artistic_style', '') entity_parts = [p for p in [entity_triggers, artist, style_tags] + tags if p] elif category == 'scenes': sdata = entity.data.get('scene', {}) scene_parts = [v for v in sdata.values() if isinstance(v, str) and v] entity_parts = [p for p in [entity_triggers] + scene_parts + tags if p] else: # detailers det_prompt = entity.data.get('prompt', '') entity_parts = [p for p in [entity_triggers, det_prompt] + tags if p] base = build_prompt(character.data, [], character.default_fields) if character else {'main': '', 'face': '', 'hand': ''} entity_str = ', '.join(entity_parts) if entity_str: base['main'] = f"{base['main']}, {entity_str}" if base['main'] else entity_str # Incorporate action prompt fields (for detailer category) if action is not None: action_data = action.data.get('action', {}) action_parts = [action_data.get(k, '') for k in ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet', 'additional', 'head', 'eyes'] if action_data.get(k)] action_str = ', '.join(action_parts) if action_str: base['main'] = f"{base['main']}, {action_str}" if base['main'] else action_str # Append any extra positive text if extra_positive: base['main'] = f"{base['main']}, {extra_positive}" if base['main'] else extra_positive return base def _prepare_strengths_workflow(workflow, category, entity, character, prompts, checkpoint, ckpt_data, strength_value, fixed_seed, custom_negative=''): """Wire a ComfyUI workflow with ONLY the entity's LoRA active at a specific strength. All other LoRA nodes are bypassed. A fixed seed ensures every step in the Strengths Gallery sweep produces a comparably composed image. """ active_node = _CATEGORY_LORA_NODES.get(category, '16') entity_lora = entity.data.get('lora', {}) entity_lora_name = entity_lora.get('lora_name', '') # 1. Set checkpoint if checkpoint and '4' in workflow: workflow['4']['inputs']['ckpt_name'] = checkpoint # 2. Default resolution if '5' in workflow: workflow['5']['inputs']['width'] = 1024 workflow['5']['inputs']['height'] = 1024 # 3. Inject prompts if '6' in workflow: workflow['6']['inputs']['text'] = workflow['6']['inputs']['text'].replace( '{{POSITIVE_PROMPT}}', prompts.get('main', '')) if '14' in workflow: workflow['14']['inputs']['text'] = workflow['14']['inputs']['text'].replace( '{{FACE_PROMPT}}', prompts.get('face', '')) if '15' in workflow: workflow['15']['inputs']['text'] = workflow['15']['inputs']['text'].replace( '{{HAND_PROMPT}}', prompts.get('hand', '')) # For looks, prepend the look's negative to node 7 if category == 'looks': look_neg = entity.data.get('negative', '') if look_neg and '7' in workflow: workflow['7']['inputs']['text'] = f"{look_neg}, {workflow['7']['inputs']['text']}" # Prepend any custom negative (e.g. extra_neg from detailer session) if custom_negative and '7' in workflow: workflow['7']['inputs']['text'] = f"{custom_negative}, {workflow['7']['inputs']['text']}" # 4. Wire LoRA chain — only activate the entity's node; skip all others model_source = ['4', 0] clip_source = ['4', 1] for node_id in ['16', '17', '18', '19']: if node_id not in workflow: continue if node_id == active_node and entity_lora_name: workflow[node_id]['inputs']['lora_name'] = entity_lora_name workflow[node_id]['inputs']['strength_model'] = float(strength_value) workflow[node_id]['inputs']['strength_clip'] = float(strength_value) workflow[node_id]['inputs']['model'] = list(model_source) workflow[node_id]['inputs']['clip'] = list(clip_source) model_source = [node_id, 0] clip_source = [node_id, 1] # else: skip — model_source/clip_source pass through unchanged # 5. Wire all consumers to the final model/clip source for consumer, needs_model, needs_clip in [ ('3', True, False), ('6', False, True), ('7', False, True), ('11', True, True), ('13', True, True), ('14', False, True), ('15', False, True), ]: if consumer in workflow: if needs_model: workflow[consumer]['inputs']['model'] = list(model_source) if needs_clip: workflow[consumer]['inputs']['clip'] = list(clip_source) # 6. Fixed seed for all samplers for seed_node in ['3', '11', '13']: if seed_node in workflow: workflow[seed_node]['inputs']['seed'] = int(fixed_seed) # 7. Apply checkpoint-specific settings (steps, cfg, sampler, base prompts, VAE) if ckpt_data: workflow = _apply_checkpoint_settings(workflow, ckpt_data) # 8. Sync sampler/scheduler to detailer nodes sampler_name = workflow['3']['inputs'].get('sampler_name') scheduler = workflow['3']['inputs'].get('scheduler') for node_id in ['11', '13']: if node_id in workflow: if sampler_name: workflow[node_id]['inputs']['sampler_name'] = sampler_name if scheduler: workflow[node_id]['inputs']['scheduler'] = scheduler # 9. Cross-dedup prompts pos_text, neg_text = _cross_dedup_prompts( workflow['6']['inputs']['text'], workflow['7']['inputs']['text'] ) workflow['6']['inputs']['text'] = pos_text workflow['7']['inputs']['text'] = neg_text _log_workflow_prompts(f"_prepare_strengths_workflow [node={active_node} lora={entity_lora_name} @ {strength_value} seed={fixed_seed}]", workflow) return workflow @app.route('/strengths///generate', methods=['POST']) def strengths_generate(category, slug): if category not in _STRENGTHS_MODEL_MAP: return {'error': 'unknown category'}, 400 Model = _STRENGTHS_MODEL_MAP[category] entity = Model.query.filter_by(slug=slug).first_or_404() try: strength_value = float(request.form.get('strength_value', 1.0)) fixed_seed = int(request.form.get('seed', random.randint(1, 10**15))) client_id = request.form.get('client_id', '') # Resolve character: prefer POST body value (reflects current page dropdown), # then fall back to session. # Session keys use the *singular* category name (char_outfit_, char_action_, …) # but the URL uses the plural (outfits, actions, …). _singular = { 'outfits': 'outfit', 'actions': 'action', 'styles': 'style', 'scenes': 'scene', 'detailers': 'detailer', 'looks': 'look', } session_prefix = _singular.get(category, category) char_slug = (request.form.get('character_slug') or session.get(f'char_{session_prefix}_{slug}')) if category == 'characters': character = entity # entity IS the character elif char_slug == '__random__': character = Character.query.order_by(db.func.random()).first() elif char_slug: character = Character.query.filter_by(slug=char_slug).first() else: character = None print(f"[Strengths] char_slug={char_slug!r} → character={character.slug if character else 'none'}") # Read extra context that may be stored in session for some categories action_obj = None extra_positive = '' extra_negative = '' if category == 'detailers': action_slug = session.get(f'action_detailer_{slug}') if action_slug: action_obj = Action.query.filter_by(slug=action_slug).first() extra_positive = session.get(f'extra_pos_detailer_{slug}', '') extra_negative = session.get(f'extra_neg_detailer_{slug}', '') print(f"[Strengths] detailer session — char={char_slug}, action={action_slug}, extra_pos={bool(extra_positive)}, extra_neg={bool(extra_negative)}") prompts = _build_strengths_prompts(category, entity, character, action=action_obj, extra_positive=extra_positive) checkpoint, ckpt_data = _get_default_checkpoint() workflow_path = os.path.join(os.path.dirname(__file__), 'comfy_workflow.json') with open(workflow_path, 'r') as f: workflow = json.load(f) workflow = _prepare_strengths_workflow( workflow, category, entity, character, prompts, checkpoint, ckpt_data, strength_value, fixed_seed, custom_negative=extra_negative ) result = queue_prompt(workflow, client_id) prompt_id = result.get('prompt_id', '') return {'status': 'queued', 'prompt_id': prompt_id} except Exception as e: print(f"[Strengths] generate error: {e}") return {'error': str(e)}, 500 @app.route('/strengths///finalize/', methods=['POST']) def strengths_finalize(category, slug, prompt_id): if category not in _STRENGTHS_MODEL_MAP: return {'error': 'unknown category'}, 400 strength_value = request.form.get('strength_value', '0.0') seed = request.form.get('seed', '0') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'prompt not found in history'}, 404 outputs = history[prompt_id].get('outputs', {}) img_data = None img_filename = None for node_output in outputs.values(): for img in node_output.get('images', []): img_data = get_image(img['filename'], img.get('subfolder', ''), img.get('type', 'output')) img_filename = img['filename'] break if img_data: break if not img_data: return {'error': 'no image in output'}, 500 # Save — encode strength value as two-decimal string in filename strength_str = f"{float(strength_value):.2f}".replace('.', '_') upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], category, slug, 'strengths') os.makedirs(upload_dir, exist_ok=True) out_filename = f"strength_{strength_str}_seed_{seed}.png" out_path = os.path.join(upload_dir, out_filename) with open(out_path, 'wb') as f: f.write(img_data) relative = f"{category}/{slug}/strengths/{out_filename}" return {'success': True, 'image_url': f"/static/uploads/{relative}", 'strength_value': strength_value} except Exception as e: print(f"[Strengths] finalize error: {e}") return {'error': str(e)}, 500 @app.route('/strengths///list') def strengths_list(category, slug): upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], category, slug, 'strengths') if not os.path.isdir(upload_dir): return {'images': []} images = [] for fname in sorted(os.listdir(upload_dir)): if not fname.endswith('.png'): continue # Parse strength value from filename: strength_0_50_seed_12345.png → "0.50" try: parts = fname.replace('strength_', '').split('_seed_') strength_raw = parts[0] # e.g. "0_50" strength_display = strength_raw.replace('_', '.') except Exception: strength_display = fname images.append({ 'url': f"/static/uploads/{category}/{slug}/strengths/{fname}", 'strength': strength_display, 'filename': fname, }) return {'images': images} @app.route('/strengths///clear', methods=['POST']) def strengths_clear(category, slug): upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], category, slug, 'strengths') if os.path.isdir(upload_dir): for fname in os.listdir(upload_dir): fpath = os.path.join(upload_dir, fname) if os.path.isfile(fpath): os.remove(fpath) return {'success': True} _STRENGTHS_DATA_DIRS = { 'characters': 'CHARACTERS_DIR', 'looks': 'LOOKS_DIR', 'outfits': 'CLOTHING_DIR', 'actions': 'ACTIONS_DIR', 'styles': 'STYLES_DIR', 'scenes': 'SCENES_DIR', 'detailers': 'DETAILERS_DIR', } @app.route('/strengths///save_range', methods=['POST']) def strengths_save_range(category, slug): """Save lora_weight_min / lora_weight_max from the Strengths Gallery back to the entity JSON + DB.""" if category not in _STRENGTHS_MODEL_MAP or category not in _STRENGTHS_DATA_DIRS: return {'error': 'unknown category'}, 400 try: min_w = float(request.form.get('min_weight', '')) max_w = float(request.form.get('max_weight', '')) except (ValueError, TypeError): return {'error': 'invalid weight values'}, 400 if min_w > max_w: min_w, max_w = max_w, min_w Model = _STRENGTHS_MODEL_MAP[category] entity = Model.query.filter_by(slug=slug).first_or_404() # Update in-memory data dict data = dict(entity.data) if 'lora' not in data or not isinstance(data.get('lora'), dict): return {'error': 'entity has no lora section'}, 400 data['lora']['lora_weight_min'] = min_w data['lora']['lora_weight_max'] = max_w entity.data = data flag_modified(entity, 'data') # Write back to JSON file on disk data_dir = app.config[_STRENGTHS_DATA_DIRS[category]] filename = getattr(entity, 'filename', None) or f"{slug}.json" file_path = os.path.join(data_dir, filename) if os.path.exists(file_path): with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write('\n') db.session.commit() return {'success': True, 'lora_weight_min': min_w, 'lora_weight_max': max_w} if __name__ == '__main__': ensure_mcp_server_running() with app.app_context(): os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) db.create_all() # Migration: Add active_outfit column if it doesn't exist try: from sqlalchemy import text db.session.execute(text('ALTER TABLE character ADD COLUMN active_outfit VARCHAR(100) DEFAULT \'default\'')) db.session.commit() print("Added active_outfit column to character table") except Exception as e: if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower(): print("active_outfit column already exists") else: print(f"Migration note: {e}") # Migration: Add default_fields column to action table if it doesn't exist try: from sqlalchemy import text db.session.execute(text('ALTER TABLE action ADD COLUMN default_fields JSON')) db.session.commit() print("Added default_fields column to action table") except Exception as e: if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower(): print("default_fields column already exists in action table") else: print(f"Migration action note: {e}") # Migration: Add new columns to settings table columns_to_add = [ ('llm_provider', "VARCHAR(50) DEFAULT 'openrouter'"), ('local_base_url', "VARCHAR(255)"), ('local_model', "VARCHAR(100)") ] for col_name, col_type in columns_to_add: try: db.session.execute(text(f'ALTER TABLE settings ADD COLUMN {col_name} {col_type}')) db.session.commit() print(f"Added {col_name} column to settings table") except Exception as e: if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower(): pass else: print(f"Migration settings note ({col_name}): {e}") # Ensure settings exist if not Settings.query.first(): db.session.add(Settings()) db.session.commit() print("Created default settings") sync_characters() sync_outfits() sync_actions() # Migration: Add data column to checkpoint table try: db.session.execute(text('ALTER TABLE checkpoint ADD COLUMN data JSON')) db.session.commit() print("Added data column to checkpoint table") except Exception as e: if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower(): print("data column already exists in checkpoint table") else: print(f"Migration checkpoint note: {e}") sync_styles() sync_detailers() sync_scenes() sync_looks() sync_checkpoints() app.run(debug=True, host='0.0.0.0', port=5000)