import os import json import time import re import requests import random from flask import Flask, render_template, request, redirect, url_for, flash, session from werkzeug.utils import secure_filename from models import db, Character, Settings, Outfit, Action app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['UPLOAD_FOLDER'] = 'static/uploads' app.config['SECRET_KEY'] = 'dev-key-123' app.config['CHARACTERS_DIR'] = 'data/characters' app.config['CLOTHING_DIR'] = 'data/clothing' app.config['ACTIONS_DIR'] = 'data/actions' app.config['COMFYUI_URL'] = 'http://127.0.0.1:8188' app.config['ILLUSTRIOUS_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Illustrious/' app.config['NOOB_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Noob/' app.config['LORA_DIR'] = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Looks/' db.init_app(app) ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} def get_available_loras(): loras = [] if os.path.exists(app.config['LORA_DIR']): for f in os.listdir(app.config['LORA_DIR']): if f.endswith('.safetensors'): # Using the format seen in character JSONs loras.append(f"Illustrious/Looks/{f}") return sorted(loras) def get_available_clothing_loras(): """Get LoRAs from the Clothing directory for outfit LoRAs.""" clothing_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Clothing/' loras = [] if os.path.exists(clothing_lora_dir): for f in os.listdir(clothing_lora_dir): if f.endswith('.safetensors'): loras.append(f"Illustrious/Clothing/{f}") return sorted(loras) def get_available_action_loras(): """Get LoRAs from the Poses directory for action LoRAs.""" poses_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Poses/' loras = [] if os.path.exists(poses_lora_dir): for f in os.listdir(poses_lora_dir): if f.endswith('.safetensors'): loras.append(f"Illustrious/Poses/{f}") return sorted(loras) def get_available_checkpoints(): checkpoints = [] # Scan Illustrious if os.path.exists(app.config['ILLUSTRIOUS_MODELS_DIR']): for f in os.listdir(app.config['ILLUSTRIOUS_MODELS_DIR']): if f.endswith('.safetensors') or f.endswith('.ckpt'): checkpoints.append(f"Illustrious/{f}") # Scan Noob if os.path.exists(app.config['NOOB_MODELS_DIR']): for f in os.listdir(app.config['NOOB_MODELS_DIR']): if f.endswith('.safetensors') or f.endswith('.ckpt'): checkpoints.append(f"Noob/{f}") return sorted(checkpoints) def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def build_prompt(data, selected_fields=None, default_fields=None, active_outfit='default'): def is_selected(section, key): # Priority: # 1. Manual selection from form (if list is not empty) # 2. Database defaults (if they exist) # 3. Select all (default behavior) if selected_fields: return f"{section}::{key}" in selected_fields if default_fields: return f"{section}::{key}" in default_fields return True identity = data.get('identity', {}) # Get wardrobe - handle both new nested format and legacy flat format wardrobe_data = data.get('wardrobe', {}) if 'default' in wardrobe_data and isinstance(wardrobe_data.get('default'), dict): # New nested format - get active outfit wardrobe = wardrobe_data.get(active_outfit or 'default', wardrobe_data.get('default', {})) else: # Legacy flat format wardrobe = wardrobe_data defaults = data.get('defaults', {}) action_data = data.get('action', {}) # Pre-calculate Hand/Glove priority # Priority: wardrobe gloves > wardrobe hands (outfit) > identity hands (character) hand_val = "" if wardrobe.get('gloves') and is_selected('wardrobe', 'gloves'): hand_val = wardrobe.get('gloves') elif wardrobe.get('hands') and is_selected('wardrobe', 'hands'): hand_val = wardrobe.get('hands') elif identity.get('hands') and is_selected('identity', 'hands'): hand_val = identity.get('hands') # 1. Main Prompt parts = ["(solo:1.2)"] # Use character_id (underscores to spaces) for tags compatibility char_tag = data.get('character_id', '').replace('_', ' ') if char_tag and is_selected('special', 'name'): parts.append(char_tag) for key in ['base_specs', 'hair', 'eyes', 'extra']: val = identity.get(key) if val and is_selected('identity', key): parts.append(val) # Add defaults (expression, pose, scene) for key in ['expression', 'pose', 'scene']: val = defaults.get(key) if val and is_selected('defaults', key): parts.append(val) # Add hand priority value to main prompt if hand_val: parts.append(hand_val) for key in ['top', 'headwear', 'legwear', 'footwear', 'accessories']: val = wardrobe.get(key) if val and is_selected('wardrobe', key): parts.append(val) style = data.get('styles', {}).get('aesthetic') if style and is_selected('styles', 'aesthetic'): parts.append(f"{style} style") tags = data.get('tags', []) if tags and is_selected('special', 'tags'): parts.extend(tags) lora = data.get('lora', {}) if lora.get('lora_triggers') and is_selected('lora', 'lora_triggers'): parts.append(lora.get('lora_triggers')) # 2. Face Prompt: Tag, Eyes, Expression, Headwear, Action details face_parts = [] if char_tag and is_selected('special', 'name'): face_parts.append(char_tag) if identity.get('eyes') and is_selected('identity', 'eyes'): face_parts.append(identity.get('eyes')) if defaults.get('expression') and is_selected('defaults', 'expression'): face_parts.append(defaults.get('expression')) if wardrobe.get('headwear') and is_selected('wardrobe', 'headwear'): face_parts.append(wardrobe.get('headwear')) # Add specific Action expression details if available if action_data.get('head') and is_selected('action', 'head'): face_parts.append(action_data.get('head')) if action_data.get('eyes') and is_selected('action', 'eyes'): face_parts.append(action_data.get('eyes')) # 3. Hand Prompt: Hand value (Gloves or Hands), Action details hand_parts = [hand_val] if hand_val else [] if action_data.get('arms') and is_selected('action', 'arms'): hand_parts.append(action_data.get('arms')) if action_data.get('hands') and is_selected('action', 'hands'): hand_parts.append(action_data.get('hands')) return { "main": ", ".join(parts), "face": ", ".join(face_parts), "hand": ", ".join(hand_parts) } def queue_prompt(prompt_workflow, client_id=None): p = {"prompt": prompt_workflow} if client_id: p["client_id"] = client_id data = json.dumps(p).encode('utf-8') response = requests.post(f"{app.config['COMFYUI_URL']}/prompt", data=data) return response.json() def get_history(prompt_id): response = requests.get(f"{app.config['COMFYUI_URL']}/history/{prompt_id}") return response.json() def get_image(filename, subfolder, folder_type): data = {"filename": filename, "subfolder": subfolder, "type": folder_type} response = requests.get(f"{app.config['COMFYUI_URL']}/view", params=data) return response.content from sqlalchemy.orm.attributes import flag_modified def sync_characters(): if not os.path.exists(app.config['CHARACTERS_DIR']): return current_ids = [] for filename in os.listdir(app.config['CHARACTERS_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['CHARACTERS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) char_id = data.get('character_id') if not char_id: continue current_ids.append(char_id) # Generate URL-safe slug: remove special characters from character_id slug = re.sub(r'[^a-zA-Z0-9_]', '', char_id) # Check if character already exists character = Character.query.filter_by(character_id=char_id).first() name = data.get('character_name', char_id.replace('_', ' ').title()) if character: character.data = data character.name = name character.slug = slug character.filename = filename # Check if cover image still exists if character.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], character.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {character.name}, clearing path.") character.image_path = None # Explicitly tell SQLAlchemy the JSON field was modified flag_modified(character, "data") else: new_char = Character( character_id=char_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_char) except Exception as e: print(f"Error importing {filename}: {e}") # Remove characters that are no longer in the folder all_characters = Character.query.all() for char in all_characters: if char.character_id not in current_ids: db.session.delete(char) db.session.commit() def sync_outfits(): if not os.path.exists(app.config['CLOTHING_DIR']): return current_ids = [] for filename in os.listdir(app.config['CLOTHING_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['CLOTHING_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) outfit_id = data.get('outfit_id') or filename.replace('.json', '') current_ids.append(outfit_id) # Generate URL-safe slug: remove special characters from outfit_id slug = re.sub(r'[^a-zA-Z0-9_]', '', outfit_id) # Check if outfit already exists outfit = Outfit.query.filter_by(outfit_id=outfit_id).first() name = data.get('outfit_name', outfit_id.replace('_', ' ').title()) if outfit: outfit.data = data outfit.name = name outfit.slug = slug outfit.filename = filename # Check if cover image still exists if outfit.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], outfit.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {outfit.name}, clearing path.") outfit.image_path = None # Explicitly tell SQLAlchemy the JSON field was modified flag_modified(outfit, "data") else: new_outfit = Outfit( outfit_id=outfit_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_outfit) except Exception as e: print(f"Error importing outfit {filename}: {e}") # Remove outfits that are no longer in the folder all_outfits = Outfit.query.all() for outfit in all_outfits: if outfit.outfit_id not in current_ids: db.session.delete(outfit) db.session.commit() def sync_actions(): if not os.path.exists(app.config['ACTIONS_DIR']): return current_ids = [] for filename in os.listdir(app.config['ACTIONS_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['ACTIONS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) action_id = data.get('action_id') or filename.replace('.json', '') current_ids.append(action_id) # Generate URL-safe slug slug = re.sub(r'[^a-zA-Z0-9_]', '', action_id) # Check if action already exists action = Action.query.filter_by(action_id=action_id).first() name = data.get('action_name', action_id.replace('_', ' ').title()) if action: action.data = data action.name = name action.slug = slug action.filename = filename # Check if cover image still exists if action.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], action.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {action.name}, clearing path.") action.image_path = None flag_modified(action, "data") else: new_action = Action( action_id=action_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_action) except Exception as e: print(f"Error importing action {filename}: {e}") # Remove actions that are no longer in the folder all_actions = Action.query.all() for action in all_actions: if action.action_id not in current_ids: db.session.delete(action) db.session.commit() def call_llm(prompt, system_prompt="You are a creative assistant."): settings = Settings.query.first() if not settings or not settings.openrouter_api_key: raise ValueError("OpenRouter API Key not configured. Please configure it in Settings.") headers = { "Authorization": f"Bearer {settings.openrouter_api_key}", "Content-Type": "application/json" } data = { "model": settings.openrouter_model or 'google/gemini-2.0-flash-001', "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ] } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data) response.raise_for_status() result = response.json() return result['choices'][0]['message']['content'] except requests.exceptions.RequestException as e: raise RuntimeError(f"LLM API request failed: {str(e)}") from e except (KeyError, IndexError) as e: raise RuntimeError(f"Unexpected LLM response format: {str(e)}") from e @app.route('/get_openrouter_models', methods=['POST']) def get_openrouter_models(): api_key = request.form.get('api_key') if not api_key: return {'error': 'API key is required'}, 400 headers = {"Authorization": f"Bearer {api_key}"} try: response = requests.get("https://openrouter.ai/api/v1/models", headers=headers) response.raise_for_status() models = response.json().get('data', []) # Return simplified list of models return {'models': [{'id': m['id'], 'name': m.get('name', m['id'])} for m in models]} except Exception as e: return {'error': str(e)}, 500 @app.route('/settings', methods=['GET', 'POST']) def settings(): settings = Settings.query.first() if not settings: settings = Settings() db.session.add(settings) db.session.commit() if request.method == 'POST': settings.openrouter_api_key = request.form.get('api_key') settings.openrouter_model = request.form.get('model') db.session.commit() flash('Settings updated successfully!') return redirect(url_for('settings')) return render_template('settings.html', settings=settings) @app.route('/') def index(): characters = Character.query.order_by(Character.name).all() return render_template('index.html', characters=characters) @app.route('/rescan', methods=['POST']) def rescan(): sync_characters() flash('Database synced with character files.') return redirect(url_for('index')) @app.route('/generator', methods=['GET', 'POST']) def generator(): characters = Character.query.order_by(Character.name).all() checkpoints = get_available_checkpoints() if not checkpoints: checkpoints = ["Noob/oneObsession_v19Atypical.safetensors"] if request.method == 'POST': char_slug = request.form.get('character') checkpoint = request.form.get('checkpoint') custom_positive = request.form.get('positive_prompt', '') custom_negative = request.form.get('negative_prompt', '') character = Character.query.filter_by(slug=char_slug).first_or_404() try: with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # Build base prompts from character defaults prompts = build_prompt(character.data, default_fields=character.default_fields) # Append custom additions to the "main" prompt if custom_positive: prompts["main"] = f"{prompts['main']}, {custom_positive}" # Prepare workflow with custom checkpoint and negative prompt workflow = _prepare_workflow(workflow, character, prompts, checkpoint, custom_negative) print(f"Queueing generator prompt for {character.character_id}") prompt_response = queue_prompt(workflow) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] flash("Generation started...") max_retries = 120 while max_retries > 0: history = get_history(prompt_id) if prompt_id in history: outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{character.slug}") os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) relative_path = f"characters/{character.slug}/{filename}" return render_template('generator.html', characters=characters, checkpoints=checkpoints, generated_image=relative_path, selected_char=char_slug, selected_ckpt=checkpoint) time.sleep(2) max_retries -= 1 flash("Generation timed out.") except Exception as e: print(f"Generator error: {e}") flash(f"Error: {str(e)}") return render_template('generator.html', characters=characters, checkpoints=checkpoints) @app.route('/character/') def detail(slug): character = Character.query.filter_by(slug=slug).first_or_404() # Load state from session preferences = session.get(f'prefs_{slug}') preview_image = session.get(f'preview_{slug}') return render_template('detail.html', character=character, preferences=preferences, preview_image=preview_image) @app.route('/create', methods=['GET', 'POST']) def create_character(): if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename', '').strip() prompt = request.form.get('prompt', '') use_llm = request.form.get('use_llm') == 'on' # Auto-generate slug from name if not provided if not slug: slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_') # Validate slug safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: safe_slug = 'character' # Find available filename (increment if exists) base_slug = safe_slug counter = 1 while os.path.exists(os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json")): safe_slug = f"{base_slug}_{counter}" counter += 1 # Check if LLM generation is requested if use_llm: if not prompt: flash("Description is required when AI generation is enabled.") return redirect(request.url) # Generate JSON with LLM system_prompt = """You are a JSON generator. output ONLY valid JSON matching this exact structure. Do not wrap in markdown blocks. Structure: { "character_id": "WILL_BE_REPLACED", "character_name": "WILL_BE_REPLACED", "identity": { "base_specs": "string (e.g. 1girl, build, skin)", "hair": "string", "eyes": "string", "hands": "string", "arms": "string", "torso": "string", "pelvis": "string", "legs": "string", "feet": "string", "extra": "string" }, "defaults": { "expression": "", "pose": "", "scene": "" }, "wardrobe": { "full_body": "string (e.g. bodysuit, dress, full outfit description)", "headwear": "string", "top": "string", "bottom": "string", "legwear": "string", "footwear": "string", "hands": "string", "accessories": "string" }, "styles": { "aesthetic": "string", "primary_color": "string", "secondary_color": "string", "tertiary_color": "string" }, "lora": { "lora_name": "", "lora_weight": 1.0, "lora_triggers": "" }, "tags": ["string", "string"] } Fill the fields based on the user's description. Use Danbooru-style tags for the values (e.g. 'long hair', 'blue eyes'). Keep values concise. Use empty strings "" for fields that are not applicable or unknown - never use words like "none" or "n/a". Leave defaults fields empty.""" try: llm_response = call_llm(f"Create a character profile for '{name}' based on this description: {prompt}", system_prompt) # Clean response (remove markdown if present) clean_json = llm_response.replace('```json', '').replace('```', '').strip() char_data = json.loads(clean_json) # Enforce IDs char_data['character_id'] = safe_slug char_data['character_name'] = name except Exception as e: print(f"LLM error: {e}") flash(f"Failed to generate character profile: {e}") return redirect(request.url) else: # Create blank character template char_data = { "character_id": safe_slug, "character_name": name, "identity": { "base_specs": "", "hair": "", "eyes": "", "hands": "", "arms": "", "torso": "", "pelvis": "", "legs": "", "feet": "", "extra": "" }, "defaults": { "expression": "", "pose": "", "scene": "" }, "wardrobe": { "full_body": "", "headwear": "", "top": "", "bottom": "", "legwear": "", "footwear": "", "hands": "", "accessories": "" }, "styles": { "aesthetic": "", "primary_color": "", "secondary_color": "", "tertiary_color": "" }, "lora": { "lora_name": "", "lora_weight": 1.0, "lora_triggers": "" }, "tags": [] } try: # Save file file_path = os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(char_data, f, indent=2) # Add to DB new_char = Character( character_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=char_data ) db.session.add(new_char) db.session.commit() flash('Character created successfully!') return redirect(url_for('detail', slug=safe_slug)) except Exception as e: print(f"Save error: {e}") flash(f"Failed to create character: {e}") return redirect(request.url) return render_template('create.html') @app.route('/character//edit', methods=['GET', 'POST']) def edit_character(slug): character = Character.query.filter_by(slug=slug).first_or_404() loras = get_available_loras() if request.method == 'POST': try: # 1. Update basic fields character.name = request.form.get('character_name') # 2. Rebuild the data dictionary new_data = character.data.copy() new_data['character_name'] = character.name # Update nested sections (non-wardrobe) for section in ['identity', 'defaults', 'styles', 'lora']: if section in new_data: for key in new_data[section]: form_key = f"{section}_{key}" if form_key in request.form: val = request.form.get(form_key) # Handle numeric weight if key == 'lora_weight': try: val = float(val) except: val = 1.0 new_data[section][key] = val # Handle wardrobe - support both nested and flat formats wardrobe = new_data.get('wardrobe', {}) if 'default' in wardrobe and isinstance(wardrobe.get('default'), dict): # New nested format - update each outfit for outfit_name in wardrobe.keys(): for key in wardrobe[outfit_name].keys(): form_key = f"wardrobe_{outfit_name}_{key}" if form_key in request.form: wardrobe[outfit_name][key] = request.form.get(form_key) new_data['wardrobe'] = wardrobe else: # Legacy flat format if 'wardrobe' in new_data: for key in new_data['wardrobe'].keys(): form_key = f"wardrobe_{key}" if form_key in request.form: new_data['wardrobe'][key] = request.form.get(form_key) # Update Tags (comma separated string to list) tags_raw = request.form.get('tags', '') new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t] character.data = new_data flag_modified(character, "data") # 3. Write back to JSON file # Use the filename we stored during sync, or fallback to a sanitized ID char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Character profile updated successfully!') return redirect(url_for('detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('edit.html', character=character, loras=loras) @app.route('/character//outfit/switch', methods=['POST']) def switch_outfit(slug): """Switch the active outfit for a character.""" character = Character.query.filter_by(slug=slug).first_or_404() outfit_name = request.form.get('outfit', 'default') # Validate outfit exists available_outfits = character.get_available_outfits() if outfit_name in available_outfits: character.active_outfit = outfit_name db.session.commit() flash(f'Switched to "{outfit_name}" outfit.') else: flash(f'Outfit "{outfit_name}" not found.', 'error') return redirect(url_for('detail', slug=slug)) @app.route('/character//outfit/add', methods=['POST']) def add_outfit(slug): """Add a new outfit to a character.""" character = Character.query.filter_by(slug=slug).first_or_404() outfit_name = request.form.get('outfit_name', '').strip() if not outfit_name: flash('Outfit name cannot be empty.', 'error') return redirect(url_for('edit_character', slug=slug)) # Sanitize outfit name for use as key safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', outfit_name.lower()) # Get wardrobe data wardrobe = character.data.get('wardrobe', {}) # Ensure wardrobe is in new nested format if 'default' not in wardrobe or not isinstance(wardrobe.get('default'), dict): # Convert legacy format wardrobe = {'default': wardrobe} # Check if outfit already exists if safe_name in wardrobe: flash(f'Outfit "{safe_name}" already exists.', 'error') return redirect(url_for('edit_character', slug=slug)) # Create new outfit (copy from default as template) default_outfit = wardrobe.get('default', { 'headwear': '', 'top': '', 'legwear': '', 'footwear': '', 'hands': '', 'accessories': '' }) wardrobe[safe_name] = default_outfit.copy() # Update character data character.data['wardrobe'] = wardrobe flag_modified(character, 'data') # Save to JSON file char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(character.data, f, indent=2) db.session.commit() flash(f'Added new outfit "{safe_name}".') return redirect(url_for('edit_character', slug=slug)) @app.route('/character//outfit/delete', methods=['POST']) def delete_outfit(slug): """Delete an outfit from a character.""" character = Character.query.filter_by(slug=slug).first_or_404() outfit_name = request.form.get('outfit', '') wardrobe = character.data.get('wardrobe', {}) # Cannot delete default if outfit_name == 'default': flash('Cannot delete the default outfit.', 'error') return redirect(url_for('edit_character', slug=slug)) if outfit_name not in wardrobe: flash(f'Outfit "{outfit_name}" not found.', 'error') return redirect(url_for('edit_character', slug=slug)) # Delete outfit del wardrobe[outfit_name] character.data['wardrobe'] = wardrobe flag_modified(character, 'data') # Switch active outfit if deleted was active if character.active_outfit == outfit_name: character.active_outfit = 'default' # Save to JSON file char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(character.data, f, indent=2) db.session.commit() flash(f'Deleted outfit "{outfit_name}".') return redirect(url_for('edit_character', slug=slug)) @app.route('/character//outfit/rename', methods=['POST']) def rename_outfit(slug): """Rename an outfit.""" character = Character.query.filter_by(slug=slug).first_or_404() old_name = request.form.get('old_name', '') new_name = request.form.get('new_name', '').strip() if not new_name: flash('New name cannot be empty.', 'error') return redirect(url_for('edit_character', slug=slug)) # Sanitize new name safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', new_name.lower()) wardrobe = character.data.get('wardrobe', {}) if old_name not in wardrobe: flash(f'Outfit "{old_name}" not found.', 'error') return redirect(url_for('edit_character', slug=slug)) if safe_name in wardrobe and safe_name != old_name: flash(f'Outfit "{safe_name}" already exists.', 'error') return redirect(url_for('edit_character', slug=slug)) # Rename (copy to new key, delete old) wardrobe[safe_name] = wardrobe.pop(old_name) character.data['wardrobe'] = wardrobe flag_modified(character, 'data') # Update active outfit if renamed was active if character.active_outfit == old_name: character.active_outfit = safe_name # Save to JSON file char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(character.data, f, indent=2) db.session.commit() flash(f'Renamed outfit "{old_name}" to "{safe_name}".') return redirect(url_for('edit_character', slug=slug)) @app.route('/character//upload', methods=['POST']) def upload_image(slug): character = Character.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create character subfolder char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{slug}") os.makedirs(char_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(char_folder, filename) file.save(file_path) # Store relative path in DB character.image_path = f"characters/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('detail', slug=slug)) @app.route('/character//finalize_generation/', methods=['POST']) def finalize_generation(slug, prompt_id): character = Character.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) # Create character subfolder char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{slug}") os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) print(f"Image saved to: {os.path.abspath(file_path)}") # Handle actions - always save as preview relative_path = f"characters/{slug}/{filename}" session[f'preview_{slug}'] = relative_path # If action is 'replace', also update the character's cover image immediately if action == 'replace': character.image_path = relative_path db.session.commit() return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 @app.route('/character//replace_cover_from_preview', methods=['POST']) def replace_cover_from_preview(slug): character = Character.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_{slug}') if preview_path: character.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('detail', slug=slug)) def _prepare_workflow(workflow, character, prompts, checkpoint=None, custom_negative=None, outfit=None, action=None): # 1. Update prompts using replacement to preserve embeddings workflow["6"]["inputs"]["text"] = workflow["6"]["inputs"]["text"].replace("{{POSITIVE_PROMPT}}", prompts["main"]) if custom_negative: workflow["7"]["inputs"]["text"] = f"{workflow['7']['inputs']['text']}, {custom_negative}" if "14" in workflow: workflow["14"]["inputs"]["text"] = workflow["14"]["inputs"]["text"].replace("{{FACE_PROMPT}}", prompts["face"]) if "15" in workflow: workflow["15"]["inputs"]["text"] = workflow["15"]["inputs"]["text"].replace("{{HAND_PROMPT}}", prompts["hand"]) print("--- DEBUG: COMFYUI PROMPTS ---") print(f"Main Positive (6): {workflow['6']['inputs']['text']}") print(f"Main Negative (7): {workflow['7']['inputs']['text']}") if "14" in workflow: print(f"Face Detailer (14): {workflow['14']['inputs']['text']}") if "15" in workflow: print(f"Hand Detailer (15): {workflow['15']['inputs']['text']}") print("-------------------------------") # 2. Update Checkpoint if checkpoint: workflow["4"]["inputs"]["ckpt_name"] = checkpoint # 3. Handle LoRAs - Node 16 for character, Node 17 for outfit, Node 18 for action # Start with direct checkpoint connections model_source = ["4", 0] clip_source = ["4", 1] # Character LoRA (Node 16) char_lora_data = character.data.get('lora', {}) if character else {} char_lora_name = char_lora_data.get('lora_name') if char_lora_name and "16" in workflow: workflow["16"]["inputs"]["lora_name"] = char_lora_name workflow["16"]["inputs"]["strength_model"] = char_lora_data.get('lora_weight', 1.0) workflow["16"]["inputs"]["strength_clip"] = char_lora_data.get('lora_weight', 1.0) workflow["16"]["inputs"]["model"] = ["4", 0] # From checkpoint workflow["16"]["inputs"]["clip"] = ["4", 1] # From checkpoint model_source = ["16", 0] clip_source = ["16", 1] print(f"Character LoRA: {char_lora_name} @ {char_lora_data.get('lora_weight', 1.0)}") # Outfit LoRA (Node 17) - chains from character LoRA or checkpoint outfit_lora_data = outfit.data.get('lora', {}) if outfit else {} outfit_lora_name = outfit_lora_data.get('lora_name') if outfit_lora_name and "17" in workflow: workflow["17"]["inputs"]["lora_name"] = outfit_lora_name workflow["17"]["inputs"]["strength_model"] = outfit_lora_data.get('lora_weight', 0.8) workflow["17"]["inputs"]["strength_clip"] = outfit_lora_data.get('lora_weight', 0.8) # Chain from character LoRA (node 16) or checkpoint (node 4) workflow["17"]["inputs"]["model"] = model_source workflow["17"]["inputs"]["clip"] = clip_source model_source = ["17", 0] clip_source = ["17", 1] print(f"Outfit LoRA: {outfit_lora_name} @ {outfit_lora_data.get('lora_weight', 0.8)}") # Action LoRA (Node 18) - chains from previous LoRA or checkpoint action_lora_data = action.data.get('lora', {}) if action else {} action_lora_name = action_lora_data.get('lora_name') if action_lora_name and "18" in workflow: workflow["18"]["inputs"]["lora_name"] = action_lora_name workflow["18"]["inputs"]["strength_model"] = action_lora_data.get('lora_weight', 1.0) workflow["18"]["inputs"]["strength_clip"] = action_lora_data.get('lora_weight', 1.0) # Chain from previous source workflow["18"]["inputs"]["model"] = model_source workflow["18"]["inputs"]["clip"] = clip_source model_source = ["18", 0] clip_source = ["18", 1] print(f"Action LoRA: {action_lora_name} @ {action_lora_data.get('lora_weight', 1.0)}") # Apply connections to all model/clip consumers workflow["3"]["inputs"]["model"] = model_source workflow["11"]["inputs"]["model"] = model_source workflow["13"]["inputs"]["model"] = model_source workflow["6"]["inputs"]["clip"] = clip_source workflow["7"]["inputs"]["clip"] = clip_source workflow["11"]["inputs"]["clip"] = clip_source workflow["13"]["inputs"]["clip"] = clip_source workflow["14"]["inputs"]["clip"] = clip_source workflow["15"]["inputs"]["clip"] = clip_source # 4. Randomize seeds gen_seed = random.randint(1, 10**15) workflow["3"]["inputs"]["seed"] = gen_seed if "11" in workflow: workflow["11"]["inputs"]["seed"] = gen_seed if "13" in workflow: workflow["13"]["inputs"]["seed"] = gen_seed return workflow def _queue_generation(character, action='preview', selected_fields=None, client_id=None): # 1. Load workflow with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # 2. Build prompts with active outfit prompts = build_prompt(character.data, selected_fields, character.default_fields, character.active_outfit) # 3. Prepare workflow workflow = _prepare_workflow(workflow, character, prompts) return queue_prompt(workflow, client_id=client_id) @app.route('/get_missing_characters') def get_missing_characters(): missing = Character.query.filter((Character.image_path == None) | (Character.image_path == '')).all() return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]} @app.route('/clear_all_covers', methods=['POST']) def clear_all_covers(): characters = Character.query.all() for char in characters: char.image_path = None db.session.commit() return {'success': True} @app.route('/generate_missing', methods=['POST']) def generate_missing(): # Query fresh from database for each check to avoid stale session issues def get_missing_count(): return Character.query.filter((Character.image_path == None) | (Character.image_path == '')).count() if get_missing_count() == 0: flash("No characters missing cover images.") return redirect(url_for('index')) success_count = 0 processed = 0 # Keep generating until no more missing while get_missing_count() > 0: # Get the next character in alphabetical order character = Character.query.filter( (Character.image_path == None) | (Character.image_path == '') ).order_by(Character.name).first() if not character: break character_slug = character.slug character_name = character.name processed += 1 try: print(f"Batch generating for: {character_name}") prompt_response = _queue_generation(character, action='replace') prompt_id = prompt_response['prompt_id'] # Simple synchronous wait for each max_retries = 120 while max_retries > 0: history = get_history(prompt_id) if prompt_id in history: outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{character_slug}") os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) # Re-query the character to ensure it's attached to the session character_to_update = Character.query.filter_by(slug=character_slug).first() if character_to_update: character_to_update.image_path = f"characters/{character_slug}/{filename}" db.session.commit() print(f"Saved cover for {character_name}: {character_to_update.image_path}") success_count += 1 break break time.sleep(2) max_retries -= 1 except Exception as e: print(f"Error generating for {character_name}: {e}") db.session.rollback() # Rollback on error to ensure clean state flash(f"Batch generation complete. Generated {success_count} images.") return redirect(url_for('index')) @app.route('/check_status/') def check_status(prompt_id): try: history = get_history(prompt_id) if prompt_id in history: return {'status': 'finished'} return {'status': 'pending'} except Exception: return {'status': 'error'}, 500 @app.route('/character//generate', methods=['POST']) def generate_image(slug): character = Character.query.filter_by(slug=slug).first_or_404() try: # Get action type action = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Save preferences session[f'prefs_{slug}'] = selected_fields # Queue generation using helper prompt_response = _queue_generation(character, action, selected_fields, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] # Return JSON if AJAX request if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('detail', slug=slug)) @app.route('/character//save_defaults', methods=['POST']) def save_defaults(slug): character = Character.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') character.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this character!') return redirect(url_for('detail', slug=slug)) # ============ OUTFIT ROUTES ============ @app.route('/outfits') def outfits_index(): outfits = Outfit.query.order_by(Outfit.name).all() return render_template('outfits/index.html', outfits=outfits) @app.route('/outfits/rescan', methods=['POST']) def rescan_outfits(): sync_outfits() flash('Database synced with outfit files.') return redirect(url_for('outfits_index')) @app.route('/outfit/') def outfit_detail(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() # Load state from session preferences = session.get(f'prefs_outfit_{slug}') preview_image = session.get(f'preview_outfit_{slug}') selected_character = session.get(f'char_outfit_{slug}') return render_template('outfits/detail.html', outfit=outfit, characters=characters, preferences=preferences, preview_image=preview_image, selected_character=selected_character) @app.route('/outfit//edit', methods=['GET', 'POST']) def edit_outfit(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() loras = get_available_clothing_loras() # Use clothing LoRAs for outfits if request.method == 'POST': try: # 1. Update basic fields outfit.name = request.form.get('outfit_name') # 2. Rebuild the data dictionary new_data = outfit.data.copy() new_data['outfit_name'] = outfit.name # Update outfit_id if provided new_outfit_id = request.form.get('outfit_id', outfit.outfit_id) new_data['outfit_id'] = new_outfit_id # Update wardrobe section if 'wardrobe' in new_data: for key in new_data['wardrobe'].keys(): form_key = f"wardrobe_{key}" if form_key in request.form: new_data['wardrobe'][key] = request.form.get(form_key) # Update lora section if 'lora' in new_data: for key in new_data['lora'].keys(): form_key = f"lora_{key}" if form_key in request.form: val = request.form.get(form_key) if key == 'lora_weight': try: val = float(val) except: val = 0.8 new_data['lora'][key] = val # Update Tags (comma separated string to list) tags_raw = request.form.get('tags', '') new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t] outfit.data = new_data flag_modified(outfit, "data") # 3. Write back to JSON file outfit_file = outfit.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', outfit.outfit_id)}.json" file_path = os.path.join(app.config['CLOTHING_DIR'], outfit_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Outfit profile updated successfully!') return redirect(url_for('outfit_detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('outfits/edit.html', outfit=outfit, loras=loras) @app.route('/outfit//upload', methods=['POST']) def upload_outfit_image(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create outfit subfolder outfit_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"outfits/{slug}") os.makedirs(outfit_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(outfit_folder, filename) file.save(file_path) # Store relative path in DB outfit.image_path = f"outfits/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('outfit_detail', slug=slug)) @app.route('/outfit//generate', methods=['POST']) def generate_outfit_image(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() try: # Get action type action = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Get selected character (if any) character_slug = request.form.get('character_slug', '') character = None # Handle random character selection if character_slug == '__random__': all_characters = Character.query.all() if all_characters: character = random.choice(all_characters) character_slug = character.slug elif character_slug: character = Character.query.filter_by(slug=character_slug).first() # Save preferences session[f'prefs_outfit_{slug}'] = selected_fields session[f'char_outfit_{slug}'] = character_slug # Build combined data for prompt building if character: # Combine character identity/defaults with outfit wardrobe combined_data = { 'character_id': character.character_id, 'identity': character.data.get('identity', {}), 'defaults': character.data.get('defaults', {}), 'wardrobe': outfit.data.get('wardrobe', {}), # Use outfit's wardrobe 'styles': character.data.get('styles', {}), # Use character's styles 'lora': outfit.data.get('lora', {}), # Use outfit's lora 'tags': outfit.data.get('tags', []) } # When character is selected, merge character identity fields into selected_fields # so they are included in the prompt if selected_fields: # Add character identity fields to selection if not already present for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']: if character.data.get('identity', {}).get(key): field_key = f'identity::{key}' if field_key not in selected_fields: selected_fields.append(field_key) # Add expression and pose, but NOT scene (outfit previews use simple background) for key in ['expression', 'pose']: if character.data.get('defaults', {}).get(key): field_key = f'defaults::{key}' if field_key not in selected_fields: selected_fields.append(field_key) # Always include character name if 'special::name' not in selected_fields: selected_fields.append('special::name') default_fields = character.default_fields else: # Outfit only - no character combined_data = { 'character_id': outfit.outfit_id, 'wardrobe': outfit.data.get('wardrobe', {}), 'lora': outfit.data.get('lora', {}), 'tags': outfit.data.get('tags', []) } default_fields = outfit.default_fields # Queue generation with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # Build prompts for combined data prompts = build_prompt(combined_data, selected_fields, default_fields) # Add colored simple background to the main prompt for outfit previews # Use character's primary_color if available if character: primary_color = character.data.get('styles', {}).get('primary_color', '') if primary_color: prompts["main"] = f"{prompts['main']}, {primary_color} simple background" else: prompts["main"] = f"{prompts['main']}, simple background" else: prompts["main"] = f"{prompts['main']}, simple background" # Prepare workflow - pass both character and outfit for dual LoRA support workflow = _prepare_workflow(workflow, character, prompts, outfit=outfit) prompt_response = queue_prompt(workflow, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] # Return JSON if AJAX request if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('outfit_detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('outfit_detail', slug=slug)) @app.route('/outfit//finalize_generation/', methods=['POST']) def finalize_outfit_generation(slug, prompt_id): outfit = Outfit.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) # Create outfit subfolder outfit_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"outfits/{slug}") os.makedirs(outfit_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(outfit_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) print(f"Image saved to: {os.path.abspath(file_path)}") # Always save as preview relative_path = f"outfits/{slug}/{filename}" session[f'preview_outfit_{slug}'] = relative_path return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 @app.route('/outfit//replace_cover_from_preview', methods=['POST']) def replace_outfit_cover_from_preview(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_outfit_{slug}') if preview_path: outfit.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('outfit_detail', slug=slug)) @app.route('/outfit/create', methods=['GET', 'POST']) def create_outfit(): if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename', '').strip() prompt = request.form.get('prompt', '') use_llm = request.form.get('use_llm') == 'on' # Auto-generate slug from name if not provided if not slug: slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_') # Validate slug safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: safe_slug = 'outfit' # Find available filename (increment if exists) base_slug = safe_slug counter = 1 while os.path.exists(os.path.join(app.config['CLOTHING_DIR'], f"{safe_slug}.json")): safe_slug = f"{base_slug}_{counter}" counter += 1 # Check if LLM generation is requested if use_llm: if not prompt: flash("Description is required when AI generation is enabled.") return redirect(request.url) # Generate JSON with LLM system_prompt = """You are a JSON generator. output ONLY valid JSON matching this exact structure. Do not wrap in markdown blocks. Structure: { "outfit_id": "WILL_BE_REPLACED", "outfit_name": "WILL_BE_REPLACED", "wardrobe": { "full_body": "string (e.g. bodysuit, dress, full outfit description)", "headwear": "string (e.g. hairband, cap)", "top": "string (e.g. blouse, corset, jacket)", "bottom": "string (e.g. skirt, pants, shorts)", "legwear": "string (e.g. stockings, tights, socks)", "footwear": "string (e.g. heels, boots, sneakers)", "hands": "string (e.g. gloves, sleeves)", "accessories": "string (e.g. necklace, belt, apron)" }, "lora": { "lora_name": "", "lora_weight": 0.8, "lora_triggers": "" }, "tags": ["string", "string"] } Fill the fields based on the user's description. Use Danbooru-style tags for the values (e.g. 'frilled skirt', 'lace stockings'). Keep values concise. Use empty strings "" for fields that are not applicable or unknown - never use words like "none" or "n/a". Leave lora fields empty - they can be configured later.""" try: llm_response = call_llm(f"Create an outfit profile for '{name}' based on this description: {prompt}", system_prompt) # Clean response (remove markdown if present) clean_json = llm_response.replace('```json', '').replace('```', '').strip() outfit_data = json.loads(clean_json) # Enforce IDs outfit_data['outfit_id'] = safe_slug outfit_data['outfit_name'] = name # Ensure required fields exist if 'wardrobe' not in outfit_data: outfit_data['wardrobe'] = { "full_body": "", "headwear": "", "top": "", "bottom": "", "legwear": "", "footwear": "", "hands": "", "accessories": "" } if 'lora' not in outfit_data: outfit_data['lora'] = { "lora_name": "", "lora_weight": 0.8, "lora_triggers": "" } if 'tags' not in outfit_data: outfit_data['tags'] = [] except Exception as e: print(f"LLM error: {e}") flash(f"Failed to generate outfit profile: {e}") return redirect(request.url) else: # Create blank outfit template outfit_data = { "outfit_id": safe_slug, "outfit_name": name, "wardrobe": { "full_body": "", "headwear": "", "top": "", "bottom": "", "legwear": "", "footwear": "", "hands": "", "accessories": "" }, "lora": { "lora_name": "", "lora_weight": 0.8, "lora_triggers": "" }, "tags": [] } try: # Save file file_path = os.path.join(app.config['CLOTHING_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(outfit_data, f, indent=2) # Add to DB new_outfit = Outfit( outfit_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=outfit_data ) db.session.add(new_outfit) db.session.commit() flash('Outfit created successfully!') return redirect(url_for('outfit_detail', slug=safe_slug)) except Exception as e: print(f"Save error: {e}") flash(f"Failed to create outfit: {e}") return redirect(request.url) return render_template('outfits/create.html') @app.route('/outfit//save_defaults', methods=['POST']) def save_outfit_defaults(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') outfit.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this outfit!') return redirect(url_for('outfit_detail', slug=slug)) @app.route('/outfit//clone', methods=['POST']) def clone_outfit(slug): outfit = Outfit.query.filter_by(slug=slug).first_or_404() # Find the next available number for the clone base_id = outfit.outfit_id # Extract base name without number suffix import re match = re.match(r'^(.+?)_(\d+)$', base_id) if match: base_name = match.group(1) current_num = int(match.group(2)) else: base_name = base_id current_num = 1 # Find next available number next_num = current_num + 1 while True: new_id = f"{base_name}_{next_num:02d}" new_filename = f"{new_id}.json" new_path = os.path.join(app.config['CLOTHING_DIR'], new_filename) if not os.path.exists(new_path): break next_num += 1 # Create new outfit data (copy of original) new_data = outfit.data.copy() new_data['outfit_id'] = new_id new_data['outfit_name'] = f"{outfit.name} (Copy)" # Save the new JSON file with open(new_path, 'w') as f: json.dump(new_data, f, indent=2) # Create new outfit in database new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id) new_outfit = Outfit( outfit_id=new_id, slug=new_slug, filename=new_filename, name=new_data['outfit_name'], data=new_data ) db.session.add(new_outfit) db.session.commit() flash(f'Outfit cloned as "{new_id}"!') return redirect(url_for('outfit_detail', slug=new_slug)) def sync_actions(): if not os.path.exists(app.config['ACTIONS_DIR']): return current_ids = [] for filename in os.listdir(app.config['ACTIONS_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['ACTIONS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) action_id = data.get('action_id') or filename.replace('.json', '') current_ids.append(action_id) # Generate URL-safe slug slug = re.sub(r'[^a-zA-Z0-9_]', '', action_id) # Check if action already exists action = Action.query.filter_by(action_id=action_id).first() name = data.get('action_name', action_id.replace('_', ' ').title()) if action: action.data = data action.name = name action.slug = slug action.filename = filename # Check if cover image still exists if action.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], action.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {action.name}, clearing path.") action.image_path = None flag_modified(action, "data") else: new_action = Action( action_id=action_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_action) except Exception as e: print(f"Error importing action {filename}: {e}") # Remove actions that are no longer in the folder all_actions = Action.query.all() for action in all_actions: if action.action_id not in current_ids: db.session.delete(action) db.session.commit() # ============ ACTION ROUTES ============ @app.route('/actions') def actions_index(): actions = Action.query.order_by(Action.name).all() return render_template('actions/index.html', actions=actions) @app.route('/actions/rescan', methods=['POST']) def rescan_actions(): sync_actions() flash('Database synced with action files.') return redirect(url_for('actions_index')) @app.route('/action/') def action_detail(slug): action = Action.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() # Load state from session preferences = session.get(f'prefs_action_{slug}') preview_image = session.get(f'preview_action_{slug}') selected_character = session.get(f'char_action_{slug}') return render_template('actions/detail.html', action=action, characters=characters, preferences=preferences, preview_image=preview_image, selected_character=selected_character) @app.route('/action//edit', methods=['GET', 'POST']) def edit_action(slug): action = Action.query.filter_by(slug=slug).first_or_404() loras = get_available_action_loras() if request.method == 'POST': try: # 1. Update basic fields action.name = request.form.get('action_name') # 2. Rebuild the data dictionary new_data = action.data.copy() new_data['action_name'] = action.name # Update action_id if provided new_action_id = request.form.get('action_id', action.action_id) new_data['action_id'] = new_action_id # Update action section if 'action' in new_data: for key in new_data['action'].keys(): form_key = f"action_{key}" if form_key in request.form: new_data['action'][key] = request.form.get(form_key) # Update lora section if 'lora' in new_data: for key in new_data['lora'].keys(): form_key = f"lora_{key}" if form_key in request.form: val = request.form.get(form_key) if key == 'lora_weight': try: val = float(val) except: val = 1.0 new_data['lora'][key] = val # Update Tags (comma separated string to list) tags_raw = request.form.get('tags', '') new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t] action.data = new_data flag_modified(action, "data") # 3. Write back to JSON file action_file = action.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', action.action_id)}.json" file_path = os.path.join(app.config['ACTIONS_DIR'], action_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Action profile updated successfully!') return redirect(url_for('action_detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('actions/edit.html', action=action, loras=loras) @app.route('/action//upload', methods=['POST']) def upload_action_image(slug): action = Action.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create action subfolder action_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}") os.makedirs(action_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(action_folder, filename) file.save(file_path) # Store relative path in DB action.image_path = f"actions/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('action_detail', slug=slug)) @app.route('/action//generate', methods=['POST']) def generate_action_image(slug): action_obj = Action.query.filter_by(slug=slug).first_or_404() try: # Get action type action_type = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Get selected character (if any) character_slug = request.form.get('character_slug', '') character = None # Handle random character selection if character_slug == '__random__': all_characters = Character.query.all() if all_characters: character = random.choice(all_characters) character_slug = character.slug elif character_slug: character = Character.query.filter_by(slug=character_slug).first() # Save preferences session[f'char_action_{slug}'] = character_slug session[f'prefs_action_{slug}'] = selected_fields # Build combined data for prompt building if character: # Combine character identity/wardrobe with action details # Action details replace character's 'defaults' (pose, etc.) combined_data = character.data.copy() # Update 'defaults' with action details action_data = action_obj.data.get('action', {}) combined_data['action'] = action_data # Ensure action section is present for routing # Aggregate pose-related fields into 'pose' pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet'] pose_parts = [action_data.get(k) for k in pose_fields if action_data.get(k)] # Aggregate expression-related fields into 'expression' expression_parts = [action_data.get(k) for k in ['head', 'eyes'] if action_data.get(k)] combined_data['defaults'] = { 'pose': ", ".join(pose_parts), 'expression': ", ".join(expression_parts), 'scene': action_data.get('additional', '') } # Merge lora triggers if present action_lora = action_obj.data.get('lora', {}) if action_lora.get('lora_triggers'): if 'lora' not in combined_data: combined_data['lora'] = {} combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {action_lora['lora_triggers']}" # Merge tags combined_data['tags'] = list(set(combined_data.get('tags', []) + action_obj.data.get('tags', []))) # Use action's defaults if no manual selection if not selected_fields: selected_fields = list(action_obj.default_fields) if action_obj.default_fields else [] # Auto-include essential character fields if a character is selected if selected_fields: # Add character identity fields to selection if not already present for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']: if character.data.get('identity', {}).get(key): field_key = f'identity::{key}' if field_key not in selected_fields: selected_fields.append(field_key) # Always include character name if 'special::name' not in selected_fields: selected_fields.append('special::name') # Add active wardrobe fields wardrobe = character.get_active_wardrobe() for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']: if wardrobe.get(key): field_key = f'wardrobe::{key}' if field_key not in selected_fields: selected_fields.append(field_key) else: # Fallback to sensible defaults if still empty (no checkboxes and no action defaults) selected_fields = ['special::name', 'defaults::pose', 'defaults::expression'] # Add identity fields for key in ['base_specs', 'hair', 'eyes']: if character.data.get('identity', {}).get(key): selected_fields.append(f'identity::{key}') # Add wardrobe fields wardrobe = character.get_active_wardrobe() for key in ['full_body', 'top', 'bottom']: if wardrobe.get(key): selected_fields.append(f'wardrobe::{key}') default_fields = action_obj.default_fields active_outfit = character.active_outfit else: # Action only - no character (rarely makes sense for actions but let's handle it) action_data = action_obj.data.get('action', {}) # Aggregate pose-related fields into 'pose' pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet'] pose_parts = [action_data.get(k) for k in pose_fields if action_data.get(k)] # Aggregate expression-related fields into 'expression' expression_parts = [action_data.get(k) for k in ['head', 'eyes'] if action_data.get(k)] combined_data = { 'character_id': action_obj.action_id, 'defaults': { 'pose': ", ".join(pose_parts), 'expression': ", ".join(expression_parts), 'scene': action_data.get('additional', '') }, 'lora': action_obj.data.get('lora', {}), 'tags': action_obj.data.get('tags', []) } if not selected_fields: selected_fields = ['defaults::pose', 'defaults::expression', 'defaults::scene', 'lora::lora_triggers', 'special::tags'] default_fields = action_obj.default_fields active_outfit = 'default' # Queue generation with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # Build prompts for combined data prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit) # Add colored simple background to the main prompt for action previews if character: primary_color = character.data.get('styles', {}).get('primary_color', '') if primary_color: prompts["main"] = f"{prompts['main']}, {primary_color} simple background" else: prompts["main"] = f"{prompts['main']}, simple background" else: prompts["main"] = f"{prompts['main']}, simple background" # Prepare workflow workflow = _prepare_workflow(workflow, character, prompts, action=action_obj) prompt_response = queue_prompt(workflow, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('action_detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('action_detail', slug=slug)) @app.route('/action//finalize_generation/', methods=['POST']) def finalize_action_generation(slug, prompt_id): action_obj = Action.query.filter_by(slug=slug).first_or_404() try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) # Create action subfolder action_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}") os.makedirs(action_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(action_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) # Always save as preview relative_path = f"actions/{slug}/{filename}" session[f'preview_action_{slug}'] = relative_path return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 @app.route('/action//replace_cover_from_preview', methods=['POST']) def replace_action_cover_from_preview(slug): action = Action.query.filter_by(slug=slug).first_or_404() preview_path = session.get(f'preview_action_{slug}') if preview_path: action.image_path = preview_path db.session.commit() flash('Cover image updated from preview!') else: flash('No preview image available', 'error') return redirect(url_for('action_detail', slug=slug)) @app.route('/action//save_defaults', methods=['POST']) def save_action_defaults(slug): action = Action.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') action.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this action!') return redirect(url_for('action_detail', slug=slug)) @app.route('/action/create', methods=['GET', 'POST']) def create_action(): if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename', '').strip() prompt = request.form.get('prompt', '') use_llm = request.form.get('use_llm') == 'on' if not slug: slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_') safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: safe_slug = 'action' base_slug = safe_slug counter = 1 while os.path.exists(os.path.join(app.config['ACTIONS_DIR'], f"{safe_slug}.json")): safe_slug = f"{base_slug}_{counter}" counter += 1 if use_llm: if not prompt: flash("Description is required when AI generation is enabled.") return redirect(request.url) system_prompt = """You are a JSON generator. output ONLY valid JSON matching this exact structure. Do not wrap in markdown blocks. Structure: { "action_id": "WILL_BE_REPLACED", "action_name": "WILL_BE_REPLACED", "action": { "full_body": "string (pose description)", "head": "string (expression/head position)", "eyes": "string", "arms": "string", "hands": "string", "torso": "string", "pelvis": "string", "legs": "string", "feet": "string", "additional": "string" }, "lora": { "lora_name": "", "lora_weight": 1.0, "lora_triggers": "" }, "tags": ["string", "string"] }""" try: llm_response = call_llm(f"Create an action profile for '{name}' based on this description: {prompt}", system_prompt) clean_json = llm_response.replace('```json', '').replace('```', '').strip() action_data = json.loads(clean_json) action_data['action_id'] = safe_slug action_data['action_name'] = name except Exception as e: print(f"LLM error: {e}") flash(f"Failed to generate action profile: {e}") return redirect(request.url) else: action_data = { "action_id": safe_slug, "action_name": name, "action": { "full_body": "", "head": "", "eyes": "", "arms": "", "hands": "", "torso": "", "pelvis": "", "legs": "", "feet": "", "additional": "" }, "lora": {"lora_name": "", "lora_weight": 1.0, "lora_triggers": ""}, "tags": [] } try: file_path = os.path.join(app.config['ACTIONS_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(action_data, f, indent=2) new_action = Action( action_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=action_data ) db.session.add(new_action) db.session.commit() flash('Action created successfully!') return redirect(url_for('action_detail', slug=safe_slug)) except Exception as e: print(f"Save error: {e}") flash(f"Failed to create action: {e}") return redirect(request.url) return render_template('actions/create.html') @app.route('/action//clone', methods=['POST']) def clone_action(slug): action = Action.query.filter_by(slug=slug).first_or_404() # Find the next available number for the clone base_id = action.action_id import re match = re.match(r'^(.+?)_(\d+)$', base_id) if match: base_name = match.group(1) current_num = int(match.group(2)) else: base_name = base_id current_num = 1 next_num = current_num + 1 while True: new_id = f"{base_name}_{next_num:02d}" new_filename = f"{new_id}.json" new_path = os.path.join(app.config['ACTIONS_DIR'], new_filename) if not os.path.exists(new_path): break next_num += 1 new_data = action.data.copy() new_data['action_id'] = new_id new_data['action_name'] = f"{action.name} (Copy)" with open(new_path, 'w') as f: json.dump(new_data, f, indent=2) new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id) new_action = Action( action_id=new_id, slug=new_slug, filename=new_filename, name=new_data['action_name'], data=new_data ) db.session.add(new_action) db.session.commit() flash(f'Action cloned as "{new_id}"!') return redirect(url_for('action_detail', slug=new_slug)) if __name__ == '__main__': with app.app_context(): os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) db.create_all() # Migration: Add active_outfit column if it doesn't exist try: from sqlalchemy import text db.session.execute(text('ALTER TABLE character ADD COLUMN active_outfit VARCHAR(100) DEFAULT \'default\'')) db.session.commit() print("Added active_outfit column to character table") except Exception as e: if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower(): print("active_outfit column already exists") else: print(f"Migration note: {e}") # Migration: Add default_fields column to action table if it doesn't exist try: from sqlalchemy import text db.session.execute(text('ALTER TABLE action ADD COLUMN default_fields JSON')) db.session.commit() print("Added default_fields column to action table") except Exception as e: if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower(): print("default_fields column already exists in action table") else: print(f"Migration action note: {e}") sync_characters() sync_outfits() sync_actions() app.run(debug=True, port=5000)