import os import json import time import re import requests import random from flask import Flask, render_template, request, redirect, url_for, flash, session from werkzeug.utils import secure_filename from models import db, Character app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['UPLOAD_FOLDER'] = 'static/uploads' app.config['SECRET_KEY'] = 'dev-key-123' app.config['CHARACTERS_DIR'] = 'characters' app.config['COMFYUI_URL'] = 'http://127.0.0.1:8188' app.config['ILLUSTRIOUS_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Illustrious/' app.config['NOOB_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Noob/' db.init_app(app) ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} def get_available_checkpoints(): checkpoints = [] # Scan Illustrious if os.path.exists(app.config['ILLUSTRIOUS_MODELS_DIR']): for f in os.listdir(app.config['ILLUSTRIOUS_MODELS_DIR']): if f.endswith('.safetensors') or f.endswith('.ckpt'): checkpoints.append(f"Illustrious/{f}") # Scan Noob if os.path.exists(app.config['NOOB_MODELS_DIR']): for f in os.listdir(app.config['NOOB_MODELS_DIR']): if f.endswith('.safetensors') or f.endswith('.ckpt'): checkpoints.append(f"Noob/{f}") return sorted(checkpoints) def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def build_prompt(data, selected_fields=None, default_fields=None): def is_selected(section, key): # Priority: # 1. Manual selection from form (if list is not empty) # 2. Database defaults (if they exist) # 3. Select all (default behavior) if selected_fields: return f"{section}::{key}" in selected_fields if default_fields: return f"{section}::{key}" in default_fields return True identity = data.get('identity', {}) wardrobe = data.get('wardrobe', {}) # Pre-calculate Hand/Glove priority hand_val = "" if wardrobe.get('gloves') and is_selected('wardrobe', 'gloves'): hand_val = wardrobe.get('gloves') elif identity.get('hands') and is_selected('identity', 'hands'): hand_val = identity.get('hands') # 1. Main Prompt parts = ["(solo:1.2)"] # Use character_id (underscores to spaces) for tags compatibility char_tag = data.get('character_id', '').replace('_', ' ') if char_tag and is_selected('special', 'name'): parts.append(char_tag) for key in ['base_specs', 'hair', 'eyes', 'expression', 'distinguishing_marks']: val = identity.get(key) if val and is_selected('identity', key): parts.append(val) # Add hand priority value to main prompt if hand_val: parts.append(hand_val) for key in ['outer_layer', 'inner_layer', 'lower_body', 'footwear', 'accessories']: val = wardrobe.get(key) if val and is_selected('wardrobe', key): parts.append(val) style = data.get('styles', {}).get('aesthetic') if style and is_selected('styles', 'aesthetic'): parts.append(f"{style} style") tags = data.get('tags', []) if tags and is_selected('special', 'tags'): parts.extend(tags) lora = data.get('lora', {}) if lora.get('lora_triggers') and is_selected('lora', 'lora_triggers'): parts.append(lora.get('lora_triggers')) # 2. Face Prompt: Tag, Eyes, Expression face_parts = [] if char_tag and is_selected('special', 'name'): face_parts.append(char_tag) if identity.get('eyes') and is_selected('identity', 'eyes'): face_parts.append(identity.get('eyes')) if identity.get('expression') and is_selected('identity', 'expression'): face_parts.append(identity.get('expression')) # 3. Hand Prompt: Hand value (Gloves or Hands) hand_parts = [hand_val] if hand_val else [] return { "main": ", ".join(parts), "face": ", ".join(face_parts), "hand": ", ".join(hand_parts) } def queue_prompt(prompt_workflow, client_id=None): p = {"prompt": prompt_workflow} if client_id: p["client_id"] = client_id data = json.dumps(p).encode('utf-8') response = requests.post(f"{app.config['COMFYUI_URL']}/prompt", data=data) return response.json() def get_history(prompt_id): response = requests.get(f"{app.config['COMFYUI_URL']}/history/{prompt_id}") return response.json() def get_image(filename, subfolder, folder_type): data = {"filename": filename, "subfolder": subfolder, "type": folder_type} response = requests.get(f"{app.config['COMFYUI_URL']}/view", params=data) return response.content from sqlalchemy.orm.attributes import flag_modified def sync_characters(): if not os.path.exists(app.config['CHARACTERS_DIR']): return current_ids = [] for filename in os.listdir(app.config['CHARACTERS_DIR']): if filename.endswith('.json'): file_path = os.path.join(app.config['CHARACTERS_DIR'], filename) try: with open(file_path, 'r') as f: data = json.load(f) char_id = data.get('character_id') if not char_id: continue current_ids.append(char_id) # Generate URL-safe slug: remove special characters from character_id slug = re.sub(r'[^a-zA-Z0-9_]', '', char_id) # Check if character already exists character = Character.query.filter_by(character_id=char_id).first() name = data.get('character_name', char_id.replace('_', ' ').title()) if character: character.data = data character.name = name character.slug = slug # Check if cover image still exists if character.image_path: full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], character.image_path) if not os.path.exists(full_img_path): print(f"Image missing for {character.name}, clearing path.") character.image_path = None # Explicitly tell SQLAlchemy the JSON field was modified flag_modified(character, "data") else: new_char = Character( character_id=char_id, slug=slug, name=name, data=data ) db.session.add(new_char) except Exception as e: print(f"Error importing {filename}: {e}") # Remove characters that are no longer in the folder all_characters = Character.query.all() for char in all_characters: if char.character_id not in current_ids: db.session.delete(char) db.session.commit() @app.route('/') def index(): characters = Character.query.order_by(Character.name).all() return render_template('index.html', characters=characters) @app.route('/rescan', methods=['POST']) def rescan(): sync_characters() flash('Database synced with character files.') return redirect(url_for('index')) @app.route('/generator', methods=['GET', 'POST']) def generator(): characters = Character.query.order_by(Character.name).all() checkpoints = get_available_checkpoints() if not checkpoints: checkpoints = ["Noob/oneObsession_v19Atypical.safetensors"] if request.method == 'POST': char_slug = request.form.get('character') checkpoint = request.form.get('checkpoint') custom_positive = request.form.get('positive_prompt', '') custom_negative = request.form.get('negative_prompt', '') character = Character.query.filter_by(slug=char_slug).first_or_404() try: with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # Build base prompts from character defaults prompts = build_prompt(character.data, default_fields=character.default_fields) # Append custom additions to the "main" prompt if custom_positive: prompts["main"] = f"{prompts['main']}, {custom_positive}" # Prepare workflow with custom checkpoint and negative prompt workflow = _prepare_workflow(workflow, character, prompts, checkpoint, custom_negative) print(f"Queueing generator prompt for {character.character_id}") prompt_response = queue_prompt(workflow) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] flash("Generation started...") max_retries = 120 while max_retries > 0: history = get_history(prompt_id) if prompt_id in history: outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) char_folder = os.path.join(app.config['UPLOAD_FOLDER'], character.slug) os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) relative_path = f"{character.slug}/{filename}" return render_template('generator.html', characters=characters, checkpoints=checkpoints, generated_image=relative_path, selected_char=char_slug, selected_ckpt=checkpoint) time.sleep(2) max_retries -= 1 flash("Generation timed out.") except Exception as e: print(f"Generator error: {e}") flash(f"Error: {str(e)}") return render_template('generator.html', characters=characters, checkpoints=checkpoints) @app.route('/character/') def detail(slug): character = Character.query.filter_by(slug=slug).first_or_404() # Load state from session preferences = session.get(f'prefs_{slug}') preview_image = session.get(f'preview_{slug}') return render_template('detail.html', character=character, preferences=preferences, preview_image=preview_image) @app.route('/character//upload', methods=['POST']) def upload_image(slug): character = Character.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create character subfolder char_folder = os.path.join(app.config['UPLOAD_FOLDER'], slug) os.makedirs(char_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(char_folder, filename) file.save(file_path) # Store relative path in DB character.image_path = f"{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('detail', slug=slug)) @app.route('/character//finalize_generation/', methods=['POST']) def finalize_generation(slug, prompt_id): character = Character.query.filter_by(slug=slug).first_or_404() action = request.form.get('action', 'preview') try: history = get_history(prompt_id) if prompt_id not in history: return {'error': 'History not found'}, 404 outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) # Create character subfolder char_folder = os.path.join(app.config['UPLOAD_FOLDER'], slug) os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) print(f"Image saved to: {os.path.abspath(file_path)}") # Handle actions relative_path = f"{slug}/{filename}" if action == 'replace': character.image_path = relative_path db.session.commit() flash('Cover image updated!') else: # Preview mode session[f'preview_{slug}'] = relative_path return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')} return {'error': 'No image found in output'}, 404 except Exception as e: print(f"Finalize error: {e}") return {'error': str(e)}, 500 def _prepare_workflow(workflow, character, prompts, checkpoint=None, custom_negative=None): # 1. Update prompts using replacement to preserve embeddings workflow["6"]["inputs"]["text"] = workflow["6"]["inputs"]["text"].replace("{{POSITIVE_PROMPT}}", prompts["main"]) if custom_negative: workflow["7"]["inputs"]["text"] = f"{workflow['7']['inputs']['text']}, {custom_negative}" if "14" in workflow: workflow["14"]["inputs"]["text"] = workflow["14"]["inputs"]["text"].replace("{{FACE_PROMPT}}", prompts["face"]) if "15" in workflow: workflow["15"]["inputs"]["text"] = workflow["15"]["inputs"]["text"].replace("{{HAND_PROMPT}}", prompts["hand"]) print("--- DEBUG: COMFYUI PROMPTS ---") print(f"Main Positive (6): {workflow['6']['inputs']['text']}") print(f"Main Negative (7): {workflow['7']['inputs']['text']}") if "14" in workflow: print(f"Face Detailer (14): {workflow['14']['inputs']['text']}") if "15" in workflow: print(f"Hand Detailer (15): {workflow['15']['inputs']['text']}") print("-------------------------------") # 2. Update Checkpoint if checkpoint: workflow["4"]["inputs"]["ckpt_name"] = checkpoint # 3. Handle LoRA lora_data = character.data.get('lora', {}) lora_name = lora_data.get('lora_name') model_source = ["4", 0] clip_source = ["4", 1] if lora_name and "16" in workflow: workflow["16"]["inputs"]["lora_name"] = lora_name workflow["16"]["inputs"]["strength_model"] = lora_data.get('lora_weight', 1.0) workflow["16"]["inputs"]["strength_clip"] = lora_data.get('lora_weight', 1.0) model_source = ["16", 0] clip_source = ["16", 1] # Apply connections to all model/clip consumers workflow["3"]["inputs"]["model"] = model_source workflow["11"]["inputs"]["model"] = model_source workflow["13"]["inputs"]["model"] = model_source workflow["6"]["inputs"]["clip"] = clip_source workflow["7"]["inputs"]["clip"] = clip_source workflow["11"]["inputs"]["clip"] = clip_source workflow["13"]["inputs"]["clip"] = clip_source workflow["14"]["inputs"]["clip"] = clip_source workflow["15"]["inputs"]["clip"] = clip_source # 4. Randomize seeds gen_seed = random.randint(1, 10**15) workflow["3"]["inputs"]["seed"] = gen_seed if "11" in workflow: workflow["11"]["inputs"]["seed"] = gen_seed if "13" in workflow: workflow["13"]["inputs"]["seed"] = gen_seed return workflow def _queue_generation(character, action='preview', selected_fields=None, client_id=None): # 1. Load workflow with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) # 2. Build prompts prompts = build_prompt(character.data, selected_fields, character.default_fields) # 3. Prepare workflow workflow = _prepare_workflow(workflow, character, prompts) return queue_prompt(workflow, client_id=client_id) @app.route('/get_missing_characters') def get_missing_characters(): missing = Character.query.filter((Character.image_path == None) | (Character.image_path == '')).all() return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]} @app.route('/clear_all_covers', methods=['POST']) def clear_all_covers(): characters = Character.query.all() for char in characters: char.image_path = None db.session.commit() return {'success': True} @app.route('/generate_missing', methods=['POST']) def generate_missing(): missing = Character.query.filter((Character.image_path == None) | (Character.image_path == '')).all() if not missing: flash("No characters missing cover images.") return redirect(url_for('index')) success_count = 0 for character in missing: try: print(f"Batch generating for: {character.name}") prompt_response = _queue_generation(character, action='replace') prompt_id = prompt_response['prompt_id'] # Simple synchronous wait for each max_retries = 120 while max_retries > 0: history = get_history(prompt_id) if prompt_id in history: outputs = history[prompt_id]['outputs'] for node_id in outputs: if 'images' in outputs[node_id]: image_info = outputs[node_id]['images'][0] image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) char_folder = os.path.join(app.config['UPLOAD_FOLDER'], character.slug) os.makedirs(char_folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" file_path = os.path.join(char_folder, filename) with open(file_path, 'wb') as f: f.write(image_data) character.image_path = f"{character.slug}/{filename}" db.session.commit() success_count += 1 break break time.sleep(2) max_retries -= 1 except Exception as e: print(f"Error generating for {character.name}: {e}") flash(f"Batch generation complete. Generated {success_count} images.") return redirect(url_for('index')) @app.route('/check_status/') def check_status(prompt_id): try: history = get_history(prompt_id) if prompt_id in history: return {'status': 'finished'} return {'status': 'pending'} except Exception: return {'status': 'error'}, 500 @app.route('/character//generate', methods=['POST']) def generate_image(slug): character = Character.query.filter_by(slug=slug).first_or_404() try: # Get action type action = request.form.get('action', 'preview') client_id = request.form.get('client_id') # Get selected fields selected_fields = request.form.getlist('include_field') # Save preferences session[f'prefs_{slug}'] = selected_fields # Queue generation using helper prompt_response = _queue_generation(character, action, selected_fields, client_id=client_id) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}") prompt_id = prompt_response['prompt_id'] # Return JSON if AJAX request if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'prompt_id': prompt_id} return redirect(url_for('detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('detail', slug=slug)) @app.route('/character//save_defaults', methods=['POST']) def save_defaults(slug): character = Character.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') character.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this character!') return redirect(url_for('detail', slug=slug)) if __name__ == '__main__': with app.app_context(): os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) db.create_all() sync_characters() app.run(debug=True, port=5000)