Files
character-browser/app.py
Aodhan Collins da55b0889b Add Docker support and refactor prompt/queue internals
- Add Dockerfile, docker-compose.yml, .dockerignore for containerised deployment
- Extract _resolve_character(), _ensure_character_fields(), _append_background() helpers to eliminate repeated inline character-field injection and background-tag patterns across all secondary-category generate routes
- Add _IDENTITY_KEYS / _WARDROBE_KEYS constants
- Fix build_extras_prompt() bug: detailer prompt (a list) was being appended as a single item instead of extended
- Replace all per-route _finalize closures with _make_finalize() factory, reducing duplication across 10 generate routes
- Add _prune_job_history() called each worker loop iteration to prevent unbounded memory growth
- Remove 10 orphaned legacy finalize_generation HTTP routes and check_status route (superseded by job queue API since job-queue branch)
- Remove one-time migration scripts (migrate_actions, migrate_detailers, migrate_lora_weight_range, migrate_wardrobe)
- Update CLAUDE.md and README.md to document new helpers, queue architecture, and Docker deployment

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 16:46:36 +00:00

6428 lines
260 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import time
import re
import requests
import random
import asyncio
import subprocess
import threading
import uuid
from collections import deque
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from flask import Flask, render_template, request, redirect, url_for, flash, session
from flask_session import Session
from werkzeug.utils import secure_filename
from models import db, Character, Settings, Outfit, Action, Style, Detailer, Scene, Checkpoint, Look
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = 'static/uploads'
app.config['SECRET_KEY'] = 'dev-key-123'
app.config['CHARACTERS_DIR'] = 'data/characters'
app.config['CLOTHING_DIR'] = 'data/clothing'
app.config['ACTIONS_DIR'] = 'data/actions'
app.config['STYLES_DIR'] = 'data/styles'
app.config['SCENES_DIR'] = 'data/scenes'
app.config['DETAILERS_DIR'] = 'data/detailers'
app.config['CHECKPOINTS_DIR'] = 'data/checkpoints'
app.config['LOOKS_DIR'] = 'data/looks'
app.config['COMFYUI_URL'] = os.environ.get('COMFYUI_URL', 'http://127.0.0.1:8188')
app.config['ILLUSTRIOUS_MODELS_DIR'] = '/ImageModels/Stable-diffusion/Illustrious/'
app.config['NOOB_MODELS_DIR'] = '/ImageModels/Stable-diffusion/Noob/'
app.config['LORA_DIR'] = '/ImageModels/lora/Illustrious/Looks/'
# Server-side session configuration to avoid cookie size limits
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SESSION_FILE_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], '../flask_session')
app.config['SESSION_PERMANENT'] = False
db.init_app(app)
Session(app)
# ---------------------------------------------------------------------------
# Generation Job Queue
# ---------------------------------------------------------------------------
# Each job is a dict:
# id — unique UUID string
# label — human-readable description (e.g. "Tifa Lockhart preview")
# status — 'pending' | 'processing' | 'done' | 'failed' | 'paused' | 'removed'
# workflow — the fully-prepared ComfyUI workflow dict
# finalize_fn — callable(comfy_prompt_id, job) that saves the image; called after ComfyUI finishes
# error — error message string (when status == 'failed')
# result — dict with image_url etc. (set by finalize_fn on success)
# created_at — unix timestamp
# comfy_prompt_id — the prompt_id returned by ComfyUI (set when processing starts)
_job_queue_lock = threading.Lock()
_job_queue = deque() # ordered list of job dicts (pending + paused + processing)
_job_history = {} # job_id -> job dict (all jobs ever added, for status lookup)
_queue_worker_event = threading.Event() # signals worker that a new job is available
def _enqueue_job(label, workflow, finalize_fn):
"""Add a generation job to the queue. Returns the job dict."""
job = {
'id': str(uuid.uuid4()),
'label': label,
'status': 'pending',
'workflow': workflow,
'finalize_fn': finalize_fn,
'error': None,
'result': None,
'created_at': time.time(),
'comfy_prompt_id': None,
}
with _job_queue_lock:
_job_queue.append(job)
_job_history[job['id']] = job
_queue_worker_event.set()
return job
def _queue_worker():
"""Background thread: processes jobs from _job_queue sequentially."""
while True:
_queue_worker_event.wait()
_queue_worker_event.clear()
while True:
job = None
with _job_queue_lock:
# Find the first pending job
for j in _job_queue:
if j['status'] == 'pending':
job = j
break
if job is None:
break # No pending jobs — go back to waiting
# Mark as processing
with _job_queue_lock:
job['status'] = 'processing'
try:
with app.app_context():
# Send workflow to ComfyUI
prompt_response = queue_prompt(job['workflow'])
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI rejected job: {prompt_response.get('error', 'unknown error')}")
comfy_id = prompt_response['prompt_id']
with _job_queue_lock:
job['comfy_prompt_id'] = comfy_id
# Poll until done (max ~10 minutes)
max_retries = 300
finished = False
while max_retries > 0:
history = get_history(comfy_id)
if comfy_id in history:
finished = True
break
time.sleep(2)
max_retries -= 1
if not finished:
raise Exception("ComfyUI generation timed out")
# Run the finalize callback (saves image to disk / DB)
# finalize_fn(comfy_prompt_id, job) — job is passed so callback can store result
job['finalize_fn'](comfy_id, job)
with _job_queue_lock:
job['status'] = 'done'
except Exception as e:
print(f"[Queue] Job {job['id']} failed: {e}")
with _job_queue_lock:
job['status'] = 'failed'
job['error'] = str(e)
# Remove completed/failed jobs from the active queue (keep in history)
with _job_queue_lock:
try:
_job_queue.remove(job)
except ValueError:
pass # Already removed (e.g. by user)
# Periodically purge old finished jobs from history to avoid unbounded growth
_prune_job_history()
# Start the background worker thread
_worker_thread = threading.Thread(target=_queue_worker, daemon=True, name='queue-worker')
_worker_thread.start()
def _make_finalize(category, slug, db_model_class=None, action=None):
"""Return a finalize callback for a standard queue job.
category — upload sub-directory name (e.g. 'characters', 'outfits')
slug — entity slug used for the upload folder name
db_model_class — SQLAlchemy model class for cover-image DB update; None = skip
action — 'replace' → update DB; None → always update; anything else → skip
"""
def _finalize(comfy_prompt_id, job):
history = get_history(comfy_prompt_id)
outputs = history[comfy_prompt_id]['outputs']
for node_output in outputs.values():
if 'images' in node_output:
image_info = node_output['images'][0]
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
folder = os.path.join(app.config['UPLOAD_FOLDER'], f"{category}/{slug}")
os.makedirs(folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
with open(os.path.join(folder, filename), 'wb') as f:
f.write(image_data)
relative_path = f"{category}/{slug}/{filename}"
job['result'] = {
'image_url': f'/static/uploads/{relative_path}',
'relative_path': relative_path,
}
if db_model_class and (action is None or action == 'replace'):
obj = db_model_class.query.filter_by(slug=slug).first()
if obj:
obj.image_path = relative_path
db.session.commit()
return
return _finalize
def _prune_job_history(max_age_seconds=3600):
"""Remove completed/failed jobs older than max_age_seconds from _job_history."""
cutoff = time.time() - max_age_seconds
with _job_queue_lock:
stale = [jid for jid, j in _job_history.items()
if j['status'] in ('done', 'failed', 'removed') and j['created_at'] < cutoff]
for jid in stale:
del _job_history[jid]
# ---------------------------------------------------------------------------
# Queue API routes
# ---------------------------------------------------------------------------
@app.route('/api/queue')
def api_queue_list():
"""Return the current queue as JSON."""
with _job_queue_lock:
jobs = [
{
'id': j['id'],
'label': j['label'],
'status': j['status'],
'error': j['error'],
'created_at': j['created_at'],
}
for j in _job_queue
]
return {'jobs': jobs, 'count': len(jobs)}
@app.route('/api/queue/count')
def api_queue_count():
"""Return just the count of active (non-done, non-failed) jobs."""
with _job_queue_lock:
count = sum(1 for j in _job_queue if j['status'] in ('pending', 'processing', 'paused'))
return {'count': count}
@app.route('/api/queue/<job_id>/remove', methods=['POST'])
def api_queue_remove(job_id):
"""Remove a pending or paused job from the queue."""
with _job_queue_lock:
job = _job_history.get(job_id)
if not job:
return {'error': 'Job not found'}, 404
if job['status'] == 'processing':
return {'error': 'Cannot remove a job that is currently processing'}, 400
try:
_job_queue.remove(job)
except ValueError:
pass # Already not in queue
job['status'] = 'removed'
return {'status': 'ok'}
@app.route('/api/queue/<job_id>/pause', methods=['POST'])
def api_queue_pause(job_id):
"""Toggle pause/resume on a pending job."""
with _job_queue_lock:
job = _job_history.get(job_id)
if not job:
return {'error': 'Job not found'}, 404
if job['status'] == 'pending':
job['status'] = 'paused'
elif job['status'] == 'paused':
job['status'] = 'pending'
_queue_worker_event.set()
else:
return {'error': f'Cannot pause/resume job with status {job["status"]}'}, 400
return {'status': 'ok', 'new_status': job['status']}
@app.route('/api/queue/<job_id>/status')
def api_queue_job_status(job_id):
"""Return the status of a specific job."""
with _job_queue_lock:
job = _job_history.get(job_id)
if not job:
return {'error': 'Job not found'}, 404
resp = {
'id': job['id'],
'label': job['label'],
'status': job['status'],
'error': job['error'],
'comfy_prompt_id': job['comfy_prompt_id'],
}
if job.get('result'):
resp['result'] = job['result']
return resp
# Path to the danbooru-mcp docker-compose project, relative to this file.
MCP_TOOLS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tools')
MCP_COMPOSE_DIR = os.path.join(MCP_TOOLS_DIR, 'danbooru-mcp')
MCP_REPO_URL = 'https://git.liveaodh.com/aodhan/danbooru-mcp'
def _ensure_mcp_repo():
"""Clone or update the danbooru-mcp source repository inside tools/.
- If ``tools/danbooru-mcp/`` does not exist, clone from MCP_REPO_URL.
- If it already exists, run ``git pull`` to fetch the latest changes.
Errors are non-fatal.
"""
os.makedirs(MCP_TOOLS_DIR, exist_ok=True)
try:
if not os.path.isdir(MCP_COMPOSE_DIR):
print(f'Cloning danbooru-mcp from {MCP_REPO_URL}')
subprocess.run(
['git', 'clone', MCP_REPO_URL, MCP_COMPOSE_DIR],
timeout=120, check=True,
)
print('danbooru-mcp cloned successfully.')
else:
print('Updating danbooru-mcp via git pull …')
subprocess.run(
['git', 'pull'],
cwd=MCP_COMPOSE_DIR,
timeout=60, check=True,
)
print('danbooru-mcp updated.')
except FileNotFoundError:
print('WARNING: git not found on PATH — danbooru-mcp repo will not be cloned/updated.')
except subprocess.CalledProcessError as e:
print(f'WARNING: git operation failed for danbooru-mcp: {e}')
except subprocess.TimeoutExpired:
print('WARNING: git timed out while cloning/updating danbooru-mcp.')
except Exception as e:
print(f'WARNING: Could not clone/update danbooru-mcp repo: {e}')
def ensure_mcp_server_running():
"""Ensure the danbooru-mcp repo is present/up-to-date, then start the
Docker container if it is not already running.
Uses ``docker compose up -d`` so the image is built automatically on first
run. Errors are non-fatal — the app will still start even if Docker is
unavailable.
Skipped when ``SKIP_MCP_AUTOSTART=true`` (set by docker-compose, where the
danbooru-mcp service is managed by compose instead).
"""
if os.environ.get('SKIP_MCP_AUTOSTART', '').lower() == 'true':
print('SKIP_MCP_AUTOSTART set — skipping danbooru-mcp auto-start.')
return
_ensure_mcp_repo()
try:
result = subprocess.run(
['docker', 'ps', '--filter', 'name=danbooru-mcp', '--format', '{{.Names}}'],
capture_output=True, text=True, timeout=10,
)
if 'danbooru-mcp' in result.stdout:
print('danbooru-mcp container already running.')
return
# Container not running — start it via docker compose
print('Starting danbooru-mcp container via docker compose …')
subprocess.run(
['docker', 'compose', 'up', '-d'],
cwd=MCP_COMPOSE_DIR,
timeout=120,
)
print('danbooru-mcp container started.')
except FileNotFoundError:
print('WARNING: docker not found on PATH — danbooru-mcp will not be started automatically.')
except subprocess.TimeoutExpired:
print('WARNING: docker timed out while starting danbooru-mcp.')
except Exception as e:
print(f'WARNING: Could not ensure danbooru-mcp is running: {e}')
@app.context_processor
def inject_comfyui_ws_url():
url = app.config.get('COMFYUI_URL', 'http://127.0.0.1:8188')
# If the URL is localhost/127.0.0.1, replace it with the current request's host
# so that remote clients connect to the correct machine for WebSockets.
if '127.0.0.1' in url or 'localhost' in url:
host = request.host.split(':')[0]
url = url.replace('127.0.0.1', host).replace('localhost', host)
# Convert http/https to ws/wss
ws_url = url.replace('http://', 'ws://').replace('https://', 'wss://')
return dict(COMFYUI_WS_URL=f"{ws_url}/ws")
@app.context_processor
def inject_default_checkpoint():
from models import Checkpoint
checkpoints = Checkpoint.query.order_by(Checkpoint.name).all()
return dict(all_checkpoints=checkpoints, default_checkpoint_path=session.get('default_checkpoint', ''))
@app.route('/set_default_checkpoint', methods=['POST'])
def set_default_checkpoint():
session['default_checkpoint'] = request.form.get('checkpoint_path', '')
return {'status': 'ok'}
@app.route('/api/status/comfyui')
def api_status_comfyui():
"""Return whether ComfyUI is reachable."""
url = app.config.get('COMFYUI_URL', 'http://127.0.0.1:8188')
try:
resp = requests.get(f'{url}/system_stats', timeout=3)
if resp.ok:
return {'status': 'ok'}
except Exception:
pass
return {'status': 'error'}
@app.route('/api/status/mcp')
def api_status_mcp():
"""Return whether the danbooru-mcp Docker container is running."""
try:
result = subprocess.run(
['docker', 'ps', '--filter', 'name=danbooru-mcp', '--format', '{{.Names}}'],
capture_output=True, text=True, timeout=5,
)
if 'danbooru-mcp' in result.stdout:
return {'status': 'ok'}
except Exception:
pass
return {'status': 'error'}
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
def get_available_loras():
loras = []
if os.path.exists(app.config['LORA_DIR']):
for f in os.listdir(app.config['LORA_DIR']):
if f.endswith('.safetensors'):
# Using the format seen in character JSONs
loras.append(f"Illustrious/Looks/{f}")
return sorted(loras)
def get_available_clothing_loras():
"""Get LoRAs from the Clothing directory for outfit LoRAs."""
clothing_lora_dir = '/ImageModels/lora/Illustrious/Clothing/'
loras = []
if os.path.exists(clothing_lora_dir):
for f in os.listdir(clothing_lora_dir):
if f.endswith('.safetensors'):
loras.append(f"Illustrious/Clothing/{f}")
return sorted(loras)
def get_available_action_loras():
"""Get LoRAs from the Poses directory for action LoRAs."""
poses_lora_dir = '/ImageModels/lora/Illustrious/Poses/'
loras = []
if os.path.exists(poses_lora_dir):
for f in os.listdir(poses_lora_dir):
if f.endswith('.safetensors'):
loras.append(f"Illustrious/Poses/{f}")
return sorted(loras)
def get_available_style_loras():
"""Get LoRAs from the Styles directory for style LoRAs."""
styles_lora_dir = '/ImageModels/lora/Illustrious/Styles/'
loras = []
if os.path.exists(styles_lora_dir):
for f in os.listdir(styles_lora_dir):
if f.endswith('.safetensors'):
loras.append(f"Illustrious/Styles/{f}")
return sorted(loras)
def get_available_detailer_loras():
"""Get LoRAs from the Detailers directory for detailer LoRAs."""
detailers_lora_dir = '/ImageModels/lora/Illustrious/Detailers/'
loras = []
if os.path.exists(detailers_lora_dir):
for f in os.listdir(detailers_lora_dir):
if f.endswith('.safetensors'):
loras.append(f"Illustrious/Detailers/{f}")
return sorted(loras)
def get_available_scene_loras():
"""Get LoRAs from the Backgrounds directory for scene LoRAs."""
backgrounds_lora_dir = '/ImageModels/lora/Illustrious/Backgrounds/'
loras = []
if os.path.exists(backgrounds_lora_dir):
for f in os.listdir(backgrounds_lora_dir):
if f.endswith('.safetensors'):
loras.append(f"Illustrious/Backgrounds/{f}")
return sorted(loras)
def get_available_checkpoints():
checkpoints = []
# Scan Illustrious
if os.path.exists(app.config['ILLUSTRIOUS_MODELS_DIR']):
for f in os.listdir(app.config['ILLUSTRIOUS_MODELS_DIR']):
if f.endswith('.safetensors') or f.endswith('.ckpt'):
checkpoints.append(f"Illustrious/{f}")
# Scan Noob
if os.path.exists(app.config['NOOB_MODELS_DIR']):
for f in os.listdir(app.config['NOOB_MODELS_DIR']):
if f.endswith('.safetensors') or f.endswith('.ckpt'):
checkpoints.append(f"Noob/{f}")
return sorted(checkpoints)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def parse_orientation(orientation_str):
if not orientation_str: return []
m_count = orientation_str.upper().count('M')
f_count = orientation_str.upper().count('F')
total = m_count + f_count
tags = []
# Gender counts
if m_count == 1: tags.append("1boy")
elif m_count > 1: tags.append(f"{m_count}boys")
if f_count == 1: tags.append("1girl")
elif f_count > 1: tags.append(f"{f_count}girls")
# Relationships/Group type
if total == 1:
tags.append("solo")
elif total > 1:
if m_count > 0 and f_count > 0:
tags.append("hetero")
elif f_count > 1 and m_count == 0:
tags.append("yuri")
elif m_count > 1 and f_count == 0:
tags.append("yaoi")
return tags
def _dedup_tags(prompt_str):
"""Remove duplicate tags from a comma-separated prompt string, preserving first-occurrence order."""
seen = set()
result = []
for tag in prompt_str.split(','):
t = tag.strip()
if t and t.lower() not in seen:
seen.add(t.lower())
result.append(t)
return ', '.join(result)
def _cross_dedup_prompts(positive, negative):
"""Remove tags shared between positive and negative prompts.
Repeatedly strips the first occurrence from each side until the tag exists
on only one side. Equal counts cancel out completely; any excess on one side
retains the remainder, allowing deliberate overrides (e.g. adding a tag twice
in the positive while it appears once in the negative leaves one copy positive).
"""
def parse_tags(s):
return [t.strip() for t in s.split(',') if t.strip()]
pos_tags = parse_tags(positive)
neg_tags = parse_tags(negative)
shared = {t.lower() for t in pos_tags} & {t.lower() for t in neg_tags}
for tag_lower in shared:
while (
any(t.lower() == tag_lower for t in pos_tags) and
any(t.lower() == tag_lower for t in neg_tags)
):
pos_tags.pop(next(i for i, t in enumerate(pos_tags) if t.lower() == tag_lower))
neg_tags.pop(next(i for i, t in enumerate(neg_tags) if t.lower() == tag_lower))
return ', '.join(pos_tags), ', '.join(neg_tags)
def _resolve_lora_weight(lora_data, override=None):
"""Return effective LoRA weight, randomising between min/max when they differ.
If *override* is provided it takes absolute precedence (used by the Strengths
Gallery to pin a specific value for each step).
"""
if override is not None:
return float(override)
weight = float(lora_data.get('lora_weight', 1.0))
min_w = lora_data.get('lora_weight_min')
max_w = lora_data.get('lora_weight_max')
if min_w is not None and max_w is not None:
min_w, max_w = float(min_w), float(max_w)
if min_w != max_w:
weight = random.uniform(min(min_w, max_w), max(min_w, max_w))
return weight
_IDENTITY_KEYS = ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']
_WARDROBE_KEYS = ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']
def _resolve_character(character_slug):
"""Resolve a character_slug string (possibly '__random__') to a Character instance."""
if character_slug == '__random__':
return Character.query.order_by(db.func.random()).first()
if character_slug:
return Character.query.filter_by(slug=character_slug).first()
return None
def _ensure_character_fields(character, selected_fields, include_wardrobe=True, include_defaults=False):
"""Mutate selected_fields in place to include essential character identity/wardrobe/name keys.
include_wardrobe — also inject active wardrobe keys (default True)
include_defaults — also inject defaults::expression and defaults::pose (for outfit/look previews)
"""
identity = character.data.get('identity', {})
for key in _IDENTITY_KEYS:
if identity.get(key):
field_key = f'identity::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
if include_defaults:
for key in ['expression', 'pose']:
if character.data.get('defaults', {}).get(key):
field_key = f'defaults::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
if 'special::name' not in selected_fields:
selected_fields.append('special::name')
if include_wardrobe:
wardrobe = character.get_active_wardrobe()
for key in _WARDROBE_KEYS:
if wardrobe.get(key):
field_key = f'wardrobe::{key}'
if field_key not in selected_fields:
selected_fields.append(field_key)
def _append_background(prompts, character=None):
"""Append a (color-prefixed) simple background tag to prompts['main']."""
primary_color = character.data.get('styles', {}).get('primary_color', '') if character else ''
bg = f"{primary_color} simple background" if primary_color else "simple background"
prompts['main'] = f"{prompts['main']}, {bg}"
def build_prompt(data, selected_fields=None, default_fields=None, active_outfit='default'):
def is_selected(section, key):
# Priority:
# 1. Manual selection from form (if list is not empty)
# 2. Database defaults (if they exist)
# 3. Select all (default behavior)
if selected_fields:
return f"{section}::{key}" in selected_fields
if default_fields:
return f"{section}::{key}" in default_fields
return True
identity = data.get('identity', {})
# Get wardrobe - handle both new nested format and legacy flat format
wardrobe_data = data.get('wardrobe', {})
if 'default' in wardrobe_data and isinstance(wardrobe_data.get('default'), dict):
# New nested format - get active outfit
wardrobe = wardrobe_data.get(active_outfit or 'default', wardrobe_data.get('default', {}))
else:
# Legacy flat format
wardrobe = wardrobe_data
defaults = data.get('defaults', {})
action_data = data.get('action', {})
style_data = data.get('style', {})
participants = data.get('participants', {})
# Pre-calculate Hand/Glove priority
# Priority: wardrobe gloves > wardrobe hands (outfit) > identity hands (character)
hand_val = ""
if wardrobe.get('gloves') and is_selected('wardrobe', 'gloves'):
hand_val = wardrobe.get('gloves')
elif wardrobe.get('hands') and is_selected('wardrobe', 'hands'):
hand_val = wardrobe.get('hands')
elif identity.get('hands') and is_selected('identity', 'hands'):
hand_val = identity.get('hands')
# 1. Main Prompt
parts = []
# Handle participants logic
if participants:
if participants.get('solo_focus') == 'true':
parts.append('(solo focus:1.2)')
orientation = participants.get('orientation', '')
if orientation:
parts.extend(parse_orientation(orientation))
else:
# Default behavior
parts.append("(solo:1.2)")
# Use character_id (underscores to spaces) for tags compatibility
char_tag = data.get('character_id', '').replace('_', ' ')
if char_tag and is_selected('special', 'name'):
parts.append(char_tag)
for key in ['base_specs', 'hair', 'eyes', 'extra']:
val = identity.get(key)
if val and is_selected('identity', key):
# Filter out conflicting tags if participants data is present
if participants and key == 'base_specs':
# Remove 1girl, 1boy, solo, etc.
val = re.sub(r'\b(1girl|1boy|solo)\b', '', val).replace(', ,', ',').strip(', ')
parts.append(val)
# Add defaults (expression, pose, scene)
for key in ['expression', 'pose', 'scene']:
val = defaults.get(key)
if val and is_selected('defaults', key):
parts.append(val)
# Add hand priority value to main prompt
if hand_val:
parts.append(hand_val)
for key in ['full_body', 'top', 'bottom', 'headwear', 'legwear', 'footwear', 'accessories']:
val = wardrobe.get(key)
if val and is_selected('wardrobe', key):
parts.append(val)
# Standard character styles
char_aesthetic = data.get('styles', {}).get('aesthetic')
if char_aesthetic and is_selected('styles', 'aesthetic'):
parts.append(f"{char_aesthetic} style")
# New Styles Gallery logic
if style_data.get('artist_name') and is_selected('style', 'artist_name'):
parts.append(f"by {style_data['artist_name']}")
if style_data.get('artistic_style') and is_selected('style', 'artistic_style'):
parts.append(style_data['artistic_style'])
tags = data.get('tags', [])
if tags and is_selected('special', 'tags'):
parts.extend(tags)
lora = data.get('lora', {})
if lora.get('lora_triggers') and is_selected('lora', 'lora_triggers'):
parts.append(lora.get('lora_triggers'))
# 2. Face Prompt: Tag, Eyes, Expression, Headwear, Action details
face_parts = []
if char_tag and is_selected('special', 'name'): face_parts.append(char_tag)
if identity.get('eyes') and is_selected('identity', 'eyes'): face_parts.append(identity.get('eyes'))
if defaults.get('expression') and is_selected('defaults', 'expression'): face_parts.append(defaults.get('expression'))
if wardrobe.get('headwear') and is_selected('wardrobe', 'headwear'): face_parts.append(wardrobe.get('headwear'))
# Add specific Action expression details if available
if action_data.get('head') and is_selected('action', 'head'): face_parts.append(action_data.get('head'))
if action_data.get('eyes') and is_selected('action', 'eyes'): face_parts.append(action_data.get('eyes'))
# 3. Hand Prompt: Hand value (Gloves or Hands), Action details
hand_parts = [hand_val] if hand_val else []
if action_data.get('arms') and is_selected('action', 'arms'): hand_parts.append(action_data.get('arms'))
if action_data.get('hands') and is_selected('action', 'hands'): hand_parts.append(action_data.get('hands'))
return {
"main": _dedup_tags(", ".join(parts)),
"face": _dedup_tags(", ".join(face_parts)),
"hand": _dedup_tags(", ".join(hand_parts))
}
def queue_prompt(prompt_workflow, client_id=None):
p = {"prompt": prompt_workflow}
if client_id:
p["client_id"] = client_id
data = json.dumps(p).encode('utf-8')
response = requests.post(f"{app.config['COMFYUI_URL']}/prompt", data=data)
return response.json()
def get_history(prompt_id):
response = requests.get(f"{app.config['COMFYUI_URL']}/history/{prompt_id}")
return response.json()
def get_image(filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
response = requests.get(f"{app.config['COMFYUI_URL']}/view", params=data)
return response.content
from sqlalchemy.orm.attributes import flag_modified
def sync_characters():
if not os.path.exists(app.config['CHARACTERS_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['CHARACTERS_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['CHARACTERS_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
char_id = data.get('character_id')
if not char_id:
continue
current_ids.append(char_id)
# Generate URL-safe slug: remove special characters from character_id
slug = re.sub(r'[^a-zA-Z0-9_]', '', char_id)
# Check if character already exists
character = Character.query.filter_by(character_id=char_id).first()
name = data.get('character_name', char_id.replace('_', ' ').title())
if character:
character.data = data
character.name = name
character.slug = slug
character.filename = filename
# Check if cover image still exists
if character.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], character.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {character.name}, clearing path.")
character.image_path = None
# Explicitly tell SQLAlchemy the JSON field was modified
flag_modified(character, "data")
else:
new_char = Character(
character_id=char_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_char)
except Exception as e:
print(f"Error importing {filename}: {e}")
# Remove characters that are no longer in the folder
all_characters = Character.query.all()
for char in all_characters:
if char.character_id not in current_ids:
db.session.delete(char)
db.session.commit()
def sync_outfits():
if not os.path.exists(app.config['CLOTHING_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['CLOTHING_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['CLOTHING_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
outfit_id = data.get('outfit_id') or filename.replace('.json', '')
current_ids.append(outfit_id)
# Generate URL-safe slug: remove special characters from outfit_id
slug = re.sub(r'[^a-zA-Z0-9_]', '', outfit_id)
# Check if outfit already exists
outfit = Outfit.query.filter_by(outfit_id=outfit_id).first()
name = data.get('outfit_name', outfit_id.replace('_', ' ').title())
if outfit:
outfit.data = data
outfit.name = name
outfit.slug = slug
outfit.filename = filename
# Check if cover image still exists
if outfit.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], outfit.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {outfit.name}, clearing path.")
outfit.image_path = None
# Explicitly tell SQLAlchemy the JSON field was modified
flag_modified(outfit, "data")
else:
new_outfit = Outfit(
outfit_id=outfit_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_outfit)
except Exception as e:
print(f"Error importing outfit {filename}: {e}")
# Remove outfits that are no longer in the folder
all_outfits = Outfit.query.all()
for outfit in all_outfits:
if outfit.outfit_id not in current_ids:
db.session.delete(outfit)
db.session.commit()
def sync_looks():
if not os.path.exists(app.config['LOOKS_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['LOOKS_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['LOOKS_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
look_id = data.get('look_id') or filename.replace('.json', '')
current_ids.append(look_id)
slug = re.sub(r'[^a-zA-Z0-9_]', '', look_id)
look = Look.query.filter_by(look_id=look_id).first()
name = data.get('look_name', look_id.replace('_', ' ').title())
character_id = data.get('character_id', None)
if look:
look.data = data
look.name = name
look.slug = slug
look.filename = filename
look.character_id = character_id
if look.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], look.image_path)
if not os.path.exists(full_img_path):
look.image_path = None
flag_modified(look, "data")
else:
new_look = Look(
look_id=look_id,
slug=slug,
filename=filename,
name=name,
character_id=character_id,
data=data
)
db.session.add(new_look)
except Exception as e:
print(f"Error importing look {filename}: {e}")
all_looks = Look.query.all()
for look in all_looks:
if look.look_id not in current_ids:
db.session.delete(look)
db.session.commit()
def sync_actions():
if not os.path.exists(app.config['ACTIONS_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['ACTIONS_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['ACTIONS_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
action_id = data.get('action_id') or filename.replace('.json', '')
current_ids.append(action_id)
# Generate URL-safe slug
slug = re.sub(r'[^a-zA-Z0-9_]', '', action_id)
# Check if action already exists
action = Action.query.filter_by(action_id=action_id).first()
name = data.get('action_name', action_id.replace('_', ' ').title())
if action:
action.data = data
action.name = name
action.slug = slug
action.filename = filename
# Check if cover image still exists
if action.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], action.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {action.name}, clearing path.")
action.image_path = None
flag_modified(action, "data")
else:
new_action = Action(
action_id=action_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_action)
except Exception as e:
print(f"Error importing action {filename}: {e}")
# Remove actions that are no longer in the folder
all_actions = Action.query.all()
for action in all_actions:
if action.action_id not in current_ids:
db.session.delete(action)
db.session.commit()
def sync_styles():
if not os.path.exists(app.config['STYLES_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['STYLES_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['STYLES_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
style_id = data.get('style_id') or filename.replace('.json', '')
current_ids.append(style_id)
# Generate URL-safe slug
slug = re.sub(r'[^a-zA-Z0-9_]', '', style_id)
# Check if style already exists
style = Style.query.filter_by(style_id=style_id).first()
name = data.get('style_name', style_id.replace('_', ' ').title())
if style:
style.data = data
style.name = name
style.slug = slug
style.filename = filename
# Check if cover image still exists
if style.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], style.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {style.name}, clearing path.")
style.image_path = None
flag_modified(style, "data")
else:
new_style = Style(
style_id=style_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_style)
except Exception as e:
print(f"Error importing style {filename}: {e}")
# Remove styles that are no longer in the folder
all_styles = Style.query.all()
for style in all_styles:
if style.style_id not in current_ids:
db.session.delete(style)
db.session.commit()
def sync_detailers():
if not os.path.exists(app.config['DETAILERS_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['DETAILERS_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['DETAILERS_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
detailer_id = data.get('detailer_id') or filename.replace('.json', '')
current_ids.append(detailer_id)
# Generate URL-safe slug
slug = re.sub(r'[^a-zA-Z0-9_]', '', detailer_id)
# Check if detailer already exists
detailer = Detailer.query.filter_by(detailer_id=detailer_id).first()
name = data.get('detailer_name', detailer_id.replace('_', ' ').title())
if detailer:
detailer.data = data
detailer.name = name
detailer.slug = slug
detailer.filename = filename
# Check if cover image still exists
if detailer.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], detailer.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {detailer.name}, clearing path.")
detailer.image_path = None
flag_modified(detailer, "data")
else:
new_detailer = Detailer(
detailer_id=detailer_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_detailer)
except Exception as e:
print(f"Error importing detailer {filename}: {e}")
# Remove detailers that are no longer in the folder
all_detailers = Detailer.query.all()
for detailer in all_detailers:
if detailer.detailer_id not in current_ids:
db.session.delete(detailer)
db.session.commit()
def sync_scenes():
if not os.path.exists(app.config['SCENES_DIR']):
return
current_ids = []
for filename in os.listdir(app.config['SCENES_DIR']):
if filename.endswith('.json'):
file_path = os.path.join(app.config['SCENES_DIR'], filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
scene_id = data.get('scene_id') or filename.replace('.json', '')
current_ids.append(scene_id)
# Generate URL-safe slug
slug = re.sub(r'[^a-zA-Z0-9_]', '', scene_id)
# Check if scene already exists
scene = Scene.query.filter_by(scene_id=scene_id).first()
name = data.get('scene_name', scene_id.replace('_', ' ').title())
if scene:
scene.data = data
scene.name = name
scene.slug = slug
scene.filename = filename
# Check if cover image still exists
if scene.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], scene.image_path)
if not os.path.exists(full_img_path):
print(f"Image missing for {scene.name}, clearing path.")
scene.image_path = None
flag_modified(scene, "data")
else:
new_scene = Scene(
scene_id=scene_id,
slug=slug,
filename=filename,
name=name,
data=data
)
db.session.add(new_scene)
except Exception as e:
print(f"Error importing scene {filename}: {e}")
# Remove scenes that are no longer in the folder
all_scenes = Scene.query.all()
for scene in all_scenes:
if scene.scene_id not in current_ids:
db.session.delete(scene)
db.session.commit()
def _default_checkpoint_data(checkpoint_path, filename):
"""Return template-default data for a checkpoint with no JSON file."""
name_base = filename.rsplit('.', 1)[0]
return {
"checkpoint_path": checkpoint_path,
"checkpoint_name": filename,
"base_positive": "anime",
"base_negative": "text, logo",
"steps": 25,
"cfg": 5,
"sampler_name": "euler_ancestral",
"vae": "integrated"
}
def sync_checkpoints():
checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints')
os.makedirs(checkpoints_dir, exist_ok=True)
# Load all JSON data files keyed by checkpoint_path
json_data_by_path = {}
for filename in os.listdir(checkpoints_dir):
if filename.endswith('.json') and not filename.endswith('.template'):
file_path = os.path.join(checkpoints_dir, filename)
try:
with open(file_path, 'r') as f:
data = json.load(f)
ckpt_path = data.get('checkpoint_path')
if ckpt_path:
json_data_by_path[ckpt_path] = data
except Exception as e:
print(f"Error reading checkpoint JSON {filename}: {e}")
current_ids = []
dirs = [
(app.config.get('ILLUSTRIOUS_MODELS_DIR', ''), 'Illustrious'),
(app.config.get('NOOB_MODELS_DIR', ''), 'Noob'),
]
for dirpath, family in dirs:
if not dirpath or not os.path.exists(dirpath):
continue
for f in sorted(os.listdir(dirpath)):
if not (f.endswith('.safetensors') or f.endswith('.ckpt')):
continue
checkpoint_path = f"{family}/{f}"
checkpoint_id = checkpoint_path
slug = re.sub(r'[^a-zA-Z0-9_]', '_', checkpoint_path.rsplit('.', 1)[0]).lower().strip('_')
name_base = f.rsplit('.', 1)[0]
friendly_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).strip().title()
current_ids.append(checkpoint_id)
data = json_data_by_path.get(checkpoint_path,
_default_checkpoint_data(checkpoint_path, f))
display_name = data.get('checkpoint_name', f).rsplit('.', 1)[0]
display_name = re.sub(r'[^a-zA-Z0-9]+', ' ', display_name).strip().title() or friendly_name
ckpt = Checkpoint.query.filter_by(checkpoint_id=checkpoint_id).first()
if ckpt:
ckpt.name = display_name
ckpt.slug = slug
ckpt.checkpoint_path = checkpoint_path
ckpt.data = data
flag_modified(ckpt, "data")
if ckpt.image_path:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], ckpt.image_path)
if not os.path.exists(full_img_path):
ckpt.image_path = None
else:
db.session.add(Checkpoint(
checkpoint_id=checkpoint_id,
slug=slug,
name=display_name,
checkpoint_path=checkpoint_path,
data=data,
))
all_ckpts = Checkpoint.query.all()
for ckpt in all_ckpts:
if ckpt.checkpoint_id not in current_ids:
db.session.delete(ckpt)
db.session.commit()
DANBOORU_TOOLS = [
{
"type": "function",
"function": {
"name": "search_tags",
"description": "Prefix/full-text search for Danbooru tags. Returns rich tag objects ordered by relevance.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search string. Trailing * added automatically."},
"limit": {"type": "integer", "description": "Max results (1-200)", "default": 20},
"category": {"type": "string", "enum": ["general", "artist", "copyright", "character", "meta"], "description": "Optional category filter."}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "validate_tags",
"description": "Exact-match validation for a list of tags. Splits into valid, deprecated, and invalid.",
"parameters": {
"type": "object",
"properties": {
"tags": {"type": "array", "items": {"type": "string"}, "description": "Tags to validate."}
},
"required": ["tags"]
}
}
},
{
"type": "function",
"function": {
"name": "suggest_tags",
"description": "Autocomplete-style suggestions for a partial or approximate tag. Sorted by post count.",
"parameters": {
"type": "object",
"properties": {
"partial": {"type": "string", "description": "Partial tag or rough approximation."},
"limit": {"type": "integer", "description": "Max suggestions (1-50)", "default": 10},
"category": {"type": "string", "enum": ["general", "artist", "copyright", "character", "meta"], "description": "Optional category filter."}
},
"required": ["partial"]
}
}
}
]
async def _run_mcp_tool(name, arguments):
server_params = StdioServerParameters(
command="docker",
args=["run", "--rm", "-i", "danbooru-mcp:latest"],
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(name, arguments)
return result.content[0].text
def call_mcp_tool(name, arguments):
try:
return asyncio.run(_run_mcp_tool(name, arguments))
except Exception as e:
print(f"MCP Tool Error: {e}")
return json.dumps({"error": str(e)})
def load_prompt(filename):
path = os.path.join('data/prompts', filename)
if os.path.exists(path):
with open(path, 'r') as f:
return f.read()
return None
def call_llm(prompt, system_prompt="You are a creative assistant."):
settings = Settings.query.first()
if not settings:
raise ValueError("Settings not configured.")
is_local = settings.llm_provider != 'openrouter'
if not is_local:
if not settings.openrouter_api_key:
raise ValueError("OpenRouter API Key not configured. Please configure it in Settings.")
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {settings.openrouter_api_key}",
"Content-Type": "application/json",
"HTTP-Referer": request.url_root,
"X-Title": "Character Browser"
}
model = settings.openrouter_model or 'google/gemini-2.0-flash-001'
else:
# Local provider (Ollama or LMStudio)
if not settings.local_base_url:
raise ValueError(f"{settings.llm_provider.title()} Base URL not configured.")
url = f"{settings.local_base_url.rstrip('/')}/chat/completions"
headers = {"Content-Type": "application/json"}
model = settings.local_model
if not model:
raise ValueError(f"No local model selected for {settings.llm_provider.title()}. Please select one in Settings.")
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
]
max_turns = 10
use_tools = True
format_retries = 3 # retries allowed for unexpected response format
while max_turns > 0:
max_turns -= 1
data = {
"model": model,
"messages": messages,
}
# Only add tools if supported/requested
if use_tools:
data["tools"] = DANBOORU_TOOLS
data["tool_choice"] = "auto"
try:
response = requests.post(url, headers=headers, json=data)
# If 400 Bad Request and we were using tools, try once without tools
if response.status_code == 400 and use_tools:
print(f"LLM Provider {settings.llm_provider} rejected tools. Retrying without tool calling...")
use_tools = False
max_turns += 1 # Reset turn for the retry
continue
response.raise_for_status()
result = response.json()
# Validate expected OpenAI-compatible response shape
if 'choices' not in result or not result['choices']:
raise KeyError('choices')
message = result['choices'][0].get('message')
if message is None:
raise KeyError('message')
if message.get('tool_calls'):
messages.append(message)
for tool_call in message['tool_calls']:
name = tool_call['function']['name']
args = json.loads(tool_call['function']['arguments'])
print(f"Executing MCP tool: {name}({args})")
tool_result = call_mcp_tool(name, args)
messages.append({
"role": "tool",
"tool_call_id": tool_call['id'],
"name": name,
"content": tool_result
})
continue
return message['content']
except requests.exceptions.RequestException as e:
error_body = ""
try: error_body = f" - Body: {response.text}"
except: pass
raise RuntimeError(f"LLM API request failed: {str(e)}{error_body}") from e
except (KeyError, IndexError) as e:
# Log the raw response to help diagnose the issue
raw = ""
try: raw = response.text[:500]
except: pass
print(f"Unexpected LLM response format (key={e}). Raw response: {raw}")
if format_retries > 0:
format_retries -= 1
max_turns += 1 # don't burn a turn on a format error
# Ask the model to try again with the correct format
messages.append({
"role": "user",
"content": (
"Your previous response was not in the expected format. "
"Please respond with valid JSON only, exactly as specified in the system prompt. "
"Do not include any explanation or markdown — only the raw JSON object."
)
})
print(f"Retrying after format error ({format_retries} retries left)…")
continue
raise RuntimeError(f"Unexpected LLM response format after retries: {str(e)}") from e
raise RuntimeError("LLM tool calling loop exceeded maximum turns")
@app.route('/get_openrouter_models', methods=['POST'])
def get_openrouter_models():
api_key = request.form.get('api_key')
if not api_key:
return {'error': 'API key is required'}, 400
headers = {"Authorization": f"Bearer {api_key}"}
try:
response = requests.get("https://openrouter.ai/api/v1/models", headers=headers)
response.raise_for_status()
models = response.json().get('data', [])
# Return simplified list of models
return {'models': [{'id': m['id'], 'name': m.get('name', m['id'])} for m in models]}
except Exception as e:
return {'error': str(e)}, 500
@app.route('/get_local_models', methods=['POST'])
def get_local_models():
base_url = request.form.get('base_url')
if not base_url:
return {'error': 'Base URL is required'}, 400
try:
response = requests.get(f"{base_url.rstrip('/')}/models")
response.raise_for_status()
models = response.json().get('data', [])
# Ollama/LMStudio often follow the same structure as OpenAI
return {'models': [{'id': m['id'], 'name': m.get('name', m['id'])} for m in models]}
except Exception as e:
return {'error': str(e)}, 500
@app.route('/settings', methods=['GET', 'POST'])
def settings():
settings = Settings.query.first()
if not settings:
settings = Settings()
db.session.add(settings)
db.session.commit()
if request.method == 'POST':
settings.llm_provider = request.form.get('llm_provider', 'openrouter')
settings.openrouter_api_key = request.form.get('api_key')
settings.openrouter_model = request.form.get('model')
settings.local_base_url = request.form.get('local_base_url')
settings.local_model = request.form.get('local_model')
db.session.commit()
flash('Settings updated successfully!')
return redirect(url_for('settings'))
return render_template('settings.html', settings=settings)
@app.route('/')
def index():
characters = Character.query.order_by(Character.name).all()
return render_template('index.html', characters=characters)
@app.route('/rescan', methods=['POST'])
def rescan():
sync_characters()
flash('Database synced with character files.')
return redirect(url_for('index'))
def build_extras_prompt(actions, outfits, scenes, styles, detailers):
"""Combine positive prompt text from all selected category items."""
parts = []
for action in actions:
data = action.data
lora = data.get('lora', {})
if lora.get('lora_triggers'):
parts.append(lora['lora_triggers'])
parts.extend(data.get('tags', []))
for key in ['full_body', 'additional']:
val = data.get('action', {}).get(key)
if val:
parts.append(val)
for outfit in outfits:
data = outfit.data
wardrobe = data.get('wardrobe', {})
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'accessories']:
val = wardrobe.get(key)
if val:
parts.append(val)
lora = data.get('lora', {})
if lora.get('lora_triggers'):
parts.append(lora['lora_triggers'])
parts.extend(data.get('tags', []))
for scene in scenes:
data = scene.data
scene_fields = data.get('scene', {})
for key in ['background', 'foreground', 'lighting']:
val = scene_fields.get(key)
if val:
parts.append(val)
lora = data.get('lora', {})
if lora.get('lora_triggers'):
parts.append(lora['lora_triggers'])
parts.extend(data.get('tags', []))
for style in styles:
data = style.data
style_fields = data.get('style', {})
if style_fields.get('artist_name'):
parts.append(f"by {style_fields['artist_name']}")
if style_fields.get('artistic_style'):
parts.append(style_fields['artistic_style'])
lora = data.get('lora', {})
if lora.get('lora_triggers'):
parts.append(lora['lora_triggers'])
for detailer in detailers:
data = detailer.data
prompt = data.get('prompt', '')
if isinstance(prompt, list):
parts.extend(p for p in prompt if p)
elif prompt:
parts.append(prompt)
lora = data.get('lora', {})
if lora.get('lora_triggers'):
parts.append(lora['lora_triggers'])
return ", ".join(p for p in parts if p)
@app.route('/generator', methods=['GET', 'POST'])
def generator():
characters = Character.query.order_by(Character.name).all()
checkpoints = get_available_checkpoints()
actions = Action.query.order_by(Action.name).all()
outfits = Outfit.query.order_by(Outfit.name).all()
scenes = Scene.query.order_by(Scene.name).all()
styles = Style.query.order_by(Style.name).all()
detailers = Detailer.query.order_by(Detailer.name).all()
if not checkpoints:
checkpoints = ["Noob/oneObsession_v19Atypical.safetensors"]
if request.method == 'POST':
char_slug = request.form.get('character')
checkpoint = request.form.get('checkpoint')
custom_positive = request.form.get('positive_prompt', '')
custom_negative = request.form.get('negative_prompt', '')
action_slugs = request.form.getlist('action_slugs')
outfit_slugs = request.form.getlist('outfit_slugs')
scene_slugs = request.form.getlist('scene_slugs')
style_slugs = request.form.getlist('style_slugs')
detailer_slugs = request.form.getlist('detailer_slugs')
override_prompt = request.form.get('override_prompt', '').strip()
width = request.form.get('width') or 1024
height = request.form.get('height') or 1024
character = Character.query.filter_by(slug=char_slug).first_or_404()
sel_actions = Action.query.filter(Action.slug.in_(action_slugs)).all() if action_slugs else []
sel_outfits = Outfit.query.filter(Outfit.slug.in_(outfit_slugs)).all() if outfit_slugs else []
sel_scenes = Scene.query.filter(Scene.slug.in_(scene_slugs)).all() if scene_slugs else []
sel_styles = Style.query.filter(Style.slug.in_(style_slugs)).all() if style_slugs else []
sel_detailers = Detailer.query.filter(Detailer.slug.in_(detailer_slugs)).all() if detailer_slugs else []
try:
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# Build base prompts from character defaults
prompts = build_prompt(character.data, default_fields=character.default_fields)
if override_prompt:
prompts["main"] = override_prompt
else:
extras = build_extras_prompt(sel_actions, sel_outfits, sel_scenes, sel_styles, sel_detailers)
combined = prompts["main"]
if extras:
combined = f"{combined}, {extras}"
if custom_positive:
combined = f"{combined}, {custom_positive}"
prompts["main"] = combined
# Prepare workflow - first selected item per category supplies its LoRA slot
ckpt_obj = Checkpoint.query.filter_by(checkpoint_path=checkpoint).first() if checkpoint else None
workflow = _prepare_workflow(
workflow, character, prompts, checkpoint, custom_negative,
outfit=sel_outfits[0] if sel_outfits else None,
action=sel_actions[0] if sel_actions else None,
style=sel_styles[0] if sel_styles else None,
detailer=sel_detailers[0] if sel_detailers else None,
scene=sel_scenes[0] if sel_scenes else None,
width=width,
height=height,
checkpoint_data=ckpt_obj.data if ckpt_obj else None,
)
print(f"Queueing generator prompt for {character.character_id}")
_finalize = _make_finalize('characters', character.slug)
label = f"Generator: {character.name}"
job = _enqueue_job(label, workflow, _finalize)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
flash("Generation queued.")
except Exception as e:
print(f"Generator error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error: {str(e)}")
return render_template('generator.html', characters=characters, checkpoints=checkpoints,
actions=actions, outfits=outfits, scenes=scenes,
styles=styles, detailers=detailers)
@app.route('/generator/preview_prompt', methods=['POST'])
def generator_preview_prompt():
char_slug = request.form.get('character')
if not char_slug:
return {'error': 'No character selected'}, 400
character = Character.query.filter_by(slug=char_slug).first()
if not character:
return {'error': 'Character not found'}, 404
action_slugs = request.form.getlist('action_slugs')
outfit_slugs = request.form.getlist('outfit_slugs')
scene_slugs = request.form.getlist('scene_slugs')
style_slugs = request.form.getlist('style_slugs')
detailer_slugs = request.form.getlist('detailer_slugs')
custom_positive = request.form.get('positive_prompt', '')
sel_actions = Action.query.filter(Action.slug.in_(action_slugs)).all() if action_slugs else []
sel_outfits = Outfit.query.filter(Outfit.slug.in_(outfit_slugs)).all() if outfit_slugs else []
sel_scenes = Scene.query.filter(Scene.slug.in_(scene_slugs)).all() if scene_slugs else []
sel_styles = Style.query.filter(Style.slug.in_(style_slugs)).all() if style_slugs else []
sel_detailers = Detailer.query.filter(Detailer.slug.in_(detailer_slugs)).all() if detailer_slugs else []
prompts = build_prompt(character.data, default_fields=character.default_fields)
extras = build_extras_prompt(sel_actions, sel_outfits, sel_scenes, sel_styles, sel_detailers)
combined = prompts["main"]
if extras:
combined = f"{combined}, {extras}"
if custom_positive:
combined = f"{combined}, {custom_positive}"
return {'prompt': combined}
@app.route('/character/<path:slug>')
def detail(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
# Load state from session
preferences = session.get(f'prefs_{slug}')
preview_image = session.get(f'preview_{slug}')
return render_template('detail.html', character=character, preferences=preferences, preview_image=preview_image)
@app.route('/create', methods=['GET', 'POST'])
def create_character():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
prompt = request.form.get('prompt', '')
use_llm = request.form.get('use_llm') == 'on'
# Auto-generate slug from name if not provided
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
# Validate slug
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'character'
# Find available filename (increment if exists)
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
# Check if LLM generation is requested
if use_llm:
if not prompt:
flash("Description is required when AI generation is enabled.")
return redirect(request.url)
# Generate JSON with LLM
system_prompt = load_prompt('character_system.txt')
if not system_prompt:
flash("System prompt file not found.")
return redirect(request.url)
try:
llm_response = call_llm(f"Create a character profile for '{name}' based on this description: {prompt}", system_prompt)
# Clean response (remove markdown if present)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
char_data = json.loads(clean_json)
# Enforce IDs
char_data['character_id'] = safe_slug
char_data['character_name'] = name
except Exception as e:
print(f"LLM error: {e}")
flash(f"Failed to generate character profile: {e}")
return redirect(request.url)
else:
# Create blank character template
char_data = {
"character_id": safe_slug,
"character_name": name,
"identity": {
"base_specs": "",
"hair": "",
"eyes": "",
"hands": "",
"arms": "",
"torso": "",
"pelvis": "",
"legs": "",
"feet": "",
"extra": ""
},
"defaults": {
"expression": "",
"pose": "",
"scene": ""
},
"wardrobe": {
"full_body": "",
"headwear": "",
"top": "",
"bottom": "",
"legwear": "",
"footwear": "",
"hands": "",
"accessories": ""
},
"styles": {
"aesthetic": "",
"primary_color": "",
"secondary_color": "",
"tertiary_color": ""
},
"lora": {
"lora_name": "",
"lora_weight": 1.0,
"lora_triggers": ""
},
"tags": []
}
try:
# Save file
file_path = os.path.join(app.config['CHARACTERS_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(char_data, f, indent=2)
# Add to DB
new_char = Character(
character_id=safe_slug,
slug=safe_slug,
filename=f"{safe_slug}.json",
name=name,
data=char_data
)
db.session.add(new_char)
db.session.commit()
flash('Character created successfully!')
return redirect(url_for('detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create character: {e}")
return redirect(request.url)
return render_template('create.html')
@app.route('/character/<path:slug>/edit', methods=['GET', 'POST'])
def edit_character(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
loras = get_available_loras()
char_looks = Look.query.filter_by(character_id=character.character_id).order_by(Look.name).all()
if request.method == 'POST':
try:
# 1. Update basic fields
character.name = request.form.get('character_name')
# 2. Rebuild the data dictionary
new_data = character.data.copy()
new_data['character_name'] = character.name
# Update nested sections (non-wardrobe)
for section in ['identity', 'defaults', 'styles', 'lora']:
if section in new_data:
for key in new_data[section]:
form_key = f"{section}_{key}"
if form_key in request.form:
val = request.form.get(form_key)
# Handle numeric weight
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data[section][key] = val
# LoRA weight randomization bounds (new fields not present in existing JSON)
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Handle wardrobe - support both nested and flat formats
wardrobe = new_data.get('wardrobe', {})
if 'default' in wardrobe and isinstance(wardrobe.get('default'), dict):
# New nested format - update each outfit
for outfit_name in wardrobe.keys():
for key in wardrobe[outfit_name].keys():
form_key = f"wardrobe_{outfit_name}_{key}"
if form_key in request.form:
wardrobe[outfit_name][key] = request.form.get(form_key)
new_data['wardrobe'] = wardrobe
else:
# Legacy flat format
if 'wardrobe' in new_data:
for key in new_data['wardrobe'].keys():
form_key = f"wardrobe_{key}"
if form_key in request.form:
new_data['wardrobe'][key] = request.form.get(form_key)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t]
character.data = new_data
flag_modified(character, "data")
# 3. Write back to JSON file
# Use the filename we stored during sync, or fallback to a sanitized ID
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Character profile updated successfully!')
return redirect(url_for('detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('edit.html', character=character, loras=loras, char_looks=char_looks)
@app.route('/character/<path:slug>/outfit/switch', methods=['POST'])
def switch_outfit(slug):
"""Switch the active outfit for a character."""
character = Character.query.filter_by(slug=slug).first_or_404()
outfit_name = request.form.get('outfit', 'default')
# Validate outfit exists
available_outfits = character.get_available_outfits()
if outfit_name in available_outfits:
character.active_outfit = outfit_name
db.session.commit()
flash(f'Switched to "{outfit_name}" outfit.')
else:
flash(f'Outfit "{outfit_name}" not found.', 'error')
return redirect(url_for('detail', slug=slug))
@app.route('/character/<path:slug>/outfit/add', methods=['POST'])
def add_outfit(slug):
"""Add a new outfit to a character."""
character = Character.query.filter_by(slug=slug).first_or_404()
outfit_name = request.form.get('outfit_name', '').strip()
if not outfit_name:
flash('Outfit name cannot be empty.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Sanitize outfit name for use as key
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', outfit_name.lower())
# Get wardrobe data
wardrobe = character.data.get('wardrobe', {})
# Ensure wardrobe is in new nested format
if 'default' not in wardrobe or not isinstance(wardrobe.get('default'), dict):
# Convert legacy format
wardrobe = {'default': wardrobe}
# Check if outfit already exists
if safe_name in wardrobe:
flash(f'Outfit "{safe_name}" already exists.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Create new outfit (copy from default as template)
default_outfit = wardrobe.get('default', {
'headwear': '', 'top': '', 'legwear': '',
'footwear': '', 'hands': '', 'accessories': ''
})
wardrobe[safe_name] = default_outfit.copy()
# Update character data
character.data['wardrobe'] = wardrobe
flag_modified(character, 'data')
# Save to JSON file
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(character.data, f, indent=2)
db.session.commit()
flash(f'Added new outfit "{safe_name}".')
return redirect(url_for('edit_character', slug=slug))
@app.route('/character/<path:slug>/outfit/delete', methods=['POST'])
def delete_outfit(slug):
"""Delete an outfit from a character."""
character = Character.query.filter_by(slug=slug).first_or_404()
outfit_name = request.form.get('outfit', '')
wardrobe = character.data.get('wardrobe', {})
# Cannot delete default
if outfit_name == 'default':
flash('Cannot delete the default outfit.', 'error')
return redirect(url_for('edit_character', slug=slug))
if outfit_name not in wardrobe:
flash(f'Outfit "{outfit_name}" not found.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Delete outfit
del wardrobe[outfit_name]
character.data['wardrobe'] = wardrobe
flag_modified(character, 'data')
# Switch active outfit if deleted was active
if character.active_outfit == outfit_name:
character.active_outfit = 'default'
# Save to JSON file
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(character.data, f, indent=2)
db.session.commit()
flash(f'Deleted outfit "{outfit_name}".')
return redirect(url_for('edit_character', slug=slug))
@app.route('/character/<path:slug>/outfit/rename', methods=['POST'])
def rename_outfit(slug):
"""Rename an outfit."""
character = Character.query.filter_by(slug=slug).first_or_404()
old_name = request.form.get('old_name', '')
new_name = request.form.get('new_name', '').strip()
if not new_name:
flash('New name cannot be empty.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Sanitize new name
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', new_name.lower())
wardrobe = character.data.get('wardrobe', {})
if old_name not in wardrobe:
flash(f'Outfit "{old_name}" not found.', 'error')
return redirect(url_for('edit_character', slug=slug))
if safe_name in wardrobe and safe_name != old_name:
flash(f'Outfit "{safe_name}" already exists.', 'error')
return redirect(url_for('edit_character', slug=slug))
# Rename (copy to new key, delete old)
wardrobe[safe_name] = wardrobe.pop(old_name)
character.data['wardrobe'] = wardrobe
flag_modified(character, 'data')
# Update active outfit if renamed was active
if character.active_outfit == old_name:
character.active_outfit = safe_name
# Save to JSON file
char_file = character.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', character.character_id)}.json"
file_path = os.path.join(app.config['CHARACTERS_DIR'], char_file)
with open(file_path, 'w') as f:
json.dump(character.data, f, indent=2)
db.session.commit()
flash(f'Renamed outfit "{old_name}" to "{safe_name}".')
return redirect(url_for('edit_character', slug=slug))
@app.route('/character/<path:slug>/upload', methods=['POST'])
def upload_image(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create character subfolder
char_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"characters/{slug}")
os.makedirs(char_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(char_folder, filename)
file.save(file_path)
# Store relative path in DB
character.image_path = f"characters/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('detail', slug=slug))
@app.route('/character/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_cover_from_preview(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_{slug}')
if preview_path:
character.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('detail', slug=slug))
def _log_workflow_prompts(label, workflow):
"""Print the final assembled ComfyUI prompts in a consistent, readable block."""
sep = "=" * 72
print(f"\n{sep}")
print(f" WORKFLOW PROMPTS [{label}]")
print(sep)
print(f" Checkpoint : {workflow['4']['inputs'].get('ckpt_name', '(not set)')}")
print(f" Seed : {workflow['3']['inputs'].get('seed', '(not set)')}")
print(f" Resolution : {workflow['5']['inputs'].get('width', '?')} x {workflow['5']['inputs'].get('height', '?')}")
print(f" Sampler : {workflow['3']['inputs'].get('sampler_name', '?')} / {workflow['3']['inputs'].get('scheduler', '?')} steps={workflow['3']['inputs'].get('steps', '?')} cfg={workflow['3']['inputs'].get('cfg', '?')}")
# LoRA chain summary
active_loras = []
for node_id, label_str in [("16", "char/look"), ("17", "outfit"), ("18", "action"), ("19", "style/detail/scene")]:
if node_id in workflow:
name = workflow[node_id]["inputs"].get("lora_name", "")
if name:
w = workflow[node_id]["inputs"].get("strength_model", "?")
active_loras.append(f"{label_str}:{name.split('/')[-1]}@{w:.3f}" if isinstance(w, float) else f"{label_str}:{name.split('/')[-1]}@{w}")
print(f" LoRAs : {' | '.join(active_loras) if active_loras else '(none)'}")
print(f" [+] Positive : {workflow['6']['inputs'].get('text', '')}")
print(f" [-] Negative : {workflow['7']['inputs'].get('text', '')}")
face_text = workflow.get('14', {}).get('inputs', {}).get('text', '')
hand_text = workflow.get('15', {}).get('inputs', {}).get('text', '')
if face_text:
print(f" [F] Face : {face_text}")
if hand_text:
print(f" [H] Hand : {hand_text}")
print(f"{sep}\n")
def _prepare_workflow(workflow, character, prompts, checkpoint=None, custom_negative=None, outfit=None, action=None, style=None, detailer=None, scene=None, width=None, height=None, checkpoint_data=None, look=None, fixed_seed=None):
# 1. Update prompts using replacement to preserve embeddings
workflow["6"]["inputs"]["text"] = workflow["6"]["inputs"]["text"].replace("{{POSITIVE_PROMPT}}", prompts["main"])
if custom_negative:
workflow["7"]["inputs"]["text"] = f"{workflow['7']['inputs']['text']}, {custom_negative}"
if "14" in workflow:
workflow["14"]["inputs"]["text"] = workflow["14"]["inputs"]["text"].replace("{{FACE_PROMPT}}", prompts["face"])
if "15" in workflow:
workflow["15"]["inputs"]["text"] = workflow["15"]["inputs"]["text"].replace("{{HAND_PROMPT}}", prompts["hand"])
# 2. Update Checkpoint
if checkpoint:
workflow["4"]["inputs"]["ckpt_name"] = checkpoint
# 3. Handle LoRAs - Node 16 for character, Node 17 for outfit, Node 18 for action, Node 19 for style/detailer
# Start with direct checkpoint connections
model_source = ["4", 0]
clip_source = ["4", 1]
# Look negative prompt (applied before character LoRA)
if look:
look_negative = look.data.get('negative', '')
if look_negative:
workflow["7"]["inputs"]["text"] = f"{look_negative}, {workflow['7']['inputs']['text']}"
# Character LoRA (Node 16) — look LoRA overrides character LoRA when present
if look:
char_lora_data = look.data.get('lora', {})
else:
char_lora_data = character.data.get('lora', {}) if character else {}
char_lora_name = char_lora_data.get('lora_name')
if char_lora_name and "16" in workflow:
_w16 = _resolve_lora_weight(char_lora_data)
workflow["16"]["inputs"]["lora_name"] = char_lora_name
workflow["16"]["inputs"]["strength_model"] = _w16
workflow["16"]["inputs"]["strength_clip"] = _w16
workflow["16"]["inputs"]["model"] = ["4", 0] # From checkpoint
workflow["16"]["inputs"]["clip"] = ["4", 1] # From checkpoint
model_source = ["16", 0]
clip_source = ["16", 1]
print(f"Character LoRA: {char_lora_name} @ {_w16}")
# Outfit LoRA (Node 17) - chains from character LoRA or checkpoint
outfit_lora_data = outfit.data.get('lora', {}) if outfit else {}
outfit_lora_name = outfit_lora_data.get('lora_name')
if outfit_lora_name and "17" in workflow:
_w17 = _resolve_lora_weight({**{'lora_weight': 0.8}, **outfit_lora_data})
workflow["17"]["inputs"]["lora_name"] = outfit_lora_name
workflow["17"]["inputs"]["strength_model"] = _w17
workflow["17"]["inputs"]["strength_clip"] = _w17
# Chain from character LoRA (node 16) or checkpoint (node 4)
workflow["17"]["inputs"]["model"] = model_source
workflow["17"]["inputs"]["clip"] = clip_source
model_source = ["17", 0]
clip_source = ["17", 1]
print(f"Outfit LoRA: {outfit_lora_name} @ {_w17}")
# Action LoRA (Node 18) - chains from previous LoRA or checkpoint
action_lora_data = action.data.get('lora', {}) if action else {}
action_lora_name = action_lora_data.get('lora_name')
if action_lora_name and "18" in workflow:
_w18 = _resolve_lora_weight(action_lora_data)
workflow["18"]["inputs"]["lora_name"] = action_lora_name
workflow["18"]["inputs"]["strength_model"] = _w18
workflow["18"]["inputs"]["strength_clip"] = _w18
# Chain from previous source
workflow["18"]["inputs"]["model"] = model_source
workflow["18"]["inputs"]["clip"] = clip_source
model_source = ["18", 0]
clip_source = ["18", 1]
print(f"Action LoRA: {action_lora_name} @ {_w18}")
# Style/Detailer/Scene LoRA (Node 19) - chains from previous LoRA or checkpoint
# Priority: Style > Detailer > Scene (Scene LoRAs are rare but supported)
target_obj = style or detailer or scene
style_lora_data = target_obj.data.get('lora', {}) if target_obj else {}
style_lora_name = style_lora_data.get('lora_name')
if style_lora_name and "19" in workflow:
_w19 = _resolve_lora_weight(style_lora_data)
workflow["19"]["inputs"]["lora_name"] = style_lora_name
workflow["19"]["inputs"]["strength_model"] = _w19
workflow["19"]["inputs"]["strength_clip"] = _w19
# Chain from previous source
workflow["19"]["inputs"]["model"] = model_source
workflow["19"]["inputs"]["clip"] = clip_source
model_source = ["19", 0]
clip_source = ["19", 1]
print(f"Style/Detailer LoRA: {style_lora_name} @ {_w19}")
# Apply connections to all model/clip consumers
workflow["3"]["inputs"]["model"] = model_source
workflow["11"]["inputs"]["model"] = model_source
workflow["13"]["inputs"]["model"] = model_source
workflow["6"]["inputs"]["clip"] = clip_source
workflow["7"]["inputs"]["clip"] = clip_source
workflow["11"]["inputs"]["clip"] = clip_source
workflow["13"]["inputs"]["clip"] = clip_source
workflow["14"]["inputs"]["clip"] = clip_source
workflow["15"]["inputs"]["clip"] = clip_source
# 4. Randomize seeds (or use a fixed seed for reproducible batches like Strengths Gallery)
gen_seed = fixed_seed if fixed_seed is not None else random.randint(1, 10**15)
workflow["3"]["inputs"]["seed"] = gen_seed
if "11" in workflow: workflow["11"]["inputs"]["seed"] = gen_seed
if "13" in workflow: workflow["13"]["inputs"]["seed"] = gen_seed
# 5. Set image dimensions
if "5" in workflow:
if width:
workflow["5"]["inputs"]["width"] = int(width)
if height:
workflow["5"]["inputs"]["height"] = int(height)
# 6. Apply checkpoint-specific settings (steps, cfg, sampler, base prompts, VAE)
if checkpoint_data:
workflow = _apply_checkpoint_settings(workflow, checkpoint_data)
# 7. Sync sampler/scheduler from main KSampler to adetailer nodes
sampler_name = workflow["3"]["inputs"].get("sampler_name")
scheduler = workflow["3"]["inputs"].get("scheduler")
for node_id in ["11", "13"]:
if node_id in workflow:
if sampler_name:
workflow[node_id]["inputs"]["sampler_name"] = sampler_name
if scheduler:
workflow[node_id]["inputs"]["scheduler"] = scheduler
# 8. Cross-deduplicate: remove tags shared between positive and negative
pos_text, neg_text = _cross_dedup_prompts(
workflow["6"]["inputs"]["text"],
workflow["7"]["inputs"]["text"]
)
workflow["6"]["inputs"]["text"] = pos_text
workflow["7"]["inputs"]["text"] = neg_text
# 9. Final prompt debug — logged after all transformations are complete
_log_workflow_prompts("_prepare_workflow", workflow)
return workflow
def _get_default_checkpoint():
"""Return (checkpoint_path, checkpoint_data) from the session default, or (None, None)."""
ckpt_path = session.get('default_checkpoint')
if not ckpt_path:
return None, None
ckpt = Checkpoint.query.filter_by(checkpoint_path=ckpt_path).first()
if not ckpt:
return None, None
return ckpt.checkpoint_path, ckpt.data or {}
def _queue_generation(character, action='preview', selected_fields=None, client_id=None):
# 1. Load workflow
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# 2. Build prompts with active outfit
prompts = build_prompt(character.data, selected_fields, character.default_fields, character.active_outfit)
# 3. Prepare workflow
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
return queue_prompt(workflow, client_id=client_id)
@app.route('/get_missing_characters')
def get_missing_characters():
missing = Character.query.filter((Character.image_path == None) | (Character.image_path == '')).all()
return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]}
@app.route('/clear_all_covers', methods=['POST'])
def clear_all_covers():
characters = Character.query.all()
for char in characters:
char.image_path = None
db.session.commit()
return {'success': True}
@app.route('/generate_missing', methods=['POST'])
def generate_missing():
missing = Character.query.filter(
(Character.image_path == None) | (Character.image_path == '')
).order_by(Character.name).all()
if not missing:
flash("No characters missing cover images.")
return redirect(url_for('index'))
enqueued = 0
for character in missing:
try:
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
prompts = build_prompt(character.data, None, character.default_fields, character.active_outfit)
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
_slug = character.slug
_enqueue_job(f"{character.name} cover", workflow, _make_finalize('characters', _slug, Character))
enqueued += 1
except Exception as e:
print(f"Error queuing cover generation for {character.name}: {e}")
flash(f"Queued {enqueued} cover image generation job{'s' if enqueued != 1 else ''}.")
return redirect(url_for('index'))
@app.route('/character/<path:slug>/generate', methods=['POST'])
def generate_image(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Save preferences
session[f'prefs_{slug}'] = selected_fields
# Build workflow
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
prompts = build_prompt(character.data, selected_fields, character.default_fields, character.active_outfit)
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
job = _enqueue_job(label, workflow, _make_finalize('characters', slug, Character, action))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
return redirect(url_for('detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('detail', slug=slug))
@app.route('/character/<path:slug>/save_defaults', methods=['POST'])
def save_defaults(slug):
character = Character.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
character.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this character!')
return redirect(url_for('detail', slug=slug))
@app.route('/get_missing_outfits')
def get_missing_outfits():
missing = Outfit.query.filter((Outfit.image_path == None) | (Outfit.image_path == '')).all()
return {'missing': [{'slug': o.slug, 'name': o.name} for o in missing]}
@app.route('/clear_all_outfit_covers', methods=['POST'])
def clear_all_outfit_covers():
outfits = Outfit.query.all()
for outfit in outfits:
outfit.image_path = None
db.session.commit()
return {'success': True}
@app.route('/get_missing_actions')
def get_missing_actions():
missing = Action.query.filter((Action.image_path == None) | (Action.image_path == '')).all()
return {'missing': [{'slug': a.slug, 'name': a.name} for a in missing]}
@app.route('/clear_all_action_covers', methods=['POST'])
def clear_all_action_covers():
actions = Action.query.all()
for action in actions:
action.image_path = None
db.session.commit()
return {'success': True}
@app.route('/get_missing_scenes')
def get_missing_scenes():
missing = Scene.query.filter((Scene.image_path == None) | (Scene.image_path == '')).all()
return {'missing': [{'slug': s.slug, 'name': s.name} for s in missing]}
@app.route('/clear_all_scene_covers', methods=['POST'])
def clear_all_scene_covers():
scenes = Scene.query.all()
for scene in scenes:
scene.image_path = None
db.session.commit()
return {'success': True}
# ============ OUTFIT ROUTES ============
@app.route('/outfits')
def outfits_index():
outfits = Outfit.query.order_by(Outfit.name).all()
return render_template('outfits/index.html', outfits=outfits)
@app.route('/outfits/rescan', methods=['POST'])
def rescan_outfits():
sync_outfits()
flash('Database synced with outfit files.')
return redirect(url_for('outfits_index'))
@app.route('/outfits/bulk_create', methods=['POST'])
def bulk_create_outfits_from_loras():
clothing_lora_dir = '/ImageModels/lora/Illustrious/Clothing/'
if not os.path.exists(clothing_lora_dir):
flash('Clothing LoRA directory not found.', 'error')
return redirect(url_for('outfits_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('outfit_system.txt')
if not system_prompt:
flash('Outfit system prompt file not found.', 'error')
return redirect(url_for('outfits_index'))
for filename in os.listdir(clothing_lora_dir):
if not filename.endswith('.safetensors'):
continue
name_base = filename.rsplit('.', 1)[0]
outfit_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
outfit_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{outfit_id}.json"
json_path = os.path.join(app.config['CLOTHING_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(clothing_lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe outfit: {outfit_name}")
prompt = f"Create an outfit profile for a clothing LoRA based on the filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text extracted from an associated HTML file:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
outfit_data = json.loads(clean_json)
outfit_data['outfit_id'] = outfit_id
outfit_data['outfit_name'] = outfit_name
if 'lora' not in outfit_data:
outfit_data['lora'] = {}
outfit_data['lora']['lora_name'] = f"Illustrious/Clothing/{filename}"
if not outfit_data['lora'].get('lora_triggers'):
outfit_data['lora']['lora_triggers'] = name_base
if outfit_data['lora'].get('lora_weight') is None:
outfit_data['lora']['lora_weight'] = 0.8
if outfit_data['lora'].get('lora_weight_min') is None:
outfit_data['lora']['lora_weight_min'] = 0.7
if outfit_data['lora'].get('lora_weight_max') is None:
outfit_data['lora']['lora_weight_max'] = 1.0
os.makedirs(app.config['CLOTHING_DIR'], exist_ok=True)
with open(json_path, 'w') as f:
json.dump(outfit_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
time.sleep(0.5)
except Exception as e:
print(f"Error creating outfit for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_outfits()
msg = f'Successfully processed outfits: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No outfits created or overwritten. {skipped_count} existing entries found.')
return redirect(url_for('outfits_index'))
@app.route('/outfit/<path:slug>')
def outfit_detail(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Load state from session
preferences = session.get(f'prefs_outfit_{slug}')
preview_image = session.get(f'preview_outfit_{slug}')
selected_character = session.get(f'char_outfit_{slug}')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"outfits/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"outfits/{slug}/{f}" for f in files]
return render_template('outfits/detail.html', outfit=outfit, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character, existing_previews=existing_previews)
@app.route('/outfit/<path:slug>/edit', methods=['GET', 'POST'])
def edit_outfit(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
loras = get_available_clothing_loras() # Use clothing LoRAs for outfits
if request.method == 'POST':
try:
# 1. Update basic fields
outfit.name = request.form.get('outfit_name')
# 2. Rebuild the data dictionary
new_data = outfit.data.copy()
new_data['outfit_name'] = outfit.name
# Update outfit_id if provided
new_outfit_id = request.form.get('outfit_id', outfit.outfit_id)
new_data['outfit_id'] = new_outfit_id
# Update wardrobe section
if 'wardrobe' in new_data:
for key in new_data['wardrobe'].keys():
form_key = f"wardrobe_{key}"
if form_key in request.form:
new_data['wardrobe'][key] = request.form.get(form_key)
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 0.8
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t]
outfit.data = new_data
flag_modified(outfit, "data")
# 3. Write back to JSON file
outfit_file = outfit.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', outfit.outfit_id)}.json"
file_path = os.path.join(app.config['CLOTHING_DIR'], outfit_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Outfit profile updated successfully!')
return redirect(url_for('outfit_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('outfits/edit.html', outfit=outfit, loras=loras)
@app.route('/outfit/<path:slug>/upload', methods=['POST'])
def upload_outfit_image(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create outfit subfolder
outfit_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"outfits/{slug}")
os.makedirs(outfit_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(outfit_folder, filename)
file.save(file_path)
# Store relative path in DB
outfit.image_path = f"outfits/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('outfit_detail', slug=slug))
@app.route('/outfit/<path:slug>/generate', methods=['POST'])
def generate_outfit_image(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = None
character = _resolve_character(character_slug)
if character_slug == '__random__' and character:
character_slug = character.slug
# Save preferences
session[f'prefs_outfit_{slug}'] = selected_fields
session[f'char_outfit_{slug}'] = character_slug
# Build combined data for prompt building
if character:
# Combine character identity/defaults with outfit wardrobe
combined_data = {
'character_id': character.character_id,
'identity': character.data.get('identity', {}),
'defaults': character.data.get('defaults', {}),
'wardrobe': outfit.data.get('wardrobe', {}), # Use outfit's wardrobe
'styles': character.data.get('styles', {}), # Use character's styles
'lora': outfit.data.get('lora', {}), # Use outfit's lora
'tags': outfit.data.get('tags', [])
}
# Merge character identity/defaults into selected_fields so they appear in the prompt
if selected_fields:
_ensure_character_fields(character, selected_fields,
include_wardrobe=False, include_defaults=True)
else:
# No explicit field selection (e.g. batch generation) — build a selection
# that includes identity + wardrobe + name + lora triggers, but NOT character
# defaults (expression, pose, scene), so outfit covers stay generic.
for key in ['base_specs', 'hair', 'eyes', 'hands', 'arms', 'torso', 'pelvis', 'legs', 'feet', 'extra']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
outfit_wardrobe = outfit.data.get('wardrobe', {})
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if outfit_wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
selected_fields.append('special::name')
if outfit.data.get('lora', {}).get('lora_triggers'):
selected_fields.append('lora::lora_triggers')
default_fields = character.default_fields
else:
# Outfit only - no character
combined_data = {
'character_id': outfit.outfit_id,
'wardrobe': outfit.data.get('wardrobe', {}),
'lora': outfit.data.get('lora', {}),
'tags': outfit.data.get('tags', [])
}
default_fields = outfit.default_fields
# Queue generation
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# Build prompts for combined data
prompts = build_prompt(combined_data, selected_fields, default_fields)
_append_background(prompts, character)
# Prepare workflow - pass both character and outfit for dual LoRA support
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, outfit=outfit, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
char_label = character.name if character else 'no character'
label = f"Outfit: {outfit.name} ({char_label}) {action}"
job = _enqueue_job(label, workflow, _make_finalize('outfits', slug, Outfit, action))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
return redirect(url_for('outfit_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('outfit_detail', slug=slug))
@app.route('/outfit/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_outfit_cover_from_preview(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_outfit_{slug}')
if preview_path:
outfit.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('outfit_detail', slug=slug))
@app.route('/outfit/create', methods=['GET', 'POST'])
def create_outfit():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
prompt = request.form.get('prompt', '')
use_llm = request.form.get('use_llm') == 'on'
# Auto-generate slug from name if not provided
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
# Validate slug
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'outfit'
# Find available filename (increment if exists)
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['CLOTHING_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
# Check if LLM generation is requested
if use_llm:
if not prompt:
flash("Description is required when AI generation is enabled.")
return redirect(request.url)
# Generate JSON with LLM
system_prompt = load_prompt('outfit_system.txt')
if not system_prompt:
flash("System prompt file not found.")
return redirect(request.url)
try:
llm_response = call_llm(f"Create an outfit profile for '{name}' based on this description: {prompt}", system_prompt)
# Clean response (remove markdown if present)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
outfit_data = json.loads(clean_json)
# Enforce IDs
outfit_data['outfit_id'] = safe_slug
outfit_data['outfit_name'] = name
# Ensure required fields exist
if 'wardrobe' not in outfit_data:
outfit_data['wardrobe'] = {
"full_body": "",
"headwear": "",
"top": "",
"bottom": "",
"legwear": "",
"footwear": "",
"hands": "",
"accessories": ""
}
if 'lora' not in outfit_data:
outfit_data['lora'] = {
"lora_name": "",
"lora_weight": 0.8,
"lora_triggers": ""
}
if 'tags' not in outfit_data:
outfit_data['tags'] = []
except Exception as e:
print(f"LLM error: {e}")
flash(f"Failed to generate outfit profile: {e}")
return redirect(request.url)
else:
# Create blank outfit template
outfit_data = {
"outfit_id": safe_slug,
"outfit_name": name,
"wardrobe": {
"full_body": "",
"headwear": "",
"top": "",
"bottom": "",
"legwear": "",
"footwear": "",
"hands": "",
"accessories": ""
},
"lora": {
"lora_name": "",
"lora_weight": 0.8,
"lora_triggers": ""
},
"tags": []
}
try:
# Save file
file_path = os.path.join(app.config['CLOTHING_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(outfit_data, f, indent=2)
# Add to DB
new_outfit = Outfit(
outfit_id=safe_slug,
slug=safe_slug,
filename=f"{safe_slug}.json",
name=name,
data=outfit_data
)
db.session.add(new_outfit)
db.session.commit()
flash('Outfit created successfully!')
return redirect(url_for('outfit_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create outfit: {e}")
return redirect(request.url)
return render_template('outfits/create.html')
@app.route('/outfit/<path:slug>/save_defaults', methods=['POST'])
def save_outfit_defaults(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
outfit.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this outfit!')
return redirect(url_for('outfit_detail', slug=slug))
@app.route('/outfit/<path:slug>/clone', methods=['POST'])
def clone_outfit(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
# Find the next available number for the clone
base_id = outfit.outfit_id
# Extract base name without number suffix
import re
match = re.match(r'^(.+?)_(\d+)$', base_id)
if match:
base_name = match.group(1)
current_num = int(match.group(2))
else:
base_name = base_id
current_num = 1
# Find next available number
next_num = current_num + 1
while True:
new_id = f"{base_name}_{next_num:02d}"
new_filename = f"{new_id}.json"
new_path = os.path.join(app.config['CLOTHING_DIR'], new_filename)
if not os.path.exists(new_path):
break
next_num += 1
# Create new outfit data (copy of original)
new_data = outfit.data.copy()
new_data['outfit_id'] = new_id
new_data['outfit_name'] = f"{outfit.name} (Copy)"
# Save the new JSON file
with open(new_path, 'w') as f:
json.dump(new_data, f, indent=2)
# Create new outfit in database
new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id)
new_outfit = Outfit(
outfit_id=new_id,
slug=new_slug,
filename=new_filename,
name=new_data['outfit_name'],
data=new_data
)
db.session.add(new_outfit)
db.session.commit()
flash(f'Outfit cloned as "{new_id}"!')
return redirect(url_for('outfit_detail', slug=new_slug))
@app.route('/outfit/<path:slug>/save_json', methods=['POST'])
def save_outfit_json(slug):
outfit = Outfit.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
outfit.data = new_data
flag_modified(outfit, 'data')
db.session.commit()
if outfit.filename:
file_path = os.path.join(app.config['CLOTHING_DIR'], outfit.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
# ============ ACTION ROUTES ============
@app.route('/actions')
def actions_index():
actions = Action.query.order_by(Action.name).all()
return render_template('actions/index.html', actions=actions)
@app.route('/actions/rescan', methods=['POST'])
def rescan_actions():
sync_actions()
flash('Database synced with action files.')
return redirect(url_for('actions_index'))
@app.route('/action/<path:slug>')
def action_detail(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Load state from session
preferences = session.get(f'prefs_action_{slug}')
preview_image = session.get(f'preview_action_{slug}')
selected_character = session.get(f'char_action_{slug}')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"actions/{slug}/{f}" for f in files]
return render_template('actions/detail.html', action=action, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character, existing_previews=existing_previews)
@app.route('/action/<path:slug>/edit', methods=['GET', 'POST'])
def edit_action(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
loras = get_available_action_loras()
if request.method == 'POST':
try:
# 1. Update basic fields
action.name = request.form.get('action_name')
# 2. Rebuild the data dictionary
new_data = action.data.copy()
new_data['action_name'] = action.name
# Update action_id if provided
new_action_id = request.form.get('action_id', action.action_id)
new_data['action_id'] = new_action_id
# Update action section
if 'action' in new_data:
for key in new_data['action'].keys():
form_key = f"action_{key}"
if form_key in request.form:
new_data['action'][key] = request.form.get(form_key)
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for f in tags_raw.split(',') for t in [f.strip()] if t]
action.data = new_data
flag_modified(action, "data")
# 3. Write back to JSON file
action_file = action.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', action.action_id)}.json"
file_path = os.path.join(app.config['ACTIONS_DIR'], action_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Action profile updated successfully!')
return redirect(url_for('action_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('actions/edit.html', action=action, loras=loras)
@app.route('/action/<path:slug>/upload', methods=['POST'])
def upload_action_image(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create action subfolder
action_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}")
os.makedirs(action_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(action_folder, filename)
file.save(file_path)
# Store relative path in DB
action.image_path = f"actions/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('action_detail', slug=slug))
@app.route('/action/<path:slug>/generate', methods=['POST'])
def generate_action_image(slug):
action_obj = Action.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action_type = request.form.get('action', 'preview')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = None
character = _resolve_character(character_slug)
if character_slug == '__random__' and character:
character_slug = character.slug
# Save preferences
session[f'char_action_{slug}'] = character_slug
session[f'prefs_action_{slug}'] = selected_fields
# Build combined data for prompt building
if character:
# Combine character identity/wardrobe with action details
# Action details replace character's 'defaults' (pose, etc.)
combined_data = character.data.copy()
# Update 'defaults' with action details
action_data = action_obj.data.get('action', {})
combined_data['action'] = action_data # Ensure action section is present for routing
combined_data['participants'] = action_obj.data.get('participants', {}) # Add participants
# Aggregate pose-related fields into 'pose'
pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet']
pose_parts = [action_data.get(k) for k in pose_fields if action_data.get(k)]
# Aggregate expression-related fields into 'expression'
expression_parts = [action_data.get(k) for k in ['head', 'eyes'] if action_data.get(k)]
combined_data['defaults'] = {
'pose': ", ".join(pose_parts),
'expression': ", ".join(expression_parts),
'scene': action_data.get('additional', '')
}
# Merge lora triggers if present
action_lora = action_obj.data.get('lora', {})
if action_lora.get('lora_triggers'):
if 'lora' not in combined_data: combined_data['lora'] = {}
combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {action_lora['lora_triggers']}"
# Merge tags
combined_data['tags'] = list(set(combined_data.get('tags', []) + action_obj.data.get('tags', [])))
# Use action's defaults if no manual selection
if not selected_fields:
selected_fields = list(action_obj.default_fields) if action_obj.default_fields else []
# Auto-include essential character fields if a character is selected
if selected_fields:
_ensure_character_fields(character, selected_fields)
else:
# Fallback to sensible defaults if still empty (no checkboxes and no action defaults)
selected_fields = ['special::name', 'defaults::pose', 'defaults::expression']
# Add identity fields
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
# Add wardrobe fields
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'headwear', 'top', 'bottom', 'legwear', 'footwear', 'hands', 'gloves', 'accessories']:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
default_fields = action_obj.default_fields
active_outfit = character.active_outfit
else:
# Action only - no character (rarely makes sense for actions but let's handle it)
action_data = action_obj.data.get('action', {})
# Aggregate pose-related fields into 'pose'
pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet']
pose_parts = [action_data.get(k) for k in pose_fields if action_data.get(k)]
# Aggregate expression-related fields into 'expression'
expression_parts = [action_data.get(k) for k in ['head', 'eyes'] if action_data.get(k)]
combined_data = {
'character_id': action_obj.action_id,
'defaults': {
'pose': ", ".join(pose_parts),
'expression': ", ".join(expression_parts),
'scene': action_data.get('additional', '')
},
'lora': action_obj.data.get('lora', {}),
'tags': action_obj.data.get('tags', [])
}
if not selected_fields:
selected_fields = ['defaults::pose', 'defaults::expression', 'defaults::scene', 'lora::lora_triggers', 'special::tags']
default_fields = action_obj.default_fields
active_outfit = 'default'
# Queue generation
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# Build prompts for combined data
prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit)
# Handle multiple female characters
participants = action_obj.data.get('participants', {})
orientation = participants.get('orientation', '')
f_count = orientation.upper().count('F')
if f_count > 1:
# We need f_count - 1 additional characters
num_extras = f_count - 1
# Get all characters excluding the current one
query = Character.query
if character:
query = query.filter(Character.id != character.id)
all_others = query.all()
if len(all_others) >= num_extras:
extras = random.sample(all_others, num_extras)
for extra_char in extras:
extra_parts = []
# Identity
ident = extra_char.data.get('identity', {})
for key in ['base_specs', 'hair', 'eyes', 'extra']:
val = ident.get(key)
if val:
# Remove 1girl/solo
val = re.sub(r'\b(1girl|1boy|solo)\b', '', val).replace(', ,', ',').strip(', ')
extra_parts.append(val)
# Wardrobe (active outfit)
wardrobe = extra_char.get_active_wardrobe()
for key in ['top', 'headwear', 'legwear', 'footwear', 'accessories']:
val = wardrobe.get(key)
if val:
extra_parts.append(val)
# Append to main prompt
if extra_parts:
prompts["main"] += ", " + ", ".join(extra_parts)
print(f"Added extra character: {extra_char.name}")
_append_background(prompts, character)
# Prepare workflow
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, action=action_obj, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
char_label = character.name if character else 'no character'
label = f"Action: {action_obj.name} ({char_label}) {action_type}"
job = _enqueue_job(label, workflow, _make_finalize('actions', slug, Action, action_type))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
return redirect(url_for('action_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('action_detail', slug=slug))
@app.route('/action/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_action_cover_from_preview(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_action_{slug}')
if preview_path:
action.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('action_detail', slug=slug))
@app.route('/action/<path:slug>/save_defaults', methods=['POST'])
def save_action_defaults(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
action.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this action!')
return redirect(url_for('action_detail', slug=slug))
@app.route('/actions/bulk_create', methods=['POST'])
def bulk_create_actions_from_loras():
actions_lora_dir = '/ImageModels/lora/Illustrious/Poses/'
if not os.path.exists(actions_lora_dir):
flash('Actions LoRA directory not found.', 'error')
return redirect(url_for('actions_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('action_system.txt')
if not system_prompt:
flash('Action system prompt file not found.', 'error')
return redirect(url_for('actions_index'))
for filename in os.listdir(actions_lora_dir):
if filename.endswith('.safetensors'):
name_base = filename.rsplit('.', 1)[0]
action_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
action_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{action_id}.json"
json_path = os.path.join(app.config['ACTIONS_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(actions_lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
# Strip HTML tags but keep text content for LLM context
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe action: {action_name}")
prompt = f"Describe an action/pose for an AI image generation model based on the LoRA filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
# Clean response
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
action_data = json.loads(clean_json)
# Enforce system values while preserving LLM-extracted metadata
action_data['action_id'] = action_id
action_data['action_name'] = action_name
# Update lora dict safely
if 'lora' not in action_data: action_data['lora'] = {}
action_data['lora']['lora_name'] = f"Illustrious/Poses/{filename}"
# Fallbacks if LLM failed to extract metadata
if not action_data['lora'].get('lora_triggers'):
action_data['lora']['lora_triggers'] = name_base
if action_data['lora'].get('lora_weight') is None:
action_data['lora']['lora_weight'] = 1.0
if action_data['lora'].get('lora_weight_min') is None:
action_data['lora']['lora_weight_min'] = 0.7
if action_data['lora'].get('lora_weight_max') is None:
action_data['lora']['lora_weight_max'] = 1.0
with open(json_path, 'w') as f:
json.dump(action_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
# Small delay to avoid API rate limits if many files
time.sleep(0.5)
except Exception as e:
print(f"Error creating action for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_actions()
msg = f'Successfully processed actions: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No actions created or overwritten. {skipped_count} existing actions found.')
return redirect(url_for('actions_index'))
@app.route('/action/create', methods=['GET', 'POST'])
def create_action():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
prompt = request.form.get('prompt', '')
use_llm = request.form.get('use_llm') == 'on'
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'action'
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['ACTIONS_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
if use_llm:
if not prompt:
flash("Description is required when AI generation is enabled.")
return redirect(request.url)
system_prompt = load_prompt('action_system.txt')
if not system_prompt:
flash("Action system prompt file not found.")
return redirect(request.url)
try:
llm_response = call_llm(f"Create an action profile for '{name}' based on this description: {prompt}", system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
action_data = json.loads(clean_json)
action_data['action_id'] = safe_slug
action_data['action_name'] = name
except Exception as e:
print(f"LLM error: {e}")
flash(f"Failed to generate action profile: {e}")
return redirect(request.url)
else:
action_data = {
"action_id": safe_slug,
"action_name": name,
"action": {
"full_body": "", "head": "", "eyes": "", "arms": "", "hands": "",
"torso": "", "pelvis": "", "legs": "", "feet": "", "additional": ""
},
"lora": {"lora_name": "", "lora_weight": 1.0, "lora_triggers": ""},
"tags": []
}
try:
file_path = os.path.join(app.config['ACTIONS_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(action_data, f, indent=2)
new_action = Action(
action_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json",
name=name, data=action_data
)
db.session.add(new_action)
db.session.commit()
flash('Action created successfully!')
return redirect(url_for('action_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create action: {e}")
return redirect(request.url)
return render_template('actions/create.html')
@app.route('/action/<path:slug>/clone', methods=['POST'])
def clone_action(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
# Find the next available number for the clone
base_id = action.action_id
import re
match = re.match(r'^(.+?)_(\d+)$', base_id)
if match:
base_name = match.group(1)
current_num = int(match.group(2))
else:
base_name = base_id
current_num = 1
next_num = current_num + 1
while True:
new_id = f"{base_name}_{next_num:02d}"
new_filename = f"{new_id}.json"
new_path = os.path.join(app.config['ACTIONS_DIR'], new_filename)
if not os.path.exists(new_path):
break
next_num += 1
new_data = action.data.copy()
new_data['action_id'] = new_id
new_data['action_name'] = f"{action.name} (Copy)"
with open(new_path, 'w') as f:
json.dump(new_data, f, indent=2)
new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id)
new_action = Action(
action_id=new_id, slug=new_slug, filename=new_filename,
name=new_data['action_name'], data=new_data
)
db.session.add(new_action)
db.session.commit()
flash(f'Action cloned as "{new_id}"!')
return redirect(url_for('action_detail', slug=new_slug))
@app.route('/action/<path:slug>/save_json', methods=['POST'])
def save_action_json(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
action.data = new_data
flag_modified(action, 'data')
db.session.commit()
if action.filename:
file_path = os.path.join(app.config['ACTIONS_DIR'], action.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
# ============ STYLE ROUTES ============
@app.route('/styles')
def styles_index():
styles = Style.query.order_by(Style.name).all()
return render_template('styles/index.html', styles=styles)
@app.route('/styles/rescan', methods=['POST'])
def rescan_styles():
sync_styles()
flash('Database synced with style files.')
return redirect(url_for('styles_index'))
@app.route('/style/<path:slug>')
def style_detail(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Load state from session
preferences = session.get(f'prefs_style_{slug}')
preview_image = session.get(f'preview_style_{slug}')
selected_character = session.get(f'char_style_{slug}')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"styles/{slug}/{f}" for f in files]
return render_template('styles/detail.html', style=style, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character, existing_previews=existing_previews)
@app.route('/style/<path:slug>/edit', methods=['GET', 'POST'])
def edit_style(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
loras = get_available_style_loras()
if request.method == 'POST':
try:
# 1. Update basic fields
style.name = request.form.get('style_name')
# 2. Rebuild the data dictionary
new_data = style.data.copy()
new_data['style_name'] = style.name
# Update style section
if 'style' in new_data:
for key in new_data['style'].keys():
form_key = f"style_{key}"
if form_key in request.form:
new_data['style'][key] = request.form.get(form_key)
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
style.data = new_data
flag_modified(style, "data")
# 3. Write back to JSON file
style_file = style.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', style.style_id)}.json"
file_path = os.path.join(app.config['STYLES_DIR'], style_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Style updated successfully!')
return redirect(url_for('style_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('styles/edit.html', style=style, loras=loras)
@app.route('/style/<path:slug>/upload', methods=['POST'])
def upload_style_image(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create style subfolder
style_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}")
os.makedirs(style_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(style_folder, filename)
file.save(file_path)
# Store relative path in DB
style.image_path = f"styles/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('style_detail', slug=slug))
def _build_style_workflow(style_obj, character=None, selected_fields=None):
"""Build and return a prepared ComfyUI workflow dict for a style generation."""
if character:
combined_data = character.data.copy()
combined_data['character_id'] = character.character_id
combined_data['style'] = style_obj.data.get('style', {})
# Merge style lora triggers if present
style_lora = style_obj.data.get('lora', {})
if style_lora.get('lora_triggers'):
if 'lora' not in combined_data: combined_data['lora'] = {}
combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {style_lora['lora_triggers']}"
# Merge character identity and wardrobe fields into selected_fields
if selected_fields:
_ensure_character_fields(character, selected_fields)
else:
# Auto-include essential character fields (minimal set for batch/default generation)
selected_fields = []
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
selected_fields.append('special::name')
wardrobe = character.get_active_wardrobe()
for key in _WARDROBE_KEYS:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
selected_fields.extend(['style::artist_name', 'style::artistic_style', 'lora::lora_triggers'])
default_fields = style_obj.default_fields
active_outfit = character.active_outfit
else:
combined_data = {
'character_id': style_obj.style_id,
'style': style_obj.data.get('style', {}),
'lora': style_obj.data.get('lora', {}),
'tags': style_obj.data.get('tags', [])
}
if not selected_fields:
selected_fields = ['style::artist_name', 'style::artistic_style', 'lora::lora_triggers']
default_fields = style_obj.default_fields
active_outfit = 'default'
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit)
_append_background(prompts, character)
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, style=style_obj, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
return workflow
@app.route('/style/<path:slug>/generate', methods=['POST'])
def generate_style_image(slug):
style_obj = Style.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = None
character = _resolve_character(character_slug)
if character_slug == '__random__' and character:
character_slug = character.slug
# Save preferences
session[f'char_style_{slug}'] = character_slug
session[f'prefs_style_{slug}'] = selected_fields
# Build workflow using helper (returns workflow dict, not prompt_response)
workflow = _build_style_workflow(style_obj, character, selected_fields)
char_label = character.name if character else 'no character'
label = f"Style: {style_obj.name} ({char_label}) {action}"
job = _enqueue_job(label, workflow, _make_finalize('styles', slug, Style, action))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
return redirect(url_for('style_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('style_detail', slug=slug))
@app.route('/style/<path:slug>/save_defaults', methods=['POST'])
def save_style_defaults(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
style.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this style!')
return redirect(url_for('style_detail', slug=slug))
@app.route('/style/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_style_cover_from_preview(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_style_{slug}')
if preview_path:
style.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('style_detail', slug=slug))
@app.route('/get_missing_styles')
def get_missing_styles():
missing = Style.query.filter((Style.image_path == None) | (Style.image_path == '')).all()
return {'missing': [{'slug': s.slug, 'name': s.name} for s in missing]}
@app.route('/get_missing_detailers')
def get_missing_detailers():
missing = Detailer.query.filter((Detailer.image_path == None) | (Detailer.image_path == '')).all()
return {'missing': [{'slug': d.slug, 'name': d.name} for d in missing]}
@app.route('/clear_all_detailer_covers', methods=['POST'])
def clear_all_detailer_covers():
detailers = Detailer.query.all()
for detailer in detailers:
detailer.image_path = None
db.session.commit()
return {'success': True}
@app.route('/clear_all_style_covers', methods=['POST'])
def clear_all_style_covers():
styles = Style.query.all()
for style in styles:
style.image_path = None
db.session.commit()
return {'success': True}
@app.route('/styles/generate_missing', methods=['POST'])
def generate_missing_styles():
missing = Style.query.filter(
(Style.image_path == None) | (Style.image_path == '')
).order_by(Style.name).all()
if not missing:
flash("No styles missing cover images.")
return redirect(url_for('styles_index'))
all_characters = Character.query.all()
if not all_characters:
flash("No characters available to preview styles with.", "error")
return redirect(url_for('styles_index'))
enqueued = 0
for style_obj in missing:
character = random.choice(all_characters)
try:
workflow = _build_style_workflow(style_obj, character=character)
_enqueue_job(f"Style: {style_obj.name} cover", workflow,
_make_finalize('styles', style_obj.slug, Style))
enqueued += 1
except Exception as e:
print(f"Error queuing cover generation for style {style_obj.name}: {e}")
flash(f"Queued {enqueued} style cover image generation job{'s' if enqueued != 1 else ''}.")
return redirect(url_for('styles_index'))
@app.route('/styles/bulk_create', methods=['POST'])
def bulk_create_styles_from_loras():
styles_lora_dir = '/ImageModels/lora/Illustrious/Styles/'
if not os.path.exists(styles_lora_dir):
flash('Styles LoRA directory not found.', 'error')
return redirect(url_for('styles_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('style_system.txt')
if not system_prompt:
flash('Style system prompt file not found.', 'error')
return redirect(url_for('styles_index'))
for filename in os.listdir(styles_lora_dir):
if filename.endswith('.safetensors'):
name_base = filename.rsplit('.', 1)[0]
style_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
style_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{style_id}.json"
json_path = os.path.join(app.config['STYLES_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(styles_lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe style: {style_name}")
prompt = f"Describe an art style or artist LoRA for AI image generation based on the filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
style_data = json.loads(clean_json)
style_data['style_id'] = style_id
style_data['style_name'] = style_name
if 'lora' not in style_data: style_data['lora'] = {}
style_data['lora']['lora_name'] = f"Illustrious/Styles/{filename}"
if not style_data['lora'].get('lora_triggers'):
style_data['lora']['lora_triggers'] = name_base
if style_data['lora'].get('lora_weight') is None:
style_data['lora']['lora_weight'] = 1.0
if style_data['lora'].get('lora_weight_min') is None:
style_data['lora']['lora_weight_min'] = 0.7
if style_data['lora'].get('lora_weight_max') is None:
style_data['lora']['lora_weight_max'] = 1.0
with open(json_path, 'w') as f:
json.dump(style_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
time.sleep(0.5)
except Exception as e:
print(f"Error creating style for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_styles()
msg = f'Successfully processed styles: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No styles created or overwritten. {skipped_count} existing styles found.')
return redirect(url_for('styles_index'))
@app.route('/style/create', methods=['GET', 'POST'])
def create_style():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'style'
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['STYLES_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
style_data = {
"style_id": safe_slug,
"style_name": name,
"style": {
"artist_name": "",
"artistic_style": ""
},
"lora": {
"lora_name": "",
"lora_weight": 1.0,
"lora_triggers": ""
}
}
try:
file_path = os.path.join(app.config['STYLES_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(style_data, f, indent=2)
new_style = Style(
style_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json",
name=name, data=style_data
)
db.session.add(new_style)
db.session.commit()
flash('Style created successfully!')
return redirect(url_for('style_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create style: {e}")
return redirect(request.url)
return render_template('styles/create.html')
@app.route('/style/<path:slug>/clone', methods=['POST'])
def clone_style(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
base_id = style.style_id
import re
match = re.match(r'^(.+?)_(\d+)$', base_id)
if match:
base_name = match.group(1)
current_num = int(match.group(2))
else:
base_name = base_id
current_num = 1
next_num = current_num + 1
while True:
new_id = f"{base_name}_{next_num:02d}"
new_filename = f"{new_id}.json"
new_path = os.path.join(app.config['STYLES_DIR'], new_filename)
if not os.path.exists(new_path):
break
next_num += 1
new_data = style.data.copy()
new_data['style_id'] = new_id
new_data['style_name'] = f"{style.name} (Copy)"
with open(new_path, 'w') as f:
json.dump(new_data, f, indent=2)
new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id)
new_style = Style(
style_id=new_id, slug=new_slug, filename=new_filename,
name=new_data['style_name'], data=new_data
)
db.session.add(new_style)
db.session.commit()
flash(f'Style cloned as "{new_id}"!')
return redirect(url_for('style_detail', slug=new_slug))
@app.route('/style/<path:slug>/save_json', methods=['POST'])
def save_style_json(slug):
style = Style.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
style.data = new_data
flag_modified(style, 'data')
db.session.commit()
if style.filename:
file_path = os.path.join(app.config['STYLES_DIR'], style.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
# ============ SCENE ROUTES ============
@app.route('/scenes')
def scenes_index():
scenes = Scene.query.order_by(Scene.name).all()
return render_template('scenes/index.html', scenes=scenes)
@app.route('/scenes/rescan', methods=['POST'])
def rescan_scenes():
sync_scenes()
flash('Database synced with scene files.')
return redirect(url_for('scenes_index'))
@app.route('/scene/<path:slug>')
def scene_detail(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Load state from session
preferences = session.get(f'prefs_scene_{slug}')
preview_image = session.get(f'preview_scene_{slug}')
selected_character = session.get(f'char_scene_{slug}')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"scenes/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"scenes/{slug}/{f}" for f in files]
return render_template('scenes/detail.html', scene=scene, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character, existing_previews=existing_previews)
@app.route('/scene/<path:slug>/edit', methods=['GET', 'POST'])
def edit_scene(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
loras = get_available_scene_loras()
if request.method == 'POST':
try:
# 1. Update basic fields
scene.name = request.form.get('scene_name')
# 2. Rebuild the data dictionary
new_data = scene.data.copy()
new_data['scene_name'] = scene.name
# Update scene section
if 'scene' in new_data:
for key in new_data['scene'].keys():
form_key = f"scene_{key}"
if form_key in request.form:
val = request.form.get(form_key)
# Handle list for furniture/colors if they were originally lists
if key in ['furniture', 'colors'] and isinstance(new_data['scene'][key], list):
val = [v.strip() for v in val.split(',') if v.strip()]
new_data['scene'][key] = val
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for t in tags_raw.split(',') if t.strip()]
scene.data = new_data
flag_modified(scene, "data")
# 3. Write back to JSON file
scene_file = scene.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', scene.scene_id)}.json"
file_path = os.path.join(app.config['SCENES_DIR'], scene_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Scene updated successfully!')
return redirect(url_for('scene_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('scenes/edit.html', scene=scene, loras=loras)
@app.route('/scene/<path:slug>/upload', methods=['POST'])
def upload_scene_image(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create scene subfolder
scene_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"scenes/{slug}")
os.makedirs(scene_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(scene_folder, filename)
file.save(file_path)
# Store relative path in DB
scene.image_path = f"scenes/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('scene_detail', slug=slug))
def _queue_scene_generation(scene_obj, character=None, selected_fields=None, client_id=None):
if character:
combined_data = character.data.copy()
combined_data['character_id'] = character.character_id
# Update character's 'defaults' with scene details
scene_data = scene_obj.data.get('scene', {})
# Build scene tag string
scene_tags = []
for key in ['background', 'foreground', 'furniture', 'colors', 'lighting', 'theme']:
val = scene_data.get(key)
if val:
if isinstance(val, list):
scene_tags.extend(val)
else:
scene_tags.append(val)
combined_data['defaults']['scene'] = ", ".join(scene_tags)
# Merge scene lora triggers if present
scene_lora = scene_obj.data.get('lora', {})
if scene_lora.get('lora_triggers'):
if 'lora' not in combined_data: combined_data['lora'] = {}
combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {scene_lora['lora_triggers']}"
# Merge character identity and wardrobe fields into selected_fields
if selected_fields:
_ensure_character_fields(character, selected_fields)
else:
# Auto-include essential character fields (minimal set for batch/default generation)
selected_fields = []
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
selected_fields.append('special::name')
wardrobe = character.get_active_wardrobe()
for key in _WARDROBE_KEYS:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
selected_fields.extend(['defaults::scene', 'lora::lora_triggers'])
default_fields = scene_obj.default_fields
active_outfit = character.active_outfit
else:
# Scene only - no character
scene_data = scene_obj.data.get('scene', {})
scene_tags = []
for key in ['background', 'foreground', 'furniture', 'colors', 'lighting', 'theme']:
val = scene_data.get(key)
if val:
if isinstance(val, list): scene_tags.extend(val)
else: scene_tags.append(val)
combined_data = {
'character_id': scene_obj.scene_id,
'defaults': {
'scene': ", ".join(scene_tags)
},
'lora': scene_obj.data.get('lora', {}),
'tags': scene_obj.data.get('tags', [])
}
if not selected_fields:
selected_fields = ['defaults::scene', 'lora::lora_triggers']
default_fields = scene_obj.default_fields
active_outfit = 'default'
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit)
# For scene generation, we want to ensure Node 20 is handled in _prepare_workflow
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, scene=scene_obj, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
return workflow
@app.route('/scene/<path:slug>/generate', methods=['POST'])
def generate_scene_image(slug):
scene_obj = Scene.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = _resolve_character(character_slug)
if character_slug == '__random__' and character:
character_slug = character.slug
# Save preferences
session[f'char_scene_{slug}'] = character_slug
session[f'prefs_scene_{slug}'] = selected_fields
# Build workflow using helper
workflow = _queue_scene_generation(scene_obj, character, selected_fields)
char_label = character.name if character else 'no character'
label = f"Scene: {scene_obj.name} ({char_label}) {action}"
job = _enqueue_job(label, workflow, _make_finalize('scenes', slug, Scene, action))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
return redirect(url_for('scene_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('scene_detail', slug=slug))
@app.route('/scene/<path:slug>/save_defaults', methods=['POST'])
def save_scene_defaults(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
scene.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this scene!')
return redirect(url_for('scene_detail', slug=slug))
@app.route('/scene/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_scene_cover_from_preview(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_scene_{slug}')
if preview_path:
scene.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('scene_detail', slug=slug))
@app.route('/scenes/bulk_create', methods=['POST'])
def bulk_create_scenes_from_loras():
backgrounds_lora_dir = '/ImageModels/lora/Illustrious/Backgrounds/'
if not os.path.exists(backgrounds_lora_dir):
flash('Backgrounds LoRA directory not found.', 'error')
return redirect(url_for('scenes_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('scene_system.txt')
if not system_prompt:
flash('Scene system prompt file not found.', 'error')
return redirect(url_for('scenes_index'))
for filename in os.listdir(backgrounds_lora_dir):
if filename.endswith('.safetensors'):
name_base = filename.rsplit('.', 1)[0]
scene_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
scene_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{scene_id}.json"
json_path = os.path.join(app.config['SCENES_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(backgrounds_lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
# Strip HTML tags but keep text content for LLM context
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe scene: {scene_name}")
prompt = f"Describe a scene for an AI image generation model based on the LoRA filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
# Clean response
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
scene_data = json.loads(clean_json)
# Enforce system values while preserving LLM-extracted metadata
scene_data['scene_id'] = scene_id
scene_data['scene_name'] = scene_name
if 'lora' not in scene_data: scene_data['lora'] = {}
scene_data['lora']['lora_name'] = f"Illustrious/Backgrounds/{filename}"
if not scene_data['lora'].get('lora_triggers'):
scene_data['lora']['lora_triggers'] = name_base
if scene_data['lora'].get('lora_weight') is None:
scene_data['lora']['lora_weight'] = 1.0
if scene_data['lora'].get('lora_weight_min') is None:
scene_data['lora']['lora_weight_min'] = 0.7
if scene_data['lora'].get('lora_weight_max') is None:
scene_data['lora']['lora_weight_max'] = 1.0
with open(json_path, 'w') as f:
json.dump(scene_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
# Small delay to avoid API rate limits if many files
time.sleep(0.5)
except Exception as e:
print(f"Error creating scene for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_scenes()
msg = f'Successfully processed scenes: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No scenes created or overwritten. {skipped_count} existing scenes found.')
return redirect(url_for('scenes_index'))
@app.route('/scene/create', methods=['GET', 'POST'])
def create_scene():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'scene'
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['SCENES_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
scene_data = {
"scene_id": safe_slug,
"scene_name": name,
"scene": {
"background": "",
"foreground": "",
"furniture": [],
"colors": [],
"lighting": "",
"theme": ""
},
"lora": {
"lora_name": "",
"lora_weight": 1.0,
"lora_triggers": ""
}
}
try:
file_path = os.path.join(app.config['SCENES_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(scene_data, f, indent=2)
new_scene = Scene(
scene_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json",
name=name, data=scene_data
)
db.session.add(new_scene)
db.session.commit()
flash('Scene created successfully!')
return redirect(url_for('scene_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create scene: {e}")
return redirect(request.url)
return render_template('scenes/create.html')
@app.route('/scene/<path:slug>/clone', methods=['POST'])
def clone_scene(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
base_id = scene.scene_id
import re
match = re.match(r'^(.+?)_(\d+)$', base_id)
if match:
base_name = match.group(1)
current_num = int(match.group(2))
else:
base_name = base_id
current_num = 1
next_num = current_num + 1
while True:
new_id = f"{base_name}_{next_num:02d}"
new_filename = f"{new_id}.json"
new_path = os.path.join(app.config['SCENES_DIR'], new_filename)
if not os.path.exists(new_path):
break
next_num += 1
new_data = scene.data.copy()
new_data['scene_id'] = new_id
new_data['scene_name'] = f"{scene.name} (Copy)"
with open(new_path, 'w') as f:
json.dump(new_data, f, indent=2)
new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id)
new_scene = Scene(
scene_id=new_id, slug=new_slug, filename=new_filename,
name=new_data['scene_name'], data=new_data
)
db.session.add(new_scene)
db.session.commit()
flash(f'Scene cloned as "{new_id}"!')
return redirect(url_for('scene_detail', slug=new_slug))
@app.route('/scene/<path:slug>/save_json', methods=['POST'])
def save_scene_json(slug):
scene = Scene.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
scene.data = new_data
flag_modified(scene, 'data')
db.session.commit()
if scene.filename:
file_path = os.path.join(app.config['SCENES_DIR'], scene.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
# ============ DETAILER ROUTES ============
@app.route('/detailers')
def detailers_index():
detailers = Detailer.query.order_by(Detailer.name).all()
return render_template('detailers/index.html', detailers=detailers)
@app.route('/detailers/rescan', methods=['POST'])
def rescan_detailers():
sync_detailers()
flash('Database synced with detailer files.')
return redirect(url_for('detailers_index'))
@app.route('/detailer/<path:slug>')
def detailer_detail(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
actions = Action.query.order_by(Action.name).all()
# Load state from session
preferences = session.get(f'prefs_detailer_{slug}')
preview_image = session.get(f'preview_detailer_{slug}')
selected_character = session.get(f'char_detailer_{slug}')
selected_action = session.get(f'action_detailer_{slug}')
extra_positive = session.get(f'extra_pos_detailer_{slug}', '')
extra_negative = session.get(f'extra_neg_detailer_{slug}', '')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"detailers/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"detailers/{slug}/{f}" for f in files]
return render_template('detailers/detail.html', detailer=detailer, characters=characters,
actions=actions, preferences=preferences, preview_image=preview_image,
selected_character=selected_character, selected_action=selected_action,
extra_positive=extra_positive, extra_negative=extra_negative,
existing_previews=existing_previews)
@app.route('/detailer/<path:slug>/edit', methods=['GET', 'POST'])
def edit_detailer(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
loras = get_available_detailer_loras()
if request.method == 'POST':
try:
# 1. Update basic fields
detailer.name = request.form.get('detailer_name')
# 2. Rebuild the data dictionary
new_data = detailer.data.copy()
new_data['detailer_name'] = detailer.name
# Update prompt (stored as a plain string)
new_data['prompt'] = request.form.get('detailer_prompt', '')
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Update Tags (comma separated string to list)
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for t in tags_raw.split(',') if t.strip()]
detailer.data = new_data
flag_modified(detailer, "data")
# 3. Write back to JSON file
detailer_file = detailer.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', detailer.detailer_id)}.json"
file_path = os.path.join(app.config['DETAILERS_DIR'], detailer_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Detailer updated successfully!')
return redirect(url_for('detailer_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('detailers/edit.html', detailer=detailer, loras=loras)
@app.route('/detailer/<path:slug>/upload', methods=['POST'])
def upload_detailer_image(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create detailer subfolder
detailer_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"detailers/{slug}")
os.makedirs(detailer_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(detailer_folder, filename)
file.save(file_path)
# Store relative path in DB
detailer.image_path = f"detailers/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('detailer_detail', slug=slug))
def _queue_detailer_generation(detailer_obj, character=None, selected_fields=None, client_id=None, action=None, extra_positive=None, extra_negative=None):
if character:
combined_data = character.data.copy()
combined_data['character_id'] = character.character_id
# Merge detailer prompt into character's tags
detailer_prompt = detailer_obj.data.get('prompt', '')
if detailer_prompt:
if 'tags' not in combined_data: combined_data['tags'] = []
combined_data['tags'].append(detailer_prompt)
# Merge detailer lora triggers if present
detailer_lora = detailer_obj.data.get('lora', {})
if detailer_lora.get('lora_triggers'):
if 'lora' not in combined_data: combined_data['lora'] = {}
combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {detailer_lora['lora_triggers']}"
# Merge character identity and wardrobe fields into selected_fields
if selected_fields:
_ensure_character_fields(character, selected_fields)
else:
# Auto-include essential character fields (minimal set for batch/default generation)
selected_fields = []
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
selected_fields.append('special::name')
wardrobe = character.get_active_wardrobe()
for key in _WARDROBE_KEYS:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
selected_fields.extend(['special::tags', 'lora::lora_triggers'])
default_fields = detailer_obj.default_fields
active_outfit = character.active_outfit
else:
# Detailer only - no character
detailer_prompt = detailer_obj.data.get('prompt', '')
detailer_tags = [detailer_prompt] if detailer_prompt else []
combined_data = {
'character_id': detailer_obj.detailer_id,
'tags': detailer_tags,
'lora': detailer_obj.data.get('lora', {}),
}
if not selected_fields:
selected_fields = ['special::tags', 'lora::lora_triggers']
default_fields = detailer_obj.default_fields
active_outfit = 'default'
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit)
_append_background(prompts, character)
if extra_positive:
prompts["main"] = f"{prompts['main']}, {extra_positive}"
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, detailer=detailer_obj, action=action, custom_negative=extra_negative or None, checkpoint=ckpt_path, checkpoint_data=ckpt_data)
return workflow
@app.route('/detailer/<path:slug>/generate', methods=['POST'])
def generate_detailer_image(slug):
detailer_obj = Detailer.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action_type = request.form.get('action', 'preview')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = _resolve_character(character_slug)
if character_slug == '__random__' and character:
character_slug = character.slug
# Get selected action (if any)
action_slug = request.form.get('action_slug', '')
action_obj = Action.query.filter_by(slug=action_slug).first() if action_slug else None
# Get additional prompts
extra_positive = request.form.get('extra_positive', '').strip()
extra_negative = request.form.get('extra_negative', '').strip()
# Save preferences
session[f'char_detailer_{slug}'] = character_slug
session[f'action_detailer_{slug}'] = action_slug
session[f'extra_pos_detailer_{slug}'] = extra_positive
session[f'extra_neg_detailer_{slug}'] = extra_negative
session[f'prefs_detailer_{slug}'] = selected_fields
# Build workflow using helper
workflow = _queue_detailer_generation(detailer_obj, character, selected_fields, action=action_obj, extra_positive=extra_positive, extra_negative=extra_negative)
char_label = character.name if character else 'no character'
label = f"Detailer: {detailer_obj.name} ({char_label}) {action_type}"
job = _enqueue_job(label, workflow, _make_finalize('detailers', slug, Detailer, action_type))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
return redirect(url_for('detailer_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('detailer_detail', slug=slug))
@app.route('/detailer/<path:slug>/save_defaults', methods=['POST'])
def save_detailer_defaults(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
detailer.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this detailer!')
return redirect(url_for('detailer_detail', slug=slug))
@app.route('/detailer/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_detailer_cover_from_preview(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_detailer_{slug}')
if preview_path:
detailer.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('detailer_detail', slug=slug))
@app.route('/detailer/<path:slug>/save_json', methods=['POST'])
def save_detailer_json(slug):
detailer = Detailer.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
detailer.data = new_data
flag_modified(detailer, 'data')
db.session.commit()
if detailer.filename:
file_path = os.path.join(app.config['DETAILERS_DIR'], detailer.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
@app.route('/detailers/bulk_create', methods=['POST'])
def bulk_create_detailers_from_loras():
detailers_lora_dir = '/ImageModels/lora/Illustrious/Detailers/'
if not os.path.exists(detailers_lora_dir):
flash('Detailers LoRA directory not found.', 'error')
return redirect(url_for('detailers_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('detailer_system.txt')
if not system_prompt:
flash('Detailer system prompt file not found.', 'error')
return redirect(url_for('detailers_index'))
for filename in os.listdir(detailers_lora_dir):
if filename.endswith('.safetensors'):
name_base = filename.rsplit('.', 1)[0]
detailer_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
detailer_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{detailer_id}.json"
json_path = os.path.join(app.config['DETAILERS_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(detailers_lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe detailer: {detailer_name}")
prompt = f"Describe a detailer LoRA for AI image generation based on the filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
detailer_data = json.loads(clean_json)
detailer_data['detailer_id'] = detailer_id
detailer_data['detailer_name'] = detailer_name
if 'lora' not in detailer_data: detailer_data['lora'] = {}
detailer_data['lora']['lora_name'] = f"Illustrious/Detailers/{filename}"
if not detailer_data['lora'].get('lora_triggers'):
detailer_data['lora']['lora_triggers'] = name_base
if detailer_data['lora'].get('lora_weight') is None:
detailer_data['lora']['lora_weight'] = 1.0
if detailer_data['lora'].get('lora_weight_min') is None:
detailer_data['lora']['lora_weight_min'] = 0.7
if detailer_data['lora'].get('lora_weight_max') is None:
detailer_data['lora']['lora_weight_max'] = 1.0
with open(json_path, 'w') as f:
json.dump(detailer_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
# Small delay to avoid API rate limits if many files
time.sleep(0.5)
except Exception as e:
print(f"Error creating detailer for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_detailers()
msg = f'Successfully processed detailers: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No new detailers created or overwritten. {skipped_count} existing detailers found.')
return redirect(url_for('detailers_index'))
@app.route('/detailer/create', methods=['GET', 'POST'])
def create_detailer():
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'detailer'
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['DETAILERS_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
detailer_data = {
"detailer_id": safe_slug,
"detailer_name": name,
"prompt": "",
"lora": {
"lora_name": "",
"lora_weight": 1.0,
"lora_triggers": ""
}
}
try:
file_path = os.path.join(app.config['DETAILERS_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(detailer_data, f, indent=2)
new_detailer = Detailer(
detailer_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json",
name=name, data=detailer_data
)
db.session.add(new_detailer)
db.session.commit()
flash('Detailer created successfully!')
return redirect(url_for('detailer_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create detailer: {e}")
return redirect(request.url)
return render_template('detailers/create.html')
# ---------------------------------------------------------------------------
# Checkpoints
# ---------------------------------------------------------------------------
@app.route('/checkpoints')
def checkpoints_index():
checkpoints = Checkpoint.query.order_by(Checkpoint.name).all()
return render_template('checkpoints/index.html', checkpoints=checkpoints)
@app.route('/checkpoints/rescan', methods=['POST'])
def rescan_checkpoints():
sync_checkpoints()
flash('Checkpoint list synced from disk.')
return redirect(url_for('checkpoints_index'))
@app.route('/checkpoint/<path:slug>')
def checkpoint_detail(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
preview_image = session.get(f'preview_checkpoint_{slug}')
selected_character = session.get(f'char_checkpoint_{slug}')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"checkpoints/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"checkpoints/{slug}/{f}" for f in files]
return render_template('checkpoints/detail.html', ckpt=ckpt, characters=characters,
preview_image=preview_image, selected_character=selected_character,
existing_previews=existing_previews)
@app.route('/checkpoint/<path:slug>/upload', methods=['POST'])
def upload_checkpoint_image(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(url_for('checkpoint_detail', slug=slug))
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(url_for('checkpoint_detail', slug=slug))
if file and allowed_file(file.filename):
folder = os.path.join(app.config['UPLOAD_FOLDER'], f"checkpoints/{slug}")
os.makedirs(folder, exist_ok=True)
filename = secure_filename(file.filename)
file.save(os.path.join(folder, filename))
ckpt.image_path = f"checkpoints/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('checkpoint_detail', slug=slug))
def _apply_checkpoint_settings(workflow, ckpt_data):
"""Apply checkpoint-specific sampler/prompt/VAE settings to the workflow."""
steps = ckpt_data.get('steps')
cfg = ckpt_data.get('cfg')
sampler_name = ckpt_data.get('sampler_name')
scheduler = ckpt_data.get('scheduler')
base_positive = ckpt_data.get('base_positive', '')
base_negative = ckpt_data.get('base_negative', '')
vae = ckpt_data.get('vae', 'integrated')
# KSampler (node 3)
if steps and '3' in workflow:
workflow['3']['inputs']['steps'] = int(steps)
if cfg and '3' in workflow:
workflow['3']['inputs']['cfg'] = float(cfg)
if sampler_name and '3' in workflow:
workflow['3']['inputs']['sampler_name'] = sampler_name
if scheduler and '3' in workflow:
workflow['3']['inputs']['scheduler'] = scheduler
# Face/hand detailers (nodes 11, 13)
for node_id in ['11', '13']:
if node_id in workflow:
if steps:
workflow[node_id]['inputs']['steps'] = int(steps)
if cfg:
workflow[node_id]['inputs']['cfg'] = float(cfg)
if sampler_name:
workflow[node_id]['inputs']['sampler_name'] = sampler_name
if scheduler:
workflow[node_id]['inputs']['scheduler'] = scheduler
# Prepend base_positive to positive prompts (main + face/hand detailers)
if base_positive:
for node_id in ['6', '14', '15']:
if node_id in workflow:
workflow[node_id]['inputs']['text'] = f"{base_positive}, {workflow[node_id]['inputs']['text']}"
# Append base_negative to negative prompt (shared by main + detailers via node 7)
if base_negative and '7' in workflow:
workflow['7']['inputs']['text'] = f"{workflow['7']['inputs']['text']}, {base_negative}"
# VAE: if not integrated, inject a VAELoader node and rewire
if vae and vae != 'integrated':
workflow['21'] = {
'inputs': {'vae_name': vae},
'class_type': 'VAELoader'
}
if '8' in workflow:
workflow['8']['inputs']['vae'] = ['21', 0]
for node_id in ['11', '13']:
if node_id in workflow:
workflow[node_id]['inputs']['vae'] = ['21', 0]
return workflow
def _build_checkpoint_workflow(ckpt_obj, character=None):
"""Build and return a prepared ComfyUI workflow dict for a checkpoint generation."""
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
if character:
combined_data = character.data.copy()
combined_data['character_id'] = character.character_id
selected_fields = []
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
selected_fields.append('special::name')
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'top', 'bottom']:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
prompts = build_prompt(combined_data, selected_fields, None, active_outfit=character.active_outfit)
_append_background(prompts, character)
else:
prompts = {
"main": "masterpiece, best quality, 1girl, solo, simple background, looking at viewer",
"face": "masterpiece, best quality",
"hand": "masterpiece, best quality",
}
workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_obj.checkpoint_path,
checkpoint_data=ckpt_obj.data or {})
return workflow
@app.route('/checkpoint/<path:slug>/generate', methods=['POST'])
def generate_checkpoint_image(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
try:
character_slug = request.form.get('character_slug', '')
character = _resolve_character(character_slug)
if character_slug == '__random__' and character:
character_slug = character.slug
session[f'char_checkpoint_{slug}'] = character_slug
workflow = _build_checkpoint_workflow(ckpt, character)
char_label = character.name if character else 'random'
label = f"Checkpoint: {ckpt.name} ({char_label})"
job = _enqueue_job(label, workflow, _make_finalize('checkpoints', slug, Checkpoint))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
return redirect(url_for('checkpoint_detail', slug=slug))
except Exception as e:
print(f"Checkpoint generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('checkpoint_detail', slug=slug))
@app.route('/checkpoint/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_checkpoint_cover_from_preview(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_checkpoint_{slug}')
if preview_path:
ckpt.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('checkpoint_detail', slug=slug))
@app.route('/checkpoint/<path:slug>/save_json', methods=['POST'])
def save_checkpoint_json(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
ckpt.data = new_data
flag_modified(ckpt, 'data')
db.session.commit()
checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints')
file_path = os.path.join(checkpoints_dir, f'{ckpt.slug}.json')
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
@app.route('/get_missing_checkpoints')
def get_missing_checkpoints():
missing = Checkpoint.query.filter((Checkpoint.image_path == None) | (Checkpoint.image_path == '')).all()
return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]}
@app.route('/clear_all_checkpoint_covers', methods=['POST'])
def clear_all_checkpoint_covers():
for ckpt in Checkpoint.query.all():
ckpt.image_path = None
db.session.commit()
return {'success': True}
@app.route('/checkpoints/bulk_create', methods=['POST'])
def bulk_create_checkpoints():
checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints')
os.makedirs(checkpoints_dir, exist_ok=True)
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('checkpoint_system.txt')
if not system_prompt:
flash('Checkpoint system prompt file not found.', 'error')
return redirect(url_for('checkpoints_index'))
dirs = [
(app.config.get('ILLUSTRIOUS_MODELS_DIR', ''), 'Illustrious'),
(app.config.get('NOOB_MODELS_DIR', ''), 'Noob'),
]
for dirpath, family in dirs:
if not dirpath or not os.path.exists(dirpath):
continue
for filename in sorted(os.listdir(dirpath)):
if not (filename.endswith('.safetensors') or filename.endswith('.ckpt')):
continue
checkpoint_path = f"{family}/{filename}"
name_base = filename.rsplit('.', 1)[0]
safe_id = re.sub(r'[^a-zA-Z0-9_]', '_', checkpoint_path.rsplit('.', 1)[0]).lower().strip('_')
json_filename = f"{safe_id}.json"
json_path = os.path.join(checkpoints_dir, json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
# Look for a matching HTML file alongside the model file
html_path = os.path.join(dirpath, f"{name_base}.html")
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML for {filename}: {e}")
defaults = _default_checkpoint_data(checkpoint_path, filename)
if html_content:
try:
print(f"Asking LLM to describe checkpoint: {filename}")
prompt = (
f"Generate checkpoint metadata JSON for the model file: '{filename}' "
f"(checkpoint_path: '{checkpoint_path}').\n\n"
f"Here is descriptive text extracted from an associated HTML file:\n###\n{html_content[:3000]}\n###"
)
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
ckpt_data = json.loads(clean_json)
# Enforce fixed fields
ckpt_data['checkpoint_path'] = checkpoint_path
ckpt_data['checkpoint_name'] = filename
# Fill missing fields with defaults
for key, val in defaults.items():
if key not in ckpt_data or ckpt_data[key] is None:
ckpt_data[key] = val
time.sleep(0.5)
except Exception as e:
print(f"LLM error for {filename}: {e}. Using defaults.")
ckpt_data = defaults
else:
ckpt_data = defaults
try:
with open(json_path, 'w') as f:
json.dump(ckpt_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
except Exception as e:
print(f"Error saving JSON for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_checkpoints()
msg = f'Successfully processed checkpoints: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No checkpoints created or overwritten. {skipped_count} existing entries found.')
return redirect(url_for('checkpoints_index'))
# ============ LOOK ROUTES ============
@app.route('/looks')
def looks_index():
looks = Look.query.order_by(Look.name).all()
return render_template('looks/index.html', looks=looks)
@app.route('/looks/rescan', methods=['POST'])
def rescan_looks():
sync_looks()
flash('Database synced with look files.')
return redirect(url_for('looks_index'))
@app.route('/look/<path:slug>')
def look_detail(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Pre-select the linked character if set
preferences = session.get(f'prefs_look_{slug}')
preview_image = session.get(f'preview_look_{slug}')
selected_character = session.get(f'char_look_{slug}', look.character_id or '')
return render_template('looks/detail.html', look=look, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character)
@app.route('/look/<path:slug>/edit', methods=['GET', 'POST'])
def edit_look(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
loras = get_available_loras()
if request.method == 'POST':
look.name = request.form.get('look_name', look.name)
character_id = request.form.get('character_id', '')
look.character_id = character_id if character_id else None
new_data = look.data.copy()
new_data['look_name'] = look.name
new_data['character_id'] = look.character_id
new_data['positive'] = request.form.get('positive', '')
new_data['negative'] = request.form.get('negative', '')
lora_name = request.form.get('lora_lora_name', '')
lora_weight = float(request.form.get('lora_lora_weight', 1.0) or 1.0)
lora_triggers = request.form.get('lora_lora_triggers', '')
new_data['lora'] = {'lora_name': lora_name, 'lora_weight': lora_weight, 'lora_triggers': lora_triggers}
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data['lora'][bound] = float(val_str)
except ValueError:
pass
tags_raw = request.form.get('tags', '')
new_data['tags'] = [t.strip() for t in tags_raw.split(',') if t.strip()]
look.data = new_data
flag_modified(look, 'data')
db.session.commit()
if look.filename:
file_path = os.path.join(app.config['LOOKS_DIR'], look.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
flash(f'Look "{look.name}" updated!')
return redirect(url_for('look_detail', slug=look.slug))
return render_template('looks/edit.html', look=look, characters=characters, loras=loras)
@app.route('/look/<path:slug>/upload', methods=['POST'])
def upload_look_image(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file selected')
return redirect(url_for('look_detail', slug=slug))
file = request.files['image']
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
look_folder = os.path.join(app.config['UPLOAD_FOLDER'], f'looks/{slug}')
os.makedirs(look_folder, exist_ok=True)
file_path = os.path.join(look_folder, filename)
file.save(file_path)
look.image_path = f'looks/{slug}/{filename}'
db.session.commit()
return redirect(url_for('look_detail', slug=slug))
@app.route('/look/<path:slug>/generate', methods=['POST'])
def generate_look_image(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
try:
action = request.form.get('action', 'preview')
selected_fields = request.form.getlist('include_field')
character_slug = request.form.get('character_slug', '')
character = None
# Only load a character when the user explicitly selects one
character = _resolve_character(character_slug)
if character_slug == '__random__' and character:
character_slug = character.slug
elif character_slug and not character:
# fallback: try matching by character_id
character = Character.query.filter_by(character_id=character_slug).first()
# No fallback to look.character_id — looks are self-contained
session[f'prefs_look_{slug}'] = selected_fields
session[f'char_look_{slug}'] = character_slug
lora_triggers = look.data.get('lora', {}).get('lora_triggers', '')
look_positive = look.data.get('positive', '')
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
if character:
# Merge character identity with look LoRA and positive prompt
combined_data = {
'character_id': character.character_id,
'identity': character.data.get('identity', {}),
'defaults': character.data.get('defaults', {}),
'wardrobe': character.data.get('wardrobe', {}).get(character.active_outfit or 'default',
character.data.get('wardrobe', {}).get('default', {})),
'styles': character.data.get('styles', {}),
'lora': look.data.get('lora', {}),
'tags': look.data.get('tags', [])
}
_ensure_character_fields(character, selected_fields,
include_wardrobe=False, include_defaults=True)
prompts = build_prompt(combined_data, selected_fields, character.default_fields)
# Append look-specific triggers and positive
extra = ', '.join(filter(None, [lora_triggers, look_positive]))
if extra:
prompts['main'] = _dedup_tags(f"{prompts['main']}, {extra}" if prompts['main'] else extra)
primary_color = character.data.get('styles', {}).get('primary_color', '')
bg = f"{primary_color} simple background" if primary_color else "simple background"
else:
# Look is self-contained: build prompt from its own positive and triggers only
main = _dedup_tags(', '.join(filter(None, ['(solo:1.2)', lora_triggers, look_positive])))
prompts = {'main': main, 'face': '', 'hand': ''}
bg = "simple background"
prompts['main'] = _dedup_tags(f"{prompts['main']}, {bg}" if prompts['main'] else bg)
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_path,
checkpoint_data=ckpt_data, look=look)
char_label = character.name if character else 'no character'
label = f"Look: {look.name} ({char_label}) {action}"
job = _enqueue_job(label, workflow, _make_finalize('looks', slug, Look, action))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
return redirect(url_for('look_detail', slug=slug))
except Exception as e:
print(f"Look generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('look_detail', slug=slug))
@app.route('/look/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_look_cover_from_preview(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
preview_path = session.get(f'preview_look_{slug}')
if preview_path:
look.image_path = preview_path
db.session.commit()
flash('Cover image updated from preview!')
else:
flash('No preview image available', 'error')
return redirect(url_for('look_detail', slug=slug))
@app.route('/look/<path:slug>/save_defaults', methods=['POST'])
def save_look_defaults(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
look.default_fields = request.form.getlist('include_field')
db.session.commit()
flash('Default prompt selection saved!')
return redirect(url_for('look_detail', slug=slug))
@app.route('/look/<path:slug>/save_json', methods=['POST'])
def save_look_json(slug):
look = Look.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
look.data = new_data
look.character_id = new_data.get('character_id', look.character_id)
flag_modified(look, 'data')
db.session.commit()
if look.filename:
file_path = os.path.join(app.config['LOOKS_DIR'], look.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
@app.route('/look/create', methods=['GET', 'POST'])
def create_look():
characters = Character.query.order_by(Character.name).all()
loras = get_available_loras()
if request.method == 'POST':
name = request.form.get('name', '').strip()
look_id = re.sub(r'[^a-zA-Z0-9_]', '_', name.lower().replace(' ', '_'))
filename = f'{look_id}.json'
file_path = os.path.join(app.config['LOOKS_DIR'], filename)
character_id = request.form.get('character_id', '') or None
lora_name = request.form.get('lora_lora_name', '')
lora_weight = float(request.form.get('lora_lora_weight', 1.0) or 1.0)
lora_triggers = request.form.get('lora_lora_triggers', '')
positive = request.form.get('positive', '')
negative = request.form.get('negative', '')
tags = [t.strip() for t in request.form.get('tags', '').split(',') if t.strip()]
data = {
'look_id': look_id,
'look_name': name,
'character_id': character_id,
'positive': positive,
'negative': negative,
'lora': {'lora_name': lora_name, 'lora_weight': lora_weight, 'lora_triggers': lora_triggers},
'tags': tags
}
os.makedirs(app.config['LOOKS_DIR'], exist_ok=True)
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
slug = re.sub(r'[^a-zA-Z0-9_]', '', look_id)
new_look = Look(look_id=look_id, slug=slug, filename=filename, name=name,
character_id=character_id, data=data)
db.session.add(new_look)
db.session.commit()
flash(f'Look "{name}" created!')
return redirect(url_for('look_detail', slug=slug))
return render_template('looks/create.html', characters=characters, loras=loras)
@app.route('/get_missing_looks')
def get_missing_looks():
missing = Look.query.filter((Look.image_path == None) | (Look.image_path == '')).all()
return {'missing': [{'slug': l.slug, 'name': l.name} for l in missing]}
@app.route('/clear_all_look_covers', methods=['POST'])
def clear_all_look_covers():
looks = Look.query.all()
for look in looks:
look.image_path = None
db.session.commit()
return {'success': True}
@app.route('/looks/bulk_create', methods=['POST'])
def bulk_create_looks_from_loras():
lora_dir = app.config['LORA_DIR']
if not os.path.exists(lora_dir):
flash('Looks LoRA directory not found.', 'error')
return redirect(url_for('looks_index'))
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('look_system.txt')
if not system_prompt:
flash('Look system prompt file not found.', 'error')
return redirect(url_for('looks_index'))
for filename in os.listdir(lora_dir):
if not filename.endswith('.safetensors'):
continue
name_base = filename.rsplit('.', 1)[0]
look_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
look_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{look_id}.json"
json_path = os.path.join(app.config['LOOKS_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
html_filename = f"{name_base}.html"
html_path = os.path.join(lora_dir, html_filename)
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML {html_filename}: {e}")
try:
print(f"Asking LLM to describe look: {look_name}")
prompt = f"Create a look profile for a character appearance LoRA based on the filename: '{filename}'"
if html_content:
prompt += f"\n\nHere is descriptive text extracted from an associated HTML file:\n###\n{html_content[:3000]}\n###"
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
look_data = json.loads(clean_json)
look_data['look_id'] = look_id
look_data['look_name'] = look_name
if 'lora' not in look_data:
look_data['lora'] = {}
look_data['lora']['lora_name'] = f"Illustrious/Looks/{filename}"
if not look_data['lora'].get('lora_triggers'):
look_data['lora']['lora_triggers'] = name_base
if look_data['lora'].get('lora_weight') is None:
look_data['lora']['lora_weight'] = 0.8
if look_data['lora'].get('lora_weight_min') is None:
look_data['lora']['lora_weight_min'] = 0.7
if look_data['lora'].get('lora_weight_max') is None:
look_data['lora']['lora_weight_max'] = 1.0
os.makedirs(app.config['LOOKS_DIR'], exist_ok=True)
with open(json_path, 'w') as f:
json.dump(look_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
time.sleep(0.5)
except Exception as e:
print(f"Error creating look for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_looks()
msg = f'Successfully processed looks: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No looks created or overwritten. {skipped_count} existing entries found.')
return redirect(url_for('looks_index'))
# ---------------------------------------------------------------------------
# Gallery
# ---------------------------------------------------------------------------
GALLERY_CATEGORIES = ['characters', 'actions', 'outfits', 'scenes', 'styles', 'detailers', 'checkpoints']
_MODEL_MAP = {
'characters': Character,
'actions': Action,
'outfits': Outfit,
'scenes': Scene,
'styles': Style,
'detailers': Detailer,
'checkpoints': Checkpoint,
}
def _scan_gallery_images(category_filter='all', slug_filter=''):
"""Return sorted list of image dicts from the uploads directory."""
upload_folder = app.config['UPLOAD_FOLDER']
images = []
cats = GALLERY_CATEGORIES if category_filter == 'all' else [category_filter]
for cat in cats:
cat_folder = os.path.join(upload_folder, cat)
if not os.path.isdir(cat_folder):
continue
try:
slugs = os.listdir(cat_folder)
except OSError:
continue
for item_slug in slugs:
if slug_filter and slug_filter != item_slug:
continue
item_folder = os.path.join(cat_folder, item_slug)
if not os.path.isdir(item_folder):
continue
try:
files = os.listdir(item_folder)
except OSError:
continue
for filename in files:
if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
continue
try:
ts = int(filename.replace('gen_', '').rsplit('.', 1)[0])
except ValueError:
ts = 0
images.append({
'path': f"{cat}/{item_slug}/{filename}",
'category': cat,
'slug': item_slug,
'filename': filename,
'timestamp': ts,
})
images.sort(key=lambda x: x['timestamp'], reverse=True)
return images
def _enrich_with_names(images):
"""Add item_name field to each image dict, querying DB once per category."""
by_cat = {}
for img in images:
by_cat.setdefault(img['category'], set()).add(img['slug'])
name_map = {}
for cat, slugs in by_cat.items():
Model = _MODEL_MAP.get(cat)
if not Model:
continue
items = Model.query.filter(Model.slug.in_(slugs)).with_entities(Model.slug, Model.name).all()
for slug, name in items:
name_map[(cat, slug)] = name
for img in images:
img['item_name'] = name_map.get((img['category'], img['slug']), img['slug'])
return images
@app.route('/gallery')
def gallery():
category = request.args.get('category', 'all')
slug = request.args.get('slug', '')
sort = request.args.get('sort', 'newest')
page = max(1, int(request.args.get('page', 1)))
per_page = int(request.args.get('per_page', 48))
per_page = per_page if per_page in (24, 48, 96) else 48
images = _scan_gallery_images(category, slug)
if sort == 'oldest':
images.reverse()
total = len(images)
total_pages = max(1, (total + per_page - 1) // per_page)
page = min(page, total_pages)
page_images = images[(page - 1) * per_page: page * per_page]
_enrich_with_names(page_images)
slug_options = []
if category != 'all':
Model = _MODEL_MAP.get(category)
if Model:
slug_options = [(r.slug, r.name) for r in Model.query.order_by(Model.name).with_entities(Model.slug, Model.name).all()]
return render_template(
'gallery.html',
images=page_images,
page=page,
per_page=per_page,
total=total,
total_pages=total_pages,
category=category,
slug=slug,
sort=sort,
categories=GALLERY_CATEGORIES,
slug_options=slug_options,
)
def _parse_comfy_png_metadata(image_path):
"""Read ComfyUI generation metadata from a PNG's tEXt 'prompt' chunk.
Returns a dict with keys: positive, negative, checkpoint, loras,
seed, steps, cfg, sampler, scheduler. Any missing field is None/[].
"""
from PIL import Image as PilImage
result = {
'positive': None,
'negative': None,
'checkpoint': None,
'loras': [], # list of {name, strength}
'seed': None,
'steps': None,
'cfg': None,
'sampler': None,
'scheduler': None,
}
try:
with PilImage.open(image_path) as im:
raw = im.info.get('prompt')
if not raw:
return result
nodes = json.loads(raw)
except Exception:
return result
for node in nodes.values():
ct = node.get('class_type', '')
inp = node.get('inputs', {})
if ct == 'KSampler':
result['seed'] = inp.get('seed')
result['steps'] = inp.get('steps')
result['cfg'] = inp.get('cfg')
result['sampler'] = inp.get('sampler_name')
result['scheduler'] = inp.get('scheduler')
elif ct == 'CheckpointLoaderSimple':
result['checkpoint'] = inp.get('ckpt_name')
elif ct == 'CLIPTextEncode':
# Identify positive vs negative by which KSampler input they connect to.
# Simpler heuristic: node "6" = positive, node "7" = negative (our fixed workflow).
# But to be robust, we check both via node graph references where possible.
# Fallback: first CLIPTextEncode = positive, second = negative.
text = inp.get('text', '')
if result['positive'] is None:
result['positive'] = text
elif result['negative'] is None:
result['negative'] = text
elif ct == 'LoraLoader':
name = inp.get('lora_name', '')
if name:
result['loras'].append({
'name': name,
'strength': inp.get('strength_model', 1.0),
})
# Re-parse with fixed node IDs from the known workflow (more reliable)
try:
if '6' in nodes:
result['positive'] = nodes['6']['inputs'].get('text', result['positive'])
if '7' in nodes:
result['negative'] = nodes['7']['inputs'].get('text', result['negative'])
except Exception:
pass
return result
@app.route('/gallery/prompt-data')
def gallery_prompt_data():
"""Return generation metadata for a specific image by reading its PNG tEXt chunk."""
img_path = request.args.get('path', '')
if not img_path:
return {'error': 'path parameter required'}, 400
# Validate path stays within uploads folder
upload_folder = os.path.abspath(app.config['UPLOAD_FOLDER'])
abs_img = os.path.abspath(os.path.join(upload_folder, img_path))
if not abs_img.startswith(upload_folder + os.sep):
return {'error': 'Invalid path'}, 400
if not os.path.isfile(abs_img):
return {'error': 'File not found'}, 404
meta = _parse_comfy_png_metadata(abs_img)
meta['path'] = img_path
return meta
@app.route('/gallery/delete', methods=['POST'])
def gallery_delete():
"""Delete a generated image from the gallery. Only the image file is removed."""
data = request.get_json(silent=True) or {}
img_path = data.get('path', '')
if not img_path:
return {'error': 'path required'}, 400
if len(img_path.split('/')) != 3:
return {'error': 'invalid path format'}, 400
upload_folder = os.path.abspath(app.config['UPLOAD_FOLDER'])
abs_img = os.path.abspath(os.path.join(upload_folder, img_path))
if not abs_img.startswith(upload_folder + os.sep):
return {'error': 'Invalid path'}, 400
if os.path.isfile(abs_img):
os.remove(abs_img)
return {'status': 'ok'}
@app.route('/resource/<category>/<slug>/delete', methods=['POST'])
def resource_delete(category, slug):
"""Delete a resource item from a category gallery.
soft: removes JSON data file + DB record; LoRA/checkpoint file kept on disk.
hard: removes JSON data file + LoRA/checkpoint safetensors + DB record.
"""
_RESOURCE_MODEL_MAP = {
'looks': Look,
'styles': Style,
'actions': Action,
'outfits': Outfit,
'scenes': Scene,
'detailers': Detailer,
'checkpoints': Checkpoint,
}
_RESOURCE_DATA_DIRS = {
'looks': app.config['LOOKS_DIR'],
'styles': app.config['STYLES_DIR'],
'actions': app.config['ACTIONS_DIR'],
'outfits': app.config['CLOTHING_DIR'],
'scenes': app.config['SCENES_DIR'],
'detailers': app.config['DETAILERS_DIR'],
'checkpoints': app.config['CHECKPOINTS_DIR'],
}
_LORA_BASE = '/ImageModels/lora/'
if category not in _RESOURCE_MODEL_MAP:
return {'error': 'unknown category'}, 400
req = request.get_json(silent=True) or {}
mode = req.get('mode', 'soft')
data_dir = _RESOURCE_DATA_DIRS[category]
json_path = os.path.join(data_dir, f'{slug}.json')
deleted = []
asset_abs = None
# Resolve asset path before deleting JSON (hard only)
if mode == 'hard' and os.path.isfile(json_path):
try:
with open(json_path) as f:
item_data = json.load(f)
if category == 'checkpoints':
ckpt_rel = item_data.get('checkpoint_path', '')
if ckpt_rel.startswith('Illustrious/'):
asset_abs = os.path.join(app.config['ILLUSTRIOUS_MODELS_DIR'],
ckpt_rel[len('Illustrious/'):])
elif ckpt_rel.startswith('Noob/'):
asset_abs = os.path.join(app.config['NOOB_MODELS_DIR'],
ckpt_rel[len('Noob/'):])
else:
lora_name = item_data.get('lora', {}).get('lora_name', '')
if lora_name:
asset_abs = os.path.join(_LORA_BASE, lora_name)
except Exception:
pass
# Delete JSON
if os.path.isfile(json_path):
os.remove(json_path)
deleted.append('json')
# Delete LoRA/checkpoint file (hard only)
if mode == 'hard' and asset_abs and os.path.isfile(asset_abs):
os.remove(asset_abs)
deleted.append('lora' if category != 'checkpoints' else 'checkpoint')
# Remove DB record
Model = _RESOURCE_MODEL_MAP[category]
rec = Model.query.filter_by(slug=slug).first()
if rec:
db.session.delete(rec)
db.session.commit()
deleted.append('db')
return {'status': 'ok', 'deleted': deleted}
# ---------------------------------------------------------------------------
# Strengths Gallery
# ---------------------------------------------------------------------------
_STRENGTHS_MODEL_MAP = {
'characters': Character,
'looks': Look,
'outfits': Outfit,
'actions': Action,
'styles': Style,
'scenes': Scene,
'detailers': Detailer,
}
# Which ComfyUI LoRA node each category occupies
_CATEGORY_LORA_NODES = {
'characters': '16',
'looks': '16',
'outfits': '17',
'actions': '18',
'styles': '19',
'scenes': '19',
'detailers': '19',
}
def _build_strengths_prompts(category, entity, character, action=None, extra_positive=''):
"""Build main/face/hand prompt strings for the Strengths Gallery.
Only includes prompt *content* from the entity and (optionally) the
character. LoRA triggers from other nodes are intentionally excluded
so the result reflects only the swept LoRA's contribution.
action — optional Action model object (used for detailer category)
extra_positive — additional free-text to append to the main prompt
"""
if category == 'characters':
# The entity IS the character — build its full prompt normally
return build_prompt(entity.data, [], entity.default_fields)
if category == 'looks':
# Start with linked character data, prepend Look positive tags
base = build_prompt(character.data, [], character.default_fields) if character else {'main': '', 'face': '', 'hand': ''}
look_pos = entity.data.get('positive', '')
look_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
prefix_parts = [p for p in [look_triggers, look_pos] if p]
prefix = ', '.join(prefix_parts)
if prefix:
base['main'] = f"{prefix}, {base['main']}" if base['main'] else prefix
return base
if category == 'outfits':
wardrobe = entity.data.get('wardrobe', {})
outfit_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
tags = entity.data.get('tags', [])
wardrobe_parts = [v for v in wardrobe.values() if isinstance(v, str) and v]
char_parts = []
face_parts = []
hand_parts = []
if character:
identity = character.data.get('identity', {})
defaults = character.data.get('defaults', {})
char_parts = [v for v in [identity.get('base_specs'), identity.get('hair'),
identity.get('eyes'), defaults.get('expression')] if v]
face_parts = [v for v in [identity.get('hair'), identity.get('eyes'),
defaults.get('expression')] if v]
hand_parts = [v for v in [wardrobe.get('hands'), wardrobe.get('gloves')] if v]
main_parts = ([outfit_triggers] if outfit_triggers else []) + char_parts + wardrobe_parts + tags
return {
'main': _dedup_tags(', '.join(p for p in main_parts if p)),
'face': _dedup_tags(', '.join(face_parts)),
'hand': _dedup_tags(', '.join(hand_parts)),
}
if category == 'actions':
action_data = entity.data.get('action', {})
action_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
tags = entity.data.get('tags', [])
pose_fields = ['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet', 'additional']
pose_parts = [action_data.get(k, '') for k in pose_fields if action_data.get(k)]
expr_parts = [action_data.get(k, '') for k in ['head', 'eyes'] if action_data.get(k)]
char_parts = []
face_parts = list(expr_parts)
hand_parts = [action_data.get('hands', '')] if action_data.get('hands') else []
if character:
identity = character.data.get('identity', {})
char_parts = [v for v in [identity.get('base_specs'), identity.get('hair'),
identity.get('eyes')] if v]
face_parts = [v for v in [identity.get('hair'), identity.get('eyes')] + expr_parts if v]
main_parts = ([action_triggers] if action_triggers else []) + char_parts + pose_parts + tags
return {
'main': _dedup_tags(', '.join(p for p in main_parts if p)),
'face': _dedup_tags(', '.join(face_parts)),
'hand': _dedup_tags(', '.join(hand_parts)),
}
# styles / scenes / detailers — character prompt + entity tags/triggers
entity_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
tags = entity.data.get('tags', [])
if category == 'styles':
sdata = entity.data.get('style', {})
artist = f"by {sdata['artist_name']}" if sdata.get('artist_name') else ''
style_tags = sdata.get('artistic_style', '')
entity_parts = [p for p in [entity_triggers, artist, style_tags] + tags if p]
elif category == 'scenes':
sdata = entity.data.get('scene', {})
scene_parts = [v for v in sdata.values() if isinstance(v, str) and v]
entity_parts = [p for p in [entity_triggers] + scene_parts + tags if p]
else: # detailers
det_prompt = entity.data.get('prompt', '')
entity_parts = [p for p in [entity_triggers, det_prompt] + tags if p]
base = build_prompt(character.data, [], character.default_fields) if character else {'main': '', 'face': '', 'hand': ''}
entity_str = ', '.join(entity_parts)
if entity_str:
base['main'] = f"{base['main']}, {entity_str}" if base['main'] else entity_str
# Incorporate action prompt fields (for detailer category)
if action is not None:
action_data = action.data.get('action', {})
action_parts = [action_data.get(k, '') for k in
['full_body', 'arms', 'hands', 'torso', 'pelvis', 'legs', 'feet', 'additional', 'head', 'eyes']
if action_data.get(k)]
action_str = ', '.join(action_parts)
if action_str:
base['main'] = f"{base['main']}, {action_str}" if base['main'] else action_str
# Append any extra positive text
if extra_positive:
base['main'] = f"{base['main']}, {extra_positive}" if base['main'] else extra_positive
return base
def _prepare_strengths_workflow(workflow, category, entity, character, prompts,
checkpoint, ckpt_data, strength_value, fixed_seed,
custom_negative=''):
"""Wire a ComfyUI workflow with ONLY the entity's LoRA active at a specific strength.
All other LoRA nodes are bypassed. A fixed seed ensures every step in the
Strengths Gallery sweep produces a comparably composed image.
"""
active_node = _CATEGORY_LORA_NODES.get(category, '16')
entity_lora = entity.data.get('lora', {})
entity_lora_name = entity_lora.get('lora_name', '')
# 1. Set checkpoint
if checkpoint and '4' in workflow:
workflow['4']['inputs']['ckpt_name'] = checkpoint
# 2. Default resolution
if '5' in workflow:
workflow['5']['inputs']['width'] = 1024
workflow['5']['inputs']['height'] = 1024
# 3. Inject prompts
if '6' in workflow:
workflow['6']['inputs']['text'] = workflow['6']['inputs']['text'].replace(
'{{POSITIVE_PROMPT}}', prompts.get('main', ''))
if '14' in workflow:
workflow['14']['inputs']['text'] = workflow['14']['inputs']['text'].replace(
'{{FACE_PROMPT}}', prompts.get('face', ''))
if '15' in workflow:
workflow['15']['inputs']['text'] = workflow['15']['inputs']['text'].replace(
'{{HAND_PROMPT}}', prompts.get('hand', ''))
# For looks, prepend the look's negative to node 7
if category == 'looks':
look_neg = entity.data.get('negative', '')
if look_neg and '7' in workflow:
workflow['7']['inputs']['text'] = f"{look_neg}, {workflow['7']['inputs']['text']}"
# Prepend any custom negative (e.g. extra_neg from detailer session)
if custom_negative and '7' in workflow:
workflow['7']['inputs']['text'] = f"{custom_negative}, {workflow['7']['inputs']['text']}"
# 4. Wire LoRA chain — only activate the entity's node; skip all others
model_source = ['4', 0]
clip_source = ['4', 1]
for node_id in ['16', '17', '18', '19']:
if node_id not in workflow:
continue
if node_id == active_node and entity_lora_name:
workflow[node_id]['inputs']['lora_name'] = entity_lora_name
workflow[node_id]['inputs']['strength_model'] = float(strength_value)
workflow[node_id]['inputs']['strength_clip'] = float(strength_value)
workflow[node_id]['inputs']['model'] = list(model_source)
workflow[node_id]['inputs']['clip'] = list(clip_source)
model_source = [node_id, 0]
clip_source = [node_id, 1]
# else: skip — model_source/clip_source pass through unchanged
# 5. Wire all consumers to the final model/clip source
for consumer, needs_model, needs_clip in [
('3', True, False),
('6', False, True),
('7', False, True),
('11', True, True),
('13', True, True),
('14', False, True),
('15', False, True),
]:
if consumer in workflow:
if needs_model:
workflow[consumer]['inputs']['model'] = list(model_source)
if needs_clip:
workflow[consumer]['inputs']['clip'] = list(clip_source)
# 6. Fixed seed for all samplers
for seed_node in ['3', '11', '13']:
if seed_node in workflow:
workflow[seed_node]['inputs']['seed'] = int(fixed_seed)
# 7. Apply checkpoint-specific settings (steps, cfg, sampler, base prompts, VAE)
if ckpt_data:
workflow = _apply_checkpoint_settings(workflow, ckpt_data)
# 8. Sync sampler/scheduler to detailer nodes
sampler_name = workflow['3']['inputs'].get('sampler_name')
scheduler = workflow['3']['inputs'].get('scheduler')
for node_id in ['11', '13']:
if node_id in workflow:
if sampler_name:
workflow[node_id]['inputs']['sampler_name'] = sampler_name
if scheduler:
workflow[node_id]['inputs']['scheduler'] = scheduler
# 9. Cross-dedup prompts
pos_text, neg_text = _cross_dedup_prompts(
workflow['6']['inputs']['text'],
workflow['7']['inputs']['text']
)
workflow['6']['inputs']['text'] = pos_text
workflow['7']['inputs']['text'] = neg_text
_log_workflow_prompts(f"_prepare_strengths_workflow [node={active_node} lora={entity_lora_name} @ {strength_value} seed={fixed_seed}]", workflow)
return workflow
@app.route('/strengths/<category>/<path:slug>/generate', methods=['POST'])
def strengths_generate(category, slug):
if category not in _STRENGTHS_MODEL_MAP:
return {'error': 'unknown category'}, 400
Model = _STRENGTHS_MODEL_MAP[category]
entity = Model.query.filter_by(slug=slug).first_or_404()
try:
strength_value = float(request.form.get('strength_value', 1.0))
fixed_seed = int(request.form.get('seed', random.randint(1, 10**15)))
# Resolve character: prefer POST body value (reflects current page dropdown),
# then fall back to session.
# Session keys use the *singular* category name (char_outfit_, char_action_, …)
# but the URL uses the plural (outfits, actions, …).
_singular = {
'outfits': 'outfit', 'actions': 'action', 'styles': 'style',
'scenes': 'scene', 'detailers': 'detailer', 'looks': 'look',
}
session_prefix = _singular.get(category, category)
char_slug = (request.form.get('character_slug') or
session.get(f'char_{session_prefix}_{slug}'))
if category == 'characters':
character = entity # entity IS the character
elif char_slug == '__random__':
character = Character.query.order_by(db.func.random()).first()
elif char_slug:
character = Character.query.filter_by(slug=char_slug).first()
else:
character = None
print(f"[Strengths] char_slug={char_slug!r} → character={character.slug if character else 'none'}")
# Read extra context that may be stored in session for some categories
action_obj = None
extra_positive = ''
extra_negative = ''
if category == 'detailers':
action_slug = session.get(f'action_detailer_{slug}')
if action_slug:
action_obj = Action.query.filter_by(slug=action_slug).first()
extra_positive = session.get(f'extra_pos_detailer_{slug}', '')
extra_negative = session.get(f'extra_neg_detailer_{slug}', '')
print(f"[Strengths] detailer session — char={char_slug}, action={action_slug}, extra_pos={bool(extra_positive)}, extra_neg={bool(extra_negative)}")
prompts = _build_strengths_prompts(category, entity, character,
action=action_obj, extra_positive=extra_positive)
checkpoint, ckpt_data = _get_default_checkpoint()
workflow_path = os.path.join(os.path.dirname(__file__), 'comfy_workflow.json')
with open(workflow_path, 'r') as f:
workflow = json.load(f)
workflow = _prepare_strengths_workflow(
workflow, category, entity, character, prompts,
checkpoint, ckpt_data, strength_value, fixed_seed,
custom_negative=extra_negative
)
_category = category
_slug = slug
_strength_value = strength_value
_fixed_seed = fixed_seed
def _finalize(comfy_prompt_id, job):
history = get_history(comfy_prompt_id)
outputs = history[comfy_prompt_id].get('outputs', {})
img_data = None
for node_output in outputs.values():
for img in node_output.get('images', []):
img_data = get_image(img['filename'], img.get('subfolder', ''), img.get('type', 'output'))
break
if img_data:
break
if not img_data:
raise Exception('no image in output')
strength_str = f"{_strength_value:.2f}".replace('.', '_')
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], _category, _slug, 'strengths')
os.makedirs(upload_dir, exist_ok=True)
out_filename = f"strength_{strength_str}_seed_{_fixed_seed}.png"
out_path = os.path.join(upload_dir, out_filename)
with open(out_path, 'wb') as f:
f.write(img_data)
relative = f"{_category}/{_slug}/strengths/{out_filename}"
job['result'] = {'image_url': f"/static/uploads/{relative}", 'strength_value': _strength_value}
label = f"Strengths: {entity.name} @ {strength_value:.2f}"
job = _enqueue_job(label, workflow, _finalize)
return {'status': 'queued', 'job_id': job['id']}
except Exception as e:
print(f"[Strengths] generate error: {e}")
return {'error': str(e)}, 500
@app.route('/strengths/<category>/<path:slug>/list')
def strengths_list(category, slug):
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], category, slug, 'strengths')
if not os.path.isdir(upload_dir):
return {'images': []}
images = []
for fname in sorted(os.listdir(upload_dir)):
if not fname.endswith('.png'):
continue
# Parse strength value from filename: strength_0_50_seed_12345.png → "0.50"
try:
parts = fname.replace('strength_', '').split('_seed_')
strength_raw = parts[0] # e.g. "0_50"
strength_display = strength_raw.replace('_', '.')
except Exception:
strength_display = fname
images.append({
'url': f"/static/uploads/{category}/{slug}/strengths/{fname}",
'strength': strength_display,
'filename': fname,
})
return {'images': images}
@app.route('/strengths/<category>/<path:slug>/clear', methods=['POST'])
def strengths_clear(category, slug):
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], category, slug, 'strengths')
if os.path.isdir(upload_dir):
for fname in os.listdir(upload_dir):
fpath = os.path.join(upload_dir, fname)
if os.path.isfile(fpath):
os.remove(fpath)
return {'success': True}
_STRENGTHS_DATA_DIRS = {
'characters': 'CHARACTERS_DIR',
'looks': 'LOOKS_DIR',
'outfits': 'CLOTHING_DIR',
'actions': 'ACTIONS_DIR',
'styles': 'STYLES_DIR',
'scenes': 'SCENES_DIR',
'detailers': 'DETAILERS_DIR',
}
@app.route('/strengths/<category>/<path:slug>/save_range', methods=['POST'])
def strengths_save_range(category, slug):
"""Save lora_weight_min / lora_weight_max from the Strengths Gallery back to the entity JSON + DB."""
if category not in _STRENGTHS_MODEL_MAP or category not in _STRENGTHS_DATA_DIRS:
return {'error': 'unknown category'}, 400
try:
min_w = float(request.form.get('min_weight', ''))
max_w = float(request.form.get('max_weight', ''))
except (ValueError, TypeError):
return {'error': 'invalid weight values'}, 400
if min_w > max_w:
min_w, max_w = max_w, min_w
Model = _STRENGTHS_MODEL_MAP[category]
entity = Model.query.filter_by(slug=slug).first_or_404()
# Update in-memory data dict
data = dict(entity.data)
if 'lora' not in data or not isinstance(data.get('lora'), dict):
return {'error': 'entity has no lora section'}, 400
data['lora']['lora_weight_min'] = min_w
data['lora']['lora_weight_max'] = max_w
entity.data = data
flag_modified(entity, 'data')
# Write back to JSON file on disk
data_dir = app.config[_STRENGTHS_DATA_DIRS[category]]
filename = getattr(entity, 'filename', None) or f"{slug}.json"
file_path = os.path.join(data_dir, filename)
if os.path.exists(file_path):
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
f.write('\n')
db.session.commit()
return {'success': True, 'lora_weight_min': min_w, 'lora_weight_max': max_w}
if __name__ == '__main__':
ensure_mcp_server_running()
with app.app_context():
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
db.create_all()
# Migration: Add active_outfit column if it doesn't exist
try:
from sqlalchemy import text
db.session.execute(text('ALTER TABLE character ADD COLUMN active_outfit VARCHAR(100) DEFAULT \'default\''))
db.session.commit()
print("Added active_outfit column to character table")
except Exception as e:
if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower():
print("active_outfit column already exists")
else:
print(f"Migration note: {e}")
# Migration: Add default_fields column to action table if it doesn't exist
try:
from sqlalchemy import text
db.session.execute(text('ALTER TABLE action ADD COLUMN default_fields JSON'))
db.session.commit()
print("Added default_fields column to action table")
except Exception as e:
if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower():
print("default_fields column already exists in action table")
else:
print(f"Migration action note: {e}")
# Migration: Add new columns to settings table
columns_to_add = [
('llm_provider', "VARCHAR(50) DEFAULT 'openrouter'"),
('local_base_url', "VARCHAR(255)"),
('local_model', "VARCHAR(100)")
]
for col_name, col_type in columns_to_add:
try:
db.session.execute(text(f'ALTER TABLE settings ADD COLUMN {col_name} {col_type}'))
db.session.commit()
print(f"Added {col_name} column to settings table")
except Exception as e:
if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower():
pass
else:
print(f"Migration settings note ({col_name}): {e}")
# Ensure settings exist
if not Settings.query.first():
db.session.add(Settings())
db.session.commit()
print("Created default settings")
sync_characters()
sync_outfits()
sync_actions()
# Migration: Add data column to checkpoint table
try:
db.session.execute(text('ALTER TABLE checkpoint ADD COLUMN data JSON'))
db.session.commit()
print("Added data column to checkpoint table")
except Exception as e:
if 'duplicate column name' in str(e).lower() or 'already exists' in str(e).lower():
print("data column already exists in checkpoint table")
else:
print(f"Migration checkpoint note: {e}")
sync_styles()
sync_detailers()
sync_scenes()
sync_looks()
sync_checkpoints()
app.run(debug=True, host='0.0.0.0', port=5000)