Files
character-browser/app.py
2026-02-19 00:40:29 +00:00

963 lines
38 KiB
Python

import os
import json
import time
import re
import requests
import random
from flask import Flask, render_template, request, redirect, url_for, flash, session
from werkzeug.utils import secure_filename
from models import db, Character, Settings
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = 'static/uploads'
app.config['SECRET_KEY'] = 'dev-key-123'
app.config['CHARACTERS_DIR'] = 'characters'
app.config['COMFYUI_URL'] = 'http://127.0.0.1:8188'
app.config['ILLUSTRIOUS_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Illustrious/'
app.config['NOOB_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Noob/'
app.config['LORA_DIR'] = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Looks/'
db.init_app(app)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
def get_available_loras():
loras = []
if os.path.exists(app.config['LORA_DIR']):
for f in os.listdir(app.config['LORA_DIR']):
if f.endswith('.safetensors'):
# Using the format seen in character JSONs
loras.append(f"Illustrious/Looks/{f}")
return sorted(loras)
def get_available_checkpoints():
checkpoints = []
# Scan Illustrious
if os.path.exists(app.config['ILLUSTRIOUS_MODELS_DIR']):
for f in os.listdir(app.config['ILLUSTRIOUS_MODELS_DIR']):
if f.endswith('.safetensors') or f.endswith('.ckpt'):
checkpoints.append(f"Illustrious/{f}")
# Scan Noob
if os.path.exists(app.config['NOOB_MODELS_DIR']):
for f in os.listdir(app.config['NOOB_MODELS_DIR']):
if f.endswith('.safetensors') or f.endswith('.ckpt'):
checkpoints.append(f"Noob/{f}")
return sorted(checkpoints)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def build_prompt(data, selected_fields=None, default_fields=None, active_outfit='default'):
def is_selected(section, key):
# Priority:
# 1. Manual selection from form (if list is not empty)
# 2. Database defaults (if they exist)
# 3. Select all (default behavior)
if selected_fields:
return f"{section}::{key}" in selected_fields
if default_fields:
return f"{section}::{key}" in default_fields
return True
identity = data.get('identity', {})
# Get wardrobe - handle both new nested format and legacy flat format
wardrobe_data = data.get('wardrobe', {})
if 'default' in wardrobe_data and isinstance(wardrobe_data.get('default'), dict):
# New nested format - get active outfit
wardrobe = wardrobe_data.get(active_outfit or 'default', wardrobe_data.get('default', {}))
else:
# Legacy flat format
wardrobe = wardrobe_data
defaults = data.get('defaults', {})
# Pre-calculate Hand/Glove priority
hand_val = ""
if wardrobe.get('gloves') and is_selected('wardrobe', 'gloves'):
hand_val = wardrobe.get('gloves')
elif identity.get('hands') and is_selected('identity', 'hands'):
hand_val = identity.get('hands')
# 1. Main Prompt
parts = ["(solo:1.2)"]
# Use character_id (underscores to spaces) for tags compatibility
char_tag = data.get('character_id', '').replace('_', ' ')
if char_tag and is_selected('special', 'name'):
parts.append(char_tag)
for key in ['base_specs', 'hair', 'eyes', 'extra']:
val = identity.get(key)
if val and is_selected('identity', key):
parts.append(val)
# Add defaults (expression, pose, scene)
for key in ['expression', 'pose', 'scene']:
val = defaults.get(key)
if val and is_selected('defaults', key):
parts.append(val)
# Add hand priority value to main prompt
if hand_val:
parts.append(hand_val)
for key in ['top', 'headwear', 'legwear', 'footwear', 'accessories']:
val = wardrobe.get(key)
if val and is_selected('wardrobe', key):
parts.append(val)
style = data.get('styles', {}).get('aesthetic')
if style and is_selected('styles', 'aesthetic'):
parts.append(f"{style} style")
tags = data.get('tags', [])
if tags and is_selected('special', 'tags'):
parts.extend(tags)
lora = data.get('lora', {})
if lora.get('lora_triggers') and is_selected('lora', 'lora_triggers'):
parts.append(lora.get('lora_triggers'))
# 2. Face Prompt: Tag, Eyes, Expression
face_parts = []
if char_tag and is_selected('special', 'name'): face_parts.append(char_tag)
if identity.get('eyes') and is_selected('identity', 'eyes'): face_parts.append(identity.get('eyes'))
if defaults.get('expression') and is_selected('defaults', 'expression'): face_parts.append(defaults.get('expression'))
# 3. Hand Prompt: Hand value (Gloves or Hands)
hand_parts = [hand_val] if hand_val else []
return {
"main": ", ".join(parts),
"face": ", ".join(face_parts),
"hand": ", ".join(hand_parts)
}
def queue_prompt(prompt_workflow, client_id=None):
p = {"prompt": prompt_workflow}
if client_id:
p["client_id"] = client_id
data = json.dumps(p).encode('utf-8')
response = requests.post(f"{app.config['COMFYUI_URL']}/prompt", data=data)
return response.json()
def get_history(prompt_id):
response = requests.get(f"{app.config['COMFYUI_URL']}/history/{prompt_id}")
return response.json()
def get_image(filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
response = requests.get(f"{app.config['COMFYUI_URL']}/view", params=data)
return response.content
from sqlalchemy.orm.attributes import flag_modified
def sync_characters():
if not os.path.exists(app.config['CHARACTERS_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['CHARACTERS_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['CHARACTERS_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
char_id = data.get('character_id')
if not char_id:
continue
current_ids.append(char_id)
# Generate URL-safe slug: remove special characters from character_id
slug = re.sub(r'[^a-zA-Z0-9_]', '', char_id)
# Check if character already exists
character = Character.query.filter_by(character_id=char_id).first()
name = data.get('character_name', char_id.replace('_', ' ').title())
if character:
character.data = data
character.name = name
character.slug = slug
character.filename = filename
# Check if cover image still exists
if character.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], character.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {character.name}, clearing path.")
character.image_path = None
# Explicitly tell SQLAlchemy the JSON field was modified
flag_modified(character, "data")
else:
new_char = Character(
character_id=char_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_char)
except Exception as e:
print(f"Error importing {filename}: {e}")
# Remove characters that are no longer in the folder
all_characters = Character.query.all()
for char in all_characters:
if char.character_id not in current_ids:
db.session.delete(char)
db.session.commit()
def call_llm(prompt, system_prompt="You are a creative assistant."):
settings = Settings.query.first()
if not settings or not settings.openrouter_api_key:
raise ValueError("OpenRouter API Key not configured. Please configure it in Settings.")
headers = {
"Authorization": f"Bearer {settings.openrouter_api_key}",
"Content-Type": "application/json"
}
data = {
"model": settings.openrouter_model or 'google/gemini-2.0-flash-001',
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
]
}
try:
response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data)
response.raise_for_status()
result = response.json()
return result['choices'][0]['message']['content']
except requests.exceptions.RequestException as e:
raise RuntimeError(f"LLM API request failed: {str(e)}") from e
except (KeyError, IndexError) as e:
raise RuntimeError(f"Unexpected LLM response format: {str(e)}") from e
@app.route('/get_openrouter_models', methods=['POST'])
def get_openrouter_models():
api_key = request.form.get('api_key')
if not api_key:
return {'error': 'API key is required'}, 400
headers = {"Authorization": f"Bearer {api_key}"}
try:
response = requests.get("https://openrouter.ai/api/v1/models", headers=headers)
response.raise_for_status()
models = response.json().get('data', [])
# Return simplified list of models
return {'models': [{'id': m['id'], 'name': m.get('name', m['id'])} for m in models]}
except Exception as e:
return {'error': str(e)}, 500
@app.route('/settings', methods=['GET', 'POST'])
def settings():
settings = Settings.query.first()
if not settings:
settings = Settings()
db.session.add(settings)
db.session.commit()
if request.method == 'POST':
settings.openrouter_api_key = request.form.get('api_key')
settings.openrouter_model = request.form.get('model')
db.session.commit()
flash('Settings updated successfully!')
return redirect(url_for('settings'))
return render_template('settings.html', settings=settings)
@app.route('/')
def index():
characters = Character.query.order_by(Character.name).all()
return render_template('index.html', characters=characters)
@app.route('/rescan', methods=['POST'])
def rescan():
sync_characters()
flash('Database synced with character files.')
return redirect(url_for('index'))
@app.route('/generator', methods=['GET', 'POST'])
def generator():
characters = Character.query.order_by(Character.name).all()
checkpoints = get_available_checkpoints()
if not checkpoints:
checkpoints = ["Noob/oneObsession_v19Atypical.safetensors"]
if request.method == 'POST':
char_slug = request.form.get('character')
checkpoint = request.form.get('checkpoint')
custom_positive = request.form.get('positive_prompt', '')
custom_negative = request.form.get('negative_prompt', '')
character = Character.query.filter_by(slug=char_slug).first_or_404()
try:
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# Build base prompts from character defaults
prompts = build_prompt(character.data, default_fields=character.default_fields)
# Append custom additions to the "main" prompt
if custom_positive:
prompts["main"] = f"{prompts['main']}, {custom_positive}"
# Prepare workflow with custom checkpoint and negative prompt
workflow = _prepare_workflow(workflow, character, prompts, checkpoint, custom_negative)
print(f"Queueing generator prompt for {character.character_id}")
prompt_response = queue_prompt(workflow)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
flash("Generation started...")
max_retries = 120
while max_retries > 0:
history = get_history(prompt_id)
if prompt_id in history:
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
char_folder = os.path.join(app.config['UPLOAD_FOLDER'], character.slug)
os.makedirs(char_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(char_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
relative_path = f"{character.slug}/{filename}"
return render_template('generator.html', characters=characters, checkpoints=checkpoints,
generated_image=relative_path, selected_char=char_slug, selected_ckpt=checkpoint)
time.sleep(2)
max_retries -= 1
flash("Generation timed out.")
except Exception as e:
print(f"Generator error: {e}")
flash(f"Error: {str(e)}")
return render_template('generator.html', characters=characters, checkpoints=checkpoints)
@app.route('/character/<path:slug>')
def detail(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
# Load state from session
preferences = session.get(f'prefs_{slug}')
preview_image = session.get(f'preview_{slug}')
return render_template('detail.html', character=character, preferences=preferences, preview_image=preview_image)
@app.route('/create', methods=['GET', 'POST'])
def create_character():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename')
prompt = request.form.get('prompt')
# Validate slug
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
flash("Invalid filename.")
return redirect(request.url)
# Check if exists
if os.path.exists(os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json")):
flash("Character with this filename already exists.")
return redirect(request.url)
# Generate JSON with LLM
system_prompt = """You are a JSON generator. output ONLY valid JSON matching this exact structure. Do not wrap in markdown blocks.
Structure:
{
"character_id": "WILL_BE_REPLACED",
"character_name": "WILL_BE_REPLACED",
"identity": {
"base_specs": "string (e.g. 1girl, build, skin)",
"hair": "string",
"eyes": "string",
"hands": "string",
"arms": "string",
"torso": "string",
"pelvis": "string",
"legs": "string",
"feet": "string",
"extra": "string"
},
"defaults": {
"expression": "",
"pose": "",
"scene": ""
},
"wardrobe": {
"headwear": "string",
"top": "string",
"legwear": "string",
"footwear": "string",
"hands": "string",
"accessories": "string"
},
"styles": {
"aesthetic": "string",
"primary_color": "string",
"secondary_color": "string",
"tertiary_color": "string"
},
"lora": {
"lora_name": "",
"lora_weight": 1.0,
"lora_triggers": ""
},
"tags": ["string", "string"]
}
Fill the fields based on the user's description. Use Danbooru-style tags for the values (e.g. 'long hair', 'blue eyes'). Keep values concise. Leave defaults fields empty."""
try:
llm_response = call_llm(f"Create a character profile for '{name}' based on this description: {prompt}", system_prompt)
# Clean response (remove markdown if present)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
char_data = json.loads(clean_json)
# Enforce IDs
char_data['character_id'] = safe_slug
char_data['character_name'] = name
# Save file
file_path = os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(char_data, f, indent=2)
# Add to DB
new_char = Character(
character_id=safe_slug,
slug=safe_slug,
filename=f"{safe_slug}.json",
name=name,
data=char_data
)
db.session.add(new_char)
db.session.commit()
flash('Character created successfully!')
return redirect(url_for('detail', slug=safe_slug))
except Exception as e:
print(f"LLM/Save error: {e}")
flash(f"Failed to create character: {e}")
return redirect(request.url)
return render_template('create.html')
@app.route('/character/<path:slug>/edit', methods=['GET', 'POST'])
def edit_character(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
loras = get_available_loras()
if request.method == 'POST':
try:
# 1. Update basic fields
character.name = request.form.get('character_name')
# 2. Rebuild the data dictionary
new_data = character.data.copy()
new_data['character_name'] = character.name
# Update nested sections (non-wardrobe)
for section in ['identity', 'defaults', 'styles', 'lora']:
if section in new_data:
for key in new_data[section]:
form_key = f"{section}_{key}"
if form_key in request.form:
val = request.form.get(form_key)
# Handle numeric weight
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data[section][key] = val
# Handle wardrobe - support both nested and flat formats
wardrobe = new_data.get('wardrobe', {})
if 'default' in wardrobe and isinstance(wardrobe.get('default'), dict):
# New nested format - update each outfit
for outfit_name in wardrobe.keys():
for key in wardrobe[outfit_name].keys():
form_key = f"wardrobe_{outfit_name}_{key}"
if form_key in request.form:
wardrobe[outfit_name][key] = request.form.get(form_key)
new_data['wardrobe'] = wardrobe
else:
# Legacy flat format
if 'wardrobe' in new_data:
for key in new_data['wardrobe'].keys():
form_key = f"wardrobe_{key}"
if form_key in request.form:
new_data['wardrobe'][key] = request.form.get(form_key)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t]
character.data = new_data
flag_modified(character, "data")
# 3. Write back to JSON file
# Use the filename we stored during sync, or fallback to a sanitized ID
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Character profile updated successfully!')
return redirect(url_for('detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('edit.html', character=character, loras=loras)
@app.route('/character/<path:slug>/outfit/switch', methods=['POST'])
def switch_outfit(slug):
"""Switch the active outfit for a character."""
character = Character.query.filter_by(slug=slug).first_or_404()
outfit_name = request.form.get('outfit', 'default')
# Validate outfit exists
available_outfits = character.get_available_outfits()
if outfit_name in available_outfits:
character.active_outfit = outfit_name
db.session.commit()
flash(f'Switched to "{outfit_name}" outfit.')
else:
flash(f'Outfit "{outfit_name}" not found.', 'error')
return redirect(url_for('detail', slug=slug))
@app.route('/character/<path:slug>/outfit/add', methods=['POST'])
def add_outfit(slug):
"""Add a new outfit to a character."""
character = Character.query.filter_by(slug=slug).first_or_404()
outfit_name = request.form.get('outfit_name', '').strip()
if not outfit_name:
flash('Outfit name cannot be empty.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Sanitize outfit name for use as key
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', outfit_name.lower())
# Get wardrobe data
wardrobe = character.data.get('wardrobe', {})
# Ensure wardrobe is in new nested format
if 'default' not in wardrobe or not isinstance(wardrobe.get('default'), dict):
# Convert legacy format
wardrobe = {'default': wardrobe}
# Check if outfit already exists
if safe_name in wardrobe:
flash(f'Outfit "{safe_name}" already exists.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Create new outfit (copy from default as template)
default_outfit = wardrobe.get('default', {
'headwear': '', 'top': '', 'legwear': '',
'footwear': '', 'hands': '', 'accessories': ''
})
wardrobe[safe_name] = default_outfit.copy()
# Update character data
character.data['wardrobe'] = wardrobe
flag_modified(character, 'data')
# Save to JSON file
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(character.data, f, indent=2)
db.session.commit()
flash(f'Added new outfit "{safe_name}".')
return redirect(url_for('edit_character', slug=slug))
@app.route('/character/<path:slug>/outfit/delete', methods=['POST'])
def delete_outfit(slug):
"""Delete an outfit from a character."""
character = Character.query.filter_by(slug=slug).first_or_404()
outfit_name = request.form.get('outfit', '')
wardrobe = character.data.get('wardrobe', {})
# Cannot delete default
if outfit_name == 'default':
flash('Cannot delete the default outfit.', 'error')
return redirect(url_for('edit_character', slug=slug))
if outfit_name not in wardrobe:
flash(f'Outfit "{outfit_name}" not found.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Delete outfit
del wardrobe[outfit_name]
character.data['wardrobe'] = wardrobe
flag_modified(character, 'data')
# Switch active outfit if deleted was active
if character.active_outfit == outfit_name:
character.active_outfit = 'default'
# Save to JSON file
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(character.data, f, indent=2)
db.session.commit()
flash(f'Deleted outfit "{outfit_name}".')
return redirect(url_for('edit_character', slug=slug))
@app.route('/character/<path:slug>/outfit/rename', methods=['POST'])
def rename_outfit(slug):
"""Rename an outfit."""
character = Character.query.filter_by(slug=slug).first_or_404()
old_name = request.form.get('old_name', '')
new_name = request.form.get('new_name', '').strip()
if not new_name:
flash('New name cannot be empty.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Sanitize new name
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', new_name.lower())
wardrobe = character.data.get('wardrobe', {})
if old_name not in wardrobe:
flash(f'Outfit "{old_name}" not found.', 'error')
return redirect(url_for('edit_character', slug=slug))
if safe_name in wardrobe and safe_name != old_name:
flash(f'Outfit "{safe_name}" already exists.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Rename (copy to new key, delete old)
wardrobe[safe_name] = wardrobe.pop(old_name)
character.data['wardrobe'] = wardrobe
flag_modified(character, 'data')
# Update active outfit if renamed was active
if character.active_outfit == old_name:
character.active_outfit = safe_name
# Save to JSON file
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(character.data, f, indent=2)
db.session.commit()
flash(f'Renamed outfit "{old_name}" to "{safe_name}".')
return redirect(url_for('edit_character', slug=slug))
@app.route('/character/<path:slug>/upload', methods=['POST'])
def upload_image(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create character subfolder
char_folder = os.path.join(app.config['UPLOAD_FOLDER'], slug)
os.makedirs(char_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(char_folder, filename)
file.save(file_path)
# Store relative path in DB
character.image_path = f"{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('detail', slug=slug))
@app.route('/character/<path:slug>/finalize_generation/<prompt_id>', methods=['POST'])
def finalize_generation(slug, prompt_id):
character = Character.query.filter_by(slug=slug).first_or_404()
action = request.form.get('action', 'preview')
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'History not found'}, 404
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
# Create character subfolder
char_folder = os.path.join(app.config['UPLOAD_FOLDER'], slug)
os.makedirs(char_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(char_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
print(f"Image saved to: {os.path.abspath(file_path)}")
# Handle actions
relative_path = f"{slug}/{filename}"
if action == 'replace':
character.image_path = relative_path
db.session.commit()
flash('Cover image updated!')
else:
# Preview mode
session[f'preview_{slug}'] = relative_path
return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')}
return {'error': 'No image found in output'}, 404
except Exception as e:
print(f"Finalize error: {e}")
return {'error': str(e)}, 500
def _prepare_workflow(workflow, character, prompts, checkpoint=None, custom_negative=None):
# 1. Update prompts using replacement to preserve embeddings
workflow["6"]["inputs"]["text"] = workflow["6"]["inputs"]["text"].replace("{{POSITIVE_PROMPT}}", prompts["main"])
if custom_negative:
workflow["7"]["inputs"]["text"] = f"{workflow['7']['inputs']['text']}, {custom_negative}"
if "14" in workflow:
workflow["14"]["inputs"]["text"] = workflow["14"]["inputs"]["text"].replace("{{FACE_PROMPT}}", prompts["face"])
if "15" in workflow:
workflow["15"]["inputs"]["text"] = workflow["15"]["inputs"]["text"].replace("{{HAND_PROMPT}}", prompts["hand"])
print("--- DEBUG: COMFYUI PROMPTS ---")
print(f"Main Positive (6): {workflow['6']['inputs']['text']}")
print(f"Main Negative (7): {workflow['7']['inputs']['text']}")
if "14" in workflow:
print(f"Face Detailer (14): {workflow['14']['inputs']['text']}")
if "15" in workflow:
print(f"Hand Detailer (15): {workflow['15']['inputs']['text']}")
print("-------------------------------")
# 2. Update Checkpoint
if checkpoint:
workflow["4"]["inputs"]["ckpt_name"] = checkpoint
# 3. Handle LoRA
lora_data = character.data.get('lora', {})
lora_name = lora_data.get('lora_name')
model_source = ["4", 0]
clip_source = ["4", 1]
if lora_name and "16" in workflow:
workflow["16"]["inputs"]["lora_name"] = lora_name
workflow["16"]["inputs"]["strength_model"] = lora_data.get('lora_weight', 1.0)
workflow["16"]["inputs"]["strength_clip"] = lora_data.get('lora_weight', 1.0)
model_source = ["16", 0]
clip_source = ["16", 1]
# Apply connections to all model/clip consumers
workflow["3"]["inputs"]["model"] = model_source
workflow["11"]["inputs"]["model"] = model_source
workflow["13"]["inputs"]["model"] = model_source
workflow["6"]["inputs"]["clip"] = clip_source
workflow["7"]["inputs"]["clip"] = clip_source
workflow["11"]["inputs"]["clip"] = clip_source
workflow["13"]["inputs"]["clip"] = clip_source
workflow["14"]["inputs"]["clip"] = clip_source
workflow["15"]["inputs"]["clip"] = clip_source
# 4. Randomize seeds
gen_seed = random.randint(1, 10**15)
workflow["3"]["inputs"]["seed"] = gen_seed
if "11" in workflow: workflow["11"]["inputs"]["seed"] = gen_seed
if "13" in workflow: workflow["13"]["inputs"]["seed"] = gen_seed
return workflow
def _queue_generation(character, action='preview', selected_fields=None, client_id=None):
# 1. Load workflow
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# 2. Build prompts with active outfit
prompts = build_prompt(character.data, selected_fields, character.default_fields, character.active_outfit)
# 3. Prepare workflow
workflow = _prepare_workflow(workflow, character, prompts)
return queue_prompt(workflow, client_id=client_id)
@app.route('/get_missing_characters')
def get_missing_characters():
missing = Character.query.filter((Character.image_path == None) | (Character.image_path == '')).all()
return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]}
@app.route('/clear_all_covers', methods=['POST'])
def clear_all_covers():
characters = Character.query.all()
for char in characters:
char.image_path = None
db.session.commit()
return {'success': True}
@app.route('/generate_missing', methods=['POST'])
def generate_missing():
missing = Character.query.filter((Character.image_path == None) | (Character.image_path == '')).all()
if not missing:
flash("No characters missing cover images.")
return redirect(url_for('index'))
success_count = 0
for character in missing:
try:
print(f"Batch generating for: {character.name}")
prompt_response = _queue_generation(character, action='replace')
prompt_id = prompt_response['prompt_id']
# Simple synchronous wait for each
max_retries = 120
while max_retries > 0:
history = get_history(prompt_id)
if prompt_id in history:
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
char_folder = os.path.join(app.config['UPLOAD_FOLDER'], character.slug)
os.makedirs(char_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(char_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
character.image_path = f"{character.slug}/{filename}"
db.session.commit()
success_count += 1
break
break
time.sleep(2)
max_retries -= 1
except Exception as e:
print(f"Error generating for {character.name}: {e}")
flash(f"Batch generation complete. Generated {success_count} images.")
return redirect(url_for('index'))
@app.route('/check_status/<prompt_id>')
def check_status(prompt_id):
try:
history = get_history(prompt_id)
if prompt_id in history:
return {'status': 'finished'}
return {'status': 'pending'}
except Exception:
return {'status': 'error'}, 500
@app.route('/character/<path:slug>/generate', methods=['POST'])
def generate_image(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
client_id = request.form.get('client_id')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Save preferences
session[f'prefs_{slug}'] = selected_fields
# Queue generation using helper
prompt_response = _queue_generation(character, action, selected_fields, client_id=client_id)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
# Return JSON if AJAX request
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'prompt_id': prompt_id}
return redirect(url_for('detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('detail', slug=slug))
@app.route('/character/<path:slug>/save_defaults', methods=['POST'])
def save_defaults(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
character.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this character!')
return redirect(url_for('detail', slug=slug))
if __name__ == '__main__':
with app.app_context():
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
db.create_all()
# Migration: Add active_outfit column if it doesn't exist
try:
from sqlalchemy import text
db.session.execute(text('ALTER TABLE character ADD COLUMN active_outfit VARCHAR(100) DEFAULT \'default\''))
db.session.commit()
print("Added active_outfit column to character table")
except Exception as e:
if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower():
print("active_outfit column already exists")
else:
print(f"Migration note: {e}")
sync_characters()
app.run(debug=True, port=5000)