import os import json import time import re import requests import random from flask import Flask, render_template, request, redirect, url_for, flash, session from werkzeug.utils import secure_filename from models import db, Character, Settings app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['UPLOAD_FOLDER'] = 'static/uploads' app.config['SECRET_KEY'] = 'dev-key-123' app.config['CHARACTERS_DIR'] = 'characters' app.config['COMFYUI_URL'] = 'http://127.0.0.1:8188' app.config['ILLUSTRIOUS_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Illustrious/' app.config['NOOB_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Noob/' app.config['LORA_DIR'] = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Looks/' db.init_app(app) ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} def get_available_loras(): loras = [] if os.path.exists(app.config['LORA_DIR']): for f in os.listdir(app.config['LORA_DIR']): if f.endswith('.safetensors'): # Using the format seen in character JSONs loras.append(f"Illustrious/Looks/{f}") return sorted(loras) def get_available_checkpoints(): checkpoints = [] # Scan Illustrious if os.path.exists(app.config['ILLUSTRIOUS_MODELS_DIR']): for f in os.listdir(app.config['ILLUSTRIOUS_MODELS_DIR']): if f.endswith('.safetensors') or f.endswith('.ckpt'): checkpoints.append(f"Illustrious/{f}") # Scan Noob if os.path.exists(app.config['NOOB_MODELS_DIR']): for f in os.listdir(app.config['NOOB_MODELS_DIR']): if f.endswith('.safetensors') or f.endswith('.ckpt'): checkpoints.append(f"Noob/{f}") return sorted(checkpoints) def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def build_prompt(data, selected_fields=None, default_fields=None, active_outfit='default'): def is_selected(section, key): # Priority: # 1. Manual selection from form (if list is not empty) # 2. Database defaults (if they exist) # 3. Select all (default behavior) if selected_fields: return f"{section}::{key}" in selected_fields if default_fields: return f"{section}::{key}" in default_fields return True identity = data.get('identity', {}) # Get wardrobe - handle both new nested format and legacy flat format wardrobe_data = data.get('wardrobe', {}) if 'default' in wardrobe_data and isinstance(wardrobe_data.get('default'), dict): # New nested format - get active outfit wardrobe = wardrobe_data.get(active_outfit or 'default', wardrobe_data.get('default', {})) else: # Legacy flat format wardrobe = wardrobe_data defaults = data.get('defaults', {}) # Pre-calculate Hand/Glove priority hand_val = "" if wardrobe.get('gloves') and is_selected('wardrobe', 'gloves'): hand_val = wardrobe.get('gloves') elif identity.get('hands') and is_selected('identity', 'hands'): hand_val = identity.get('hands') # 1. Main Prompt parts = ["(solo:1.2)"] # Use character_id (underscores to spaces) for tags compatibility char_tag = data.get('character_id', '').replace('_', ' ') if char_tag and is_selected('special', 'name'): parts.append(char_tag) for key in ['base_specs', 'hair', 'eyes', 'extra']: val = identity.get(key) if val and is_selected('identity', key): parts.append(val) # Add defaults (expression, pose, scene) for key in ['expression', 'pose', 'scene']: val = defaults.get(key) if val and is_selected('defaults', key): parts.append(val) # Add hand priority value to main prompt if hand_val: parts.append(hand_val) for key in ['top', 'headwear', 'legwear', 'footwear', 'accessories']: val = wardrobe.get(key) if val and is_selected('wardrobe', key): parts.append(val) style = data.get('styles', {}).get('aesthetic') if style and is_selected('styles', 'aesthetic'): parts.append(f"{style} style") tags = data.get('tags', []) if tags and is_selected('special', 'tags'): parts.extend(tags) lora = data.get('lora', {}) if lora.get('lora_triggers') and is_selected('lora', 'lora_triggers'): parts.append(lora.get('lora_triggers')) # 2. Face Prompt: Tag, Eyes, Expression face_parts = [] if char_tag and is_selected('special', 'name'): face_parts.append(char_tag) if identity.get('eyes') and is_selected('identity', 'eyes'): face_parts.append(identity.get('eyes')) if defaults.get('expression') and is_selected('defaults', 'expression'): face_parts.append(defaults.get('expression')) # 3. Hand Prompt: Hand value (Gloves or Hands) hand_parts = [hand_val] if hand_val else [] return { "main": ", ".join(parts), "face": ", ".join(face_parts), "hand": ", ".join(hand_parts) } def queue_prompt(prompt_workflow, client_id=None): p = {"prompt": prompt_workflow} if client_id: p["client_id"] = client_id data = json.dumps(p).encode('utf-8') response = requests.post(f"{app.config['COMFYUI_URL']}/prompt", data=data) return response.json() def get_history(prompt_id): response = requests.get(f"{app.config['COMFYUI_URL']}/history/{prompt_id}") return response.json() def get_image(filename, subfolder, folder_type): data = {"filename": filename, "subfolder": subfolder, "type": folder_type} response = requests.get(f"{app.config['COMFYUI_URL']}/view", params=data) return response.content from sqlalchemy.orm.attributes import flag_modified def sync_characters(): if not os.path.exists(app.config['CHARACTERS_DIR']): return current_ids = [] for filename in os.listdir(app.config['CHARACTERS_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['CHARACTERS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) char_id = data.get('character_id') if not char_id: continue current_ids.append(char_id) # Generate URL-safe slug: remove special characters from character_id slug = re.sub(r'[^a-zA-Z0-9_]', '', char_id) # Check if character already exists character = Character.query.filter_by(character_id=char_id).first() name = data.get('character_name', char_id.replace('_', ' ').title()) if character: character.data = data character.name = name character.slug = slug character.filename = filename # Check if cover image still exists if character.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], character.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {character.name}, clearing path.") character.image_path = None # Explicitly tell SQLAlchemy the JSON field was modified flag_modified(character, "data") else: new_char = Character( character_id=char_id, slug=slug, filename=filename, name=name, data=data ) db.session.add(new_char) except Exception as e: print(f"Error importing {filename}: {e}") # Remove characters that are no longer in the folder all_characters = Character.query.all() for char in all_characters: if char.character_id not in current_ids: db.session.delete(char) db.session.commit() def call_llm(prompt, system_prompt="You are a creative assistant."): settings = Settings.query.first() if not settings or not settings.openrouter_api_key: raise ValueError("OpenRouter API Key not configured. Please configure it in Settings.") headers = { "Authorization": f"Bearer {settings.openrouter_api_key}", "Content-Type": "application/json" } data = { "model": settings.openrouter_model or 'google/gemini-2.0-flash-001', "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ] } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data) response.raise_for_status() result = response.json() return result['choices'][0]['message']['content'] except requests.exceptions.RequestException as e: raise RuntimeError(f"LLM API request failed: {str(e)}") from e except (KeyError, IndexError) as e: raise RuntimeError(f"Unexpected LLM response format: {str(e)}") from e @app.route('/get_openrouter_models', methods=['POST']) def get_openrouter_models(): api_key = request.form.get('api_key') if not api_key: return {'error': 'API key is required'}, 400 headers = {"Authorization": f"Bearer {api_key}"} try: response = requests.get("https://openrouter.ai/api/v1/models", headers=headers) response.raise_for_status() models = response.json().get('data', []) # Return simplified list of models return {'models': [{'id': m['id'], 'name': m.get('name', m['id'])} for m in models]} except Exception as e: return {'error': str(e)}, 500 @app.route('/settings', methods=['GET', 'POST']) def settings(): settings = Settings.query.first() if not settings: settings = Settings() db.session.add(settings) db.session.commit() if request.method == 'POST': settings.openrouter_api_key = request.form.get('api_key') settings.openrouter_model = request.form.get('model') db.session.commit() flash('Settings updated successfully!') return redirect(url_for('settings')) return render_template('settings.html', settings=settings) @app.route('/') def index(): characters = Character.query.order_by(Character.name).all() return render_template('index.html', characters=characters) @app.route('/rescan', methods=['POST']) def rescan(): sync_characters() flash('Database synced with character files.') return redirect(url_for('index')) @app.route('/generator', methods=['GET', 'POST']) def generator(): characters = Character.query.order_by(Character.name).all() checkpoints = get_available_checkpoints() if not checkpoints: checkpoints = ["Noob/oneObsession_v19Atypical.safetensors"] if request.method == 'POST': char_slug = request.form.get('character') checkpoint = request.form.get('checkpoint') custom_positive = request.form.get('positive_prompt', '') custom_negative = request.form.get('negative_prompt', '') character = Character.query.filter_by(slug=char_slug).first_or_404() try: with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # Build base prompts from character defaults prompts = build_prompt(character.data, default_fields=character.default_fields) # Append custom additions to the "main" prompt if custom_positive: prompts["main"] = f"{prompts['main']}, {custom_positive}" # Prepare workflow with custom checkpoint and negative prompt workflow = _prepare_workflow(workflow, character, prompts, checkpoint, custom_negative) print(f"Queueing generator prompt for {character.character_id}") prompt_response = queue_prompt(workflow) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] flash("Generation started...") max_retries = 120 while max_retries > 0: history = get_history(prompt_id) if prompt_id in history: outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) char_folder = os.path.join(app.config['UPLOAD_FOLDER'], character.slug) os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) relative_path = f"{character.slug}/{filename}" return render_template('generator.html', characters=characters, checkpoints=checkpoints, generated_image=relative_path, selected_char=char_slug, selected_ckpt=checkpoint) time.sleep(2) max_retries -= 1 flash("Generation timed out.") except Exception as e: print(f"Generator error: {e}") flash(f"Error: {str(e)}") return render_template('generator.html', characters=characters, checkpoints=checkpoints) @app.route('/character/') def detail(slug): character = Character.query.filter_by(slug=slug).first_or_404() # Load state from session preferences = session.get(f'prefs_{slug}') preview_image = session.get(f'preview_{slug}') return render_template('detail.html', character=character, preferences=preferences, preview_image=preview_image) @app.route('/create', methods=['GET', 'POST']) def create_character(): if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename') prompt = request.form.get('prompt') # Validate slug safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: flash("Invalid filename.") return redirect(request.url) # Check if exists if os.path.exists(os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json")): flash("Character with this filename already exists.") return redirect(request.url) # Generate JSON with LLM system_prompt = """You are a JSON generator. output ONLY valid JSON matching this exact structure. Do not wrap in markdown blocks. Structure: { "character_id": "WILL_BE_REPLACED", "character_name": "WILL_BE_REPLACED", "identity": { "base_specs": "string (e.g. 1girl, build, skin)", "hair": "string", "eyes": "string", "hands": "string", "arms": "string", "torso": "string", "pelvis": "string", "legs": "string", "feet": "string", "extra": "string" }, "defaults": { "expression": "", "pose": "", "scene": "" }, "wardrobe": { "headwear": "string", "top": "string", "legwear": "string", "footwear": "string", "hands": "string", "accessories": "string" }, "styles": { "aesthetic": "string", "primary_color": "string", "secondary_color": "string", "tertiary_color": "string" }, "lora": { "lora_name": "", "lora_weight": 1.0, "lora_triggers": "" }, "tags": ["string", "string"] } Fill the fields based on the user's description. Use Danbooru-style tags for the values (e.g. 'long hair', 'blue eyes'). Keep values concise. Leave defaults fields empty.""" try: llm_response = call_llm(f"Create a character profile for '{name}' based on this description: {prompt}", system_prompt) # Clean response (remove markdown if present) clean_json = llm_response.replace('```json', '').replace('```', '').strip() char_data = json.loads(clean_json) # Enforce IDs char_data['character_id'] = safe_slug char_data['character_name'] = name # Save file file_path = os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(char_data, f, indent=2) # Add to DB new_char = Character( character_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=char_data ) db.session.add(new_char) db.session.commit() flash('Character created successfully!') return redirect(url_for('detail', slug=safe_slug)) except Exception as e: print(f"LLM/Save error: {e}") flash(f"Failed to create character: {e}") return redirect(request.url) return render_template('create.html') @app.route('/character//edit', methods=['GET', 'POST']) def edit_character(slug): character = Character.query.filter_by(slug=slug).first_or_404() loras = get_available_loras() if request.method == 'POST': try: # 1. Update basic fields character.name = request.form.get('character_name') # 2. Rebuild the data dictionary new_data = character.data.copy() new_data['character_name'] = character.name # Update nested sections (non-wardrobe) for section in ['identity', 'defaults', 'styles', 'lora']: if section in new_data: for key in new_data[section]: form_key = f"{section}_{key}" if form_key in request.form: val = request.form.get(form_key) # Handle numeric weight if key == 'lora_weight': try: val = float(val) except: val = 1.0 new_data[section][key] = val # Handle wardrobe - support both nested and flat formats wardrobe = new_data.get('wardrobe', {}) if 'default' in wardrobe and isinstance(wardrobe.get('default'), dict): # New nested format - update each outfit for outfit_name in wardrobe.keys(): for key in wardrobe[outfit_name].keys(): form_key = f"wardrobe_{outfit_name}_{key}" if form_key in request.form: wardrobe[outfit_name][key] = request.form.get(form_key) new_data['wardrobe'] = wardrobe else: # Legacy flat format if 'wardrobe' in new_data: for key in new_data['wardrobe'].keys(): form_key = f"wardrobe_{key}" if form_key in request.form: new_data['wardrobe'][key] = request.form.get(form_key) # Update Tags (comma separated string to list) tags_raw = request.form.get('tags', '') new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t] character.data = new_data flag_modified(character, "data") # 3. Write back to JSON file # Use the filename we stored during sync, or fallback to a sanitized ID char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Character profile updated successfully!') return redirect(url_for('detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('edit.html', character=character, loras=loras) @app.route('/character//outfit/switch', methods=['POST']) def switch_outfit(slug): """Switch the active outfit for a character.""" character = Character.query.filter_by(slug=slug).first_or_404() outfit_name = request.form.get('outfit', 'default') # Validate outfit exists available_outfits = character.get_available_outfits() if outfit_name in available_outfits: character.active_outfit = outfit_name db.session.commit() flash(f'Switched to "{outfit_name}" outfit.') else: flash(f'Outfit "{outfit_name}" not found.', 'error') return redirect(url_for('detail', slug=slug)) @app.route('/character//outfit/add', methods=['POST']) def add_outfit(slug): """Add a new outfit to a character.""" character = Character.query.filter_by(slug=slug).first_or_404() outfit_name = request.form.get('outfit_name', '').strip() if not outfit_name: flash('Outfit name cannot be empty.', 'error') return redirect(url_for('edit_character', slug=slug)) # Sanitize outfit name for use as key safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', outfit_name.lower()) # Get wardrobe data wardrobe = character.data.get('wardrobe', {}) # Ensure wardrobe is in new nested format if 'default' not in wardrobe or not isinstance(wardrobe.get('default'), dict): # Convert legacy format wardrobe = {'default': wardrobe} # Check if outfit already exists if safe_name in wardrobe: flash(f'Outfit "{safe_name}" already exists.', 'error') return redirect(url_for('edit_character', slug=slug)) # Create new outfit (copy from default as template) default_outfit = wardrobe.get('default', { 'headwear': '', 'top': '', 'legwear': '', 'footwear': '', 'hands': '', 'accessories': '' }) wardrobe[safe_name] = default_outfit.copy() # Update character data character.data['wardrobe'] = wardrobe flag_modified(character, 'data') # Save to JSON file char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(character.data, f, indent=2) db.session.commit() flash(f'Added new outfit "{safe_name}".') return redirect(url_for('edit_character', slug=slug)) @app.route('/character//outfit/delete', methods=['POST']) def delete_outfit(slug): """Delete an outfit from a character.""" character = Character.query.filter_by(slug=slug).first_or_404() outfit_name = request.form.get('outfit', '') wardrobe = character.data.get('wardrobe', {}) # Cannot delete default if outfit_name == 'default': flash('Cannot delete the default outfit.', 'error') return redirect(url_for('edit_character', slug=slug)) if outfit_name not in wardrobe: flash(f'Outfit "{outfit_name}" not found.', 'error') return redirect(url_for('edit_character', slug=slug)) # Delete outfit del wardrobe[outfit_name] character.data['wardrobe'] = wardrobe flag_modified(character, 'data') # Switch active outfit if deleted was active if character.active_outfit == outfit_name: character.active_outfit = 'default' # Save to JSON file char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(character.data, f, indent=2) db.session.commit() flash(f'Deleted outfit "{outfit_name}".') return redirect(url_for('edit_character', slug=slug)) @app.route('/character//outfit/rename', methods=['POST']) def rename_outfit(slug): """Rename an outfit.""" character = Character.query.filter_by(slug=slug).first_or_404() old_name = request.form.get('old_name', '') new_name = request.form.get('new_name', '').strip() if not new_name: flash('New name cannot be empty.', 'error') return redirect(url_for('edit_character', slug=slug)) # Sanitize new name safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', new_name.lower()) wardrobe = character.data.get('wardrobe', {}) if old_name not in wardrobe: flash(f'Outfit "{old_name}" not found.', 'error') return redirect(url_for('edit_character', slug=slug)) if safe_name in wardrobe and safe_name != old_name: flash(f'Outfit "{safe_name}" already exists.', 'error') return redirect(url_for('edit_character', slug=slug)) # Rename (copy to new key, delete old) wardrobe[safe_name] = wardrobe.pop(old_name) character.data['wardrobe'] = wardrobe flag_modified(character, 'data') # Update active outfit if renamed was active if character.active_outfit == old_name: character.active_outfit = safe_name # Save to JSON file char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json" file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file) with open(file_path, 'w') as f: json.dump(character.data, f, indent=2) db.session.commit() flash(f'Renamed outfit "{old_name}" to "{safe_name}".') return redirect(url_for('edit_character', slug=slug)) @app.route('/character//upload', methods=['POST']) def upload_image(slug): character = Character.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create character subfolder char_folder = os.path.join(app.config['UPLOAD_FOLDER'], slug) os.makedirs(char_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(char_folder, filename) file.save(file_path) # Store relative path in DB character.image_path = f"{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('detail', slug=slug)) @app.route('/character//finalize_generation/', methods=['POST']) def finalize_generation(slug, prompt_id): character = Character.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) # Create character subfolder char_folder = os.path.join(app.config['UPLOAD_FOLDER'], slug) os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) print(f"Image saved to: {os.path.abspath(file_path)}") # Handle actions relative_path = f"{slug}/{filename}" if action == 'replace': character.image_path = relative_path db.session.commit() flash('Cover image updated!') else: # Preview mode session[f'preview_{slug}'] = relative_path return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 def _prepare_workflow(workflow, character, prompts, checkpoint=None, custom_negative=None): # 1. Update prompts using replacement to preserve embeddings workflow["6"]["inputs"]["text"] = workflow["6"]["inputs"]["text"].replace("{{POSITIVE_PROMPT}}", prompts["main"]) if custom_negative: workflow["7"]["inputs"]["text"] = f"{workflow['7']['inputs']['text']}, {custom_negative}" if "14" in workflow: workflow["14"]["inputs"]["text"] = workflow["14"]["inputs"]["text"].replace("{{FACE_PROMPT}}", prompts["face"]) if "15" in workflow: workflow["15"]["inputs"]["text"] = workflow["15"]["inputs"]["text"].replace("{{HAND_PROMPT}}", prompts["hand"]) print("--- DEBUG: COMFYUI PROMPTS ---") print(f"Main Positive (6): {workflow['6']['inputs']['text']}") print(f"Main Negative (7): {workflow['7']['inputs']['text']}") if "14" in workflow: print(f"Face Detailer (14): {workflow['14']['inputs']['text']}") if "15" in workflow: print(f"Hand Detailer (15): {workflow['15']['inputs']['text']}") print("-------------------------------") # 2. Update Checkpoint if checkpoint: workflow["4"]["inputs"]["ckpt_name"] = checkpoint # 3. Handle LoRA lora_data = character.data.get('lora', {}) lora_name = lora_data.get('lora_name') model_source = ["4", 0] clip_source = ["4", 1] if lora_name and "16" in workflow: workflow["16"]["inputs"]["lora_name"] = lora_name workflow["16"]["inputs"]["strength_model"] = lora_data.get('lora_weight', 1.0) workflow["16"]["inputs"]["strength_clip"] = lora_data.get('lora_weight', 1.0) model_source = ["16", 0] clip_source = ["16", 1] # Apply connections to all model/clip consumers workflow["3"]["inputs"]["model"] = model_source workflow["11"]["inputs"]["model"] = model_source workflow["13"]["inputs"]["model"] = model_source workflow["6"]["inputs"]["clip"] = clip_source workflow["7"]["inputs"]["clip"] = clip_source workflow["11"]["inputs"]["clip"] = clip_source workflow["13"]["inputs"]["clip"] = clip_source workflow["14"]["inputs"]["clip"] = clip_source workflow["15"]["inputs"]["clip"] = clip_source # 4. Randomize seeds gen_seed = random.randint(1, 10**15) workflow["3"]["inputs"]["seed"] = gen_seed if "11" in workflow: workflow["11"]["inputs"]["seed"] = gen_seed if "13" in workflow: workflow["13"]["inputs"]["seed"] = gen_seed return workflow def _queue_generation(character, action='preview', selected_fields=None, client_id=None): # 1. Load workflow with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # 2. Build prompts with active outfit prompts = build_prompt(character.data, selected_fields, character.default_fields, character.active_outfit) # 3. Prepare workflow workflow = _prepare_workflow(workflow, character, prompts) return queue_prompt(workflow, client_id=client_id) @app.route('/get_missing_characters') def get_missing_characters(): missing = Character.query.filter((Character.image_path == None) | (Character.image_path == '')).all() return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]} @app.route('/clear_all_covers', methods=['POST']) def clear_all_covers(): characters = Character.query.all() for char in characters: char.image_path = None db.session.commit() return {'success': True} @app.route('/generate_missing', methods=['POST']) def generate_missing(): missing = Character.query.filter((Character.image_path == None) | (Character.image_path == '')).all() if not missing: flash("No characters missing cover images.") return redirect(url_for('index')) success_count = 0 for character in missing: try: print(f"Batch generating for: {character.name}") prompt_response = _queue_generation(character, action='replace') prompt_id = prompt_response['prompt_id'] # Simple synchronous wait for each max_retries = 120 while max_retries > 0: history = get_history(prompt_id) if prompt_id in history: outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) char_folder = os.path.join(app.config['UPLOAD_FOLDER'], character.slug) os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) character.image_path = f"{character.slug}/{filename}" db.session.commit() success_count += 1 break break time.sleep(2) max_retries -= 1 except Exception as e: print(f"Error generating for {character.name}: {e}") flash(f"Batch generation complete. Generated {success_count} images.") return redirect(url_for('index')) @app.route('/check_status/') def check_status(prompt_id): try: history = get_history(prompt_id) if prompt_id in history: return {'status': 'finished'} return {'status': 'pending'} except Exception: return {'status': 'error'}, 500 @app.route('/character//generate', methods=['POST']) def generate_image(slug): character = Character.query.filter_by(slug=slug).first_or_404() try: # Get action type action = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Save preferences session[f'prefs_{slug}'] = selected_fields # Queue generation using helper prompt_response = _queue_generation(character, action, selected_fields, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] # Return JSON if AJAX request if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('detail', slug=slug)) @app.route('/character//save_defaults', methods=['POST']) def save_defaults(slug): character = Character.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') character.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this character!') return redirect(url_for('detail', slug=slug)) if __name__ == '__main__': with app.app_context(): os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) db.create_all() # Migration: Add active_outfit column if it doesn't exist try: from sqlalchemy import text db.session.execute(text('ALTER TABLE character ADD COLUMN active_outfit VARCHAR(100) DEFAULT \'default\'')) db.session.commit() print("Added active_outfit column to character table") except Exception as e: if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower(): print("active_outfit column already exists") else: print(f"Migration note: {e}") sync_characters() app.run(debug=True, port=5000)