Files
character-browser/routes/checkpoints.py
Aodhan Collins 5e4348ebc1 Add extra prompts, endless generation, random character default, and small fixes
- Add extra positive/negative prompt textareas to all 9 detail pages with session persistence
- Add Endless generation button to all detail pages (continuous preview generation until stopped)
- Default character selector to "Random Character" on all secondary detail pages
- Fix queue clear endpoint (remove spurious auth check)
- Refactor app.py into routes/ and services/ modules
- Update CLAUDE.md with new architecture documentation
- Various data file updates and cleanup

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 02:07:16 +00:00

287 lines
14 KiB
Python

import json
import os
import re
import time
import logging
from flask import render_template, request, redirect, url_for, flash, session, current_app
from werkzeug.utils import secure_filename
from sqlalchemy.orm.attributes import flag_modified
from models import db, Checkpoint, Character, Settings
from services.workflow import _prepare_workflow, _get_default_checkpoint, _apply_checkpoint_settings, _log_workflow_prompts
from services.job_queue import _enqueue_job, _make_finalize
from services.prompts import build_prompt, _resolve_character, _ensure_character_fields, _append_background
from services.sync import sync_checkpoints, _default_checkpoint_data
from services.file_io import get_available_checkpoints
from services.llm import load_prompt, call_llm
from utils import allowed_file
logger = logging.getLogger('gaze')
def register_routes(app):
def _build_checkpoint_workflow(ckpt_obj, character=None, fixed_seed=None, extra_positive=None, extra_negative=None):
"""Build and return a prepared ComfyUI workflow dict for a checkpoint generation."""
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
if character:
combined_data = character.data.copy()
combined_data['character_id'] = character.character_id
selected_fields = []
for key in ['base_specs', 'hair', 'eyes']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
selected_fields.append('special::name')
wardrobe = character.get_active_wardrobe()
for key in ['full_body', 'top', 'bottom']:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
prompts = build_prompt(combined_data, selected_fields, None, active_outfit=character.active_outfit)
_append_background(prompts, character)
else:
prompts = {
"main": "masterpiece, best quality, 1girl, solo, simple background, looking at viewer",
"face": "masterpiece, best quality",
"hand": "masterpiece, best quality",
}
if extra_positive:
prompts["main"] = f"{prompts['main']}, {extra_positive}"
workflow = _prepare_workflow(workflow, character, prompts, checkpoint=ckpt_obj.checkpoint_path,
checkpoint_data=ckpt_obj.data or {}, custom_negative=extra_negative or None, fixed_seed=fixed_seed)
return workflow
@app.route('/checkpoints')
def checkpoints_index():
checkpoints = Checkpoint.query.order_by(Checkpoint.name).all()
return render_template('checkpoints/index.html', checkpoints=checkpoints)
@app.route('/checkpoints/rescan', methods=['POST'])
def rescan_checkpoints():
sync_checkpoints()
flash('Checkpoint list synced from disk.')
return redirect(url_for('checkpoints_index'))
@app.route('/checkpoint/<path:slug>')
def checkpoint_detail(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
preview_image = session.get(f'preview_checkpoint_{slug}')
selected_character = session.get(f'char_checkpoint_{slug}')
extra_positive = session.get(f'extra_pos_checkpoint_{slug}', '')
extra_negative = session.get(f'extra_neg_checkpoint_{slug}', '')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"checkpoints/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"checkpoints/{slug}/{f}" for f in files]
return render_template('checkpoints/detail.html', ckpt=ckpt, characters=characters,
preview_image=preview_image, selected_character=selected_character,
existing_previews=existing_previews,
extra_positive=extra_positive, extra_negative=extra_negative)
@app.route('/checkpoint/<path:slug>/upload', methods=['POST'])
def upload_checkpoint_image(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(url_for('checkpoint_detail', slug=slug))
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(url_for('checkpoint_detail', slug=slug))
if file and allowed_file(file.filename):
folder = os.path.join(app.config['UPLOAD_FOLDER'], f"checkpoints/{slug}")
os.makedirs(folder, exist_ok=True)
filename = secure_filename(file.filename)
file.save(os.path.join(folder, filename))
ckpt.image_path = f"checkpoints/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('checkpoint_detail', slug=slug))
@app.route('/checkpoint/<path:slug>/generate', methods=['POST'])
def generate_checkpoint_image(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
try:
character_slug = request.form.get('character_slug', '')
character = _resolve_character(character_slug)
if character_slug == '__random__' and character:
character_slug = character.slug
# Get additional prompts
extra_positive = request.form.get('extra_positive', '').strip()
extra_negative = request.form.get('extra_negative', '').strip()
session[f'char_checkpoint_{slug}'] = character_slug
session[f'extra_pos_checkpoint_{slug}'] = extra_positive
session[f'extra_neg_checkpoint_{slug}'] = extra_negative
session.modified = True
seed_val = request.form.get('seed', '').strip()
fixed_seed = int(seed_val) if seed_val else None
workflow = _build_checkpoint_workflow(ckpt, character, fixed_seed=fixed_seed, extra_positive=extra_positive, extra_negative=extra_negative)
char_label = character.name if character else 'random'
label = f"Checkpoint: {ckpt.name} ({char_label})"
job = _enqueue_job(label, workflow, _make_finalize('checkpoints', slug, Checkpoint))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
return redirect(url_for('checkpoint_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('checkpoint_detail', slug=slug))
@app.route('/checkpoint/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_checkpoint_cover_from_preview(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
preview_path = request.form.get('preview_path')
if preview_path and os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], preview_path)):
ckpt.image_path = preview_path
db.session.commit()
flash('Cover image updated!')
else:
flash('No valid preview image selected.', 'error')
return redirect(url_for('checkpoint_detail', slug=slug))
@app.route('/checkpoint/<path:slug>/save_json', methods=['POST'])
def save_checkpoint_json(slug):
ckpt = Checkpoint.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
ckpt.data = new_data
flag_modified(ckpt, 'data')
db.session.commit()
checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints')
file_path = os.path.join(checkpoints_dir, f'{ckpt.slug}.json')
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
@app.route('/get_missing_checkpoints')
def get_missing_checkpoints():
missing = Checkpoint.query.filter((Checkpoint.image_path == None) | (Checkpoint.image_path == '')).order_by(Checkpoint.name).all()
return {'missing': [{'slug': c.slug, 'name': c.name} for c in missing]}
@app.route('/clear_all_checkpoint_covers', methods=['POST'])
def clear_all_checkpoint_covers():
for ckpt in Checkpoint.query.all():
ckpt.image_path = None
db.session.commit()
return {'success': True}
@app.route('/checkpoints/bulk_create', methods=['POST'])
def bulk_create_checkpoints():
checkpoints_dir = app.config.get('CHECKPOINTS_DIR', 'data/checkpoints')
os.makedirs(checkpoints_dir, exist_ok=True)
overwrite = request.form.get('overwrite') == 'true'
created_count = 0
skipped_count = 0
overwritten_count = 0
system_prompt = load_prompt('checkpoint_system.txt')
if not system_prompt:
flash('Checkpoint system prompt file not found.', 'error')
return redirect(url_for('checkpoints_index'))
dirs = [
(app.config.get('ILLUSTRIOUS_MODELS_DIR', ''), 'Illustrious'),
(app.config.get('NOOB_MODELS_DIR', ''), 'Noob'),
]
for dirpath, family in dirs:
if not dirpath or not os.path.exists(dirpath):
continue
for filename in sorted(os.listdir(dirpath)):
if not (filename.endswith('.safetensors') or filename.endswith('.ckpt')):
continue
checkpoint_path = f"{family}/{filename}"
name_base = filename.rsplit('.', 1)[0]
safe_id = re.sub(r'[^a-zA-Z0-9_]', '_', checkpoint_path.rsplit('.', 1)[0]).lower().strip('_')
json_filename = f"{safe_id}.json"
json_path = os.path.join(checkpoints_dir, json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped_count += 1
continue
# Look for a matching HTML file alongside the model file
html_path = os.path.join(dirpath, f"{name_base}.html")
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception as e:
print(f"Error reading HTML for {filename}: {e}")
defaults = _default_checkpoint_data(checkpoint_path, filename)
if html_content:
try:
print(f"Asking LLM to describe checkpoint: {filename}")
prompt = (
f"Generate checkpoint metadata JSON for the model file: '{filename}' "
f"(checkpoint_path: '{checkpoint_path}').\n\n"
f"Here is descriptive text extracted from an associated HTML file:\n###\n{html_content[:3000]}\n###"
)
llm_response = call_llm(prompt, system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
ckpt_data = json.loads(clean_json)
# Enforce fixed fields
ckpt_data['checkpoint_path'] = checkpoint_path
ckpt_data['checkpoint_name'] = filename
# Fill missing fields with defaults
for key, val in defaults.items():
if key not in ckpt_data or ckpt_data[key] is None:
ckpt_data[key] = val
time.sleep(0.5)
except Exception as e:
print(f"LLM error for {filename}: {e}. Using defaults.")
ckpt_data = defaults
else:
ckpt_data = defaults
try:
with open(json_path, 'w') as f:
json.dump(ckpt_data, f, indent=2)
if is_existing:
overwritten_count += 1
else:
created_count += 1
except Exception as e:
print(f"Error saving JSON for {filename}: {e}")
if created_count > 0 or overwritten_count > 0:
sync_checkpoints()
msg = f'Successfully processed checkpoints: {created_count} created, {overwritten_count} overwritten.'
if skipped_count > 0:
msg += f' (Skipped {skipped_count} existing)'
flash(msg)
else:
flash(f'No checkpoints created or overwritten. {skipped_count} existing entries found.')
return redirect(url_for('checkpoints_index'))