Files
character-browser/app.py
Aodhan Collins ae7ba961c1 Add danbooru-mcp auto-start, git sync, status API endpoints, navbar status indicators, and LLM format retry
- app.py: add subprocess import; add _ensure_mcp_repo() to clone/pull
  danbooru-mcp from https://git.liveaodh.com/aodhan/danbooru-mcp into
  tools/danbooru-mcp/ at startup; add ensure_mcp_server_running() which
  calls _ensure_mcp_repo() then starts the Docker container if not running;
  add GET /api/status/comfyui and GET /api/status/mcp health endpoints;
  fix call_llm() to retry up to 3 times on unexpected response format
  (KeyError/IndexError), logging the raw response and prompting the LLM
  to respond with valid JSON before each retry
- templates/layout.html: add ComfyUI and MCP status dot indicators to
  navbar; add polling JS that checks both endpoints on load and every 30s
- static/style.css: add .service-status, .status-dot, .status-ok,
  .status-error, .status-checking styles and status-pulse keyframe animation
- .gitignore: add tools/ to exclude the cloned danbooru-mcp repo
2026-03-03 00:57:27 +00:00

6801 lines
280 KiB
Python

import os
import json
import time
import re
import requests
import random
import asyncio
import subprocess
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from flask import Flask, render_template, request, redirect, url_for, flash, session
from flask_session import Session
from werkzeug.utils import secure_filename
from models import db, Character, Settings, Outfit, Action, Style, Detailer, Scene, Checkpoint, Look
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = 'static/uploads'
app.config['SECRET_KEY'] = 'dev-key-123'
app.config['CHARACTERS_DIR'] = 'data/characters'
app.config['CLOTHING_DIR'] = 'data/clothing'
app.config['ACTIONS_DIR'] = 'data/actions'
app.config['STYLES_DIR'] = 'data/styles'
app.config['SCENES_DIR'] = 'data/scenes'
app.config['DETAILERS_DIR'] = 'data/detailers'
app.config['CHECKPOINTS_DIR'] = 'data/checkpoints'
app.config['LOOKS_DIR'] = 'data/looks'
app.config['COMFYUI_URL'] = 'http://127.0.0.1:8188'
app.config['ILLUSTRIOUS_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Illustrious/'
app.config['NOOB_MODELS_DIR'] = '/mnt/alexander/AITools/Image Models/Stable-diffusion/Noob/'
app.config['LORA_DIR'] = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Looks/'
# Server-side session configuration to avoid cookie size limits
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SESSION_FILE_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], '../flask_session')
app.config['SESSION_PERMANENT'] = False
db.init_app(app)
Session(app)
# Path to the danbooru-mcp docker-compose project, relative to this file.
MCP_TOOLS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tools')
MCP_COMPOSE_DIR = os.path.join(MCP_TOOLS_DIR, 'danbooru-mcp')
MCP_REPO_URL = 'https://git.liveaodh.com/aodhan/danbooru-mcp'
def _ensure_mcp_repo():
"""Clone or update the danbooru-mcp source repository inside tools/.
- If ``tools/danbooru-mcp/`` does not exist, clone from MCP_REPO_URL.
- If it already exists, run ``git pull`` to fetch the latest changes.
Errors are non-fatal.
"""
os.makedirs(MCP_TOOLS_DIR, exist_ok=True)
try:
if not os.path.isdir(MCP_COMPOSE_DIR):
print(f'Cloning danbooru-mcp from {MCP_REPO_URL}')
subprocess.run(
['git', 'clone', MCP_REPO_URL, MCP_COMPOSE_DIR],
timeout=120, check=True,
)
print('danbooru-mcp cloned successfully.')
else:
print('Updating danbooru-mcp via git pull …')
subprocess.run(
['git', 'pull'],
cwd=MCP_COMPOSE_DIR,
timeout=60, check=True,
)
print('danbooru-mcp updated.')
except FileNotFoundError:
print('WARNING: git not found on PATH — danbooru-mcp repo will not be cloned/updated.')
except subprocess.CalledProcessError as e:
print(f'WARNING: git operation failed for danbooru-mcp: {e}')
except subprocess.TimeoutExpired:
print('WARNING: git timed out while cloning/updating danbooru-mcp.')
except Exception as e:
print(f'WARNING: Could not clone/update danbooru-mcp repo: {e}')
def ensure_mcp_server_running():
"""Ensure the danbooru-mcp repo is present/up-to-date, then start the
Docker container if it is not already running.
Uses ``docker compose up -d`` so the image is built automatically on first
run. Errors are non-fatal — the app will still start even if Docker is
unavailable.
"""
_ensure_mcp_repo()
try:
result = subprocess.run(
['docker', 'ps', '--filter', 'name=danbooru-mcp', '--format', '{{.Names}}'],
capture_output=True, text=True, timeout=10,
)
if 'danbooru-mcp' in result.stdout:
print('danbooru-mcp container already running.')
return
# Container not running — start it via docker compose
print('Starting danbooru-mcp container via docker compose …')
subprocess.run(
['docker', 'compose', 'up', '-d'],
cwd=MCP_COMPOSE_DIR,
timeout=120,
)
print('danbooru-mcp container started.')
except FileNotFoundError:
print('WARNING: docker not found on PATH — danbooru-mcp will not be started automatically.')
except subprocess.TimeoutExpired:
print('WARNING: docker timed out while starting danbooru-mcp.')
except Exception as e:
print(f'WARNING: Could not ensure danbooru-mcp is running: {e}')
@app.context_processor
def inject_comfyui_ws_url():
url = app.config.get('COMFYUI_URL', 'http://127.0.0.1:8188')
# If the URL is localhost/127.0.0.1, replace it with the current request's host
# so that remote clients connect to the correct machine for WebSockets.
if '127.0.0.1' in url or 'localhost' in url:
host = request.host.split(':')[0]
url = url.replace('127.0.0.1', host).replace('localhost', host)
# Convert http/https to ws/wss
ws_url = url.replace('http://', 'ws://').replace('https://', 'wss://')
return dict(COMFYUI_WS_URL=f"{ws_url}/ws")
@app.context_processor
def inject_default_checkpoint():
from models import Checkpoint
checkpoints = Checkpoint.query.order_by(Checkpoint.name).all()
return dict(all_checkpoints=checkpoints, default_checkpoint_path=session.get('default_checkpoint', ''))
@app.route('/set_default_checkpoint', methods=['POST'])
def set_default_checkpoint():
session['default_checkpoint'] = request.form.get('checkpoint_path', '')
return {'status': 'ok'}
@app.route('/api/status/comfyui')
def api_status_comfyui():
"""Return whether ComfyUI is reachable."""
url = app.config.get('COMFYUI_URL', 'http://127.0.0.1:8188')
try:
resp = requests.get(f'{url}/system_stats', timeout=3)
if resp.ok:
return {'status': 'ok'}
except Exception:
pass
return {'status': 'error'}
@app.route('/api/status/mcp')
def api_status_mcp():
"""Return whether the danbooru-mcp Docker container is running."""
try:
result = subprocess.run(
['docker', 'ps', '--filter', 'name=danbooru-mcp', '--format', '{{.Names}}'],
capture_output=True, text=True, timeout=5,
)
if 'danbooru-mcp' in result.stdout:
return {'status': 'ok'}
except Exception:
pass
return {'status': 'error'}
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
def get_available_loras():
loras = []
if os.path.exists(app.config['LORA_DIR']):
for f in os.listdir(app.config['LORA_DIR']):
if f.endswith('.safetensors'):
# Using the format seen in character JSONs
loras.append(f"Illustrious/Looks/{f}")
return sorted(loras)
def get_available_clothing_loras():
"""Get LoRAs from the Clothing directory for outfit LoRAs."""
clothing_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Clothing/'
loras = []
if os.path.exists(clothing_lora_dir):
for f in os.listdir(clothing_lora_dir):
if f.endswith('.safetensors'):
loras.append(f"Illustrious/Clothing/{f}")
return sorted(loras)
def get_available_action_loras():
"""Get LoRAs from the Poses directory for action LoRAs."""
poses_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Poses/'
loras = []
if os.path.exists(poses_lora_dir):
for f in os.listdir(poses_lora_dir):
if f.endswith('.safetensors'):
loras.append(f"Illustrious/Poses/{f}")
return sorted(loras)
def get_available_style_loras():
"""Get LoRAs from the Styles directory for style LoRAs."""
styles_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Styles/'
loras = []
if os.path.exists(styles_lora_dir):
for f in os.listdir(styles_lora_dir):
if f.endswith('.safetensors'):
loras.append(f"Illustrious/Styles/{f}")
return sorted(loras)
def get_available_detailer_loras():
"""Get LoRAs from the Detailers directory for detailer LoRAs."""
detailers_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Detailers/'
loras = []
if os.path.exists(detailers_lora_dir):
for f in os.listdir(detailers_lora_dir):
if f.endswith('.safetensors'):
loras.append(f"Illustrious/Detailers/{f}")
return sorted(loras)
def get_available_scene_loras():
"""Get LoRAs from the Backgrounds directory for scene LoRAs."""
backgrounds_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Backgrounds/'
loras = []
if os.path.exists(backgrounds_lora_dir):
for f in os.listdir(backgrounds_lora_dir):
if f.endswith('.safetensors'):
loras.append(f"Illustrious/Backgrounds/{f}")
return sorted(loras)
def get_available_checkpoints():
checkpoints = []
# Scan Illustrious
if os.path.exists(app.config['ILLUSTRIOUS_MODELS_DIR']):
for f in os.listdir(app.config['ILLUSTRIOUS_MODELS_DIR']):
if f.endswith('.safetensors') or f.endswith('.ckpt'):
checkpoints.append(f"Illustrious/{f}")
# Scan Noob
if os.path.exists(app.config['NOOB_MODELS_DIR']):
for f in os.listdir(app.config['NOOB_MODELS_DIR']):
if f.endswith('.safetensors') or f.endswith('.ckpt'):
checkpoints.append(f"Noob/{f}")
return sorted(checkpoints)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def parse_orientation(orientation_str):
if not orientation_str: return []
m_count = orientation_str.upper().count('M')
f_count = orientation_str.upper().count('F')
total = m_count + f_count
tags = []
# Gender counts
if m_count == 1: tags.append("1boy")
elif m_count > 1: tags.append(f"{m_count}boys")
if f_count == 1: tags.append("1girl")
elif f_count > 1: tags.append(f"{f_count}girls")
# Relationships/Group type
if total == 1:
tags.append("solo")
elif total > 1:
if m_count > 0 and f_count > 0:
tags.append("hetero")
elif f_count > 1 and m_count == 0:
tags.append("yuri")
elif m_count > 1 and f_count == 0:
tags.append("yaoi")
return tags
def _dedup_tags(prompt_str):
"""Remove duplicate tags from a comma-separated prompt string, preserving first-occurrence order."""
seen = set()
result = []
for tag in prompt_str.split(','):
t = tag.strip()
if t and t.lower() not in seen:
seen.add(t.lower())
result.append(t)
return ', '.join(result)
def _cross_dedup_prompts(positive, negative):
"""Remove tags shared between positive and negative prompts.
Repeatedly strips the first occurrence from each side until the tag exists
on only one side. Equal counts cancel out completely; any excess on one side
retains the remainder, allowing deliberate overrides (e.g. adding a tag twice
in the positive while it appears once in the negative leaves one copy positive).
"""
def parse_tags(s):
return [t.strip() for t in s.split(',') if t.strip()]
pos_tags = parse_tags(positive)
neg_tags = parse_tags(negative)
shared = {t.lower() for t in pos_tags} & {t.lower() for t in neg_tags}
for tag_lower in shared:
while (
any(t.lower() == tag_lower for t in pos_tags) and
any(t.lower() == tag_lower for t in neg_tags)
):
pos_tags.pop(next(i for i, t in enumerate(pos_tags) if t.lower() == tag_lower))
neg_tags.pop(next(i for i, t in enumerate(neg_tags) if t.lower() == tag_lower))
return ', '.join(pos_tags), ', '.join(neg_tags)
def _resolve_lora_weight(lora_data, override=None):
"""Return effective LoRA weight, randomising between min/max when they differ.
If *override* is provided it takes absolute precedence (used by the Strengths
Gallery to pin a specific value for each step).
"""
if override is not None:
return float(override)
weight = float(lora_data.get('lora_weight', 1.0))
min_w = lora_data.get('lora_weight_min')
max_w = lora_data.get('lora_weight_max')
if min_w is not None and max_w is not None:
min_w, max_w = float(min_w), float(max_w)
if min_w != max_w:
weight = random.uniform(min(min_w, max_w), max(min_w, max_w))
return weight
def build_prompt(data, selected_fields=None, default_fields=None, active_outfit='default'):
def is_selected(section, key):
# Priority:
# 1. Manual selection from form (if list is not empty)
# 2. Database defaults (if they exist)
# 3. Select all (default behavior)
if selected_fields:
return f"{section}::{key}" in selected_fields
if default_fields:
return f"{section}::{key}" in default_fields
return True
identity = data.get('identity', {})
# Get wardrobe - handle both new nested format and legacy flat format
wardrobe_data = data.get('wardrobe', {})
if 'default' in wardrobe_data and isinstance(wardrobe_data.get('default'), dict):
# New nested format - get active outfit
wardrobe = wardrobe_data.get(active_outfit or 'default', wardrobe_data.get('default', {}))
else:
# Legacy flat format
wardrobe = wardrobe_data
defaults = data.get('defaults', {})
action_data = data.get('action', {})
style_data = data.get('style', {})
participants = data.get('participants', {})
# Pre-calculate Hand/Glove priority
# Priority: wardrobe gloves > wardrobe hands (outfit) > identity hands (character)
hand_val = ""
if wardrobe.get('gloves') and is_selected('wardrobe', 'gloves'):
hand_val = wardrobe.get('gloves')
elif wardrobe.get('hands') and is_selected('wardrobe', 'hands'):
hand_val = wardrobe.get('hands')
elif identity.get('hands') and is_selected('identity', 'hands'):
hand_val = identity.get('hands')
# 1. Main Prompt
parts = []
# Handle participants logic
if participants:
if participants.get('solo_focus') == 'true':
parts.append('(solo focus:1.2)')
orientation = participants.get('orientation', '')
if orientation:
parts.extend(parse_orientation(orientation))
else:
# Default behavior
parts.append("(solo:1.2)")
# Use character_id (underscores to spaces) for tags compatibility
char_tag = data.get('character_id', '').replace('_', ' ')
if char_tag and is_selected('special', 'name'):
parts.append(char_tag)
for key in ['base_specs', 'hair', 'eyes', 'extra']:
val = identity.get(key)
if val and is_selected('identity', key):
# Filter out conflicting tags if participants data is present
if participants and key == 'base_specs':
# Remove 1girl, 1boy, solo, etc.
val = re.sub(r'\b(1girl|1boy|solo)\b', '', val).replace(', ,', ',').strip(', ')
parts.append(val)
# Add defaults (expression, pose, scene)
for key in ['expression', 'pose', 'scene']:
val = defaults.get(key)
if val and is_selected('defaults', key):
parts.append(val)
# Add hand priority value to main prompt
if hand_val:
parts.append(hand_val)
for key in ['full_body', 'top', 'bottom', 'headwear', 'legwear', 'footwear', 'accessories']:
val = wardrobe.get(key)
if val and is_selected('wardrobe', key):
parts.append(val)
# Standard character styles
char_aesthetic = data.get('styles', {}).get('aesthetic')
if char_aesthetic and is_selected('styles', 'aesthetic'):
parts.append(f"{char_aesthetic} style")
# New Styles Gallery logic
if style_data.get('artist_name') and is_selected('style', 'artist_name'):
parts.append(f"by {style_data['artist_name']}")
if style_data.get('artistic_style') and is_selected('style', 'artistic_style'):
parts.append(style_data['artistic_style'])
tags = data.get('tags', [])
if tags and is_selected('special', 'tags'):
parts.extend(tags)
lora = data.get('lora', {})
if lora.get('lora_triggers') and is_selected('lora', 'lora_triggers'):
parts.append(lora.get('lora_triggers'))
# 2. Face Prompt: Tag, Eyes, Expression, Headwear, Action details
face_parts = []
if char_tag and is_selected('special', 'name'): face_parts.append(char_tag)
if identity.get('eyes') and is_selected('identity', 'eyes'): face_parts.append(identity.get('eyes'))
if defaults.get('expression') and is_selected('defaults', 'expression'): face_parts.append(defaults.get('expression'))
if wardrobe.get('headwear') and is_selected('wardrobe', 'headwear'): face_parts.append(wardrobe.get('headwear'))
# Add specific Action expression details if available
if action_data.get('head') and is_selected('action', 'head'): face_parts.append(action_data.get('head'))
if action_data.get('eyes') and is_selected('action', 'eyes'): face_parts.append(action_data.get('eyes'))
# 3. Hand Prompt: Hand value (Gloves or Hands), Action details
hand_parts = [hand_val] if hand_val else []
if action_data.get('arms') and is_selected('action', 'arms'): hand_parts.append(action_data.get('arms'))
if action_data.get('hands') and is_selected('action', 'hands'): hand_parts.append(action_data.get('hands'))
return {
"main": _dedup_tags(", ".join(parts)),
"face": _dedup_tags(", ".join(face_parts)),
"hand": _dedup_tags(", ".join(hand_parts))
}
def queue_prompt(prompt_workflow, client_id=None):
p = {"prompt": prompt_workflow}
if client_id:
p["client_id"] = client_id
data = json.dumps(p).encode('utf-8')
response = requests.post(f"{app.config['COMFYUI_URL']}/prompt", data=data)
return response.json()
def get_history(prompt_id):
response = requests.get(f"{app.config['COMFYUI_URL']}/history/{prompt_id}")
return response.json()
def get_image(filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
response = requests.get(f"{app.config['COMFYUI_URL']}/view", params=data)
return response.content
from sqlalchemy.orm.attributes import flag_modified
def sync_characters():
if not os.path.exists(app.config['CHARACTERS_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['CHARACTERS_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['CHARACTERS_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
char_id = data.get('character_id')
if not char_id:
continue
current_ids.append(char_id)
# Generate URL-safe slug: remove special characters from character_id
slug = re.sub(r'[^a-zA-Z0-9_]', '', char_id)
# Check if character already exists
character = Character.query.filter_by(character_id=char_id).first()
name = data.get('character_name', char_id.replace('_', ' ').title())
if character:
character.data = data
character.name = name
character.slug = slug
character.filename = filename
# Check if cover image still exists
if character.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], character.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {character.name}, clearing path.")
character.image_path = None
# Explicitly tell SQLAlchemy the JSON field was modified
flag_modified(character, "data")
else:
new_char = Character(
character_id=char_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_char)
except Exception as e:
print(f"Error importing {filename}: {e}")
# Remove characters that are no longer in the folder
all_characters = Character.query.all()
for char in all_characters:
if char.character_id not in current_ids:
db.session.delete(char)
db.session.commit()
def sync_outfits():
if not os.path.exists(app.config['CLOTHING_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['CLOTHING_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['CLOTHING_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
outfit_id = data.get('outfit_id') or filename.replace('.json', '')
current_ids.append(outfit_id)
# Generate URL-safe slug: remove special characters from outfit_id
slug = re.sub(r'[^a-zA-Z0-9_]', '', outfit_id)
# Check if outfit already exists
outfit = Outfit.query.filter_by(outfit_id=outfit_id).first()
name = data.get('outfit_name', outfit_id.replace('_', ' ').title())
if outfit:
outfit.data = data
outfit.name = name
outfit.slug = slug
outfit.filename = filename
# Check if cover image still exists
if outfit.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], outfit.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {outfit.name}, clearing path.")
outfit.image_path = None
# Explicitly tell SQLAlchemy the JSON field was modified
flag_modified(outfit, "data")
else:
new_outfit = Outfit(
outfit_id=outfit_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_outfit)
except Exception as e:
print(f"Error importing outfit {filename}: {e}")
# Remove outfits that are no longer in the folder
all_outfits = Outfit.query.all()
for outfit in all_outfits:
if outfit.outfit_id not in current_ids:
db.session.delete(outfit)
db.session.commit()
def sync_looks():
if not os.path.exists(app.config['LOOKS_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['LOOKS_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['LOOKS_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
look_id = data.get('look_id') or filename.replace('.json', '')
current_ids.append(look_id)
slug = re.sub(r'[^a-zA-Z0-9_]', '', look_id)
look = Look.query.filter_by(look_id=look_id).first()
name = data.get('look_name', look_id.replace('_', ' ').title())
character_id = data.get('character_id', None)
if look:
look.data = data
look.name = name
look.slug = slug
look.filename = filename
look.character_id = character_id
if look.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], look.image_path)
if not os.path.exists(full_img_path):
look.image_path = None
flag_modified(look, "data")
else:
new_look = Look(
look_id=look_id,
slug=slug,
filename=filename,
name=name,
character_id=character_id,
data=data
)
db.session.add(new_look)
except Exception as e:
print(f"Error importing look {filename}: {e}")
all_looks = Look.query.all()
for look in all_looks:
if look.look_id not in current_ids:
db.session.delete(look)
db.session.commit()
def sync_actions():
if not os.path.exists(app.config['ACTIONS_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['ACTIONS_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['ACTIONS_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
action_id = data.get('action_id') or filename.replace('.json', '')
current_ids.append(action_id)
# Generate URL-safe slug
slug = re.sub(r'[^a-zA-Z0-9_]', '', action_id)
# Check if action already exists
action = Action.query.filter_by(action_id=action_id).first()
name = data.get('action_name', action_id.replace('_', ' ').title())
if action:
action.data = data
action.name = name
action.slug = slug
action.filename = filename
# Check if cover image still exists
if action.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], action.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {action.name}, clearing path.")
action.image_path = None
flag_modified(action, "data")
else:
new_action = Action(
action_id=action_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_action)
except Exception as e:
print(f"Error importing action {filename}: {e}")
# Remove actions that are no longer in the folder
all_actions = Action.query.all()
for action in all_actions:
if action.action_id not in current_ids:
db.session.delete(action)
db.session.commit()
def sync_styles():
if not os.path.exists(app.config['STYLES_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['STYLES_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['STYLES_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
style_id = data.get('style_id') or filename.replace('.json', '')
current_ids.append(style_id)
# Generate URL-safe slug
slug = re.sub(r'[^a-zA-Z0-9_]', '', style_id)
# Check if style already exists
style = Style.query.filter_by(style_id=style_id).first()
name = data.get('style_name', style_id.replace('_', ' ').title())
if style:
style.data = data
style.name = name
style.slug = slug
style.filename = filename
# Check if cover image still exists
if style.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], style.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {style.name}, clearing path.")
style.image_path = None
flag_modified(style, "data")
else:
new_style = Style(
style_id=style_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_style)
except Exception as e:
print(f"Error importing style {filename}: {e}")
# Remove styles that are no longer in the folder
all_styles = Style.query.all()
for style in all_styles:
if style.style_id not in current_ids:
db.session.delete(style)
db.session.commit()
def sync_detailers():
if not os.path.exists(app.config['DETAILERS_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['DETAILERS_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['DETAILERS_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
detailer_id = data.get('detailer_id') or filename.replace('.json', '')
current_ids.append(detailer_id)
# Generate URL-safe slug
slug = re.sub(r'[^a-zA-Z0-9_]', '', detailer_id)
# Check if detailer already exists
detailer = Detailer.query.filter_by(detailer_id=detailer_id).first()
name = data.get('detailer_name', detailer_id.replace('_', ' ').title())
if detailer:
detailer.data = data
detailer.name = name
detailer.slug = slug
detailer.filename = filename
# Check if cover image still exists
if detailer.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], detailer.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {detailer.name}, clearing path.")
detailer.image_path = None
flag_modified(detailer, "data")
else:
new_detailer = Detailer(
detailer_id=detailer_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_detailer)
except Exception as e:
print(f"Error importing detailer {filename}: {e}")
# Remove detailers that are no longer in the folder
all_detailers = Detailer.query.all()
for detailer in all_detailers:
if detailer.detailer_id not in current_ids:
db.session.delete(detailer)
db.session.commit()
def sync_scenes():
if not os.path.exists(app.config['SCENES_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['SCENES_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['SCENES_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
scene_id = data.get('scene_id') or filename.replace('.json', '')
current_ids.append(scene_id)
# Generate URL-safe slug
slug = re.sub(r'[^a-zA-Z0-9_]', '', scene_id)
# Check if scene already exists
scene = Scene.query.filter_by(scene_id=scene_id).first()
name = data.get('scene_name', scene_id.replace('_', ' ').title())
if scene:
scene.data = data
scene.name = name
scene.slug = slug
scene.filename = filename
# Check if cover image still exists
if scene.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], scene.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {scene.name}, clearing path.")
scene.image_path = None
flag_modified(scene, "data")
else:
new_scene = Scene(
scene_id=scene_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_scene)
except Exception as e:
print(f"Error importing scene {filename}: {e}")
# Remove scenes that are no longer in the folder
all_scenes = Scene.query.all()
for scene in all_scenes:
if scene.scene_id not in current_ids:
db.session.delete(scene)
db.session.commit()
def _default_checkpoint_data(checkpoint_path, filename):
"""Return template-default data for a checkpoint with no JSON file."""
name_base = filename.rsplit('.', 1)[0]
return {
"checkpoint_path": checkpoint_path,
"checkpoint_name": filename,
"base_positive": "anime",
"base_negative": "text, logo",
"steps": 25,
"cfg": 5,
"sampler_name": "euler_ancestral",
"vae": "integrated"
}
def sync_checkpoints():
checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints')
os.makedirs(checkpoints_dir, exist_ok=True)
# Load all JSON data files keyed by checkpoint_path
json_data_by_path = {}
for filename in os.listdir(checkpoints_dir):
if filename.endswith('.json') and not filename.endswith('.template'):
file_path = os.path.join(checkpoints_dir, filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
ckpt_path = data.get('checkpoint_path')
if ckpt_path:
json_data_by_path[ckpt_path] = data
except Exception as e:
print(f"Error reading checkpoint JSON {filename}: {e}")
current_ids = []
dirs = [
(app.config.get('ILLUSTRIOUS_MODELS_DIR', ''), 'Illustrious'),
(app.config.get('NOOB_MODELS_DIR', ''), 'Noob'),
]
for dirpath, family in dirs:
if not dirpath or not os.path.exists(dirpath):
continue
for f in sorted(os.listdir(dirpath)):
if not (f.endswith('.safetensors') or f.endswith('.ckpt')):
continue
checkpoint_path = f"{family}/{f}"
checkpoint_id = checkpoint_path
slug = re.sub(r'[^a-zA-Z0-9_]', '_', checkpoint_path.rsplit('.', 1)[0]).lower().strip('_')
name_base = f.rsplit('.', 1)[0]
friendly_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).strip().title()
current_ids.append(checkpoint_id)
data = json_data_by_path.get(checkpoint_path,
_default_checkpoint_data(checkpoint_path, f))
display_name = data.get('checkpoint_name', f).rsplit('.', 1)[0]
display_name = re.sub(r'[^a-zA-Z0-9]+', ' ', display_name).strip().title() or friendly_name
ckpt = Checkpoint.query.filter_by(checkpoint_id=checkpoint_id).first()
if ckpt:
ckpt.name = display_name
ckpt.slug = slug
ckpt.checkpoint_path = checkpoint_path
ckpt.data = data
flag_modified(ckpt, "data")
if ckpt.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], ckpt.image_path)
if not os.path.exists(full_img_path):
ckpt.image_path = None
else:
db.session.add(Checkpoint(
checkpoint_id=checkpoint_id,
slug=slug,
name=display_name,
checkpoint_path=checkpoint_path,
data=data,
))
all_ckpts = Checkpoint.query.all()
for ckpt in all_ckpts:
if ckpt.checkpoint_id not in current_ids:
db.session.delete(ckpt)
db.session.commit()
DANBOORU_TOOLS = [
{
"type": "function",
"function": {
"name": "search_tags",
"description": "Prefix/full-text search for Danbooru tags. Returns rich tag objects ordered by relevance.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search string. Trailing * added automatically."},
"limit": {"type": "integer", "description": "Max results (1-200)", "default": 20},
"category": {"type": "string", "enum": ["general", "artist", "copyright", "character", "meta"], "description": "Optional category filter."}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "validate_tags",
"description": "Exact-match validation for a list of tags. Splits into valid, deprecated, and invalid.",
"parameters": {
"type": "object",
"properties": {
"tags": {"type": "array", "items": {"type": "string"}, "description": "Tags to validate."}
},
"required": ["tags"]
}
}
},
{
"type": "function",
"function": {
"name": "suggest_tags",
"description": "Autocomplete-style suggestions for a partial or approximate tag. Sorted by post count.",
"parameters": {
"type": "object",
"properties": {
"partial": {"type": "string", "description": "Partial tag or rough approximation."},
"limit": {"type": "integer", "description": "Max suggestions (1-50)", "default": 10},
"category": {"type": "string", "enum": ["general", "artist", "copyright", "character", "meta"], "description": "Optional category filter."}
},
"required": ["partial"]
}
}
}
]
async def _run_mcp_tool(name, arguments):
server_params = StdioServerParameters(
command="docker",
args=["run", "--rm", "-i", "danbooru-mcp:latest"],
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(name, arguments)
return result.content[0].text
def call_mcp_tool(name, arguments):
try:
return asyncio.run(_run_mcp_tool(name, arguments))
except Exception as e:
print(f"MCP Tool Error: {e}")
return json.dumps({"error": str(e)})
def load_prompt(filename):
path = os.path.join('data/prompts', filename)
if os.path.exists(path):
with open(path, 'r') as f:
return f.read()
return None
def call_llm(prompt, system_prompt="You are a creative assistant."):
settings = Settings.query.first()
if not settings:
raise ValueError("Settings not configured.")
is_local = settings.llm_provider != 'openrouter'
if not is_local:
if not settings.openrouter_api_key:
raise ValueError("OpenRouter API Key not configured. Please configure it in Settings.")
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {settings.openrouter_api_key}",
"Content-Type": "application/json",
"HTTP-Referer": request.url_root,
"X-Title": "Character Browser"
}
model = settings.openrouter_model or 'google/gemini-2.0-flash-001'
else:
# Local provider (Ollama or LMStudio)
if not settings.local_base_url:
raise ValueError(f"{settings.llm_provider.title()} Base URL not configured.")
url = f"{settings.local_base_url.rstrip('/')}/chat/completions"
headers = {"Content-Type": "application/json"}
model = settings.local_model
if not model:
raise ValueError(f"No local model selected for {settings.llm_provider.title()}. Please select one in Settings.")
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
]
max_turns = 10
use_tools = True
format_retries = 3 # retries allowed for unexpected response format
while max_turns > 0:
max_turns -= 1
data = {
"model": model,
"messages": messages,
}
# Only add tools if supported/requested
if use_tools:
data["tools"] = DANBOORU_TOOLS
data["tool_choice"] = "auto"
try:
response = requests.post(url, headers=headers, json=data)
# If 400 Bad Request and we were using tools, try once without tools
if response.status_code == 400 and use_tools:
print(f"LLM Provider {settings.llm_provider} rejected tools. Retrying without tool calling...")
use_tools = False
max_turns += 1 # Reset turn for the retry
continue
response.raise_for_status()
result = response.json()
# Validate expected OpenAI-compatible response shape
if 'choices' not in result or not result['choices']:
raise KeyError('choices')
message = result['choices'][0].get('message')
if message is None:
raise KeyError('message')
if message.get('tool_calls'):
messages.append(message)
for tool_call in message['tool_calls']:
name = tool_call['function']['name']
args = json.loads(tool_call['function']['arguments'])
print(f"Executing MCP tool: {name}({args})")
tool_result = call_mcp_tool(name, args)
messages.append({
"role": "tool",
"tool_call_id": tool_call['id'],
"name": name,
"content": tool_result
})
continue
return message['content']
except requests.exceptions.RequestException as e:
error_body = ""
try: error_body = f" - Body: {response.text}"
except: pass
raise RuntimeError(f"LLM API request failed: {str(e)}{error_body}") from e
except (KeyError, IndexError) as e:
# Log the raw response to help diagnose the issue
raw = ""
try: raw = response.text[:500]
except: pass
print(f"Unexpected LLM response format (key={e}). Raw response: {raw}")
if format_retries > 0:
format_retries -= 1
max_turns += 1 # don't burn a turn on a format error
# Ask the model to try again with the correct format
messages.append({
"role": "user",
"content": (
"Your previous response was not in the expected format. "
"Please respond with valid JSON only, exactly as specified in the system prompt. "
"Do not include any explanation or markdown — only the raw JSON object."
)
})
print(f"Retrying after format error ({format_retries} retries left)…")
continue
raise RuntimeError(f"Unexpected LLM response format after retries: {str(e)}") from e
raise RuntimeError("LLM tool calling loop exceeded maximum turns")
@app.route('/get_openrouter_models', methods=['POST'])
def get_openrouter_models():
api_key = request.form.get('api_key')
if not api_key:
return {'error': 'API key is required'}, 400
headers = {"Authorization": f"Bearer {api_key}"}
try:
response = requests.get("https://openrouter.ai/api/v1/models", headers=headers)
response.raise_for_status()
models = response.json().get('data', [])
# Return simplified list of models
return {'models': [{'id': m['id'], 'name': m.get('name', m['id'])} for m in models]}
except Exception as e:
return {'error': str(e)}, 500
@app.route('/get_local_models', methods=['POST'])
def get_local_models():
base_url = request.form.get('base_url')
if not base_url:
return {'error': 'Base URL is required'}, 400
try:
response = requests.get(f"{base_url.rstrip('/')}/models")
response.raise_for_status()
models = response.json().get('data', [])
# Ollama/LMStudio often follow the same structure as OpenAI
return {'models': [{'id': m['id'], 'name': m.get('name', m['id'])} for m in models]}
except Exception as e:
return {'error': str(e)}, 500
@app.route('/settings', methods=['GET', 'POST'])
def settings():
settings = Settings.query.first()
if not settings:
settings = Settings()
db.session.add(settings)
db.session.commit()
if request.method == 'POST':
settings.llm_provider = request.form.get('llm_provider', 'openrouter')
settings.openrouter_api_key = request.form.get('api_key')
settings.openrouter_model = request.form.get('model')
settings.local_base_url = request.form.get('local_base_url')
settings.local_model = request.form.get('local_model')
db.session.commit()
flash('Settings updated successfully!')
return redirect(url_for('settings'))
return render_template('settings.html', settings=settings)
@app.route('/')
def index():
characters = Character.query.order_by(Character.name).all()
return render_template('index.html', characters=characters)
@app.route('/rescan', methods=['POST'])
def rescan():
sync_characters()
flash('Database synced with character files.')
return redirect(url_for('index'))
def build_extras_prompt(actions, outfits, scenes, styles, detailers):
"""Combine positive prompt text from all selected category items."""
parts = []
for action in actions:
data = action.data
lora = data.get('lora', {})
if lora.get('lora_triggers'):
parts.append(lora['lora_triggers'])
parts.extend(data.get('tags', []))
for key in ['full_body', 'additional']:
val = data.get('action', {}).get(key)
if val:
parts.append(val)
for outfit in outfits:
data = outfit.data
wardrobe = data.get('wardrobe', {})
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'accessories']:
val = wardrobe.get(key)
if val:
parts.append(val)
lora = data.get('lora', {})
if lora.get('lora_triggers'):
parts.append(lora['lora_triggers'])
parts.extend(data.get('tags', []))
for scene in scenes:
data = scene.data
scene_fields = data.get('scene', {})
for key in ['background', 'foreground', 'lighting']:
val = scene_fields.get(key)
if val:
parts.append(val)
lora = data.get('lora', {})
if lora.get('lora_triggers'):
parts.append(lora['lora_triggers'])
parts.extend(data.get('tags', []))
for style in styles:
data = style.data
style_fields = data.get('style', {})
if style_fields.get('artist_name'):
parts.append(f"by {style_fields['artist_name']}")
if style_fields.get('artistic_style'):
parts.append(style_fields['artistic_style'])
lora = data.get('lora', {})
if lora.get('lora_triggers'):
parts.append(lora['lora_triggers'])
for detailer in detailers:
data = detailer.data
if data.get('prompt'):
parts.append(data['prompt'])
lora = data.get('lora', {})
if lora.get('lora_triggers'):
parts.append(lora['lora_triggers'])
return ", ".join(p for p in parts if p)
@app.route('/generator', methods=['GET', 'POST'])
def generator():
characters = Character.query.order_by(Character.name).all()
checkpoints = get_available_checkpoints()
actions = Action.query.order_by(Action.name).all()
outfits = Outfit.query.order_by(Outfit.name).all()
scenes = Scene.query.order_by(Scene.name).all()
styles = Style.query.order_by(Style.name).all()
detailers = Detailer.query.order_by(Detailer.name).all()
if not checkpoints:
checkpoints = ["Noob/oneObsession_v19Atypical.safetensors"]
if request.method == 'POST':
char_slug = request.form.get('character')
checkpoint = request.form.get('checkpoint')
custom_positive = request.form.get('positive_prompt', '')
custom_negative = request.form.get('negative_prompt', '')
client_id = request.form.get('client_id')
action_slugs = request.form.getlist('action_slugs')
outfit_slugs = request.form.getlist('outfit_slugs')
scene_slugs = request.form.getlist('scene_slugs')
style_slugs = request.form.getlist('style_slugs')
detailer_slugs = request.form.getlist('detailer_slugs')
override_prompt = request.form.get('override_prompt', '').strip()
width = request.form.get('width') or 1024
height = request.form.get('height') or 1024
character = Character.query.filter_by(slug=char_slug).first_or_404()
sel_actions = Action.query.filter(Action.slug.in_(action_slugs)).all() if action_slugs else []
sel_outfits = Outfit.query.filter(Outfit.slug.in_(outfit_slugs)).all() if outfit_slugs else []
sel_scenes = Scene.query.filter(Scene.slug.in_(scene_slugs)).all() if scene_slugs else []
sel_styles = Style.query.filter(Style.slug.in_(style_slugs)).all() if style_slugs else []
sel_detailers = Detailer.query.filter(Detailer.slug.in_(detailer_slugs)).all() if detailer_slugs else []
try:
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# Build base prompts from character defaults
prompts = build_prompt(character.data, default_fields=character.default_fields)
if override_prompt:
prompts["main"] = override_prompt
else:
extras = build_extras_prompt(sel_actions, sel_outfits, sel_scenes, sel_styles, sel_detailers)
combined = prompts["main"]
if extras:
combined = f"{combined}, {extras}"
if custom_positive:
combined = f"{combined}, {custom_positive}"
prompts["main"] = combined
# Prepare workflow - first selected item per category supplies its LoRA slot
ckpt_obj = Checkpoint.query.filter_by(checkpoint_path=checkpoint).first() if checkpoint else None
workflow = _prepare_workflow(
workflow, character, prompts, checkpoint, custom_negative,
outfit=sel_outfits[0] if sel_outfits else None,
action=sel_actions[0] if sel_actions else None,
style=sel_styles[0] if sel_styles else None,
detailer=sel_detailers[0] if sel_detailers else None,
scene=sel_scenes[0] if sel_scenes else None,
width=width,
height=height,
checkpoint_data=ckpt_obj.data if ckpt_obj else None,
)
print(f"Queueing generator prompt for {character.character_id}")
prompt_response = queue_prompt(workflow, client_id=client_id)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'prompt_id': prompt_id}
flash("Generation started...")
max_retries = 120
while max_retries > 0:
history = get_history(prompt_id)
if prompt_id in history:
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{character.slug}")
os.makedirs(char_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(char_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
relative_path = f"characters/{character.slug}/{filename}"
return render_template('generator.html',
characters=characters, checkpoints=checkpoints,
actions=actions, outfits=outfits, scenes=scenes,
styles=styles, detailers=detailers,
generated_image=relative_path, selected_char=char_slug, selected_ckpt=checkpoint)
time.sleep(2)
max_retries -= 1
flash("Generation timed out.")
except Exception as e:
print(f"Generator error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error: {str(e)}")
return render_template('generator.html', characters=characters, checkpoints=checkpoints,
actions=actions, outfits=outfits, scenes=scenes,
styles=styles, detailers=detailers)
@app.route('/generator/finalize/<path:slug>/<prompt_id>', methods=['POST'])
def finalize_generator(slug, prompt_id):
character = Character.query.filter_by(slug=slug).first_or_404()
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'History not found'}, 404
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{slug}")
os.makedirs(char_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(char_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
relative_path = f"characters/{slug}/{filename}"
return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')}
return {'error': 'No image found in output'}, 404
except Exception as e:
print(f"Finalize error: {e}")
return {'error': str(e)}, 500
@app.route('/generator/preview_prompt', methods=['POST'])
def generator_preview_prompt():
char_slug = request.form.get('character')
if not char_slug:
return {'error': 'No character selected'}, 400
character = Character.query.filter_by(slug=char_slug).first()
if not character:
return {'error': 'Character not found'}, 404
action_slugs = request.form.getlist('action_slugs')
outfit_slugs = request.form.getlist('outfit_slugs')
scene_slugs = request.form.getlist('scene_slugs')
style_slugs = request.form.getlist('style_slugs')
detailer_slugs = request.form.getlist('detailer_slugs')
custom_positive = request.form.get('positive_prompt', '')
sel_actions = Action.query.filter(Action.slug.in_(action_slugs)).all() if action_slugs else []
sel_outfits = Outfit.query.filter(Outfit.slug.in_(outfit_slugs)).all() if outfit_slugs else []
sel_scenes = Scene.query.filter(Scene.slug.in_(scene_slugs)).all() if scene_slugs else []
sel_styles = Style.query.filter(Style.slug.in_(style_slugs)).all() if style_slugs else []
sel_detailers = Detailer.query.filter(Detailer.slug.in_(detailer_slugs)).all() if detailer_slugs else []
prompts = build_prompt(character.data, default_fields=character.default_fields)
extras = build_extras_prompt(sel_actions, sel_outfits, sel_scenes, sel_styles, sel_detailers)
combined = prompts["main"]
if extras:
combined = f"{combined}, {extras}"
if custom_positive:
combined = f"{combined}, {custom_positive}"
return {'prompt': combined}
@app.route('/character/<path:slug>')
def detail(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
# Load state from session
preferences = session.get(f'prefs_{slug}')
preview_image = session.get(f'preview_{slug}')
return render_template('detail.html', character=character, preferences=preferences, preview_image=preview_image)
@app.route('/create', methods=['GET', 'POST'])
def create_character():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
prompt = request.form.get('prompt', '')
use_llm = request.form.get('use_llm') == 'on'
# Auto-generate slug from name if not provided
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
# Validate slug
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'character'
# Find available filename (increment if exists)
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
# Check if LLM generation is requested
if use_llm:
if not prompt:
flash("Description is required when AI generation is enabled.")
return redirect(request.url)
# Generate JSON with LLM
system_prompt = load_prompt('character_system.txt')
if not system_prompt:
flash("System prompt file not found.")
return redirect(request.url)
try:
llm_response = call_llm(f"Create a character profile for '{name}' based on this description: {prompt}", system_prompt)
# Clean response (remove markdown if present)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
char_data = json.loads(clean_json)
# Enforce IDs
char_data['character_id'] = safe_slug
char_data['character_name'] = name
except Exception as e:
print(f"LLM error: {e}")
flash(f"Failed to generate character profile: {e}")
return redirect(request.url)
else:
# Create blank character template
char_data = {
"character_id": safe_slug,
"character_name": name,
"identity": {
"base_specs": "",
"hair": "",
"eyes": "",
"hands": "",
"arms": "",
"torso": "",
"pelvis": "",
"legs": "",
"feet": "",
"extra": ""
},
"defaults": {
"expression": "",
"pose": "",
"scene": ""
},
"wardrobe": {
"full_body": "",
"headwear": "",
"top": "",
"bottom": "",
"legwear": "",
"footwear": "",
"hands": "",
"accessories": ""
},
"styles": {
"aesthetic": "",
"primary_color": "",
"secondary_color": "",
"tertiary_color": ""
},
"lora": {
"lora_name": "",
"lora_weight": 1.0,
"lora_triggers": ""
},
"tags": []
}
try:
# Save file
file_path = os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(char_data, f, indent=2)
# Add to DB
new_char = Character(
character_id=safe_slug,
slug=safe_slug,
filename=f"{safe_slug}.json",
name=name,
data=char_data
)
db.session.add(new_char)
db.session.commit()
flash('Character created successfully!')
return redirect(url_for('detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create character: {e}")
return redirect(request.url)
return render_template('create.html')
@app.route('/character/<path:slug>/edit', methods=['GET', 'POST'])
def edit_character(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
loras = get_available_loras()
char_looks = Look.query.filter_by(character_id=character.character_id).order_by(Look.name).all()
if request.method == 'POST':
try:
# 1. Update basic fields
character.name = request.form.get('character_name')
# 2. Rebuild the data dictionary
new_data = character.data.copy()
new_data['character_name'] = character.name
# Update nested sections (non-wardrobe)
for section in ['identity', 'defaults', 'styles', 'lora']:
if section in new_data:
for key in new_data[section]:
form_key = f"{section}_{key}"
if form_key in request.form:
val = request.form.get(form_key)
# Handle numeric weight
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data[section][key] = val
# LoRA weight randomization bounds (new fields not present in existing JSON)
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Handle wardrobe - support both nested and flat formats
wardrobe = new_data.get('wardrobe', {})
if 'default' in wardrobe and isinstance(wardrobe.get('default'), dict):
# New nested format - update each outfit
for outfit_name in wardrobe.keys():
for key in wardrobe[outfit_name].keys():
form_key = f"wardrobe_{outfit_name}_{key}"
if form_key in request.form:
wardrobe[outfit_name][key] = request.form.get(form_key)
new_data['wardrobe'] = wardrobe
else:
# Legacy flat format
if 'wardrobe' in new_data:
for key in new_data['wardrobe'].keys():
form_key = f"wardrobe_{key}"
if form_key in request.form:
new_data['wardrobe'][key] = request.form.get(form_key)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t]
character.data = new_data
flag_modified(character, "data")
# 3. Write back to JSON file
# Use the filename we stored during sync, or fallback to a sanitized ID
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Character profile updated successfully!')
return redirect(url_for('detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('edit.html', character=character, loras=loras, char_looks=char_looks)
@app.route('/character/<path:slug>/outfit/switch', methods=['POST'])
def switch_outfit(slug):
"""Switch the active outfit for a character."""
character = Character.query.filter_by(slug=slug).first_or_404()
outfit_name = request.form.get('outfit', 'default')
# Validate outfit exists
available_outfits = character.get_available_outfits()
if outfit_name in available_outfits:
character.active_outfit = outfit_name
db.session.commit()
flash(f'Switched to "{outfit_name}" outfit.')
else:
flash(f'Outfit "{outfit_name}" not found.', 'error')
return redirect(url_for('detail', slug=slug))
@app.route('/character/<path:slug>/outfit/add', methods=['POST'])
def add_outfit(slug):
"""Add a new outfit to a character."""
character = Character.query.filter_by(slug=slug).first_or_404()
outfit_name = request.form.get('outfit_name', '').strip()
if not outfit_name:
flash('Outfit name cannot be empty.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Sanitize outfit name for use as key
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', outfit_name.lower())
# Get wardrobe data
wardrobe = character.data.get('wardrobe', {})
# Ensure wardrobe is in new nested format
if 'default' not in wardrobe or not isinstance(wardrobe.get('default'), dict):
# Convert legacy format
wardrobe = {'default': wardrobe}
# Check if outfit already exists
if safe_name in wardrobe:
flash(f'Outfit "{safe_name}" already exists.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Create new outfit (copy from default as template)
default_outfit = wardrobe.get('default', {
'headwear': '', 'top': '', 'legwear': '',
'footwear': '', 'hands': '', 'accessories': ''
})
wardrobe[safe_name] = default_outfit.copy()
# Update character data
character.data['wardrobe'] = wardrobe
flag_modified(character, 'data')
# Save to JSON file
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(character.data, f, indent=2)
db.session.commit()
flash(f'Added new outfit "{safe_name}".')
return redirect(url_for('edit_character', slug=slug))
@app.route('/character/<path:slug>/outfit/delete', methods=['POST'])
def delete_outfit(slug):
"""Delete an outfit from a character."""
character = Character.query.filter_by(slug=slug).first_or_404()
outfit_name = request.form.get('outfit', '')
wardrobe = character.data.get('wardrobe', {})
# Cannot delete default
if outfit_name == 'default':
flash('Cannot delete the default outfit.', 'error')
return redirect(url_for('edit_character', slug=slug))
if outfit_name not in wardrobe:
flash(f'Outfit "{outfit_name}" not found.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Delete outfit
del wardrobe[outfit_name]
character.data['wardrobe'] = wardrobe
flag_modified(character, 'data')
# Switch active outfit if deleted was active
if character.active_outfit == outfit_name:
character.active_outfit = 'default'
# Save to JSON file
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(character.data, f, indent=2)
db.session.commit()
flash(f'Deleted outfit "{outfit_name}".')
return redirect(url_for('edit_character', slug=slug))
@app.route('/character/<path:slug>/outfit/rename', methods=['POST'])
def rename_outfit(slug):
"""Rename an outfit."""
character = Character.query.filter_by(slug=slug).first_or_404()
old_name = request.form.get('old_name', '')
new_name = request.form.get('new_name', '').strip()
if not new_name:
flash('New name cannot be empty.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Sanitize new name
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', new_name.lower())
wardrobe = character.data.get('wardrobe', {})
if old_name not in wardrobe:
flash(f'Outfit "{old_name}" not found.', 'error')
return redirect(url_for('edit_character', slug=slug))
if safe_name in wardrobe and safe_name != old_name:
flash(f'Outfit "{safe_name}" already exists.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Rename (copy to new key, delete old)
wardrobe[safe_name] = wardrobe.pop(old_name)
character.data['wardrobe'] = wardrobe
flag_modified(character, 'data')
# Update active outfit if renamed was active
if character.active_outfit == old_name:
character.active_outfit = safe_name
# Save to JSON file
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(character.data, f, indent=2)
db.session.commit()
flash(f'Renamed outfit "{old_name}" to "{safe_name}".')
return redirect(url_for('edit_character', slug=slug))
@app.route('/character/<path:slug>/upload', methods=['POST'])
def upload_image(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create character subfolder
char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{slug}")
os.makedirs(char_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(char_folder, filename)
file.save(file_path)
# Store relative path in DB
character.image_path = f"characters/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('detail', slug=slug))
@app.route('/character/<path:slug>/finalize_generation/<prompt_id>', methods=['POST'])
def finalize_generation(slug, prompt_id):
character = Character.query.filter_by(slug=slug).first_or_404()
action = request.form.get('action', 'preview')
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'History not found'}, 404
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
# Create character subfolder
char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{slug}")
os.makedirs(char_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(char_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
print(f"Image saved to: {os.path.abspath(file_path)}")
# Handle actions - always save as preview
relative_path = f"characters/{slug}/{filename}"
session[f'preview_{slug}'] = relative_path
session.modified = True # Ensure session is saved for JSON response
# If action is 'replace', also update the character's cover image immediately
if action == 'replace':
character.image_path = relative_path
db.session.commit()
return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')}
return {'error': 'No image found in output'}, 404
except Exception as e:
print(f"Finalize error: {e}")
return {'error': str(e)}, 500
@app.route('/character/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_cover_from_preview(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_{slug}')
if preview_path:
character.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('detail', slug=slug))
def _log_workflow_prompts(label, workflow):
"""Print the final assembled ComfyUI prompts in a consistent, readable block."""
sep = "=" * 72
print(f"\n{sep}")
print(f" WORKFLOW PROMPTS [{label}]")
print(sep)
print(f" Checkpoint : {workflow['4']['inputs'].get('ckpt_name', '(not set)')}")
print(f" Seed : {workflow['3']['inputs'].get('seed', '(not set)')}")
print(f" Resolution : {workflow['5']['inputs'].get('width', '?')} x {workflow['5']['inputs'].get('height', '?')}")
print(f" Sampler : {workflow['3']['inputs'].get('sampler_name', '?')} / {workflow['3']['inputs'].get('scheduler', '?')} steps={workflow['3']['inputs'].get('steps', '?')} cfg={workflow['3']['inputs'].get('cfg', '?')}")
# LoRA chain summary
active_loras = []
for node_id, label_str in [("16", "char/look"), ("17", "outfit"), ("18", "action"), ("19", "style/detail/scene")]:
if node_id in workflow:
name = workflow[node_id]["inputs"].get("lora_name", "")
if name:
w = workflow[node_id]["inputs"].get("strength_model", "?")
active_loras.append(f"{label_str}:{name.split('/')[-1]}@{w:.3f}" if isinstance(w, float) else f"{label_str}:{name.split('/')[-1]}@{w}")
print(f" LoRAs : {' | '.join(active_loras) if active_loras else '(none)'}")
print(f" [+] Positive : {workflow['6']['inputs'].get('text', '')}")
print(f" [-] Negative : {workflow['7']['inputs'].get('text', '')}")
face_text = workflow.get('14', {}).get('inputs', {}).get('text', '')
hand_text = workflow.get('15', {}).get('inputs', {}).get('text', '')
if face_text:
print(f" [F] Face : {face_text}")
if hand_text:
print(f" [H] Hand : {hand_text}")
print(f"{sep}\n")
def _prepare_workflow(workflow, character, prompts, checkpoint=None, custom_negative=None, outfit=None, action=None, style=None, detailer=None, scene=None, width=None, height=None, checkpoint_data=None, look=None, fixed_seed=None):
# 1. Update prompts using replacement to preserve embeddings
workflow["6"]["inputs"]["text"] = workflow["6"]["inputs"]["text"].replace("{{POSITIVE_PROMPT}}", prompts["main"])
if custom_negative:
workflow["7"]["inputs"]["text"] = f"{workflow['7']['inputs']['text']}, {custom_negative}"
if "14" in workflow:
workflow["14"]["inputs"]["text"] = workflow["14"]["inputs"]["text"].replace("{{FACE_PROMPT}}", prompts["face"])
if "15" in workflow:
workflow["15"]["inputs"]["text"] = workflow["15"]["inputs"]["text"].replace("{{HAND_PROMPT}}", prompts["hand"])
# 2. Update Checkpoint
if checkpoint:
workflow["4"]["inputs"]["ckpt_name"] = checkpoint
# 3. Handle LoRAs - Node 16 for character, Node 17 for outfit, Node 18 for action, Node 19 for style/detailer
# Start with direct checkpoint connections
model_source = ["4", 0]
clip_source = ["4", 1]
# Look negative prompt (applied before character LoRA)
if look:
look_negative = look.data.get('negative', '')
if look_negative:
workflow["7"]["inputs"]["text"] = f"{look_negative}, {workflow['7']['inputs']['text']}"
# Character LoRA (Node 16) — look LoRA overrides character LoRA when present
if look:
char_lora_data = look.data.get('lora', {})
else:
char_lora_data = character.data.get('lora', {}) if character else {}
char_lora_name = char_lora_data.get('lora_name')
if char_lora_name and "16" in workflow:
_w16 = _resolve_lora_weight(char_lora_data)
workflow["16"]["inputs"]["lora_name"] = char_lora_name
workflow["16"]["inputs"]["strength_model"] = _w16
workflow["16"]["inputs"]["strength_clip"] = _w16
workflow["16"]["inputs"]["model"] = ["4", 0] # From checkpoint
workflow["16"]["inputs"]["clip"] = ["4", 1] # From checkpoint
model_source = ["16", 0]
clip_source = ["16", 1]
print(f"Character LoRA: {char_lora_name} @ {_w16}")
# Outfit LoRA (Node 17) - chains from character LoRA or checkpoint
outfit_lora_data = outfit.data.get('lora', {}) if outfit else {}
outfit_lora_name = outfit_lora_data.get('lora_name')
if outfit_lora_name and "17" in workflow:
_w17 = _resolve_lora_weight({**{'lora_weight': 0.8}, **outfit_lora_data})
workflow["17"]["inputs"]["lora_name"] = outfit_lora_name
workflow["17"]["inputs"]["strength_model"] = _w17
workflow["17"]["inputs"]["strength_clip"] = _w17
# Chain from character LoRA (node 16) or checkpoint (node 4)
workflow["17"]["inputs"]["model"] = model_source
workflow["17"]["inputs"]["clip"] = clip_source
model_source = ["17", 0]
clip_source = ["17", 1]
print(f"Outfit LoRA: {outfit_lora_name} @ {_w17}")
# Action LoRA (Node 18) - chains from previous LoRA or checkpoint
action_lora_data = action.data.get('lora', {}) if action else {}
action_lora_name = action_lora_data.get('lora_name')
if action_lora_name and "18" in workflow:
_w18 = _resolve_lora_weight(action_lora_data)
workflow["18"]["inputs"]["lora_name"] = action_lora_name
workflow["18"]["inputs"]["strength_model"] = _w18
workflow["18"]["inputs"]["strength_clip"] = _w18
# Chain from previous source
workflow["18"]["inputs"]["model"] = model_source
workflow["18"]["inputs"]["clip"] = clip_source
model_source = ["18", 0]
clip_source = ["18", 1]
print(f"Action LoRA: {action_lora_name} @ {_w18}")
# Style/Detailer/Scene LoRA (Node 19) - chains from previous LoRA or checkpoint
# Priority: Style > Detailer > Scene (Scene LoRAs are rare but supported)
target_obj = style or detailer or scene
style_lora_data = target_obj.data.get('lora', {}) if target_obj else {}
style_lora_name = style_lora_data.get('lora_name')
if style_lora_name and "19" in workflow:
_w19 = _resolve_lora_weight(style_lora_data)
workflow["19"]["inputs"]["lora_name"] = style_lora_name
workflow["19"]["inputs"]["strength_model"] = _w19
workflow["19"]["inputs"]["strength_clip"] = _w19
# Chain from previous source
workflow["19"]["inputs"]["model"] = model_source
workflow["19"]["inputs"]["clip"] = clip_source
model_source = ["19", 0]
clip_source = ["19", 1]
print(f"Style/Detailer LoRA: {style_lora_name} @ {_w19}")
# Apply connections to all model/clip consumers
workflow["3"]["inputs"]["model"] = model_source
workflow["11"]["inputs"]["model"] = model_source
workflow["13"]["inputs"]["model"] = model_source
workflow["6"]["inputs"]["clip"] = clip_source
workflow["7"]["inputs"]["clip"] = clip_source
workflow["11"]["inputs"]["clip"] = clip_source
workflow["13"]["inputs"]["clip"] = clip_source
workflow["14"]["inputs"]["clip"] = clip_source
workflow["15"]["inputs"]["clip"] = clip_source
# 4. Randomize seeds (or use a fixed seed for reproducible batches like Strengths Gallery)
gen_seed = fixed_seed if fixed_seed is not None else random.randint(1, 10**15)
workflow["3"]["inputs"]["seed"] = gen_seed
if "11" in workflow: workflow["11"]["inputs"]["seed"] = gen_seed
if "13" in workflow: workflow["13"]["inputs"]["seed"] = gen_seed
# 5. Set image dimensions
if "5" in workflow:
if width:
workflow["5"]["inputs"]["width"] = int(width)
if height:
workflow["5"]["inputs"]["height"] = int(height)
# 6. Apply checkpoint-specific settings (steps, cfg, sampler, base prompts, VAE)
if checkpoint_data:
workflow = _apply_checkpoint_settings(workflow, checkpoint_data)
# 7. Sync sampler/scheduler from main KSampler to adetailer nodes
sampler_name = workflow["3"]["inputs"].get("sampler_name")
scheduler = workflow["3"]["inputs"].get("scheduler")
for node_id in ["11", "13"]:
if node_id in workflow:
if sampler_name:
workflow[node_id]["inputs"]["sampler_name"] = sampler_name
if scheduler:
workflow[node_id]["inputs"]["scheduler"] = scheduler
# 8. Cross-deduplicate: remove tags shared between positive and negative
pos_text, neg_text = _cross_dedup_prompts(
workflow["6"]["inputs"]["text"],
workflow["7"]["inputs"]["text"]
)
workflow["6"]["inputs"]["text"] = pos_text
workflow["7"]["inputs"]["text"] = neg_text
# 9. Final prompt debug — logged after all transformations are complete
_log_workflow_prompts("_prepare_workflow", workflow)
return workflow
def _get_default_checkpoint():
"""Return (checkpoint_path, checkpoint_data) from the session default, or (None, None)."""
ckpt_path = session.get('default_checkpoint')
if not ckpt_path:
return None, None
ckpt = Checkpoint.query.filter_by(checkpoint_path=ckpt_path).first()
if not ckpt:
return None, None
return ckpt.checkpoint_path, ckpt.data or {}
def _queue_generation(character, action='preview', selected_fields=None, client_id=None):
# 1. Load workflow
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# 2. Build prompts with active outfit
prompts = build_prompt(character.data, selected_fields, character.default_fields, character.active_outfit)
# 3. Prepare workflow
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
return queue_prompt(workflow, client_id=client_id)
@app.route('/get_missing_characters')
def get_missing_characters():
missing = Character.query.filter((Character.image_path == None) | (Character.image_path == '')).all()
return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]}
@app.route('/clear_all_covers', methods=['POST'])
def clear_all_covers():
characters = Character.query.all()
for char in characters:
char.image_path = None
db.session.commit()
return {'success': True}
@app.route('/generate_missing', methods=['POST'])
def generate_missing():
# Query fresh from database for each check to avoid stale session issues
def get_missing_count():
return Character.query.filter((Character.image_path == None) | (Character.image_path == '')).count()
if get_missing_count() == 0:
flash("No characters missing cover images.")
return redirect(url_for('index'))
success_count = 0
processed = 0
# Keep generating until no more missing
while get_missing_count() > 0:
# Get the next character in alphabetical order
character = Character.query.filter(
(Character.image_path == None) | (Character.image_path == '')
).order_by(Character.name).first()
if not character:
break
character_slug = character.slug
character_name = character.name
processed += 1
try:
print(f"Batch generating for: {character_name}")
prompt_response = _queue_generation(character, action='replace')
prompt_id = prompt_response['prompt_id']
# Simple synchronous wait for each
max_retries = 120
while max_retries > 0:
history = get_history(prompt_id)
if prompt_id in history:
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{character_slug}")
os.makedirs(char_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(char_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
# Re-query the character to ensure it's attached to the session
character_to_update = Character.query.filter_by(slug=character_slug).first()
if character_to_update:
character_to_update.image_path = f"characters/{character_slug}/{filename}"
db.session.commit()
print(f"Saved cover for {character_name}: {character_to_update.image_path}")
success_count += 1
break
break
time.sleep(2)
max_retries -= 1
except Exception as e:
print(f"Error generating for {character_name}: {e}")
db.session.rollback() # Rollback on error to ensure clean state
flash(f"Batch generation complete. Generated {success_count} images.")
return redirect(url_for('index'))
@app.route('/check_status/<prompt_id>')
def check_status(prompt_id):
try:
history = get_history(prompt_id)
if prompt_id in history:
return {'status': 'finished'}
return {'status': 'pending'}
except Exception:
return {'status': 'error'}, 500
@app.route('/character/<path:slug>/generate', methods=['POST'])
def generate_image(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
client_id = request.form.get('client_id')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Save preferences
session[f'prefs_{slug}'] = selected_fields
# Queue generation using helper
prompt_response = _queue_generation(character, action, selected_fields, client_id=client_id)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
# Return JSON if AJAX request
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'prompt_id': prompt_id}
return redirect(url_for('detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('detail', slug=slug))
@app.route('/character/<path:slug>/save_defaults', methods=['POST'])
def save_defaults(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
character.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this character!')
return redirect(url_for('detail', slug=slug))
@app.route('/get_missing_outfits')
def get_missing_outfits():
missing = Outfit.query.filter((Outfit.image_path == None) | (Outfit.image_path == '')).all()
return {'missing': [{'slug': o.slug, 'name': o.name} for o in missing]}
@app.route('/clear_all_outfit_covers', methods=['POST'])
def clear_all_outfit_covers():
outfits = Outfit.query.all()
for outfit in outfits:
outfit.image_path = None
db.session.commit()
return {'success': True}
@app.route('/get_missing_actions')
def get_missing_actions():
missing = Action.query.filter((Action.image_path == None) | (Action.image_path == '')).all()
return {'missing': [{'slug': a.slug, 'name': a.name} for a in missing]}
@app.route('/clear_all_action_covers', methods=['POST'])
def clear_all_action_covers():
actions = Action.query.all()
for action in actions:
action.image_path = None
db.session.commit()
return {'success': True}
@app.route('/get_missing_scenes')
def get_missing_scenes():
missing = Scene.query.filter((Scene.image_path == None) | (Scene.image_path == '')).all()
return {'missing': [{'slug': s.slug, 'name': s.name} for s in missing]}
@app.route('/clear_all_scene_covers', methods=['POST'])
def clear_all_scene_covers():
scenes = Scene.query.all()
for scene in scenes:
scene.image_path = None
db.session.commit()
return {'success': True}
# ============ OUTFIT ROUTES ============
@app.route('/outfits')
def outfits_index():
outfits = Outfit.query.order_by(Outfit.name).all()
return render_template('outfits/index.html', outfits=outfits)
@app.route('/outfits/rescan', methods=['POST'])
def rescan_outfits():
sync_outfits()
flash('Database synced with outfit files.')
return redirect(url_for('outfits_index'))
@app.route('/outfits/bulk_create', methods=['POST'])
def bulk_create_outfits_from_loras():
clothing_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Clothing/'
if not os.path.exists(clothing_lora_dir):
flash('Clothing LoRA directory not found.', 'error')
return redirect(url_for('outfits_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('outfit_system.txt')
if not system_prompt:
flash('Outfit system prompt file not found.', 'error')
return redirect(url_for('outfits_index'))
for filename in os.listdir(clothing_lora_dir):
if not filename.endswith('.safetensors'):
continue
name_base = filename.rsplit('.', 1)[0]
outfit_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
outfit_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{outfit_id}.json"
json_path = os.path.join(app.config['CLOTHING_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(clothing_lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe outfit: {outfit_name}")
prompt = f"Create an outfit profile for a clothing LoRA based on the filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text extracted from an associated HTML file:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
outfit_data = json.loads(clean_json)
outfit_data['outfit_id'] = outfit_id
outfit_data['outfit_name'] = outfit_name
if 'lora' not in outfit_data:
outfit_data['lora'] = {}
outfit_data['lora']['lora_name'] = f"Illustrious/Clothing/{filename}"
if not outfit_data['lora'].get('lora_triggers'):
outfit_data['lora']['lora_triggers'] = name_base
if outfit_data['lora'].get('lora_weight') is None:
outfit_data['lora']['lora_weight'] = 0.8
if outfit_data['lora'].get('lora_weight_min') is None:
outfit_data['lora']['lora_weight_min'] = 0.7
if outfit_data['lora'].get('lora_weight_max') is None:
outfit_data['lora']['lora_weight_max'] = 1.0
os.makedirs(app.config['CLOTHING_DIR'], exist_ok=True)
with open(json_path, 'w') as f:
json.dump(outfit_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
time.sleep(0.5)
except Exception as e:
print(f"Error creating outfit for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_outfits()
msg = f'Successfully processed outfits: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No outfits created or overwritten. {skipped_count} existing entries found.')
return redirect(url_for('outfits_index'))
@app.route('/outfit/<path:slug>')
def outfit_detail(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Load state from session
preferences = session.get(f'prefs_outfit_{slug}')
preview_image = session.get(f'preview_outfit_{slug}')
selected_character = session.get(f'char_outfit_{slug}')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"outfits/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"outfits/{slug}/{f}" for f in files]
return render_template('outfits/detail.html', outfit=outfit, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character, existing_previews=existing_previews)
@app.route('/outfit/<path:slug>/edit', methods=['GET', 'POST'])
def edit_outfit(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
loras = get_available_clothing_loras() # Use clothing LoRAs for outfits
if request.method == 'POST':
try:
# 1. Update basic fields
outfit.name = request.form.get('outfit_name')
# 2. Rebuild the data dictionary
new_data = outfit.data.copy()
new_data['outfit_name'] = outfit.name
# Update outfit_id if provided
new_outfit_id = request.form.get('outfit_id', outfit.outfit_id)
new_data['outfit_id'] = new_outfit_id
# Update wardrobe section
if 'wardrobe' in new_data:
for key in new_data['wardrobe'].keys():
form_key = f"wardrobe_{key}"
if form_key in request.form:
new_data['wardrobe'][key] = request.form.get(form_key)
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 0.8
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t]
outfit.data = new_data
flag_modified(outfit, "data")
# 3. Write back to JSON file
outfit_file = outfit.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', outfit.outfit_id)}.json"
file_path = os.path.join(app.config['CLOTHING_DIR'], outfit_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Outfit profile updated successfully!')
return redirect(url_for('outfit_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('outfits/edit.html', outfit=outfit, loras=loras)
@app.route('/outfit/<path:slug>/upload', methods=['POST'])
def upload_outfit_image(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create outfit subfolder
outfit_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"outfits/{slug}")
os.makedirs(outfit_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(outfit_folder, filename)
file.save(file_path)
# Store relative path in DB
outfit.image_path = f"outfits/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('outfit_detail', slug=slug))
@app.route('/outfit/<path:slug>/generate', methods=['POST'])
def generate_outfit_image(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
client_id = request.form.get('client_id')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = None
# Handle random character selection
if character_slug == '__random__':
all_characters = Character.query.all()
if all_characters:
character = random.choice(all_characters)
character_slug = character.slug
elif character_slug:
character = Character.query.filter_by(slug=character_slug).first()
# Save preferences
session[f'prefs_outfit_{slug}'] = selected_fields
session[f'char_outfit_{slug}'] = character_slug
# Build combined data for prompt building
if character:
# Combine character identity/defaults with outfit wardrobe
combined_data = {
'character_id': character.character_id,
'identity': character.data.get('identity', {}),
'defaults': character.data.get('defaults', {}),
'wardrobe': outfit.data.get('wardrobe', {}), # Use outfit's wardrobe
'styles': character.data.get('styles', {}), # Use character's styles
'lora': outfit.data.get('lora', {}), # Use outfit's lora
'tags': outfit.data.get('tags', [])
}
# When character is selected, merge character identity fields into selected_fields
# so they are included in the prompt
if selected_fields:
# Add character identity fields to selection if not already present
for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']:
if character.data.get('identity', {}).get(key):
field_key = f'identity::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
# Add expression and pose, but NOT scene (outfit previews use simple background)
for key in ['expression', 'pose']:
if character.data.get('defaults', {}).get(key):
field_key = f'defaults::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
# Always include character name
if 'special::name' not in selected_fields:
selected_fields.append('special::name')
else:
# No explicit field selection (e.g. batch generation) — build a selection
# that includes identity + wardrobe + name + lora triggers, but NOT character
# defaults (expression, pose, scene), so outfit covers stay generic.
for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
outfit_wardrobe = outfit.data.get('wardrobe', {})
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if outfit_wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
selected_fields.append('special::name')
if outfit.data.get('lora', {}).get('lora_triggers'):
selected_fields.append('lora::lora_triggers')
default_fields = character.default_fields
else:
# Outfit only - no character
combined_data = {
'character_id': outfit.outfit_id,
'wardrobe': outfit.data.get('wardrobe', {}),
'lora': outfit.data.get('lora', {}),
'tags': outfit.data.get('tags', [])
}
default_fields = outfit.default_fields
# Queue generation
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# Build prompts for combined data
prompts = build_prompt(combined_data, selected_fields, default_fields)
# Add colored simple background to the main prompt for outfit previews
# Use character's primary_color if available
if character:
primary_color = character.data.get('styles', {}).get('primary_color', '')
if primary_color:
prompts["main"] = f"{prompts['main']}, {primary_color} simple background"
else:
prompts["main"] = f"{prompts['main']}, simple background"
else:
prompts["main"] = f"{prompts['main']}, simple background"
# Prepare workflow - pass both character and outfit for dual LoRA support
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, outfit=outfit, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
prompt_response = queue_prompt(workflow, client_id=client_id)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
# Return JSON if AJAX request
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'prompt_id': prompt_id}
return redirect(url_for('outfit_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('outfit_detail', slug=slug))
@app.route('/outfit/<path:slug>/finalize_generation/<prompt_id>', methods=['POST'])
def finalize_outfit_generation(slug, prompt_id):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
action = request.form.get('action', 'preview')
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'History not found'}, 404
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
# Create outfit subfolder
outfit_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"outfits/{slug}")
os.makedirs(outfit_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(outfit_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
print(f"Image saved to: {os.path.abspath(file_path)}")
# Always save as preview
relative_path = f"outfits/{slug}/{filename}"
session[f'preview_outfit_{slug}'] = relative_path
session.modified = True # Ensure session is saved for JSON response
# If action is 'replace', also update the outfit's cover image immediately
if action == 'replace':
outfit.image_path = relative_path
db.session.commit()
return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')}
return {'error': 'No image found in output'}, 404
except Exception as e:
print(f"Finalize error: {e}")
return {'error': str(e)}, 500
@app.route('/outfit/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_outfit_cover_from_preview(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_outfit_{slug}')
if preview_path:
outfit.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('outfit_detail', slug=slug))
@app.route('/outfit/create', methods=['GET', 'POST'])
def create_outfit():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
prompt = request.form.get('prompt', '')
use_llm = request.form.get('use_llm') == 'on'
# Auto-generate slug from name if not provided
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
# Validate slug
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'outfit'
# Find available filename (increment if exists)
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['CLOTHING_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
# Check if LLM generation is requested
if use_llm:
if not prompt:
flash("Description is required when AI generation is enabled.")
return redirect(request.url)
# Generate JSON with LLM
system_prompt = load_prompt('outfit_system.txt')
if not system_prompt:
flash("System prompt file not found.")
return redirect(request.url)
try:
llm_response = call_llm(f"Create an outfit profile for '{name}' based on this description: {prompt}", system_prompt)
# Clean response (remove markdown if present)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
outfit_data = json.loads(clean_json)
# Enforce IDs
outfit_data['outfit_id'] = safe_slug
outfit_data['outfit_name'] = name
# Ensure required fields exist
if 'wardrobe' not in outfit_data:
outfit_data['wardrobe'] = {
"full_body": "",
"headwear": "",
"top": "",
"bottom": "",
"legwear": "",
"footwear": "",
"hands": "",
"accessories": ""
}
if 'lora' not in outfit_data:
outfit_data['lora'] = {
"lora_name": "",
"lora_weight": 0.8,
"lora_triggers": ""
}
if 'tags' not in outfit_data:
outfit_data['tags'] = []
except Exception as e:
print(f"LLM error: {e}")
flash(f"Failed to generate outfit profile: {e}")
return redirect(request.url)
else:
# Create blank outfit template
outfit_data = {
"outfit_id": safe_slug,
"outfit_name": name,
"wardrobe": {
"full_body": "",
"headwear": "",
"top": "",
"bottom": "",
"legwear": "",
"footwear": "",
"hands": "",
"accessories": ""
},
"lora": {
"lora_name": "",
"lora_weight": 0.8,
"lora_triggers": ""
},
"tags": []
}
try:
# Save file
file_path = os.path.join(app.config['CLOTHING_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(outfit_data, f, indent=2)
# Add to DB
new_outfit = Outfit(
outfit_id=safe_slug,
slug=safe_slug,
filename=f"{safe_slug}.json",
name=name,
data=outfit_data
)
db.session.add(new_outfit)
db.session.commit()
flash('Outfit created successfully!')
return redirect(url_for('outfit_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create outfit: {e}")
return redirect(request.url)
return render_template('outfits/create.html')
@app.route('/outfit/<path:slug>/save_defaults', methods=['POST'])
def save_outfit_defaults(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
outfit.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this outfit!')
return redirect(url_for('outfit_detail', slug=slug))
@app.route('/outfit/<path:slug>/clone', methods=['POST'])
def clone_outfit(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
# Find the next available number for the clone
base_id = outfit.outfit_id
# Extract base name without number suffix
import re
match = re.match(r'^(.+?)_(\d+)$', base_id)
if match:
base_name = match.group(1)
current_num = int(match.group(2))
else:
base_name = base_id
current_num = 1
# Find next available number
next_num = current_num + 1
while True:
new_id = f"{base_name}_{next_num:02d}"
new_filename = f"{new_id}.json"
new_path = os.path.join(app.config['CLOTHING_DIR'], new_filename)
if not os.path.exists(new_path):
break
next_num += 1
# Create new outfit data (copy of original)
new_data = outfit.data.copy()
new_data['outfit_id'] = new_id
new_data['outfit_name'] = f"{outfit.name} (Copy)"
# Save the new JSON file
with open(new_path, 'w') as f:
json.dump(new_data, f, indent=2)
# Create new outfit in database
new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id)
new_outfit = Outfit(
outfit_id=new_id,
slug=new_slug,
filename=new_filename,
name=new_data['outfit_name'],
data=new_data
)
db.session.add(new_outfit)
db.session.commit()
flash(f'Outfit cloned as "{new_id}"!')
return redirect(url_for('outfit_detail', slug=new_slug))
@app.route('/outfit/<path:slug>/save_json', methods=['POST'])
def save_outfit_json(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
outfit.data = new_data
flag_modified(outfit, 'data')
db.session.commit()
if outfit.filename:
file_path = os.path.join(app.config['CLOTHING_DIR'], outfit.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
# ============ ACTION ROUTES ============
@app.route('/actions')
def actions_index():
actions = Action.query.order_by(Action.name).all()
return render_template('actions/index.html', actions=actions)
@app.route('/actions/rescan', methods=['POST'])
def rescan_actions():
sync_actions()
flash('Database synced with action files.')
return redirect(url_for('actions_index'))
@app.route('/action/<path:slug>')
def action_detail(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Load state from session
preferences = session.get(f'prefs_action_{slug}')
preview_image = session.get(f'preview_action_{slug}')
selected_character = session.get(f'char_action_{slug}')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"actions/{slug}/{f}" for f in files]
return render_template('actions/detail.html', action=action, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character, existing_previews=existing_previews)
@app.route('/action/<path:slug>/edit', methods=['GET', 'POST'])
def edit_action(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
loras = get_available_action_loras()
if request.method == 'POST':
try:
# 1. Update basic fields
action.name = request.form.get('action_name')
# 2. Rebuild the data dictionary
new_data = action.data.copy()
new_data['action_name'] = action.name
# Update action_id if provided
new_action_id = request.form.get('action_id', action.action_id)
new_data['action_id'] = new_action_id
# Update action section
if 'action' in new_data:
for key in new_data['action'].keys():
form_key = f"action_{key}"
if form_key in request.form:
new_data['action'][key] = request.form.get(form_key)
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t]
action.data = new_data
flag_modified(action, "data")
# 3. Write back to JSON file
action_file = action.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', action.action_id)}.json"
file_path = os.path.join(app.config['ACTIONS_DIR'], action_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Action profile updated successfully!')
return redirect(url_for('action_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('actions/edit.html', action=action, loras=loras)
@app.route('/action/<path:slug>/upload', methods=['POST'])
def upload_action_image(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create action subfolder
action_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}")
os.makedirs(action_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(action_folder, filename)
file.save(file_path)
# Store relative path in DB
action.image_path = f"actions/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('action_detail', slug=slug))
@app.route('/action/<path:slug>/generate', methods=['POST'])
def generate_action_image(slug):
action_obj = Action.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action_type = request.form.get('action', 'preview')
client_id = request.form.get('client_id')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = None
# Handle random character selection
if character_slug == '__random__':
all_characters = Character.query.all()
if all_characters:
character = random.choice(all_characters)
character_slug = character.slug
elif character_slug:
character = Character.query.filter_by(slug=character_slug).first()
# Save preferences
session[f'char_action_{slug}'] = character_slug
session[f'prefs_action_{slug}'] = selected_fields
# Build combined data for prompt building
if character:
# Combine character identity/wardrobe with action details
# Action details replace character's 'defaults' (pose, etc.)
combined_data = character.data.copy()
# Update 'defaults' with action details
action_data = action_obj.data.get('action', {})
combined_data['action'] = action_data # Ensure action section is present for routing
combined_data['participants'] = action_obj.data.get('participants', {}) # Add participants
# Aggregate pose-related fields into 'pose'
pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet']
pose_parts = [action_data.get(k) for k in pose_fields if action_data.get(k)]
# Aggregate expression-related fields into 'expression'
expression_parts = [action_data.get(k) for k in ['head', 'eyes'] if action_data.get(k)]
combined_data['defaults'] = {
'pose': ", ".join(pose_parts),
'expression': ", ".join(expression_parts),
'scene': action_data.get('additional', '')
}
# Merge lora triggers if present
action_lora = action_obj.data.get('lora', {})
if action_lora.get('lora_triggers'):
if 'lora' not in combined_data: combined_data['lora'] = {}
combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {action_lora['lora_triggers']}"
# Merge tags
combined_data['tags'] = list(set(combined_data.get('tags', []) + action_obj.data.get('tags', [])))
# Use action's defaults if no manual selection
if not selected_fields:
selected_fields = list(action_obj.default_fields) if action_obj.default_fields else []
# Auto-include essential character fields if a character is selected
if selected_fields:
# Add character identity fields to selection if not already present
for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']:
if character.data.get('identity', {}).get(key):
field_key = f'identity::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
# Always include character name
if 'special::name' not in selected_fields:
selected_fields.append('special::name')
# Add active wardrobe fields
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if wardrobe.get(key):
field_key = f'wardrobe::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
else:
# Fallback to sensible defaults if still empty (no checkboxes and no action defaults)
selected_fields = ['special::name', 'defaults::pose', 'defaults::expression']
# Add identity fields
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
# Add wardrobe fields
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
default_fields = action_obj.default_fields
active_outfit = character.active_outfit
else:
# Action only - no character (rarely makes sense for actions but let's handle it)
action_data = action_obj.data.get('action', {})
# Aggregate pose-related fields into 'pose'
pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet']
pose_parts = [action_data.get(k) for k in pose_fields if action_data.get(k)]
# Aggregate expression-related fields into 'expression'
expression_parts = [action_data.get(k) for k in ['head', 'eyes'] if action_data.get(k)]
combined_data = {
'character_id': action_obj.action_id,
'defaults': {
'pose': ", ".join(pose_parts),
'expression': ", ".join(expression_parts),
'scene': action_data.get('additional', '')
},
'lora': action_obj.data.get('lora', {}),
'tags': action_obj.data.get('tags', [])
}
if not selected_fields:
selected_fields = ['defaults::pose', 'defaults::expression', 'defaults::scene', 'lora::lora_triggers', 'special::tags']
default_fields = action_obj.default_fields
active_outfit = 'default'
# Queue generation
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# Build prompts for combined data
prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit)
# Handle multiple female characters
participants = action_obj.data.get('participants', {})
orientation = participants.get('orientation', '')
f_count = orientation.upper().count('F')
if f_count > 1:
# We need f_count - 1 additional characters
num_extras = f_count - 1
# Get all characters excluding the current one
query = Character.query
if character:
query = query.filter(Character.id != character.id)
all_others = query.all()
if len(all_others) >= num_extras:
extras = random.sample(all_others, num_extras)
for extra_char in extras:
extra_parts = []
# Identity
ident = extra_char.data.get('identity', {})
for key in ['base_specs', 'hair', 'eyes', 'extra']:
val = ident.get(key)
if val:
# Remove 1girl/solo
val = re.sub(r'\b(1girl|1boy|solo)\b', '', val).replace(', ,', ',').strip(', ')
extra_parts.append(val)
# Wardrobe (active outfit)
wardrobe = extra_char.get_active_wardrobe()
for key in ['top', 'headwear', 'legwear', 'footwear', 'accessories']:
val = wardrobe.get(key)
if val:
extra_parts.append(val)
# Append to main prompt
if extra_parts:
prompts["main"] += ", " + ", ".join(extra_parts)
print(f"Added extra character: {extra_char.name}")
# Add colored simple background to the main prompt for action previews
if character:
primary_color = character.data.get('styles', {}).get('primary_color', '')
if primary_color:
prompts["main"] = f"{prompts['main']}, {primary_color} simple background"
else:
prompts["main"] = f"{prompts['main']}, simple background"
else:
prompts["main"] = f"{prompts['main']}, simple background"
# Prepare workflow
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, action=action_obj, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
prompt_response = queue_prompt(workflow, client_id=client_id)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'prompt_id': prompt_id}
return redirect(url_for('action_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('action_detail', slug=slug))
@app.route('/action/<path:slug>/finalize_generation/<prompt_id>', methods=['POST'])
def finalize_action_generation(slug, prompt_id):
action_obj = Action.query.filter_by(slug=slug).first_or_404()
action = request.form.get('action', 'preview')
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'History not found'}, 404
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
# Create action subfolder
action_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}")
os.makedirs(action_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(action_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
# Always save as preview
relative_path = f"actions/{slug}/{filename}"
session[f'preview_action_{slug}'] = relative_path
session.modified = True # Ensure session is saved for JSON response
# If action is 'replace', also update the action's cover image immediately
if action == 'replace':
action_obj.image_path = relative_path
db.session.commit()
return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')}
return {'error': 'No image found in output'}, 404
except Exception as e:
print(f"Finalize error: {e}")
return {'error': str(e)}, 500
@app.route('/action/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_action_cover_from_preview(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_action_{slug}')
if preview_path:
action.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('action_detail', slug=slug))
@app.route('/action/<path:slug>/save_defaults', methods=['POST'])
def save_action_defaults(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
action.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this action!')
return redirect(url_for('action_detail', slug=slug))
@app.route('/actions/bulk_create', methods=['POST'])
def bulk_create_actions_from_loras():
actions_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Poses/'
if not os.path.exists(actions_lora_dir):
flash('Actions LoRA directory not found.', 'error')
return redirect(url_for('actions_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('action_system.txt')
if not system_prompt:
flash('Action system prompt file not found.', 'error')
return redirect(url_for('actions_index'))
for filename in os.listdir(actions_lora_dir):
if filename.endswith('.safetensors'):
name_base = filename.rsplit('.', 1)[0]
action_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
action_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{action_id}.json"
json_path = os.path.join(app.config['ACTIONS_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(actions_lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
# Strip HTML tags but keep text content for LLM context
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe action: {action_name}")
prompt = f"Describe an action/pose for an AI image generation model based on the LoRA filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
# Clean response
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
action_data = json.loads(clean_json)
# Enforce system values while preserving LLM-extracted metadata
action_data['action_id'] = action_id
action_data['action_name'] = action_name
# Update lora dict safely
if 'lora' not in action_data: action_data['lora'] = {}
action_data['lora']['lora_name'] = f"Illustrious/Poses/{filename}"
# Fallbacks if LLM failed to extract metadata
if not action_data['lora'].get('lora_triggers'):
action_data['lora']['lora_triggers'] = name_base
if action_data['lora'].get('lora_weight') is None:
action_data['lora']['lora_weight'] = 1.0
if action_data['lora'].get('lora_weight_min') is None:
action_data['lora']['lora_weight_min'] = 0.7
if action_data['lora'].get('lora_weight_max') is None:
action_data['lora']['lora_weight_max'] = 1.0
with open(json_path, 'w') as f:
json.dump(action_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
# Small delay to avoid API rate limits if many files
time.sleep(0.5)
except Exception as e:
print(f"Error creating action for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_actions()
msg = f'Successfully processed actions: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No actions created or overwritten. {skipped_count} existing actions found.')
return redirect(url_for('actions_index'))
@app.route('/action/create', methods=['GET', 'POST'])
def create_action():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
prompt = request.form.get('prompt', '')
use_llm = request.form.get('use_llm') == 'on'
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'action'
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['ACTIONS_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
if use_llm:
if not prompt:
flash("Description is required when AI generation is enabled.")
return redirect(request.url)
system_prompt = load_prompt('action_system.txt')
if not system_prompt:
flash("Action system prompt file not found.")
return redirect(request.url)
try:
llm_response = call_llm(f"Create an action profile for '{name}' based on this description: {prompt}", system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
action_data = json.loads(clean_json)
action_data['action_id'] = safe_slug
action_data['action_name'] = name
except Exception as e:
print(f"LLM error: {e}")
flash(f"Failed to generate action profile: {e}")
return redirect(request.url)
else:
action_data = {
"action_id": safe_slug,
"action_name": name,
"action": {
"full_body": "", "head": "", "eyes": "", "arms": "", "hands": "",
"torso": "", "pelvis": "", "legs": "", "feet": "", "additional": ""
},
"lora": {"lora_name": "", "lora_weight": 1.0, "lora_triggers": ""},
"tags": []
}
try:
file_path = os.path.join(app.config['ACTIONS_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(action_data, f, indent=2)
new_action = Action(
action_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json",
name=name, data=action_data
)
db.session.add(new_action)
db.session.commit()
flash('Action created successfully!')
return redirect(url_for('action_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create action: {e}")
return redirect(request.url)
return render_template('actions/create.html')
@app.route('/action/<path:slug>/clone', methods=['POST'])
def clone_action(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
# Find the next available number for the clone
base_id = action.action_id
import re
match = re.match(r'^(.+?)_(\d+)$', base_id)
if match:
base_name = match.group(1)
current_num = int(match.group(2))
else:
base_name = base_id
current_num = 1
next_num = current_num + 1
while True:
new_id = f"{base_name}_{next_num:02d}"
new_filename = f"{new_id}.json"
new_path = os.path.join(app.config['ACTIONS_DIR'], new_filename)
if not os.path.exists(new_path):
break
next_num += 1
new_data = action.data.copy()
new_data['action_id'] = new_id
new_data['action_name'] = f"{action.name} (Copy)"
with open(new_path, 'w') as f:
json.dump(new_data, f, indent=2)
new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id)
new_action = Action(
action_id=new_id, slug=new_slug, filename=new_filename,
name=new_data['action_name'], data=new_data
)
db.session.add(new_action)
db.session.commit()
flash(f'Action cloned as "{new_id}"!')
return redirect(url_for('action_detail', slug=new_slug))
@app.route('/action/<path:slug>/save_json', methods=['POST'])
def save_action_json(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
action.data = new_data
flag_modified(action, 'data')
db.session.commit()
if action.filename:
file_path = os.path.join(app.config['ACTIONS_DIR'], action.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
# ============ STYLE ROUTES ============
@app.route('/styles')
def styles_index():
styles = Style.query.order_by(Style.name).all()
return render_template('styles/index.html', styles=styles)
@app.route('/styles/rescan', methods=['POST'])
def rescan_styles():
sync_styles()
flash('Database synced with style files.')
return redirect(url_for('styles_index'))
@app.route('/style/<path:slug>')
def style_detail(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Load state from session
preferences = session.get(f'prefs_style_{slug}')
preview_image = session.get(f'preview_style_{slug}')
selected_character = session.get(f'char_style_{slug}')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"styles/{slug}/{f}" for f in files]
return render_template('styles/detail.html', style=style, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character, existing_previews=existing_previews)
@app.route('/style/<path:slug>/edit', methods=['GET', 'POST'])
def edit_style(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
loras = get_available_style_loras()
if request.method == 'POST':
try:
# 1. Update basic fields
style.name = request.form.get('style_name')
# 2. Rebuild the data dictionary
new_data = style.data.copy()
new_data['style_name'] = style.name
# Update style section
if 'style' in new_data:
for key in new_data['style'].keys():
form_key = f"style_{key}"
if form_key in request.form:
new_data['style'][key] = request.form.get(form_key)
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
style.data = new_data
flag_modified(style, "data")
# 3. Write back to JSON file
style_file = style.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', style.style_id)}.json"
file_path = os.path.join(app.config['STYLES_DIR'], style_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Style updated successfully!')
return redirect(url_for('style_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('styles/edit.html', style=style, loras=loras)
@app.route('/style/<path:slug>/upload', methods=['POST'])
def upload_style_image(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create style subfolder
style_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}")
os.makedirs(style_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(style_folder, filename)
file.save(file_path)
# Store relative path in DB
style.image_path = f"styles/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('style_detail', slug=slug))
def _queue_style_generation(style_obj, character=None, selected_fields=None, client_id=None):
if character:
combined_data = character.data.copy()
combined_data['character_id'] = character.character_id
combined_data['style'] = style_obj.data.get('style', {})
# Merge style lora triggers if present
style_lora = style_obj.data.get('lora', {})
if style_lora.get('lora_triggers'):
if 'lora' not in combined_data: combined_data['lora'] = {}
combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {style_lora['lora_triggers']}"
# Merge character identity and wardrobe fields into selected_fields
if selected_fields:
# Add character identity fields to selection if not already present
for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']:
if character.data.get('identity', {}).get(key):
field_key = f'identity::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
# Always include character name
if 'special::name' not in selected_fields:
selected_fields.append('special::name')
# Add active wardrobe fields
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if wardrobe.get(key):
field_key = f'wardrobe::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
else:
# Auto-include essential character fields
selected_fields = []
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
selected_fields.append('special::name')
# Add active wardrobe
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
# Add style fields
selected_fields.extend(['style::artist_name', 'style::artistic_style', 'lora::lora_triggers'])
default_fields = style_obj.default_fields
active_outfit = character.active_outfit
else:
combined_data = {
'character_id': style_obj.style_id,
'style': style_obj.data.get('style', {}),
'lora': style_obj.data.get('lora', {}),
'tags': style_obj.data.get('tags', [])
}
if not selected_fields:
selected_fields = ['style::artist_name', 'style::artistic_style', 'lora::lora_triggers']
default_fields = style_obj.default_fields
active_outfit = 'default'
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit)
if character:
primary_color = character.data.get('styles', {}).get('primary_color', '')
if primary_color:
prompts["main"] = f"{prompts['main']}, {primary_color} simple background"
else:
prompts["main"] = f"{prompts['main']}, simple background"
else:
prompts["main"] = f"{prompts['main']}, simple background"
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, style=style_obj, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
return queue_prompt(workflow, client_id=client_id)
@app.route('/style/<path:slug>/generate', methods=['POST'])
def generate_style_image(slug):
style_obj = Style.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
client_id = request.form.get('client_id')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = None
if character_slug == '__random__':
all_characters = Character.query.all()
if all_characters:
character = random.choice(all_characters)
character_slug = character.slug
elif character_slug:
character = Character.query.filter_by(slug=character_slug).first()
# Save preferences
session[f'char_style_{slug}'] = character_slug
session[f'prefs_style_{slug}'] = selected_fields
# Queue generation using helper
prompt_response = _queue_style_generation(style_obj, character, selected_fields, client_id=client_id)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'prompt_id': prompt_id}
return redirect(url_for('style_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('style_detail', slug=slug))
@app.route('/style/<path:slug>/finalize_generation/<prompt_id>', methods=['POST'])
def finalize_style_generation(slug, prompt_id):
style_obj = Style.query.filter_by(slug=slug).first_or_404()
action = request.form.get('action', 'preview')
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'History not found'}, 404
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
style_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}")
os.makedirs(style_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(style_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
relative_path = f"styles/{slug}/{filename}"
session[f'preview_style_{slug}'] = relative_path
session.modified = True # Ensure session is saved for JSON response
# If action is 'replace', also update the style's cover image immediately
if action == 'replace':
style_obj.image_path = relative_path
db.session.commit()
return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')}
return {'error': 'No image found in output'}, 404
except Exception as e:
print(f"Finalize error: {e}")
return {'error': str(e)}, 500
@app.route('/style/<path:slug>/save_defaults', methods=['POST'])
def save_style_defaults(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
style.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this style!')
return redirect(url_for('style_detail', slug=slug))
@app.route('/style/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_style_cover_from_preview(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_style_{slug}')
if preview_path:
style.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('style_detail', slug=slug))
@app.route('/get_missing_styles')
def get_missing_styles():
missing = Style.query.filter((Style.image_path == None) | (Style.image_path == '')).all()
return {'missing': [{'slug': s.slug, 'name': s.name} for s in missing]}
@app.route('/get_missing_detailers')
def get_missing_detailers():
missing = Detailer.query.filter((Detailer.image_path == None) | (Detailer.image_path == '')).all()
return {'missing': [{'slug': d.slug, 'name': d.name} for d in missing]}
@app.route('/clear_all_detailer_covers', methods=['POST'])
def clear_all_detailer_covers():
detailers = Detailer.query.all()
for detailer in detailers:
detailer.image_path = None
db.session.commit()
return {'success': True}
@app.route('/clear_all_style_covers', methods=['POST'])
def clear_all_style_covers():
styles = Style.query.all()
for style in styles:
style.image_path = None
db.session.commit()
return {'success': True}
@app.route('/styles/generate_missing', methods=['POST'])
def generate_missing_styles():
def get_missing_count():
return Style.query.filter((Style.image_path == None) | (Style.image_path == '')).count()
if get_missing_count() == 0:
flash("No styles missing cover images.")
return redirect(url_for('styles_index'))
# Get all characters once to pick from
all_characters = Character.query.all()
if not all_characters:
flash("No characters available to preview styles with.", "error")
return redirect(url_for('styles_index'))
success_count = 0
while get_missing_count() > 0:
style_obj = Style.query.filter((Style.image_path == None) | (Style.image_path == '')).order_by(Style.name).first()
if not style_obj: break
# Pick a random character for each style for variety
character = random.choice(all_characters)
style_slug = style_obj.slug
try:
print(f"Batch generating style: {style_obj.name} with character {character.name}")
prompt_response = _queue_style_generation(style_obj, character=character)
prompt_id = prompt_response['prompt_id']
max_retries = 120
while max_retries > 0:
history = get_history(prompt_id)
if prompt_id in history:
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
style_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{style_slug}")
os.makedirs(style_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(style_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
style_to_update = Style.query.filter_by(slug=style_slug).first()
if style_to_update:
style_to_update.image_path = f"styles/{style_slug}/{filename}"
db.session.commit()
success_count += 1
break
break
time.sleep(2)
max_retries -= 1
except Exception as e:
print(f"Error generating for style {style_obj.name}: {e}")
db.session.rollback()
flash(f"Batch style generation complete. Generated {success_count} images.")
return redirect(url_for('styles_index'))
@app.route('/styles/bulk_create', methods=['POST'])
def bulk_create_styles_from_loras():
styles_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Styles/'
if not os.path.exists(styles_lora_dir):
flash('Styles LoRA directory not found.', 'error')
return redirect(url_for('styles_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('style_system.txt')
if not system_prompt:
flash('Style system prompt file not found.', 'error')
return redirect(url_for('styles_index'))
for filename in os.listdir(styles_lora_dir):
if filename.endswith('.safetensors'):
name_base = filename.rsplit('.', 1)[0]
style_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
style_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{style_id}.json"
json_path = os.path.join(app.config['STYLES_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(styles_lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe style: {style_name}")
prompt = f"Describe an art style or artist LoRA for AI image generation based on the filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
style_data = json.loads(clean_json)
style_data['style_id'] = style_id
style_data['style_name'] = style_name
if 'lora' not in style_data: style_data['lora'] = {}
style_data['lora']['lora_name'] = f"Illustrious/Styles/{filename}"
if not style_data['lora'].get('lora_triggers'):
style_data['lora']['lora_triggers'] = name_base
if style_data['lora'].get('lora_weight') is None:
style_data['lora']['lora_weight'] = 1.0
if style_data['lora'].get('lora_weight_min') is None:
style_data['lora']['lora_weight_min'] = 0.7
if style_data['lora'].get('lora_weight_max') is None:
style_data['lora']['lora_weight_max'] = 1.0
with open(json_path, 'w') as f:
json.dump(style_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
time.sleep(0.5)
except Exception as e:
print(f"Error creating style for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_styles()
msg = f'Successfully processed styles: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No styles created or overwritten. {skipped_count} existing styles found.')
return redirect(url_for('styles_index'))
@app.route('/style/create', methods=['GET', 'POST'])
def create_style():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'style'
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['STYLES_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
style_data = {
"style_id": safe_slug,
"style_name": name,
"style": {
"artist_name": "",
"artistic_style": ""
},
"lora": {
"lora_name": "",
"lora_weight": 1.0,
"lora_triggers": ""
}
}
try:
file_path = os.path.join(app.config['STYLES_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(style_data, f, indent=2)
new_style = Style(
style_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json",
name=name, data=style_data
)
db.session.add(new_style)
db.session.commit()
flash('Style created successfully!')
return redirect(url_for('style_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create style: {e}")
return redirect(request.url)
return render_template('styles/create.html')
@app.route('/style/<path:slug>/clone', methods=['POST'])
def clone_style(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
base_id = style.style_id
import re
match = re.match(r'^(.+?)_(\d+)$', base_id)
if match:
base_name = match.group(1)
current_num = int(match.group(2))
else:
base_name = base_id
current_num = 1
next_num = current_num + 1
while True:
new_id = f"{base_name}_{next_num:02d}"
new_filename = f"{new_id}.json"
new_path = os.path.join(app.config['STYLES_DIR'], new_filename)
if not os.path.exists(new_path):
break
next_num += 1
new_data = style.data.copy()
new_data['style_id'] = new_id
new_data['style_name'] = f"{style.name} (Copy)"
with open(new_path, 'w') as f:
json.dump(new_data, f, indent=2)
new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id)
new_style = Style(
style_id=new_id, slug=new_slug, filename=new_filename,
name=new_data['style_name'], data=new_data
)
db.session.add(new_style)
db.session.commit()
flash(f'Style cloned as "{new_id}"!')
return redirect(url_for('style_detail', slug=new_slug))
@app.route('/style/<path:slug>/save_json', methods=['POST'])
def save_style_json(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
style.data = new_data
flag_modified(style, 'data')
db.session.commit()
if style.filename:
file_path = os.path.join(app.config['STYLES_DIR'], style.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
# ============ SCENE ROUTES ============
@app.route('/scenes')
def scenes_index():
scenes = Scene.query.order_by(Scene.name).all()
return render_template('scenes/index.html', scenes=scenes)
@app.route('/scenes/rescan', methods=['POST'])
def rescan_scenes():
sync_scenes()
flash('Database synced with scene files.')
return redirect(url_for('scenes_index'))
@app.route('/scene/<path:slug>')
def scene_detail(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Load state from session
preferences = session.get(f'prefs_scene_{slug}')
preview_image = session.get(f'preview_scene_{slug}')
selected_character = session.get(f'char_scene_{slug}')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"scenes/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"scenes/{slug}/{f}" for f in files]
return render_template('scenes/detail.html', scene=scene, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character, existing_previews=existing_previews)
@app.route('/scene/<path:slug>/edit', methods=['GET', 'POST'])
def edit_scene(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
loras = get_available_scene_loras()
if request.method == 'POST':
try:
# 1. Update basic fields
scene.name = request.form.get('scene_name')
# 2. Rebuild the data dictionary
new_data = scene.data.copy()
new_data['scene_name'] = scene.name
# Update scene section
if 'scene' in new_data:
for key in new_data['scene'].keys():
form_key = f"scene_{key}"
if form_key in request.form:
val = request.form.get(form_key)
# Handle list for furniture/colors if they were originally lists
if key in ['furniture', 'colors'] and isinstance(new_data['scene'][key], list):
val = [v.strip() for v in val.split(',') if v.strip()]
new_data['scene'][key] = val
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for t in tags_raw.split(',') if t.strip()]
scene.data = new_data
flag_modified(scene, "data")
# 3. Write back to JSON file
scene_file = scene.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', scene.scene_id)}.json"
file_path = os.path.join(app.config['SCENES_DIR'], scene_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Scene updated successfully!')
return redirect(url_for('scene_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('scenes/edit.html', scene=scene, loras=loras)
@app.route('/scene/<path:slug>/upload', methods=['POST'])
def upload_scene_image(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create scene subfolder
scene_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"scenes/{slug}")
os.makedirs(scene_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(scene_folder, filename)
file.save(file_path)
# Store relative path in DB
scene.image_path = f"scenes/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('scene_detail', slug=slug))
def _queue_scene_generation(scene_obj, character=None, selected_fields=None, client_id=None):
if character:
combined_data = character.data.copy()
combined_data['character_id'] = character.character_id
# Update character's 'defaults' with scene details
scene_data = scene_obj.data.get('scene', {})
# Build scene tag string
scene_tags = []
for key in ['background', 'foreground', 'furniture', 'colors', 'lighting', 'theme']:
val = scene_data.get(key)
if val:
if isinstance(val, list):
scene_tags.extend(val)
else:
scene_tags.append(val)
combined_data['defaults']['scene'] = ", ".join(scene_tags)
# Merge scene lora triggers if present
scene_lora = scene_obj.data.get('lora', {})
if scene_lora.get('lora_triggers'):
if 'lora' not in combined_data: combined_data['lora'] = {}
combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {scene_lora['lora_triggers']}"
# Merge character identity and wardrobe fields into selected_fields
if selected_fields:
# Add character identity fields to selection if not already present
for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']:
if character.data.get('identity', {}).get(key):
field_key = f'identity::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
# Always include character name
if 'special::name' not in selected_fields:
selected_fields.append('special::name')
# Add active wardrobe fields
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if wardrobe.get(key):
field_key = f'wardrobe::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
else:
# Auto-include essential character fields
selected_fields = []
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
selected_fields.append('special::name')
# Add active wardrobe
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
# Add scene fields
selected_fields.extend(['defaults::scene', 'lora::lora_triggers'])
default_fields = scene_obj.default_fields
active_outfit = character.active_outfit
else:
# Scene only - no character
scene_data = scene_obj.data.get('scene', {})
scene_tags = []
for key in ['background', 'foreground', 'furniture', 'colors', 'lighting', 'theme']:
val = scene_data.get(key)
if val:
if isinstance(val, list): scene_tags.extend(val)
else: scene_tags.append(val)
combined_data = {
'character_id': scene_obj.scene_id,
'defaults': {
'scene': ", ".join(scene_tags)
},
'lora': scene_obj.data.get('lora', {}),
'tags': scene_obj.data.get('tags', [])
}
if not selected_fields:
selected_fields = ['defaults::scene', 'lora::lora_triggers']
default_fields = scene_obj.default_fields
active_outfit = 'default'
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit)
# For scene generation, we want to ensure Node 20 is handled in _prepare_workflow
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, scene=scene_obj, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
return queue_prompt(workflow, client_id=client_id)
@app.route('/scene/<path:slug>/generate', methods=['POST'])
def generate_scene_image(slug):
scene_obj = Scene.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
client_id = request.form.get('client_id')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = None
if character_slug == '__random__':
all_characters = Character.query.all()
if all_characters:
character = random.choice(all_characters)
character_slug = character.slug
elif character_slug:
character = Character.query.filter_by(slug=character_slug).first()
# Save preferences
session[f'char_scene_{slug}'] = character_slug
session[f'prefs_scene_{slug}'] = selected_fields
# Queue generation using helper
prompt_response = _queue_scene_generation(scene_obj, character, selected_fields, client_id=client_id)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'prompt_id': prompt_id}
return redirect(url_for('scene_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('scene_detail', slug=slug))
@app.route('/scene/<path:slug>/finalize_generation/<prompt_id>', methods=['POST'])
def finalize_scene_generation(slug, prompt_id):
scene_obj = Scene.query.filter_by(slug=slug).first_or_404()
action = request.form.get('action', 'preview')
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'History not found'}, 404
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
scene_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"scenes/{slug}")
os.makedirs(scene_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(scene_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
relative_path = f"scenes/{slug}/{filename}"
session[f'preview_scene_{slug}'] = relative_path
session.modified = True # Ensure session is saved for JSON response
# If action is 'replace', also update the scene's cover image immediately
if action == 'replace':
scene_obj.image_path = relative_path
db.session.commit()
return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')}
return {'error': 'No image found in output'}, 404
except Exception as e:
print(f"Finalize error: {e}")
return {'error': str(e)}, 500
@app.route('/scene/<path:slug>/save_defaults', methods=['POST'])
def save_scene_defaults(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
scene.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this scene!')
return redirect(url_for('scene_detail', slug=slug))
@app.route('/scene/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_scene_cover_from_preview(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_scene_{slug}')
if preview_path:
scene.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('scene_detail', slug=slug))
@app.route('/scenes/bulk_create', methods=['POST'])
def bulk_create_scenes_from_loras():
backgrounds_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Backgrounds/'
if not os.path.exists(backgrounds_lora_dir):
flash('Backgrounds LoRA directory not found.', 'error')
return redirect(url_for('scenes_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('scene_system.txt')
if not system_prompt:
flash('Scene system prompt file not found.', 'error')
return redirect(url_for('scenes_index'))
for filename in os.listdir(backgrounds_lora_dir):
if filename.endswith('.safetensors'):
name_base = filename.rsplit('.', 1)[0]
scene_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
scene_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{scene_id}.json"
json_path = os.path.join(app.config['SCENES_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(backgrounds_lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
# Strip HTML tags but keep text content for LLM context
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe scene: {scene_name}")
prompt = f"Describe a scene for an AI image generation model based on the LoRA filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
# Clean response
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
scene_data = json.loads(clean_json)
# Enforce system values while preserving LLM-extracted metadata
scene_data['scene_id'] = scene_id
scene_data['scene_name'] = scene_name
if 'lora' not in scene_data: scene_data['lora'] = {}
scene_data['lora']['lora_name'] = f"Illustrious/Backgrounds/{filename}"
if not scene_data['lora'].get('lora_triggers'):
scene_data['lora']['lora_triggers'] = name_base
if scene_data['lora'].get('lora_weight') is None:
scene_data['lora']['lora_weight'] = 1.0
if scene_data['lora'].get('lora_weight_min') is None:
scene_data['lora']['lora_weight_min'] = 0.7
if scene_data['lora'].get('lora_weight_max') is None:
scene_data['lora']['lora_weight_max'] = 1.0
with open(json_path, 'w') as f:
json.dump(scene_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
# Small delay to avoid API rate limits if many files
time.sleep(0.5)
except Exception as e:
print(f"Error creating scene for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_scenes()
msg = f'Successfully processed scenes: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No scenes created or overwritten. {skipped_count} existing scenes found.')
return redirect(url_for('scenes_index'))
@app.route('/scene/create', methods=['GET', 'POST'])
def create_scene():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'scene'
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['SCENES_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
scene_data = {
"scene_id": safe_slug,
"scene_name": name,
"scene": {
"background": "",
"foreground": "",
"furniture": [],
"colors": [],
"lighting": "",
"theme": ""
},
"lora": {
"lora_name": "",
"lora_weight": 1.0,
"lora_triggers": ""
}
}
try:
file_path = os.path.join(app.config['SCENES_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(scene_data, f, indent=2)
new_scene = Scene(
scene_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json",
name=name, data=scene_data
)
db.session.add(new_scene)
db.session.commit()
flash('Scene created successfully!')
return redirect(url_for('scene_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create scene: {e}")
return redirect(request.url)
return render_template('scenes/create.html')
@app.route('/scene/<path:slug>/clone', methods=['POST'])
def clone_scene(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
base_id = scene.scene_id
import re
match = re.match(r'^(.+?)_(\d+)$', base_id)
if match:
base_name = match.group(1)
current_num = int(match.group(2))
else:
base_name = base_id
current_num = 1
next_num = current_num + 1
while True:
new_id = f"{base_name}_{next_num:02d}"
new_filename = f"{new_id}.json"
new_path = os.path.join(app.config['SCENES_DIR'], new_filename)
if not os.path.exists(new_path):
break
next_num += 1
new_data = scene.data.copy()
new_data['scene_id'] = new_id
new_data['scene_name'] = f"{scene.name} (Copy)"
with open(new_path, 'w') as f:
json.dump(new_data, f, indent=2)
new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id)
new_scene = Scene(
scene_id=new_id, slug=new_slug, filename=new_filename,
name=new_data['scene_name'], data=new_data
)
db.session.add(new_scene)
db.session.commit()
flash(f'Scene cloned as "{new_id}"!')
return redirect(url_for('scene_detail', slug=new_slug))
@app.route('/scene/<path:slug>/save_json', methods=['POST'])
def save_scene_json(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
scene.data = new_data
flag_modified(scene, 'data')
db.session.commit()
if scene.filename:
file_path = os.path.join(app.config['SCENES_DIR'], scene.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
# ============ DETAILER ROUTES ============
@app.route('/detailers')
def detailers_index():
detailers = Detailer.query.order_by(Detailer.name).all()
return render_template('detailers/index.html', detailers=detailers)
@app.route('/detailers/rescan', methods=['POST'])
def rescan_detailers():
sync_detailers()
flash('Database synced with detailer files.')
return redirect(url_for('detailers_index'))
@app.route('/detailer/<path:slug>')
def detailer_detail(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
actions = Action.query.order_by(Action.name).all()
# Load state from session
preferences = session.get(f'prefs_detailer_{slug}')
preview_image = session.get(f'preview_detailer_{slug}')
selected_character = session.get(f'char_detailer_{slug}')
selected_action = session.get(f'action_detailer_{slug}')
extra_positive = session.get(f'extra_pos_detailer_{slug}', '')
extra_negative = session.get(f'extra_neg_detailer_{slug}', '')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"detailers/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"detailers/{slug}/{f}" for f in files]
return render_template('detailers/detail.html', detailer=detailer, characters=characters,
actions=actions, preferences=preferences, preview_image=preview_image,
selected_character=selected_character, selected_action=selected_action,
extra_positive=extra_positive, extra_negative=extra_negative,
existing_previews=existing_previews)
@app.route('/detailer/<path:slug>/edit', methods=['GET', 'POST'])
def edit_detailer(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
loras = get_available_detailer_loras()
if request.method == 'POST':
try:
# 1. Update basic fields
detailer.name = request.form.get('detailer_name')
# 2. Rebuild the data dictionary
new_data = detailer.data.copy()
new_data['detailer_name'] = detailer.name
# Update prompt (stored as a plain string)
new_data['prompt'] = request.form.get('detailer_prompt', '')
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for t in tags_raw.split(',') if t.strip()]
detailer.data = new_data
flag_modified(detailer, "data")
# 3. Write back to JSON file
detailer_file = detailer.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', detailer.detailer_id)}.json"
file_path = os.path.join(app.config['DETAILERS_DIR'], detailer_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Detailer updated successfully!')
return redirect(url_for('detailer_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('detailers/edit.html', detailer=detailer, loras=loras)
@app.route('/detailer/<path:slug>/upload', methods=['POST'])
def upload_detailer_image(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create detailer subfolder
detailer_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"detailers/{slug}")
os.makedirs(detailer_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(detailer_folder, filename)
file.save(file_path)
# Store relative path in DB
detailer.image_path = f"detailers/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('detailer_detail', slug=slug))
def _queue_detailer_generation(detailer_obj, character=None, selected_fields=None, client_id=None, action=None, extra_positive=None, extra_negative=None):
if character:
combined_data = character.data.copy()
combined_data['character_id'] = character.character_id
# Merge detailer prompt into character's tags
detailer_prompt = detailer_obj.data.get('prompt', '')
if detailer_prompt:
if 'tags' not in combined_data: combined_data['tags'] = []
combined_data['tags'].append(detailer_prompt)
# Merge detailer lora triggers if present
detailer_lora = detailer_obj.data.get('lora', {})
if detailer_lora.get('lora_triggers'):
if 'lora' not in combined_data: combined_data['lora'] = {}
combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {detailer_lora['lora_triggers']}"
# Merge character identity and wardrobe fields into selected_fields
if selected_fields:
# Add character identity fields to selection if not already present
for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']:
if character.data.get('identity', {}).get(key):
field_key = f'identity::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
# Always include character name
if 'special::name' not in selected_fields:
selected_fields.append('special::name')
# Add active wardrobe fields
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if wardrobe.get(key):
field_key = f'wardrobe::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
else:
# Auto-include essential character fields
selected_fields = []
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
selected_fields.append('special::name')
# Add active wardrobe
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
# Add detailer fields
selected_fields.extend(['special::tags', 'lora::lora_triggers'])
default_fields = detailer_obj.default_fields
active_outfit = character.active_outfit
else:
# Detailer only - no character
detailer_prompt = detailer_obj.data.get('prompt', '')
detailer_tags = [detailer_prompt] if detailer_prompt else []
combined_data = {
'character_id': detailer_obj.detailer_id,
'tags': detailer_tags,
'lora': detailer_obj.data.get('lora', {}),
}
if not selected_fields:
selected_fields = ['special::tags', 'lora::lora_triggers']
default_fields = detailer_obj.default_fields
active_outfit = 'default'
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit)
# Add colored simple background to the main prompt for detailer previews
if character:
primary_color = character.data.get('styles', {}).get('primary_color', '')
if primary_color:
prompts["main"] = f"{prompts['main']}, {primary_color} simple background"
else:
prompts["main"] = f"{prompts['main']}, simple background"
else:
prompts["main"] = f"{prompts['main']}, simple background"
if extra_positive:
prompts["main"] = f"{prompts['main']}, {extra_positive}"
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, detailer=detailer_obj, action=action, custom_negative=extra_negative or None, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
return queue_prompt(workflow, client_id=client_id)
@app.route('/detailer/<path:slug>/generate', methods=['POST'])
def generate_detailer_image(slug):
detailer_obj = Detailer.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
client_id = request.form.get('client_id')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = None
if character_slug == '__random__':
all_characters = Character.query.all()
if all_characters:
character = random.choice(all_characters)
character_slug = character.slug
elif character_slug:
character = Character.query.filter_by(slug=character_slug).first()
# Get selected action (if any)
action_slug = request.form.get('action_slug', '')
action = Action.query.filter_by(slug=action_slug).first() if action_slug else None
# Get additional prompts
extra_positive = request.form.get('extra_positive', '').strip()
extra_negative = request.form.get('extra_negative', '').strip()
# Save preferences
session[f'char_detailer_{slug}'] = character_slug
session[f'action_detailer_{slug}'] = action_slug
session[f'extra_pos_detailer_{slug}'] = extra_positive
session[f'extra_neg_detailer_{slug}'] = extra_negative
session[f'prefs_detailer_{slug}'] = selected_fields
# Queue generation using helper
prompt_response = _queue_detailer_generation(detailer_obj, character, selected_fields, client_id=client_id, action=action, extra_positive=extra_positive, extra_negative=extra_negative)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'prompt_id': prompt_id}
return redirect(url_for('detailer_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('detailer_detail', slug=slug))
@app.route('/detailer/<path:slug>/finalize_generation/<prompt_id>', methods=['POST'])
def finalize_detailer_generation(slug, prompt_id):
detailer_obj = Detailer.query.filter_by(slug=slug).first_or_404()
action = request.form.get('action', 'preview')
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'History not found'}, 404
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
detailer_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"detailers/{slug}")
os.makedirs(detailer_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(detailer_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
relative_path = f"detailers/{slug}/{filename}"
session[f'preview_detailer_{slug}'] = relative_path
session.modified = True # Ensure session is saved for JSON response
# If action is 'replace', also update the detailer's cover image immediately
if action == 'replace':
detailer_obj.image_path = relative_path
db.session.commit()
return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')}
return {'error': 'No image found in output'}, 404
except Exception as e:
print(f"Finalize error: {e}")
return {'error': str(e)}, 500
@app.route('/detailer/<path:slug>/save_defaults', methods=['POST'])
def save_detailer_defaults(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
detailer.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this detailer!')
return redirect(url_for('detailer_detail', slug=slug))
@app.route('/detailer/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_detailer_cover_from_preview(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_detailer_{slug}')
if preview_path:
detailer.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('detailer_detail', slug=slug))
@app.route('/detailer/<path:slug>/save_json', methods=['POST'])
def save_detailer_json(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
detailer.data = new_data
flag_modified(detailer, 'data')
db.session.commit()
if detailer.filename:
file_path = os.path.join(app.config['DETAILERS_DIR'], detailer.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
@app.route('/detailers/bulk_create', methods=['POST'])
def bulk_create_detailers_from_loras():
detailers_lora_dir = '/mnt/alexander/AITools/Image Models/lora/Illustrious/Detailers/'
if not os.path.exists(detailers_lora_dir):
flash('Detailers LoRA directory not found.', 'error')
return redirect(url_for('detailers_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('detailer_system.txt')
if not system_prompt:
flash('Detailer system prompt file not found.', 'error')
return redirect(url_for('detailers_index'))
for filename in os.listdir(detailers_lora_dir):
if filename.endswith('.safetensors'):
name_base = filename.rsplit('.', 1)[0]
detailer_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
detailer_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{detailer_id}.json"
json_path = os.path.join(app.config['DETAILERS_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(detailers_lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe detailer: {detailer_name}")
prompt = f"Describe a detailer LoRA for AI image generation based on the filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
detailer_data = json.loads(clean_json)
detailer_data['detailer_id'] = detailer_id
detailer_data['detailer_name'] = detailer_name
if 'lora' not in detailer_data: detailer_data['lora'] = {}
detailer_data['lora']['lora_name'] = f"Illustrious/Detailers/{filename}"
if not detailer_data['lora'].get('lora_triggers'):
detailer_data['lora']['lora_triggers'] = name_base
if detailer_data['lora'].get('lora_weight') is None:
detailer_data['lora']['lora_weight'] = 1.0
if detailer_data['lora'].get('lora_weight_min') is None:
detailer_data['lora']['lora_weight_min'] = 0.7
if detailer_data['lora'].get('lora_weight_max') is None:
detailer_data['lora']['lora_weight_max'] = 1.0
with open(json_path, 'w') as f:
json.dump(detailer_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
# Small delay to avoid API rate limits if many files
time.sleep(0.5)
except Exception as e:
print(f"Error creating detailer for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_detailers()
msg = f'Successfully processed detailers: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No new detailers created or overwritten. {skipped_count} existing detailers found.')
return redirect(url_for('detailers_index'))
@app.route('/detailer/create', methods=['GET', 'POST'])
def create_detailer():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'detailer'
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['DETAILERS_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
detailer_data = {
"detailer_id": safe_slug,
"detailer_name": name,
"prompt": "",
"lora": {
"lora_name": "",
"lora_weight": 1.0,
"lora_triggers": ""
}
}
try:
file_path = os.path.join(app.config['DETAILERS_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(detailer_data, f, indent=2)
new_detailer = Detailer(
detailer_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json",
name=name, data=detailer_data
)
db.session.add(new_detailer)
db.session.commit()
flash('Detailer created successfully!')
return redirect(url_for('detailer_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create detailer: {e}")
return redirect(request.url)
return render_template('detailers/create.html')
# ---------------------------------------------------------------------------
# Checkpoints
# ---------------------------------------------------------------------------
@app.route('/checkpoints')
def checkpoints_index():
checkpoints = Checkpoint.query.order_by(Checkpoint.name).all()
return render_template('checkpoints/index.html', checkpoints=checkpoints)
@app.route('/checkpoints/rescan', methods=['POST'])
def rescan_checkpoints():
sync_checkpoints()
flash('Checkpoint list synced from disk.')
return redirect(url_for('checkpoints_index'))
@app.route('/checkpoint/<path:slug>')
def checkpoint_detail(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
preview_image = session.get(f'preview_checkpoint_{slug}')
selected_character = session.get(f'char_checkpoint_{slug}')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"checkpoints/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"checkpoints/{slug}/{f}" for f in files]
return render_template('checkpoints/detail.html', ckpt=ckpt, characters=characters,
preview_image=preview_image, selected_character=selected_character,
existing_previews=existing_previews)
@app.route('/checkpoint/<path:slug>/upload', methods=['POST'])
def upload_checkpoint_image(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(url_for('checkpoint_detail', slug=slug))
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(url_for('checkpoint_detail', slug=slug))
if file and allowed_file(file.filename):
folder = os.path.join(app.config['UPLOAD_FOLDER'], f"checkpoints/{slug}")
os.makedirs(folder, exist_ok=True)
filename = secure_filename(file.filename)
file.save(os.path.join(folder, filename))
ckpt.image_path = f"checkpoints/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('checkpoint_detail', slug=slug))
def _apply_checkpoint_settings(workflow, ckpt_data):
"""Apply checkpoint-specific sampler/prompt/VAE settings to the workflow."""
steps = ckpt_data.get('steps')
cfg = ckpt_data.get('cfg')
sampler_name = ckpt_data.get('sampler_name')
scheduler = ckpt_data.get('scheduler')
base_positive = ckpt_data.get('base_positive', '')
base_negative = ckpt_data.get('base_negative', '')
vae = ckpt_data.get('vae', 'integrated')
# KSampler (node 3)
if steps and '3' in workflow:
workflow['3']['inputs']['steps'] = int(steps)
if cfg and '3' in workflow:
workflow['3']['inputs']['cfg'] = float(cfg)
if sampler_name and '3' in workflow:
workflow['3']['inputs']['sampler_name'] = sampler_name
if scheduler and '3' in workflow:
workflow['3']['inputs']['scheduler'] = scheduler
# Face/hand detailers (nodes 11, 13)
for node_id in ['11', '13']:
if node_id in workflow:
if steps:
workflow[node_id]['inputs']['steps'] = int(steps)
if cfg:
workflow[node_id]['inputs']['cfg'] = float(cfg)
if sampler_name:
workflow[node_id]['inputs']['sampler_name'] = sampler_name
if scheduler:
workflow[node_id]['inputs']['scheduler'] = scheduler
# Prepend base_positive to positive prompts (main + face/hand detailers)
if base_positive:
for node_id in ['6', '14', '15']:
if node_id in workflow:
workflow[node_id]['inputs']['text'] = f"{base_positive}, {workflow[node_id]['inputs']['text']}"
# Append base_negative to negative prompt (shared by main + detailers via node 7)
if base_negative and '7' in workflow:
workflow['7']['inputs']['text'] = f"{workflow['7']['inputs']['text']}, {base_negative}"
# VAE: if not integrated, inject a VAELoader node and rewire
if vae and vae != 'integrated':
workflow['21'] = {
'inputs': {'vae_name': vae},
'class_type': 'VAELoader'
}
if '8' in workflow:
workflow['8']['inputs']['vae'] = ['21', 0]
for node_id in ['11', '13']:
if node_id in workflow:
workflow[node_id]['inputs']['vae'] = ['21', 0]
return workflow
def _queue_checkpoint_generation(ckpt_obj, character=None, client_id=None):
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
if character:
combined_data = character.data.copy()
combined_data['character_id'] = character.character_id
selected_fields = []
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
selected_fields.append('special::name')
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'top', 'bottom']:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
prompts = build_prompt(combined_data, selected_fields, None, active_outfit=character.active_outfit)
primary_color = character.data.get('styles', {}).get('primary_color', '')
prompts["main"] = f"{prompts['main']}, {primary_color + ' ' if primary_color else ''}simple background"
else:
prompts = {
"main": "masterpiece, best quality, 1girl, solo, simple background, looking at viewer",
"face": "masterpiece, best quality",
"hand": "masterpiece, best quality",
}
workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_obj.checkpoint_path,
checkpoint_data=ckpt_obj.data or {})
return queue_prompt(workflow, client_id=client_id)
@app.route('/checkpoint/<path:slug>/generate', methods=['POST'])
def generate_checkpoint_image(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
try:
client_id = request.form.get('client_id')
character_slug = request.form.get('character_slug', '')
character = None
if character_slug == '__random__':
all_characters = Character.query.all()
if all_characters:
character = random.choice(all_characters)
character_slug = character.slug
elif character_slug:
character = Character.query.filter_by(slug=character_slug).first()
session[f'char_checkpoint_{slug}'] = character_slug
prompt_response = _queue_checkpoint_generation(ckpt, character, client_id=client_id)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'prompt_id': prompt_id}
return redirect(url_for('checkpoint_detail', slug=slug))
except Exception as e:
print(f"Checkpoint generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('checkpoint_detail', slug=slug))
@app.route('/checkpoint/<path:slug>/finalize_generation/<prompt_id>', methods=['POST'])
def finalize_checkpoint_generation(slug, prompt_id):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
action = request.form.get('action', 'preview')
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'History not found'}, 404
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
folder = os.path.join(app.config['UPLOAD_FOLDER'], f"checkpoints/{slug}")
os.makedirs(folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
with open(os.path.join(folder, filename), 'wb') as f:
f.write(image_data)
relative_path = f"checkpoints/{slug}/{filename}"
session[f'preview_checkpoint_{slug}'] = relative_path
session.modified = True
if action == 'replace':
ckpt.image_path = relative_path
db.session.commit()
return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')}
return {'error': 'No image found in output'}, 404
except Exception as e:
print(f"Finalize checkpoint error: {e}")
return {'error': str(e)}, 500
@app.route('/checkpoint/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_checkpoint_cover_from_preview(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_checkpoint_{slug}')
if preview_path:
ckpt.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('checkpoint_detail', slug=slug))
@app.route('/checkpoint/<path:slug>/save_json', methods=['POST'])
def save_checkpoint_json(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
ckpt.data = new_data
flag_modified(ckpt, 'data')
db.session.commit()
checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints')
file_path = os.path.join(checkpoints_dir, f'{ckpt.slug}.json')
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
@app.route('/get_missing_checkpoints')
def get_missing_checkpoints():
missing = Checkpoint.query.filter((Checkpoint.image_path == None) | (Checkpoint.image_path == '')).all()
return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]}
@app.route('/clear_all_checkpoint_covers', methods=['POST'])
def clear_all_checkpoint_covers():
for ckpt in Checkpoint.query.all():
ckpt.image_path = None
db.session.commit()
return {'success': True}
@app.route('/checkpoints/bulk_create', methods=['POST'])
def bulk_create_checkpoints():
checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints')
os.makedirs(checkpoints_dir, exist_ok=True)
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('checkpoint_system.txt')
if not system_prompt:
flash('Checkpoint system prompt file not found.', 'error')
return redirect(url_for('checkpoints_index'))
dirs = [
(app.config.get('ILLUSTRIOUS_MODELS_DIR', ''), 'Illustrious'),
(app.config.get('NOOB_MODELS_DIR', ''), 'Noob'),
]
for dirpath, family in dirs:
if not dirpath or not os.path.exists(dirpath):
continue
for filename in sorted(os.listdir(dirpath)):
if not (filename.endswith('.safetensors') or filename.endswith('.ckpt')):
continue
checkpoint_path = f"{family}/{filename}"
name_base = filename.rsplit('.', 1)[0]
safe_id = re.sub(r'[^a-zA-Z0-9_]', '_', checkpoint_path.rsplit('.', 1)[0]).lower().strip('_')
json_filename = f"{safe_id}.json"
json_path = os.path.join(checkpoints_dir, json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
# Look for a matching HTML file alongside the model file
html_path = os.path.join(dirpath, f"{name_base}.html")
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML for {filename}: {e}")
defaults = _default_checkpoint_data(checkpoint_path, filename)
if html_content:
try:
print(f"Asking LLM to describe checkpoint: {filename}")
prompt = (
f"Generate checkpoint metadata JSON for the model file: '{filename}' "
f"(checkpoint_path: '{checkpoint_path}').\n\n"
f"Here is descriptive text extracted from an associated HTML file:\n###\n{html_content[:3000]}\n###"
)
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
ckpt_data = json.loads(clean_json)
# Enforce fixed fields
ckpt_data['checkpoint_path'] = checkpoint_path
ckpt_data['checkpoint_name'] = filename
# Fill missing fields with defaults
for key, val in defaults.items():
if key not in ckpt_data or ckpt_data[key] is None:
ckpt_data[key] = val
time.sleep(0.5)
except Exception as e:
print(f"LLM error for {filename}: {e}. Using defaults.")
ckpt_data = defaults
else:
ckpt_data = defaults
try:
with open(json_path, 'w') as f:
json.dump(ckpt_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
except Exception as e:
print(f"Error saving JSON for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_checkpoints()
msg = f'Successfully processed checkpoints: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No checkpoints created or overwritten. {skipped_count} existing entries found.')
return redirect(url_for('checkpoints_index'))
# ============ LOOK ROUTES ============
@app.route('/looks')
def looks_index():
looks = Look.query.order_by(Look.name).all()
return render_template('looks/index.html', looks=looks)
@app.route('/looks/rescan', methods=['POST'])
def rescan_looks():
sync_looks()
flash('Database synced with look files.')
return redirect(url_for('looks_index'))
@app.route('/look/<path:slug>')
def look_detail(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Pre-select the linked character if set
preferences = session.get(f'prefs_look_{slug}')
preview_image = session.get(f'preview_look_{slug}')
selected_character = session.get(f'char_look_{slug}', look.character_id or '')
return render_template('looks/detail.html', look=look, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character)
@app.route('/look/<path:slug>/edit', methods=['GET', 'POST'])
def edit_look(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
loras = get_available_loras()
if request.method == 'POST':
look.name = request.form.get('look_name', look.name)
character_id = request.form.get('character_id', '')
look.character_id = character_id if character_id else None
new_data = look.data.copy()
new_data['look_name'] = look.name
new_data['character_id'] = look.character_id
new_data['positive'] = request.form.get('positive', '')
new_data['negative'] = request.form.get('negative', '')
lora_name = request.form.get('lora_lora_name', '')
lora_weight = float(request.form.get('lora_lora_weight', 1.0) or 1.0)
lora_triggers = request.form.get('lora_lora_triggers', '')
new_data['lora'] = {'lora_name': lora_name, 'lora_weight': lora_weight, 'lora_triggers': lora_triggers}
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data['lora'][bound] = float(val_str)
except ValueError:
pass
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for t in tags_raw.split(',') if t.strip()]
look.data = new_data
flag_modified(look, 'data')
db.session.commit()
if look.filename:
file_path = os.path.join(app.config['LOOKS_DIR'], look.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
flash(f'Look "{look.name}" updated!')
return redirect(url_for('look_detail', slug=look.slug))
return render_template('looks/edit.html', look=look, characters=characters, loras=loras)
@app.route('/look/<path:slug>/upload', methods=['POST'])
def upload_look_image(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file selected')
return redirect(url_for('look_detail', slug=slug))
file = request.files['image']
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
look_folder = os.path.join(app.config['UPLOAD_FOLDER'], f'looks/{slug}')
os.makedirs(look_folder, exist_ok=True)
file_path = os.path.join(look_folder, filename)
file.save(file_path)
look.image_path = f'looks/{slug}/{filename}'
db.session.commit()
return redirect(url_for('look_detail', slug=slug))
@app.route('/look/<path:slug>/generate', methods=['POST'])
def generate_look_image(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
try:
action = request.form.get('action', 'preview')
client_id = request.form.get('client_id')
selected_fields = request.form.getlist('include_field')
character_slug = request.form.get('character_slug', '')
character = None
# Only load a character when the user explicitly selects one
if character_slug == '__random__':
all_characters = Character.query.all()
if all_characters:
character = random.choice(all_characters)
character_slug = character.slug
elif character_slug:
character = Character.query.filter_by(slug=character_slug).first()
if not character:
character = Character.query.filter_by(character_id=character_slug).first()
# No fallback to look.character_id — looks are self-contained
session[f'prefs_look_{slug}'] = selected_fields
session[f'char_look_{slug}'] = character_slug
lora_triggers = look.data.get('lora', {}).get('lora_triggers', '')
look_positive = look.data.get('positive', '')
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
if character:
# Merge character identity with look LoRA and positive prompt
combined_data = {
'character_id': character.character_id,
'identity': character.data.get('identity', {}),
'defaults': character.data.get('defaults', {}),
'wardrobe': character.data.get('wardrobe', {}).get(character.active_outfit or 'default',
character.data.get('wardrobe', {}).get('default', {})),
'styles': character.data.get('styles', {}),
'lora': look.data.get('lora', {}),
'tags': look.data.get('tags', [])
}
for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']:
if character.data.get('identity', {}).get(key):
field_key = f'identity::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
for key in ['expression', 'pose']:
if character.data.get('defaults', {}).get(key):
field_key = f'defaults::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
if 'special::name' not in selected_fields:
selected_fields.append('special::name')
prompts = build_prompt(combined_data, selected_fields, character.default_fields)
# Append look-specific triggers and positive
extra = ', '.join(filter(None, [lora_triggers, look_positive]))
if extra:
prompts['main'] = _dedup_tags(f"{prompts['main']}, {extra}" if prompts['main'] else extra)
primary_color = character.data.get('styles', {}).get('primary_color', '')
bg = f"{primary_color} simple background" if primary_color else "simple background"
else:
# Look is self-contained: build prompt from its own positive and triggers only
main = _dedup_tags(', '.join(filter(None, ['(solo:1.2)', lora_triggers, look_positive])))
prompts = {'main': main, 'face': '', 'hand': ''}
bg = "simple background"
prompts['main'] = _dedup_tags(f"{prompts['main']}, {bg}" if prompts['main'] else bg)
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_path,
checkpoint_data=ckpt_data, look=look)
prompt_response = queue_prompt(workflow, client_id=client_id)
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI failed: {prompt_response.get('error', 'Unknown error')}")
prompt_id = prompt_response['prompt_id']
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'prompt_id': prompt_id}
return redirect(url_for('look_detail', slug=slug))
except Exception as e:
print(f"Look generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('look_detail', slug=slug))
@app.route('/look/<path:slug>/finalize_generation/<prompt_id>', methods=['POST'])
def finalize_look_generation(slug, prompt_id):
look = Look.query.filter_by(slug=slug).first_or_404()
action = request.form.get('action', 'preview')
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'History not found'}, 404
outputs = history[prompt_id]['outputs']
for node_id in outputs:
if 'images' in outputs[node_id]:
image_info = outputs[node_id]['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
look_folder = os.path.join(app.config['UPLOAD_FOLDER'], f'looks/{slug}')
os.makedirs(look_folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
file_path = os.path.join(look_folder, filename)
with open(file_path, 'wb') as f:
f.write(image_data)
relative_path = f'looks/{slug}/{filename}'
session[f'preview_look_{slug}'] = relative_path
session.modified = True
if action == 'replace':
look.image_path = relative_path
db.session.commit()
return {'success': True, 'image_url': url_for('static', filename=f'uploads/{relative_path}')}
return {'error': 'No image found in output'}, 404
except Exception as e:
print(f"Finalize look error: {e}")
return {'error': str(e)}, 500
@app.route('/look/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_look_cover_from_preview(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_look_{slug}')
if preview_path:
look.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('look_detail', slug=slug))
@app.route('/look/<path:slug>/save_defaults', methods=['POST'])
def save_look_defaults(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
look.default_fields = request.form.getlist('include_field')
db.session.commit()
flash('Default prompt selection saved!')
return redirect(url_for('look_detail', slug=slug))
@app.route('/look/<path:slug>/save_json', methods=['POST'])
def save_look_json(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
look.data = new_data
look.character_id = new_data.get('character_id', look.character_id)
flag_modified(look, 'data')
db.session.commit()
if look.filename:
file_path = os.path.join(app.config['LOOKS_DIR'], look.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
@app.route('/look/create', methods=['GET', 'POST'])
def create_look():
characters = Character.query.order_by(Character.name).all()
loras = get_available_loras()
if request.method == 'POST':
name = request.form.get('name', '').strip()
look_id = re.sub(r'[^a-zA-Z0-9_]', '_', name.lower().replace(' ', '_'))
filename = f'{look_id}.json'
file_path = os.path.join(app.config['LOOKS_DIR'], filename)
character_id = request.form.get('character_id', '') or None
lora_name = request.form.get('lora_lora_name', '')
lora_weight = float(request.form.get('lora_lora_weight', 1.0) or 1.0)
lora_triggers = request.form.get('lora_lora_triggers', '')
positive = request.form.get('positive', '')
negative = request.form.get('negative', '')
tags = [t.strip() for t in request.form.get('tags', '').split(',') if t.strip()]
data = {
'look_id': look_id,
'look_name': name,
'character_id': character_id,
'positive': positive,
'negative': negative,
'lora': {'lora_name': lora_name, 'lora_weight': lora_weight, 'lora_triggers': lora_triggers},
'tags': tags
}
os.makedirs(app.config['LOOKS_DIR'], exist_ok=True)
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
slug = re.sub(r'[^a-zA-Z0-9_]', '', look_id)
new_look = Look(look_id=look_id, slug=slug, filename=filename, name=name,
character_id=character_id, data=data)
db.session.add(new_look)
db.session.commit()
flash(f'Look "{name}" created!')
return redirect(url_for('look_detail', slug=slug))
return render_template('looks/create.html', characters=characters, loras=loras)
@app.route('/get_missing_looks')
def get_missing_looks():
missing = Look.query.filter((Look.image_path == None) | (Look.image_path == '')).all()
return {'missing': [{'slug': l.slug, 'name': l.name} for l in missing]}
@app.route('/clear_all_look_covers', methods=['POST'])
def clear_all_look_covers():
looks = Look.query.all()
for look in looks:
look.image_path = None
db.session.commit()
return {'success': True}
@app.route('/looks/bulk_create', methods=['POST'])
def bulk_create_looks_from_loras():
lora_dir = app.config['LORA_DIR']
if not os.path.exists(lora_dir):
flash('Looks LoRA directory not found.', 'error')
return redirect(url_for('looks_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('look_system.txt')
if not system_prompt:
flash('Look system prompt file not found.', 'error')
return redirect(url_for('looks_index'))
for filename in os.listdir(lora_dir):
if not filename.endswith('.safetensors'):
continue
name_base = filename.rsplit('.', 1)[0]
look_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
look_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{look_id}.json"
json_path = os.path.join(app.config['LOOKS_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe look: {look_name}")
prompt = f"Create a look profile for a character appearance LoRA based on the filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text extracted from an associated HTML file:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
look_data = json.loads(clean_json)
look_data['look_id'] = look_id
look_data['look_name'] = look_name
if 'lora' not in look_data:
look_data['lora'] = {}
look_data['lora']['lora_name'] = f"Illustrious/Looks/{filename}"
if not look_data['lora'].get('lora_triggers'):
look_data['lora']['lora_triggers'] = name_base
if look_data['lora'].get('lora_weight') is None:
look_data['lora']['lora_weight'] = 0.8
if look_data['lora'].get('lora_weight_min') is None:
look_data['lora']['lora_weight_min'] = 0.7
if look_data['lora'].get('lora_weight_max') is None:
look_data['lora']['lora_weight_max'] = 1.0
os.makedirs(app.config['LOOKS_DIR'], exist_ok=True)
with open(json_path, 'w') as f:
json.dump(look_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
time.sleep(0.5)
except Exception as e:
print(f"Error creating look for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_looks()
msg = f'Successfully processed looks: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No looks created or overwritten. {skipped_count} existing entries found.')
return redirect(url_for('looks_index'))
# ---------------------------------------------------------------------------
# Gallery
# ---------------------------------------------------------------------------
GALLERY_CATEGORIES = ['characters', 'actions', 'outfits', 'scenes', 'styles', 'detailers', 'checkpoints']
_MODEL_MAP = {
'characters': Character,
'actions': Action,
'outfits': Outfit,
'scenes': Scene,
'styles': Style,
'detailers': Detailer,
'checkpoints': Checkpoint,
}
def _scan_gallery_images(category_filter='all', slug_filter=''):
"""Return sorted list of image dicts from the uploads directory."""
upload_folder = app.config['UPLOAD_FOLDER']
images = []
cats = GALLERY_CATEGORIES if category_filter == 'all' else [category_filter]
for cat in cats:
cat_folder = os.path.join(upload_folder, cat)
if not os.path.isdir(cat_folder):
continue
try:
slugs = os.listdir(cat_folder)
except OSError:
continue
for item_slug in slugs:
if slug_filter and slug_filter != item_slug:
continue
item_folder = os.path.join(cat_folder, item_slug)
if not os.path.isdir(item_folder):
continue
try:
files = os.listdir(item_folder)
except OSError:
continue
for filename in files:
if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
continue
try:
ts = int(filename.replace('gen_', '').rsplit('.', 1)[0])
except ValueError:
ts = 0
images.append({
'path': f"{cat}/{item_slug}/{filename}",
'category': cat,
'slug': item_slug,
'filename': filename,
'timestamp': ts,
})
images.sort(key=lambda x: x['timestamp'], reverse=True)
return images
def _enrich_with_names(images):
"""Add item_name field to each image dict, querying DB once per category."""
by_cat = {}
for img in images:
by_cat.setdefault(img['category'], set()).add(img['slug'])
name_map = {}
for cat, slugs in by_cat.items():
Model = _MODEL_MAP.get(cat)
if not Model:
continue
items = Model.query.filter(Model.slug.in_(slugs)).with_entities(Model.slug, Model.name).all()
for slug, name in items:
name_map[(cat, slug)] = name
for img in images:
img['item_name'] = name_map.get((img['category'], img['slug']), img['slug'])
return images
@app.route('/gallery')
def gallery():
category = request.args.get('category', 'all')
slug = request.args.get('slug', '')
sort = request.args.get('sort', 'newest')
page = max(1, int(request.args.get('page', 1)))
per_page = int(request.args.get('per_page', 48))
per_page = per_page if per_page in (24, 48, 96) else 48
images = _scan_gallery_images(category, slug)
if sort == 'oldest':
images.reverse()
total = len(images)
total_pages = max(1, (total + per_page - 1) // per_page)
page = min(page, total_pages)
page_images = images[(page - 1) * per_page: page * per_page]
_enrich_with_names(page_images)
slug_options = []
if category != 'all':
Model = _MODEL_MAP.get(category)
if Model:
slug_options = [(r.slug, r.name) for r in Model.query.order_by(Model.name).with_entities(Model.slug, Model.name).all()]
return render_template(
'gallery.html',
images=page_images,
page=page,
per_page=per_page,
total=total,
total_pages=total_pages,
category=category,
slug=slug,
sort=sort,
categories=GALLERY_CATEGORIES,
slug_options=slug_options,
)
def _parse_comfy_png_metadata(image_path):
"""Read ComfyUI generation metadata from a PNG's tEXt 'prompt' chunk.
Returns a dict with keys: positive, negative, checkpoint, loras,
seed, steps, cfg, sampler, scheduler. Any missing field is None/[].
"""
from PIL import Image as PilImage
result = {
'positive': None,
'negative': None,
'checkpoint': None,
'loras': [], # list of {name, strength}
'seed': None,
'steps': None,
'cfg': None,
'sampler': None,
'scheduler': None,
}
try:
with PilImage.open(image_path) as im:
raw = im.info.get('prompt')
if not raw:
return result
nodes = json.loads(raw)
except Exception:
return result
for node in nodes.values():
ct = node.get('class_type', '')
inp = node.get('inputs', {})
if ct == 'KSampler':
result['seed'] = inp.get('seed')
result['steps'] = inp.get('steps')
result['cfg'] = inp.get('cfg')
result['sampler'] = inp.get('sampler_name')
result['scheduler'] = inp.get('scheduler')
elif ct == 'CheckpointLoaderSimple':
result['checkpoint'] = inp.get('ckpt_name')
elif ct == 'CLIPTextEncode':
# Identify positive vs negative by which KSampler input they connect to.
# Simpler heuristic: node "6" = positive, node "7" = negative (our fixed workflow).
# But to be robust, we check both via node graph references where possible.
# Fallback: first CLIPTextEncode = positive, second = negative.
text = inp.get('text', '')
if result['positive'] is None:
result['positive'] = text
elif result['negative'] is None:
result['negative'] = text
elif ct == 'LoraLoader':
name = inp.get('lora_name', '')
if name:
result['loras'].append({
'name': name,
'strength': inp.get('strength_model', 1.0),
})
# Re-parse with fixed node IDs from the known workflow (more reliable)
try:
if '6' in nodes:
result['positive'] = nodes['6']['inputs'].get('text', result['positive'])
if '7' in nodes:
result['negative'] = nodes['7']['inputs'].get('text', result['negative'])
except Exception:
pass
return result
@app.route('/gallery/prompt-data')
def gallery_prompt_data():
"""Return generation metadata for a specific image by reading its PNG tEXt chunk."""
img_path = request.args.get('path', '')
if not img_path:
return {'error': 'path parameter required'}, 400
# Validate path stays within uploads folder
upload_folder = os.path.abspath(app.config['UPLOAD_FOLDER'])
abs_img = os.path.abspath(os.path.join(upload_folder, img_path))
if not abs_img.startswith(upload_folder + os.sep):
return {'error': 'Invalid path'}, 400
if not os.path.isfile(abs_img):
return {'error': 'File not found'}, 404
meta = _parse_comfy_png_metadata(abs_img)
meta['path'] = img_path
return meta
@app.route('/gallery/delete', methods=['POST'])
def gallery_delete():
"""Delete a generated image from the gallery. Only the image file is removed."""
data = request.get_json(silent=True) or {}
img_path = data.get('path', '')
if not img_path:
return {'error': 'path required'}, 400
if len(img_path.split('/')) != 3:
return {'error': 'invalid path format'}, 400
upload_folder = os.path.abspath(app.config['UPLOAD_FOLDER'])
abs_img = os.path.abspath(os.path.join(upload_folder, img_path))
if not abs_img.startswith(upload_folder + os.sep):
return {'error': 'Invalid path'}, 400
if os.path.isfile(abs_img):
os.remove(abs_img)
return {'status': 'ok'}
@app.route('/resource/<category>/<slug>/delete', methods=['POST'])
def resource_delete(category, slug):
"""Delete a resource item from a category gallery.
soft: removes JSON data file + DB record; LoRA/checkpoint file kept on disk.
hard: removes JSON data file + LoRA/checkpoint safetensors + DB record.
"""
_RESOURCE_MODEL_MAP = {
'looks': Look,
'styles': Style,
'actions': Action,
'outfits': Outfit,
'scenes': Scene,
'detailers': Detailer,
'checkpoints': Checkpoint,
}
_RESOURCE_DATA_DIRS = {
'looks': app.config['LOOKS_DIR'],
'styles': app.config['STYLES_DIR'],
'actions': app.config['ACTIONS_DIR'],
'outfits': app.config['CLOTHING_DIR'],
'scenes': app.config['SCENES_DIR'],
'detailers': app.config['DETAILERS_DIR'],
'checkpoints': app.config['CHECKPOINTS_DIR'],
}
_LORA_BASE = '/mnt/alexander/AITools/Image Models/lora/'
if category not in _RESOURCE_MODEL_MAP:
return {'error': 'unknown category'}, 400
req = request.get_json(silent=True) or {}
mode = req.get('mode', 'soft')
data_dir = _RESOURCE_DATA_DIRS[category]
json_path = os.path.join(data_dir, f'{slug}.json')
deleted = []
asset_abs = None
# Resolve asset path before deleting JSON (hard only)
if mode == 'hard' and os.path.isfile(json_path):
try:
with open(json_path) as f:
item_data = json.load(f)
if category == 'checkpoints':
ckpt_rel = item_data.get('checkpoint_path', '')
if ckpt_rel.startswith('Illustrious/'):
asset_abs = os.path.join(app.config['ILLUSTRIOUS_MODELS_DIR'],
ckpt_rel[len('Illustrious/'):])
elif ckpt_rel.startswith('Noob/'):
asset_abs = os.path.join(app.config['NOOB_MODELS_DIR'],
ckpt_rel[len('Noob/'):])
else:
lora_name = item_data.get('lora', {}).get('lora_name', '')
if lora_name:
asset_abs = os.path.join(_LORA_BASE, lora_name)
except Exception:
pass
# Delete JSON
if os.path.isfile(json_path):
os.remove(json_path)
deleted.append('json')
# Delete LoRA/checkpoint file (hard only)
if mode == 'hard' and asset_abs and os.path.isfile(asset_abs):
os.remove(asset_abs)
deleted.append('lora' if category != 'checkpoints' else 'checkpoint')
# Remove DB record
Model = _RESOURCE_MODEL_MAP[category]
rec = Model.query.filter_by(slug=slug).first()
if rec:
db.session.delete(rec)
db.session.commit()
deleted.append('db')
return {'status': 'ok', 'deleted': deleted}
# ---------------------------------------------------------------------------
# Strengths Gallery
# ---------------------------------------------------------------------------
_STRENGTHS_MODEL_MAP = {
'characters': Character,
'looks': Look,
'outfits': Outfit,
'actions': Action,
'styles': Style,
'scenes': Scene,
'detailers': Detailer,
}
# Which ComfyUI LoRA node each category occupies
_CATEGORY_LORA_NODES = {
'characters': '16',
'looks': '16',
'outfits': '17',
'actions': '18',
'styles': '19',
'scenes': '19',
'detailers': '19',
}
def _build_strengths_prompts(category, entity, character, action=None, extra_positive=''):
"""Build main/face/hand prompt strings for the Strengths Gallery.
Only includes prompt *content* from the entity and (optionally) the
character. LoRA triggers from other nodes are intentionally excluded
so the result reflects only the swept LoRA's contribution.
action — optional Action model object (used for detailer category)
extra_positive — additional free-text to append to the main prompt
"""
if category == 'characters':
# The entity IS the character — build its full prompt normally
return build_prompt(entity.data, [], entity.default_fields)
if category == 'looks':
# Start with linked character data, prepend Look positive tags
base = build_prompt(character.data, [], character.default_fields) if character else {'main': '', 'face': '', 'hand': ''}
look_pos = entity.data.get('positive', '')
look_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
prefix_parts = [p for p in [look_triggers, look_pos] if p]
prefix = ', '.join(prefix_parts)
if prefix:
base['main'] = f"{prefix}, {base['main']}" if base['main'] else prefix
return base
if category == 'outfits':
wardrobe = entity.data.get('wardrobe', {})
outfit_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
tags = entity.data.get('tags', [])
wardrobe_parts = [v for v in wardrobe.values() if isinstance(v, str) and v]
char_parts = []
face_parts = []
hand_parts = []
if character:
identity = character.data.get('identity', {})
defaults = character.data.get('defaults', {})
char_parts = [v for v in [identity.get('base_specs'), identity.get('hair'),
identity.get('eyes'), defaults.get('expression')] if v]
face_parts = [v for v in [identity.get('hair'), identity.get('eyes'),
defaults.get('expression')] if v]
hand_parts = [v for v in [wardrobe.get('hands'), wardrobe.get('gloves')] if v]
main_parts = ([outfit_triggers] if outfit_triggers else []) + char_parts + wardrobe_parts + tags
return {
'main': _dedup_tags(', '.join(p for p in main_parts if p)),
'face': _dedup_tags(', '.join(face_parts)),
'hand': _dedup_tags(', '.join(hand_parts)),
}
if category == 'actions':
action_data = entity.data.get('action', {})
action_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
tags = entity.data.get('tags', [])
pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet', 'additional']
pose_parts = [action_data.get(k, '') for k in pose_fields if action_data.get(k)]
expr_parts = [action_data.get(k, '') for k in ['head', 'eyes'] if action_data.get(k)]
char_parts = []
face_parts = list(expr_parts)
hand_parts = [action_data.get('hands', '')] if action_data.get('hands') else []
if character:
identity = character.data.get('identity', {})
char_parts = [v for v in [identity.get('base_specs'), identity.get('hair'),
identity.get('eyes')] if v]
face_parts = [v for v in [identity.get('hair'), identity.get('eyes')] + expr_parts if v]
main_parts = ([action_triggers] if action_triggers else []) + char_parts + pose_parts + tags
return {
'main': _dedup_tags(', '.join(p for p in main_parts if p)),
'face': _dedup_tags(', '.join(face_parts)),
'hand': _dedup_tags(', '.join(hand_parts)),
}
# styles / scenes / detailers — character prompt + entity tags/triggers
entity_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
tags = entity.data.get('tags', [])
if category == 'styles':
sdata = entity.data.get('style', {})
artist = f"by {sdata['artist_name']}" if sdata.get('artist_name') else ''
style_tags = sdata.get('artistic_style', '')
entity_parts = [p for p in [entity_triggers, artist, style_tags] + tags if p]
elif category == 'scenes':
sdata = entity.data.get('scene', {})
scene_parts = [v for v in sdata.values() if isinstance(v, str) and v]
entity_parts = [p for p in [entity_triggers] + scene_parts + tags if p]
else: # detailers
det_prompt = entity.data.get('prompt', '')
entity_parts = [p for p in [entity_triggers, det_prompt] + tags if p]
base = build_prompt(character.data, [], character.default_fields) if character else {'main': '', 'face': '', 'hand': ''}
entity_str = ', '.join(entity_parts)
if entity_str:
base['main'] = f"{base['main']}, {entity_str}" if base['main'] else entity_str
# Incorporate action prompt fields (for detailer category)
if action is not None:
action_data = action.data.get('action', {})
action_parts = [action_data.get(k, '') for k in
['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet', 'additional', 'head', 'eyes']
if action_data.get(k)]
action_str = ', '.join(action_parts)
if action_str:
base['main'] = f"{base['main']}, {action_str}" if base['main'] else action_str
# Append any extra positive text
if extra_positive:
base['main'] = f"{base['main']}, {extra_positive}" if base['main'] else extra_positive
return base
def _prepare_strengths_workflow(workflow, category, entity, character, prompts,
checkpoint, ckpt_data, strength_value, fixed_seed,
custom_negative=''):
"""Wire a ComfyUI workflow with ONLY the entity's LoRA active at a specific strength.
All other LoRA nodes are bypassed. A fixed seed ensures every step in the
Strengths Gallery sweep produces a comparably composed image.
"""
active_node = _CATEGORY_LORA_NODES.get(category, '16')
entity_lora = entity.data.get('lora', {})
entity_lora_name = entity_lora.get('lora_name', '')
# 1. Set checkpoint
if checkpoint and '4' in workflow:
workflow['4']['inputs']['ckpt_name'] = checkpoint
# 2. Default resolution
if '5' in workflow:
workflow['5']['inputs']['width'] = 1024
workflow['5']['inputs']['height'] = 1024
# 3. Inject prompts
if '6' in workflow:
workflow['6']['inputs']['text'] = workflow['6']['inputs']['text'].replace(
'{{POSITIVE_PROMPT}}', prompts.get('main', ''))
if '14' in workflow:
workflow['14']['inputs']['text'] = workflow['14']['inputs']['text'].replace(
'{{FACE_PROMPT}}', prompts.get('face', ''))
if '15' in workflow:
workflow['15']['inputs']['text'] = workflow['15']['inputs']['text'].replace(
'{{HAND_PROMPT}}', prompts.get('hand', ''))
# For looks, prepend the look's negative to node 7
if category == 'looks':
look_neg = entity.data.get('negative', '')
if look_neg and '7' in workflow:
workflow['7']['inputs']['text'] = f"{look_neg}, {workflow['7']['inputs']['text']}"
# Prepend any custom negative (e.g. extra_neg from detailer session)
if custom_negative and '7' in workflow:
workflow['7']['inputs']['text'] = f"{custom_negative}, {workflow['7']['inputs']['text']}"
# 4. Wire LoRA chain — only activate the entity's node; skip all others
model_source = ['4', 0]
clip_source = ['4', 1]
for node_id in ['16', '17', '18', '19']:
if node_id not in workflow:
continue
if node_id == active_node and entity_lora_name:
workflow[node_id]['inputs']['lora_name'] = entity_lora_name
workflow[node_id]['inputs']['strength_model'] = float(strength_value)
workflow[node_id]['inputs']['strength_clip'] = float(strength_value)
workflow[node_id]['inputs']['model'] = list(model_source)
workflow[node_id]['inputs']['clip'] = list(clip_source)
model_source = [node_id, 0]
clip_source = [node_id, 1]
# else: skip — model_source/clip_source pass through unchanged
# 5. Wire all consumers to the final model/clip source
for consumer, needs_model, needs_clip in [
('3', True, False),
('6', False, True),
('7', False, True),
('11', True, True),
('13', True, True),
('14', False, True),
('15', False, True),
]:
if consumer in workflow:
if needs_model:
workflow[consumer]['inputs']['model'] = list(model_source)
if needs_clip:
workflow[consumer]['inputs']['clip'] = list(clip_source)
# 6. Fixed seed for all samplers
for seed_node in ['3', '11', '13']:
if seed_node in workflow:
workflow[seed_node]['inputs']['seed'] = int(fixed_seed)
# 7. Apply checkpoint-specific settings (steps, cfg, sampler, base prompts, VAE)
if ckpt_data:
workflow = _apply_checkpoint_settings(workflow, ckpt_data)
# 8. Sync sampler/scheduler to detailer nodes
sampler_name = workflow['3']['inputs'].get('sampler_name')
scheduler = workflow['3']['inputs'].get('scheduler')
for node_id in ['11', '13']:
if node_id in workflow:
if sampler_name:
workflow[node_id]['inputs']['sampler_name'] = sampler_name
if scheduler:
workflow[node_id]['inputs']['scheduler'] = scheduler
# 9. Cross-dedup prompts
pos_text, neg_text = _cross_dedup_prompts(
workflow['6']['inputs']['text'],
workflow['7']['inputs']['text']
)
workflow['6']['inputs']['text'] = pos_text
workflow['7']['inputs']['text'] = neg_text
_log_workflow_prompts(f"_prepare_strengths_workflow [node={active_node} lora={entity_lora_name} @ {strength_value} seed={fixed_seed}]", workflow)
return workflow
@app.route('/strengths/<category>/<path:slug>/generate', methods=['POST'])
def strengths_generate(category, slug):
if category not in _STRENGTHS_MODEL_MAP:
return {'error': 'unknown category'}, 400
Model = _STRENGTHS_MODEL_MAP[category]
entity = Model.query.filter_by(slug=slug).first_or_404()
try:
strength_value = float(request.form.get('strength_value', 1.0))
fixed_seed = int(request.form.get('seed', random.randint(1, 10**15)))
client_id = request.form.get('client_id', '')
# Resolve character: prefer POST body value (reflects current page dropdown),
# then fall back to session.
# Session keys use the *singular* category name (char_outfit_, char_action_, …)
# but the URL uses the plural (outfits, actions, …).
_singular = {
'outfits': 'outfit', 'actions': 'action', 'styles': 'style',
'scenes': 'scene', 'detailers': 'detailer', 'looks': 'look',
}
session_prefix = _singular.get(category, category)
char_slug = (request.form.get('character_slug') or
session.get(f'char_{session_prefix}_{slug}'))
if category == 'characters':
character = entity # entity IS the character
elif char_slug == '__random__':
character = Character.query.order_by(db.func.random()).first()
elif char_slug:
character = Character.query.filter_by(slug=char_slug).first()
else:
character = None
print(f"[Strengths] char_slug={char_slug!r} → character={character.slug if character else 'none'}")
# Read extra context that may be stored in session for some categories
action_obj = None
extra_positive = ''
extra_negative = ''
if category == 'detailers':
action_slug = session.get(f'action_detailer_{slug}')
if action_slug:
action_obj = Action.query.filter_by(slug=action_slug).first()
extra_positive = session.get(f'extra_pos_detailer_{slug}', '')
extra_negative = session.get(f'extra_neg_detailer_{slug}', '')
print(f"[Strengths] detailer session — char={char_slug}, action={action_slug}, extra_pos={bool(extra_positive)}, extra_neg={bool(extra_negative)}")
prompts = _build_strengths_prompts(category, entity, character,
action=action_obj, extra_positive=extra_positive)
checkpoint, ckpt_data = _get_default_checkpoint()
workflow_path = os.path.join(os.path.dirname(__file__), 'comfy_workflow.json')
with open(workflow_path, 'r') as f:
workflow = json.load(f)
workflow = _prepare_strengths_workflow(
workflow, category, entity, character, prompts,
checkpoint, ckpt_data, strength_value, fixed_seed,
custom_negative=extra_negative
)
result = queue_prompt(workflow, client_id)
prompt_id = result.get('prompt_id', '')
return {'status': 'queued', 'prompt_id': prompt_id}
except Exception as e:
print(f"[Strengths] generate error: {e}")
return {'error': str(e)}, 500
@app.route('/strengths/<category>/<path:slug>/finalize/<prompt_id>', methods=['POST'])
def strengths_finalize(category, slug, prompt_id):
if category not in _STRENGTHS_MODEL_MAP:
return {'error': 'unknown category'}, 400
strength_value = request.form.get('strength_value', '0.0')
seed = request.form.get('seed', '0')
try:
history = get_history(prompt_id)
if prompt_id not in history:
return {'error': 'prompt not found in history'}, 404
outputs = history[prompt_id].get('outputs', {})
img_data = None
img_filename = None
for node_output in outputs.values():
for img in node_output.get('images', []):
img_data = get_image(img['filename'], img.get('subfolder', ''), img.get('type', 'output'))
img_filename = img['filename']
break
if img_data:
break
if not img_data:
return {'error': 'no image in output'}, 500
# Save — encode strength value as two-decimal string in filename
strength_str = f"{float(strength_value):.2f}".replace('.', '_')
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], category, slug, 'strengths')
os.makedirs(upload_dir, exist_ok=True)
out_filename = f"strength_{strength_str}_seed_{seed}.png"
out_path = os.path.join(upload_dir, out_filename)
with open(out_path, 'wb') as f:
f.write(img_data)
relative = f"{category}/{slug}/strengths/{out_filename}"
return {'success': True, 'image_url': f"/static/uploads/{relative}", 'strength_value': strength_value}
except Exception as e:
print(f"[Strengths] finalize error: {e}")
return {'error': str(e)}, 500
@app.route('/strengths/<category>/<path:slug>/list')
def strengths_list(category, slug):
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], category, slug, 'strengths')
if not os.path.isdir(upload_dir):
return {'images': []}
images = []
for fname in sorted(os.listdir(upload_dir)):
if not fname.endswith('.png'):
continue
# Parse strength value from filename: strength_0_50_seed_12345.png → "0.50"
try:
parts = fname.replace('strength_', '').split('_seed_')
strength_raw = parts[0] # e.g. "0_50"
strength_display = strength_raw.replace('_', '.')
except Exception:
strength_display = fname
images.append({
'url': f"/static/uploads/{category}/{slug}/strengths/{fname}",
'strength': strength_display,
'filename': fname,
})
return {'images': images}
@app.route('/strengths/<category>/<path:slug>/clear', methods=['POST'])
def strengths_clear(category, slug):
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], category, slug, 'strengths')
if os.path.isdir(upload_dir):
for fname in os.listdir(upload_dir):
fpath = os.path.join(upload_dir, fname)
if os.path.isfile(fpath):
os.remove(fpath)
return {'success': True}
_STRENGTHS_DATA_DIRS = {
'characters': 'CHARACTERS_DIR',
'looks': 'LOOKS_DIR',
'outfits': 'CLOTHING_DIR',
'actions': 'ACTIONS_DIR',
'styles': 'STYLES_DIR',
'scenes': 'SCENES_DIR',
'detailers': 'DETAILERS_DIR',
}
@app.route('/strengths/<category>/<path:slug>/save_range', methods=['POST'])
def strengths_save_range(category, slug):
"""Save lora_weight_min / lora_weight_max from the Strengths Gallery back to the entity JSON + DB."""
if category not in _STRENGTHS_MODEL_MAP or category not in _STRENGTHS_DATA_DIRS:
return {'error': 'unknown category'}, 400
try:
min_w = float(request.form.get('min_weight', ''))
max_w = float(request.form.get('max_weight', ''))
except (ValueError, TypeError):
return {'error': 'invalid weight values'}, 400
if min_w > max_w:
min_w, max_w = max_w, min_w
Model = _STRENGTHS_MODEL_MAP[category]
entity = Model.query.filter_by(slug=slug).first_or_404()
# Update in-memory data dict
data = dict(entity.data)
if 'lora' not in data or not isinstance(data.get('lora'), dict):
return {'error': 'entity has no lora section'}, 400
data['lora']['lora_weight_min'] = min_w
data['lora']['lora_weight_max'] = max_w
entity.data = data
flag_modified(entity, 'data')
# Write back to JSON file on disk
data_dir = app.config[_STRENGTHS_DATA_DIRS[category]]
filename = getattr(entity, 'filename', None) or f"{slug}.json"
file_path = os.path.join(data_dir, filename)
if os.path.exists(file_path):
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
f.write('\n')
db.session.commit()
return {'success': True, 'lora_weight_min': min_w, 'lora_weight_max': max_w}
if __name__ == '__main__':
ensure_mcp_server_running()
with app.app_context():
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
db.create_all()
# Migration: Add active_outfit column if it doesn't exist
try:
from sqlalchemy import text
db.session.execute(text('ALTER TABLE character ADD COLUMN active_outfit VARCHAR(100) DEFAULT \'default\''))
db.session.commit()
print("Added active_outfit column to character table")
except Exception as e:
if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower():
print("active_outfit column already exists")
else:
print(f"Migration note: {e}")
# Migration: Add default_fields column to action table if it doesn't exist
try:
from sqlalchemy import text
db.session.execute(text('ALTER TABLE action ADD COLUMN default_fields JSON'))
db.session.commit()
print("Added default_fields column to action table")
except Exception as e:
if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower():
print("default_fields column already exists in action table")
else:
print(f"Migration action note: {e}")
# Migration: Add new columns to settings table
columns_to_add = [
('llm_provider', "VARCHAR(50) DEFAULT 'openrouter'"),
('local_base_url', "VARCHAR(255)"),
('local_model', "VARCHAR(100)")
]
for col_name, col_type in columns_to_add:
try:
db.session.execute(text(f'ALTER TABLE settings ADD COLUMN {col_name} {col_type}'))
db.session.commit()
print(f"Added {col_name} column to settings table")
except Exception as e:
if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower():
pass
else:
print(f"Migration settings note ({col_name}): {e}")
# Ensure settings exist
if not Settings.query.first():
db.session.add(Settings())
db.session.commit()
print("Created default settings")
sync_characters()
sync_outfits()
sync_actions()
# Migration: Add data column to checkpoint table
try:
db.session.execute(text('ALTER TABLE checkpoint ADD COLUMN data JSON'))
db.session.commit()
print("Added data column to checkpoint table")
except Exception as e:
if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower():
print("data column already exists in checkpoint table")
else:
print(f"Migration checkpoint note: {e}")
sync_styles()
sync_detailers()
sync_scenes()
sync_looks()
sync_checkpoints()
app.run(debug=True, host='0.0.0.0', port=5000)