import json import os import re import random import logging from flask import render_template, request, redirect, url_for, flash, session, current_app from werkzeug.utils import secure_filename from sqlalchemy.orm.attributes import flag_modified from models import db, Character, Style, Detailer, Settings from services.workflow import _prepare_workflow, _get_default_checkpoint from services.job_queue import _enqueue_job, _make_finalize, _enqueue_task from services.prompts import build_prompt, _resolve_character, _ensure_character_fields, _append_background from services.sync import sync_styles from services.file_io import get_available_loras from services.llm import load_prompt, call_llm from utils import allowed_file, _WARDROBE_KEYS logger = logging.getLogger('gaze') def register_routes(app): def _build_style_workflow(style_obj, character=None, selected_fields=None, fixed_seed=None, extra_positive=None, extra_negative=None): """Build and return a prepared ComfyUI workflow dict for a style generation.""" if character: combined_data = character.data.copy() combined_data['character_id'] = character.character_id combined_data['style'] = style_obj.data.get('style', {}) # Merge style lora triggers if present style_lora = style_obj.data.get('lora', {}) if style_lora.get('lora_triggers'): if 'lora' not in combined_data: combined_data['lora'] = {} combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {style_lora['lora_triggers']}" # Merge character identity and wardrobe fields into selected_fields if selected_fields: _ensure_character_fields(character, selected_fields) else: # Auto-include essential character fields (minimal set for batch/default generation) selected_fields = [] for key in ['base', 'head']: if character.data.get('identity', {}).get(key): selected_fields.append(f'identity::{key}') selected_fields.append('special::name') wardrobe = character.get_active_wardrobe() for key in _WARDROBE_KEYS: if wardrobe.get(key): selected_fields.append(f'wardrobe::{key}') selected_fields.extend(['style::artist_name', 'style::artistic_style', 'lora::lora_triggers']) default_fields = style_obj.default_fields active_outfit = character.active_outfit else: combined_data = { 'character_id': style_obj.style_id, 'style': style_obj.data.get('style', {}), 'lora': style_obj.data.get('lora', {}), 'tags': style_obj.data.get('tags', []) } if not selected_fields: selected_fields = ['style::artist_name', 'style::artistic_style', 'lora::lora_triggers'] default_fields = style_obj.default_fields active_outfit = 'default' with open('comfy_workflow.json', 'r') as f: workflow = json.load(f) prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit) _append_background(prompts, character) if extra_positive: prompts["main"] = f"{prompts['main']}, {extra_positive}" ckpt_path, ckpt_data = _get_default_checkpoint() workflow = _prepare_workflow(workflow, character, prompts, style=style_obj, custom_negative=extra_negative or None, checkpoint=ckpt_path, checkpoint_data=ckpt_data, fixed_seed=fixed_seed) return workflow @app.route('/styles') def styles_index(): query = Style.query fav = request.args.get('favourite') nsfw = request.args.get('nsfw', 'all') if fav == 'on': query = query.filter_by(is_favourite=True) if nsfw == 'sfw': query = query.filter_by(is_nsfw=False) elif nsfw == 'nsfw': query = query.filter_by(is_nsfw=True) styles = query.order_by(Style.is_favourite.desc(), Style.name).all() return render_template('styles/index.html', styles=styles, favourite_filter=fav or '', nsfw_filter=nsfw) @app.route('/styles/rescan', methods=['POST']) def rescan_styles(): sync_styles() flash('Database synced with style files.') return redirect(url_for('styles_index')) @app.route('/style/') def style_detail(slug): style = Style.query.filter_by(slug=slug).first_or_404() characters = Character.query.order_by(Character.name).all() # Load state from session preferences = session.get(f'prefs_style_{slug}') preview_image = session.get(f'preview_style_{slug}') selected_character = session.get(f'char_style_{slug}') extra_positive = session.get(f'extra_pos_style_{slug}', '') extra_negative = session.get(f'extra_neg_style_{slug}', '') # List existing preview images upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}") existing_previews = [] if os.path.isdir(upload_dir): files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True) existing_previews = [f"styles/{slug}/{f}" for f in files] return render_template('styles/detail.html', style=style, characters=characters, preferences=preferences, preview_image=preview_image, selected_character=selected_character, existing_previews=existing_previews, extra_positive=extra_positive, extra_negative=extra_negative) @app.route('/style//edit', methods=['GET', 'POST']) def edit_style(slug): style = Style.query.filter_by(slug=slug).first_or_404() loras = get_available_loras('styles') if request.method == 'POST': try: # 1. Update basic fields style.name = request.form.get('style_name') # 2. Rebuild the data dictionary new_data = style.data.copy() new_data['style_name'] = style.name # Update style section if 'style' in new_data: for key in new_data['style'].keys(): form_key = f"style_{key}" if form_key in request.form: new_data['style'][key] = request.form.get(form_key) # Update lora section if 'lora' in new_data: for key in new_data['lora'].keys(): form_key = f"lora_{key}" if form_key in request.form: val = request.form.get(form_key) if key == 'lora_weight': try: val = float(val) except: val = 1.0 new_data['lora'][key] = val # LoRA weight randomization bounds for bound in ['lora_weight_min', 'lora_weight_max']: val_str = request.form.get(f'lora_{bound}', '').strip() if val_str: try: new_data.setdefault('lora', {})[bound] = float(val_str) except ValueError: pass else: new_data.setdefault('lora', {}).pop(bound, None) # Update Tags (structured dict) new_data['tags'] = { 'style_type': request.form.get('tag_style_type', '').strip(), 'nsfw': 'tag_nsfw' in request.form, } style.is_nsfw = new_data['tags']['nsfw'] style.data = new_data flag_modified(style, "data") # 3. Write back to JSON file style_file = style.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', style.style_id)}.json" file_path = os.path.join(app.config['STYLES_DIR'], style_file) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) db.session.commit() flash('Style updated successfully!') return redirect(url_for('style_detail', slug=slug)) except Exception as e: print(f"Edit error: {e}") flash(f"Error saving changes: {str(e)}") return render_template('styles/edit.html', style=style, loras=loras) @app.route('/style//upload', methods=['POST']) def upload_style_image(slug): style = Style.query.filter_by(slug=slug).first_or_404() if 'image' not in request.files: flash('No file part') return redirect(request.url) file = request.files['image'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): # Create style subfolder style_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"styles/{slug}") os.makedirs(style_folder, exist_ok=True) filename = secure_filename(file.filename) file_path = os.path.join(style_folder, filename) file.save(file_path) # Store relative path in DB style.image_path = f"styles/{slug}/{filename}" db.session.commit() flash('Image uploaded successfully!') return redirect(url_for('style_detail', slug=slug)) @app.route('/style//generate', methods=['POST']) def generate_style_image(slug): style_obj = Style.query.filter_by(slug=slug).first_or_404() try: # Get action type action = request.form.get('action', 'preview') # Get selected fields selected_fields = request.form.getlist('include_field') # Get selected character (if any) character_slug = request.form.get('character_slug', '') character = None character = _resolve_character(character_slug) if character_slug == '__random__' and character: character_slug = character.slug # Get additional prompts extra_positive = request.form.get('extra_positive', '').strip() extra_negative = request.form.get('extra_negative', '').strip() # Save preferences session[f'char_style_{slug}'] = character_slug session[f'prefs_style_{slug}'] = selected_fields session[f'extra_pos_style_{slug}'] = extra_positive session[f'extra_neg_style_{slug}'] = extra_negative session.modified = True # Parse optional seed seed_val = request.form.get('seed', '').strip() fixed_seed = int(seed_val) if seed_val else None # Build workflow using helper (returns workflow dict, not prompt_response) workflow = _build_style_workflow(style_obj, character, selected_fields, fixed_seed=fixed_seed, extra_positive=extra_positive, extra_negative=extra_negative) char_label = character.name if character else 'no character' label = f"Style: {style_obj.name} ({char_label}) – {action}" job = _enqueue_job(label, workflow, _make_finalize('styles', slug, Style, action)) if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'status': 'queued', 'job_id': job['id']} return redirect(url_for('style_detail', slug=slug)) except Exception as e: print(f"Generation error: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': str(e)}, 500 flash(f"Error during generation: {str(e)}") return redirect(url_for('style_detail', slug=slug)) @app.route('/style//save_defaults', methods=['POST']) def save_style_defaults(slug): style = Style.query.filter_by(slug=slug).first_or_404() selected_fields = request.form.getlist('include_field') style.default_fields = selected_fields db.session.commit() flash('Default prompt selection saved for this style!') return redirect(url_for('style_detail', slug=slug)) @app.route('/style//replace_cover_from_preview', methods=['POST']) def replace_style_cover_from_preview(slug): style = Style.query.filter_by(slug=slug).first_or_404() preview_path = request.form.get('preview_path') if preview_path and os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], preview_path)): style.image_path = preview_path db.session.commit() flash('Cover image updated!') else: flash('No valid preview image selected.', 'error') return redirect(url_for('style_detail', slug=slug)) @app.route('/get_missing_styles') def get_missing_styles(): missing = Style.query.filter((Style.image_path == None) | (Style.image_path == '')).order_by(Style.name).all() return {'missing': [{'slug': s.slug, 'name': s.name} for s in missing]} @app.route('/get_missing_detailers') def get_missing_detailers(): missing = Detailer.query.filter((Detailer.image_path == None) | (Detailer.image_path == '')).order_by(Detailer.name).all() return {'missing': [{'slug': d.slug, 'name': d.name} for d in missing]} @app.route('/clear_all_detailer_covers', methods=['POST']) def clear_all_detailer_covers(): detailers = Detailer.query.all() for detailer in detailers: detailer.image_path = None db.session.commit() return {'success': True} @app.route('/clear_all_style_covers', methods=['POST']) def clear_all_style_covers(): styles = Style.query.all() for style in styles: style.image_path = None db.session.commit() return {'success': True} @app.route('/styles/generate_missing', methods=['POST']) def generate_missing_styles(): missing = Style.query.filter( (Style.image_path == None) | (Style.image_path == '') ).order_by(Style.name).all() if not missing: flash("No styles missing cover images.") return redirect(url_for('styles_index')) all_characters = Character.query.all() if not all_characters: flash("No characters available to preview styles with.", "error") return redirect(url_for('styles_index')) enqueued = 0 for style_obj in missing: character = random.choice(all_characters) try: workflow = _build_style_workflow(style_obj, character=character) _enqueue_job(f"Style: {style_obj.name} – cover", workflow, _make_finalize('styles', style_obj.slug, Style)) enqueued += 1 except Exception as e: print(f"Error queuing cover generation for style {style_obj.name}: {e}") flash(f"Queued {enqueued} style cover image generation job{'s' if enqueued != 1 else ''}.") return redirect(url_for('styles_index')) @app.route('/styles/bulk_create', methods=['POST']) def bulk_create_styles_from_loras(): _s = Settings.query.first() styles_lora_dir = ((_s.lora_dir_styles if _s else None) or '/ImageModels/lora/Illustrious/Styles').rstrip('/') _lora_subfolder = os.path.basename(styles_lora_dir) if not os.path.exists(styles_lora_dir): if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': 'Styles LoRA directory not found.'}, 400 flash('Styles LoRA directory not found.', 'error') return redirect(url_for('styles_index')) overwrite = request.form.get('overwrite') == 'true' system_prompt = load_prompt('style_system.txt') if not system_prompt: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'error': 'Style system prompt file not found.'}, 500 flash('Style system prompt file not found.', 'error') return redirect(url_for('styles_index')) job_ids = [] skipped = 0 for filename in sorted(os.listdir(styles_lora_dir)): if not filename.endswith('.safetensors'): continue name_base = filename.rsplit('.', 1)[0] style_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower()) style_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title() json_filename = f"{style_id}.json" json_path = os.path.join(app.config['STYLES_DIR'], json_filename) is_existing = os.path.exists(json_path) if is_existing and not overwrite: skipped += 1 continue # Read HTML companion file if it exists html_path = os.path.join(styles_lora_dir, f"{name_base}.html") html_content = "" if os.path.exists(html_path): try: with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf: html_raw = hf.read() clean_html = re.sub(r']*>.*?', '', html_raw, flags=re.DOTALL) clean_html = re.sub(r']*>.*?', '', clean_html, flags=re.DOTALL) clean_html = re.sub(r']*>', '', clean_html) clean_html = re.sub(r'<[^>]+>', ' ', clean_html) html_content = ' '.join(clean_html.split()) except Exception: pass def make_task(fn, sid, sname, jp, lsf, html_ctx, sys_prompt, is_exist): def task_fn(job): prompt = f"Describe an art style or artist LoRA for AI image generation based on the filename: '{fn}'" if html_ctx: prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_ctx[:3000]}\n###" llm_response = call_llm(prompt, sys_prompt) clean_json = llm_response.replace('```json', '').replace('```', '').strip() style_data = json.loads(clean_json) style_data['style_id'] = sid style_data['style_name'] = sname if 'lora' not in style_data: style_data['lora'] = {} style_data['lora']['lora_name'] = f"Illustrious/{lsf}/{fn}" if not style_data['lora'].get('lora_triggers'): style_data['lora']['lora_triggers'] = fn.rsplit('.', 1)[0] if style_data['lora'].get('lora_weight') is None: style_data['lora']['lora_weight'] = 1.0 if style_data['lora'].get('lora_weight_min') is None: style_data['lora']['lora_weight_min'] = 0.7 if style_data['lora'].get('lora_weight_max') is None: style_data['lora']['lora_weight_max'] = 1.0 os.makedirs(os.path.dirname(jp), exist_ok=True) with open(jp, 'w') as f: json.dump(style_data, f, indent=2) job['result'] = {'name': sname, 'action': 'overwritten' if is_exist else 'created'} return task_fn job = _enqueue_task( f"Create style: {style_name}", make_task(filename, style_id, style_name, json_path, _lora_subfolder, html_content, system_prompt, is_existing) ) job_ids.append(job['id']) # Enqueue a sync task to run after all creates if job_ids: def sync_task(job): sync_styles() job['result'] = {'synced': True} _enqueue_task("Sync styles DB", sync_task) if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'success': True, 'queued': len(job_ids), 'skipped': skipped} flash(f'Queued {len(job_ids)} style creation tasks ({skipped} skipped). Watch progress in the queue.') return redirect(url_for('styles_index')) @app.route('/style/create', methods=['GET', 'POST']) def create_style(): form_data = {} if request.method == 'POST': name = request.form.get('name') slug = request.form.get('filename', '').strip() form_data = {'name': name, 'filename': slug} if not slug: slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_') safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug) if not safe_slug: safe_slug = 'style' base_slug = safe_slug counter = 1 while os.path.exists(os.path.join(app.config['STYLES_DIR'], f"{safe_slug}.json")): safe_slug = f"{base_slug}_{counter}" counter += 1 style_data = { "style_id": safe_slug, "style_name": name, "style": { "artist_name": "", "artistic_style": "" }, "lora": { "lora_name": "", "lora_weight": 1.0, "lora_triggers": "" } } try: file_path = os.path.join(app.config['STYLES_DIR'], f"{safe_slug}.json") with open(file_path, 'w') as f: json.dump(style_data, f, indent=2) new_style = Style( style_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json", name=name, data=style_data ) db.session.add(new_style) db.session.commit() flash('Style created successfully!') return redirect(url_for('style_detail', slug=safe_slug)) except Exception as e: print(f"Save error: {e}") flash(f"Failed to create style: {e}") return render_template('styles/create.html', form_data=form_data) return render_template('styles/create.html', form_data=form_data) @app.route('/style//clone', methods=['POST']) def clone_style(slug): style = Style.query.filter_by(slug=slug).first_or_404() base_id = style.style_id match = re.match(r'^(.+?)_(\d+)$', base_id) if match: base_name = match.group(1) current_num = int(match.group(2)) else: base_name = base_id current_num = 1 next_num = current_num + 1 while True: new_id = f"{base_name}_{next_num:02d}" new_filename = f"{new_id}.json" new_path = os.path.join(app.config['STYLES_DIR'], new_filename) if not os.path.exists(new_path): break next_num += 1 new_data = style.data.copy() new_data['style_id'] = new_id new_data['style_name'] = f"{style.name} (Copy)" with open(new_path, 'w') as f: json.dump(new_data, f, indent=2) new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id) new_style = Style( style_id=new_id, slug=new_slug, filename=new_filename, name=new_data['style_name'], data=new_data ) db.session.add(new_style) db.session.commit() flash(f'Style cloned as "{new_id}"!') return redirect(url_for('style_detail', slug=new_slug)) @app.route('/style//save_json', methods=['POST']) def save_style_json(slug): style = Style.query.filter_by(slug=slug).first_or_404() try: new_data = json.loads(request.form.get('json_data', '')) except (ValueError, TypeError) as e: return {'success': False, 'error': f'Invalid JSON: {e}'}, 400 style.data = new_data flag_modified(style, 'data') db.session.commit() if style.filename: file_path = os.path.join(app.config['STYLES_DIR'], style.filename) with open(file_path, 'w') as f: json.dump(new_data, f, indent=2) return {'success': True} @app.route('/style//favourite', methods=['POST']) def toggle_style_favourite(slug): style_obj = Style.query.filter_by(slug=slug).first_or_404() style_obj.is_favourite = not style_obj.is_favourite db.session.commit() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return {'success': True, 'is_favourite': style_obj.is_favourite} return redirect(url_for('style_detail', slug=slug))