545 lines
23 KiB
Python
545 lines
23 KiB
Python
import json
|
||
import os
|
||
import re
|
||
import random
|
||
import time
|
||
import logging
|
||
|
||
from flask import render_template, request, redirect, url_for, flash, session, current_app
|
||
from werkzeug.utils import secure_filename
|
||
from sqlalchemy.orm.attributes import flag_modified
|
||
|
||
from models import db, Character, Style, Detailer, Settings
|
||
from services.workflow import _prepare_workflow, _get_default_checkpoint
|
||
from services.job_queue import _enqueue_job, _make_finalize
|
||
from services.prompts import build_prompt, _resolve_character, _ensure_character_fields, _append_background
|
||
from services.sync import sync_styles
|
||
from services.file_io import get_available_loras
|
||
from services.llm import load_prompt, call_llm
|
||
from utils import allowed_file, _WARDROBE_KEYS
|
||
|
||
logger = logging.getLogger('gaze')
|
||
|
||
|
||
def register_routes(app):
|
||
|
||
def _build_style_workflow(style_obj, character=None, selected_fields=None, fixed_seed=None, extra_positive=None, extra_negative=None):
|
||
"""Build and return a prepared ComfyUI workflow dict for a style generation."""
|
||
if character:
|
||
combined_data = character.data.copy()
|
||
combined_data['character_id'] = character.character_id
|
||
combined_data['style'] = style_obj.data.get('style', {})
|
||
|
||
# Merge style lora triggers if present
|
||
style_lora = style_obj.data.get('lora', {})
|
||
if style_lora.get('lora_triggers'):
|
||
if 'lora' not in combined_data: combined_data['lora'] = {}
|
||
combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {style_lora['lora_triggers']}"
|
||
|
||
# Merge character identity and wardrobe fields into selected_fields
|
||
if selected_fields:
|
||
_ensure_character_fields(character, selected_fields)
|
||
else:
|
||
# Auto-include essential character fields (minimal set for batch/default generation)
|
||
selected_fields = []
|
||
for key in ['base', 'head']:
|
||
if character.data.get('identity', {}).get(key):
|
||
selected_fields.append(f'identity::{key}')
|
||
selected_fields.append('special::name')
|
||
wardrobe = character.get_active_wardrobe()
|
||
for key in _WARDROBE_KEYS:
|
||
if wardrobe.get(key):
|
||
selected_fields.append(f'wardrobe::{key}')
|
||
selected_fields.extend(['style::artist_name', 'style::artistic_style', 'lora::lora_triggers'])
|
||
|
||
default_fields = style_obj.default_fields
|
||
active_outfit = character.active_outfit
|
||
else:
|
||
combined_data = {
|
||
'character_id': style_obj.style_id,
|
||
'style': style_obj.data.get('style', {}),
|
||
'lora': style_obj.data.get('lora', {}),
|
||
'tags': style_obj.data.get('tags', [])
|
||
}
|
||
if not selected_fields:
|
||
selected_fields = ['style::artist_name', 'style::artistic_style', 'lora::lora_triggers']
|
||
default_fields = style_obj.default_fields
|
||
active_outfit = 'default'
|
||
|
||
with open('comfy_workflow.json', 'r') as f:
|
||
workflow = json.load(f)
|
||
|
||
prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit)
|
||
|
||
_append_background(prompts, character)
|
||
|
||
if extra_positive:
|
||
prompts["main"] = f"{prompts['main']}, {extra_positive}"
|
||
|
||
ckpt_path, ckpt_data = _get_default_checkpoint()
|
||
workflow = _prepare_workflow(workflow, character, prompts, style=style_obj, custom_negative=extra_negative or None, checkpoint=ckpt_path, checkpoint_data=ckpt_data, fixed_seed=fixed_seed)
|
||
return workflow
|
||
|
||
@app.route('/styles')
|
||
def styles_index():
|
||
styles = Style.query.order_by(Style.name).all()
|
||
return render_template('styles/index.html', styles=styles)
|
||
|
||
@app.route('/styles/rescan', methods=['POST'])
|
||
def rescan_styles():
|
||
sync_styles()
|
||
flash('Database synced with style files.')
|
||
return redirect(url_for('styles_index'))
|
||
|
||
@app.route('/style/<path:slug>')
|
||
def style_detail(slug):
|
||
style = Style.query.filter_by(slug=slug).first_or_404()
|
||
characters = Character.query.order_by(Character.name).all()
|
||
|
||
# Load state from session
|
||
preferences = session.get(f'prefs_style_{slug}')
|
||
preview_image = session.get(f'preview_style_{slug}')
|
||
selected_character = session.get(f'char_style_{slug}')
|
||
extra_positive = session.get(f'extra_pos_style_{slug}', '')
|
||
extra_negative = session.get(f'extra_neg_style_{slug}', '')
|
||
|
||
# List existing preview images
|
||
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}")
|
||
existing_previews = []
|
||
if os.path.isdir(upload_dir):
|
||
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
|
||
existing_previews = [f"styles/{slug}/{f}" for f in files]
|
||
|
||
return render_template('styles/detail.html', style=style, characters=characters,
|
||
preferences=preferences, preview_image=preview_image,
|
||
selected_character=selected_character, existing_previews=existing_previews,
|
||
extra_positive=extra_positive, extra_negative=extra_negative)
|
||
|
||
@app.route('/style/<path:slug>/edit', methods=['GET', 'POST'])
|
||
def edit_style(slug):
|
||
style = Style.query.filter_by(slug=slug).first_or_404()
|
||
loras = get_available_loras('styles')
|
||
|
||
if request.method == 'POST':
|
||
try:
|
||
# 1. Update basic fields
|
||
style.name = request.form.get('style_name')
|
||
|
||
# 2. Rebuild the data dictionary
|
||
new_data = style.data.copy()
|
||
new_data['style_name'] = style.name
|
||
|
||
# Update style section
|
||
if 'style' in new_data:
|
||
for key in new_data['style'].keys():
|
||
form_key = f"style_{key}"
|
||
if form_key in request.form:
|
||
new_data['style'][key] = request.form.get(form_key)
|
||
|
||
# Update lora section
|
||
if 'lora' in new_data:
|
||
for key in new_data['lora'].keys():
|
||
form_key = f"lora_{key}"
|
||
if form_key in request.form:
|
||
val = request.form.get(form_key)
|
||
if key == 'lora_weight':
|
||
try: val = float(val)
|
||
except: val = 1.0
|
||
new_data['lora'][key] = val
|
||
|
||
# LoRA weight randomization bounds
|
||
for bound in ['lora_weight_min', 'lora_weight_max']:
|
||
val_str = request.form.get(f'lora_{bound}', '').strip()
|
||
if val_str:
|
||
try:
|
||
new_data.setdefault('lora', {})[bound] = float(val_str)
|
||
except ValueError:
|
||
pass
|
||
else:
|
||
new_data.setdefault('lora', {}).pop(bound, None)
|
||
|
||
style.data = new_data
|
||
flag_modified(style, "data")
|
||
|
||
# 3. Write back to JSON file
|
||
style_file = style.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', style.style_id)}.json"
|
||
file_path = os.path.join(app.config['STYLES_DIR'], style_file)
|
||
|
||
with open(file_path, 'w') as f:
|
||
json.dump(new_data, f, indent=2)
|
||
|
||
db.session.commit()
|
||
flash('Style updated successfully!')
|
||
return redirect(url_for('style_detail', slug=slug))
|
||
|
||
except Exception as e:
|
||
print(f"Edit error: {e}")
|
||
flash(f"Error saving changes: {str(e)}")
|
||
|
||
return render_template('styles/edit.html', style=style, loras=loras)
|
||
|
||
@app.route('/style/<path:slug>/upload', methods=['POST'])
|
||
def upload_style_image(slug):
|
||
style = Style.query.filter_by(slug=slug).first_or_404()
|
||
|
||
if 'image' not in request.files:
|
||
flash('No file part')
|
||
return redirect(request.url)
|
||
|
||
file = request.files['image']
|
||
if file.filename == '':
|
||
flash('No selected file')
|
||
return redirect(request.url)
|
||
|
||
if file and allowed_file(file.filename):
|
||
# Create style subfolder
|
||
style_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}")
|
||
os.makedirs(style_folder, exist_ok=True)
|
||
|
||
filename = secure_filename(file.filename)
|
||
file_path = os.path.join(style_folder, filename)
|
||
file.save(file_path)
|
||
|
||
# Store relative path in DB
|
||
style.image_path = f"styles/{slug}/{filename}"
|
||
db.session.commit()
|
||
flash('Image uploaded successfully!')
|
||
|
||
return redirect(url_for('style_detail', slug=slug))
|
||
|
||
@app.route('/style/<path:slug>/generate', methods=['POST'])
|
||
def generate_style_image(slug):
|
||
style_obj = Style.query.filter_by(slug=slug).first_or_404()
|
||
|
||
try:
|
||
# Get action type
|
||
action = request.form.get('action', 'preview')
|
||
|
||
# Get selected fields
|
||
selected_fields = request.form.getlist('include_field')
|
||
|
||
# Get selected character (if any)
|
||
character_slug = request.form.get('character_slug', '')
|
||
character = None
|
||
|
||
character = _resolve_character(character_slug)
|
||
if character_slug == '__random__' and character:
|
||
character_slug = character.slug
|
||
|
||
# Get additional prompts
|
||
extra_positive = request.form.get('extra_positive', '').strip()
|
||
extra_negative = request.form.get('extra_negative', '').strip()
|
||
|
||
# Save preferences
|
||
session[f'char_style_{slug}'] = character_slug
|
||
session[f'prefs_style_{slug}'] = selected_fields
|
||
session[f'extra_pos_style_{slug}'] = extra_positive
|
||
session[f'extra_neg_style_{slug}'] = extra_negative
|
||
session.modified = True
|
||
|
||
# Parse optional seed
|
||
seed_val = request.form.get('seed', '').strip()
|
||
fixed_seed = int(seed_val) if seed_val else None
|
||
|
||
# Build workflow using helper (returns workflow dict, not prompt_response)
|
||
workflow = _build_style_workflow(style_obj, character, selected_fields, fixed_seed=fixed_seed, extra_positive=extra_positive, extra_negative=extra_negative)
|
||
|
||
char_label = character.name if character else 'no character'
|
||
label = f"Style: {style_obj.name} ({char_label}) – {action}"
|
||
job = _enqueue_job(label, workflow, _make_finalize('styles', slug, Style, action))
|
||
|
||
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
|
||
return {'status': 'queued', 'job_id': job['id']}
|
||
|
||
return redirect(url_for('style_detail', slug=slug))
|
||
|
||
except Exception as e:
|
||
print(f"Generation error: {e}")
|
||
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
|
||
return {'error': str(e)}, 500
|
||
flash(f"Error during generation: {str(e)}")
|
||
return redirect(url_for('style_detail', slug=slug))
|
||
|
||
@app.route('/style/<path:slug>/save_defaults', methods=['POST'])
|
||
def save_style_defaults(slug):
|
||
style = Style.query.filter_by(slug=slug).first_or_404()
|
||
selected_fields = request.form.getlist('include_field')
|
||
style.default_fields = selected_fields
|
||
db.session.commit()
|
||
flash('Default prompt selection saved for this style!')
|
||
return redirect(url_for('style_detail', slug=slug))
|
||
|
||
@app.route('/style/<path:slug>/replace_cover_from_preview', methods=['POST'])
|
||
def replace_style_cover_from_preview(slug):
|
||
style = Style.query.filter_by(slug=slug).first_or_404()
|
||
preview_path = request.form.get('preview_path')
|
||
if preview_path and os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], preview_path)):
|
||
style.image_path = preview_path
|
||
db.session.commit()
|
||
flash('Cover image updated!')
|
||
else:
|
||
flash('No valid preview image selected.', 'error')
|
||
return redirect(url_for('style_detail', slug=slug))
|
||
|
||
@app.route('/get_missing_styles')
|
||
def get_missing_styles():
|
||
missing = Style.query.filter((Style.image_path == None) | (Style.image_path == '')).order_by(Style.name).all()
|
||
return {'missing': [{'slug': s.slug, 'name': s.name} for s in missing]}
|
||
|
||
@app.route('/get_missing_detailers')
|
||
def get_missing_detailers():
|
||
missing = Detailer.query.filter((Detailer.image_path == None) | (Detailer.image_path == '')).order_by(Detailer.name).all()
|
||
return {'missing': [{'slug': d.slug, 'name': d.name} for d in missing]}
|
||
|
||
@app.route('/clear_all_detailer_covers', methods=['POST'])
|
||
def clear_all_detailer_covers():
|
||
detailers = Detailer.query.all()
|
||
for detailer in detailers:
|
||
detailer.image_path = None
|
||
db.session.commit()
|
||
return {'success': True}
|
||
|
||
@app.route('/clear_all_style_covers', methods=['POST'])
|
||
def clear_all_style_covers():
|
||
styles = Style.query.all()
|
||
for style in styles:
|
||
style.image_path = None
|
||
db.session.commit()
|
||
return {'success': True}
|
||
|
||
@app.route('/styles/generate_missing', methods=['POST'])
|
||
def generate_missing_styles():
|
||
missing = Style.query.filter(
|
||
(Style.image_path == None) | (Style.image_path == '')
|
||
).order_by(Style.name).all()
|
||
|
||
if not missing:
|
||
flash("No styles missing cover images.")
|
||
return redirect(url_for('styles_index'))
|
||
|
||
all_characters = Character.query.all()
|
||
if not all_characters:
|
||
flash("No characters available to preview styles with.", "error")
|
||
return redirect(url_for('styles_index'))
|
||
|
||
enqueued = 0
|
||
for style_obj in missing:
|
||
character = random.choice(all_characters)
|
||
try:
|
||
workflow = _build_style_workflow(style_obj, character=character)
|
||
|
||
_enqueue_job(f"Style: {style_obj.name} – cover", workflow,
|
||
_make_finalize('styles', style_obj.slug, Style))
|
||
enqueued += 1
|
||
except Exception as e:
|
||
print(f"Error queuing cover generation for style {style_obj.name}: {e}")
|
||
|
||
flash(f"Queued {enqueued} style cover image generation job{'s' if enqueued != 1 else ''}.")
|
||
return redirect(url_for('styles_index'))
|
||
|
||
@app.route('/styles/bulk_create', methods=['POST'])
|
||
def bulk_create_styles_from_loras():
|
||
_s = Settings.query.first()
|
||
styles_lora_dir = ((_s.lora_dir_styles if _s else None) or '/ImageModels/lora/Illustrious/Styles').rstrip('/')
|
||
_lora_subfolder = os.path.basename(styles_lora_dir)
|
||
if not os.path.exists(styles_lora_dir):
|
||
flash('Styles LoRA directory not found.', 'error')
|
||
return redirect(url_for('styles_index'))
|
||
|
||
overwrite = request.form.get('overwrite') == 'true'
|
||
created_count = 0
|
||
skipped_count = 0
|
||
overwritten_count = 0
|
||
|
||
system_prompt = load_prompt('style_system.txt')
|
||
if not system_prompt:
|
||
flash('Style system prompt file not found.', 'error')
|
||
return redirect(url_for('styles_index'))
|
||
|
||
for filename in os.listdir(styles_lora_dir):
|
||
if filename.endswith('.safetensors'):
|
||
name_base = filename.rsplit('.', 1)[0]
|
||
style_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
|
||
style_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
|
||
|
||
json_filename = f"{style_id}.json"
|
||
json_path = os.path.join(app.config['STYLES_DIR'], json_filename)
|
||
|
||
is_existing = os.path.exists(json_path)
|
||
if is_existing and not overwrite:
|
||
skipped_count += 1
|
||
continue
|
||
|
||
html_filename = f"{name_base}.html"
|
||
html_path = os.path.join(styles_lora_dir, html_filename)
|
||
html_content = ""
|
||
if os.path.exists(html_path):
|
||
try:
|
||
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
|
||
html_raw = hf.read()
|
||
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
|
||
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
|
||
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
|
||
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
|
||
html_content = ' '.join(clean_html.split())
|
||
except Exception as e:
|
||
print(f"Error reading HTML {html_filename}: {e}")
|
||
|
||
try:
|
||
print(f"Asking LLM to describe style: {style_name}")
|
||
prompt = f"Describe an art style or artist LoRA for AI image generation based on the filename: '{filename}'"
|
||
if html_content:
|
||
prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_content[:3000]}\n###"
|
||
|
||
llm_response = call_llm(prompt, system_prompt)
|
||
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
|
||
style_data = json.loads(clean_json)
|
||
|
||
style_data['style_id'] = style_id
|
||
style_data['style_name'] = style_name
|
||
|
||
if 'lora' not in style_data: style_data['lora'] = {}
|
||
style_data['lora']['lora_name'] = f"Illustrious/{_lora_subfolder}/{filename}"
|
||
|
||
if not style_data['lora'].get('lora_triggers'):
|
||
style_data['lora']['lora_triggers'] = name_base
|
||
if style_data['lora'].get('lora_weight') is None:
|
||
style_data['lora']['lora_weight'] = 1.0
|
||
if style_data['lora'].get('lora_weight_min') is None:
|
||
style_data['lora']['lora_weight_min'] = 0.7
|
||
if style_data['lora'].get('lora_weight_max') is None:
|
||
style_data['lora']['lora_weight_max'] = 1.0
|
||
|
||
with open(json_path, 'w') as f:
|
||
json.dump(style_data, f, indent=2)
|
||
|
||
if is_existing:
|
||
overwritten_count += 1
|
||
else:
|
||
created_count += 1
|
||
|
||
time.sleep(0.5)
|
||
except Exception as e:
|
||
print(f"Error creating style for {filename}: {e}")
|
||
|
||
if created_count > 0 or overwritten_count > 0:
|
||
sync_styles()
|
||
msg = f'Successfully processed styles: {created_count} created, {overwritten_count} overwritten.'
|
||
if skipped_count > 0:
|
||
msg += f' (Skipped {skipped_count} existing)'
|
||
flash(msg)
|
||
else:
|
||
flash(f'No styles created or overwritten. {skipped_count} existing styles found.')
|
||
|
||
return redirect(url_for('styles_index'))
|
||
|
||
@app.route('/style/create', methods=['GET', 'POST'])
|
||
def create_style():
|
||
if request.method == 'POST':
|
||
name = request.form.get('name')
|
||
slug = request.form.get('filename', '').strip()
|
||
|
||
if not slug:
|
||
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
|
||
|
||
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
|
||
if not safe_slug:
|
||
safe_slug = 'style'
|
||
|
||
base_slug = safe_slug
|
||
counter = 1
|
||
while os.path.exists(os.path.join(app.config['STYLES_DIR'], f"{safe_slug}.json")):
|
||
safe_slug = f"{base_slug}_{counter}"
|
||
counter += 1
|
||
|
||
style_data = {
|
||
"style_id": safe_slug,
|
||
"style_name": name,
|
||
"style": {
|
||
"artist_name": "",
|
||
"artistic_style": ""
|
||
},
|
||
"lora": {
|
||
"lora_name": "",
|
||
"lora_weight": 1.0,
|
||
"lora_triggers": ""
|
||
}
|
||
}
|
||
|
||
try:
|
||
file_path = os.path.join(app.config['STYLES_DIR'], f"{safe_slug}.json")
|
||
with open(file_path, 'w') as f:
|
||
json.dump(style_data, f, indent=2)
|
||
|
||
new_style = Style(
|
||
style_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json",
|
||
name=name, data=style_data
|
||
)
|
||
db.session.add(new_style)
|
||
db.session.commit()
|
||
|
||
flash('Style created successfully!')
|
||
return redirect(url_for('style_detail', slug=safe_slug))
|
||
except Exception as e:
|
||
print(f"Save error: {e}")
|
||
flash(f"Failed to create style: {e}")
|
||
return redirect(request.url)
|
||
|
||
return render_template('styles/create.html')
|
||
|
||
@app.route('/style/<path:slug>/clone', methods=['POST'])
|
||
def clone_style(slug):
|
||
style = Style.query.filter_by(slug=slug).first_or_404()
|
||
|
||
base_id = style.style_id
|
||
match = re.match(r'^(.+?)_(\d+)$', base_id)
|
||
if match:
|
||
base_name = match.group(1)
|
||
current_num = int(match.group(2))
|
||
else:
|
||
base_name = base_id
|
||
current_num = 1
|
||
|
||
next_num = current_num + 1
|
||
while True:
|
||
new_id = f"{base_name}_{next_num:02d}"
|
||
new_filename = f"{new_id}.json"
|
||
new_path = os.path.join(app.config['STYLES_DIR'], new_filename)
|
||
if not os.path.exists(new_path):
|
||
break
|
||
next_num += 1
|
||
|
||
new_data = style.data.copy()
|
||
new_data['style_id'] = new_id
|
||
new_data['style_name'] = f"{style.name} (Copy)"
|
||
|
||
with open(new_path, 'w') as f:
|
||
json.dump(new_data, f, indent=2)
|
||
|
||
new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id)
|
||
new_style = Style(
|
||
style_id=new_id, slug=new_slug, filename=new_filename,
|
||
name=new_data['style_name'], data=new_data
|
||
)
|
||
db.session.add(new_style)
|
||
db.session.commit()
|
||
|
||
flash(f'Style cloned as "{new_id}"!')
|
||
return redirect(url_for('style_detail', slug=new_slug))
|
||
|
||
@app.route('/style/<path:slug>/save_json', methods=['POST'])
|
||
def save_style_json(slug):
|
||
style = Style.query.filter_by(slug=slug).first_or_404()
|
||
try:
|
||
new_data = json.loads(request.form.get('json_data', ''))
|
||
except (ValueError, TypeError) as e:
|
||
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
|
||
style.data = new_data
|
||
flag_modified(style, 'data')
|
||
db.session.commit()
|
||
if style.filename:
|
||
file_path = os.path.join(app.config['STYLES_DIR'], style.filename)
|
||
with open(file_path, 'w') as f:
|
||
json.dump(new_data, f, indent=2)
|
||
return {'success': True}
|