Files
character-browser/routes/actions.py
Aodhan Collins 32a73b02f5 Add semantic tagging, search, favourite/NSFW filtering, and LLM job queue
Replaces old list-format tags (which duplicated prompt content) with structured
dict tags per category (origin_series, outfit_type, participants, style_type,
scene_type, etc.). Tags are now purely organizational metadata — removed from
the prompt pipeline entirely.

Adds is_favourite and is_nsfw columns to all 8 resource models. Favourite is
DB-only (user preference); NSFW is mirrored in JSON tags for rescan persistence.
All library pages get filter controls and favourites-first sorting.

Introduces a parallel LLM job queue (_enqueue_task + _llm_queue_worker) for
background tag regeneration, with the same status polling UI as ComfyUI jobs.
Fixes call_llm() to use has_request_context() fallback for background threads.

Adds global search (/search) across resources and gallery images, with navbar
search bar. Adds gallery image sidecar JSON for per-image favourite/NSFW metadata.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 03:22:09 +00:00

661 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import re
import random
import logging
from flask import render_template, request, redirect, url_for, flash, session, current_app
from werkzeug.utils import secure_filename
from sqlalchemy.orm.attributes import flag_modified
from models import db, Character, Action, Outfit, Style, Scene, Detailer, Checkpoint, Settings, Look
from services.workflow import _prepare_workflow, _get_default_checkpoint
from services.job_queue import _enqueue_job, _make_finalize, _enqueue_task
from services.prompts import build_prompt, _resolve_character, _ensure_character_fields, _append_background
from services.sync import sync_actions
from services.file_io import get_available_loras
from services.llm import load_prompt, call_llm
from utils import allowed_file, _LORA_DEFAULTS
logger = logging.getLogger('gaze')
def register_routes(app):
@app.route('/get_missing_actions')
def get_missing_actions():
missing = Action.query.filter((Action.image_path == None) | (Action.image_path == '')).order_by(Action.name).all()
return {'missing': [{'slug': a.slug, 'name': a.name} for a in missing]}
@app.route('/clear_all_action_covers', methods=['POST'])
def clear_all_action_covers():
actions = Action.query.all()
for action in actions:
action.image_path = None
db.session.commit()
return {'success': True}
@app.route('/actions')
def actions_index():
query = Action.query
fav = request.args.get('favourite')
nsfw = request.args.get('nsfw', 'all')
if fav == 'on':
query = query.filter_by(is_favourite=True)
if nsfw == 'sfw':
query = query.filter_by(is_nsfw=False)
elif nsfw == 'nsfw':
query = query.filter_by(is_nsfw=True)
actions = query.order_by(Action.is_favourite.desc(), Action.name).all()
return render_template('actions/index.html', actions=actions, favourite_filter=fav or '', nsfw_filter=nsfw)
@app.route('/actions/rescan', methods=['POST'])
def rescan_actions():
sync_actions()
flash('Database synced with action files.')
return redirect(url_for('actions_index'))
@app.route('/action/<path:slug>')
def action_detail(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
characters = Character.query.order_by(Character.name).all()
# Load state from session
preferences = session.get(f'prefs_action_{slug}')
preview_image = session.get(f'preview_action_{slug}')
selected_character = session.get(f'char_action_{slug}')
extra_positive = session.get(f'extra_pos_action_{slug}', '')
extra_negative = session.get(f'extra_neg_action_{slug}', '')
# List existing preview images
upload_dir = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}")
existing_previews = []
if os.path.isdir(upload_dir):
files = sorted([f for f in os.listdir(upload_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))], reverse=True)
existing_previews = [f"actions/{slug}/{f}" for f in files]
return render_template('actions/detail.html', action=action, characters=characters,
preferences=preferences, preview_image=preview_image,
selected_character=selected_character, existing_previews=existing_previews,
extra_positive=extra_positive, extra_negative=extra_negative)
@app.route('/action/<path:slug>/edit', methods=['GET', 'POST'])
def edit_action(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
loras = get_available_loras('actions')
if request.method == 'POST':
try:
# 1. Update basic fields
action.name = request.form.get('action_name')
# 2. Rebuild the data dictionary
new_data = action.data.copy()
new_data['action_name'] = action.name
# Update action_id if provided
new_action_id = request.form.get('action_id', action.action_id)
new_data['action_id'] = new_action_id
# Update action section
if 'action' in new_data:
for key in new_data['action'].keys():
form_key = f"action_{key}"
if form_key in request.form:
new_data['action'][key] = request.form.get(form_key)
# Update lora section
if 'lora' in new_data:
for key in new_data['lora'].keys():
form_key = f"lora_{key}"
if form_key in request.form:
val = request.form.get(form_key)
if key == 'lora_weight':
try: val = float(val)
except: val = 1.0
new_data['lora'][key] = val
# LoRA weight randomization bounds
for bound in ['lora_weight_min', 'lora_weight_max']:
val_str = request.form.get(f'lora_{bound}', '').strip()
if val_str:
try:
new_data.setdefault('lora', {})[bound] = float(val_str)
except ValueError:
pass
else:
new_data.setdefault('lora', {}).pop(bound, None)
# Suppress wardrobe toggle
new_data['suppress_wardrobe'] = request.form.get('suppress_wardrobe') == 'on'
# Update Tags (structured dict)
new_data['tags'] = {
'participants': request.form.get('tag_participants', '').strip(),
'nsfw': 'tag_nsfw' in request.form,
}
action.is_nsfw = new_data['tags']['nsfw']
action.data = new_data
flag_modified(action, "data")
# 3. Write back to JSON file
action_file = action.filename or f"{re.sub(r'[^a-zA-Z0-9_]', '', action.action_id)}.json"
file_path = os.path.join(app.config['ACTIONS_DIR'], action_file)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
db.session.commit()
flash('Action profile updated successfully!')
return redirect(url_for('action_detail', slug=slug))
except Exception as e:
print(f"Edit error: {e}")
flash(f"Error saving changes: {str(e)}")
return render_template('actions/edit.html', action=action, loras=loras)
@app.route('/action/<path:slug>/upload', methods=['POST'])
def upload_action_image(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
if 'image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['image']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
# Create action subfolder
action_folder = os.path.join(app.config['UPLOAD_FOLDER'], f"actions/{slug}")
os.makedirs(action_folder, exist_ok=True)
filename = secure_filename(file.filename)
file_path = os.path.join(action_folder, filename)
file.save(file_path)
# Store relative path in DB
action.image_path = f"actions/{slug}/{filename}"
db.session.commit()
flash('Image uploaded successfully!')
return redirect(url_for('action_detail', slug=slug))
@app.route('/action/<path:slug>/generate', methods=['POST'])
def generate_action_image(slug):
action_obj = Action.query.filter_by(slug=slug).first_or_404()
try:
# Get action type
action = request.form.get('action', 'preview')
# Get selected fields
selected_fields = request.form.getlist('include_field')
# Get selected character (if any)
character_slug = request.form.get('character_slug', '')
character = None
character = _resolve_character(character_slug)
if character_slug == '__random__' and character:
character_slug = character.slug
# Get additional prompts
extra_positive = request.form.get('extra_positive', '').strip()
extra_negative = request.form.get('extra_negative', '').strip()
# Save preferences
session[f'char_action_{slug}'] = character_slug
session[f'prefs_action_{slug}'] = selected_fields
session[f'extra_pos_action_{slug}'] = extra_positive
session[f'extra_neg_action_{slug}'] = extra_negative
session.modified = True
suppress_wardrobe = action_obj.data.get('suppress_wardrobe', False)
# Strip any wardrobe fields from manual selection when suppressed
if suppress_wardrobe:
selected_fields = [f for f in selected_fields if not f.startswith('wardrobe::')]
# Build combined data for prompt building
if character:
# Combine character identity/wardrobe with action details
# Action details replace character's 'defaults' (pose, etc.)
combined_data = character.data.copy()
# Update 'defaults' with action details
action_data = action_obj.data.get('action', {})
combined_data['action'] = action_data # Ensure action section is present for routing
combined_data['participants'] = action_obj.data.get('participants', {}) # Add participants
# Aggregate pose-related fields into 'pose'
pose_fields = ['base', 'upper_body', 'lower_body', 'hands', 'feet']
pose_parts = [action_data.get(k) for k in pose_fields if action_data.get(k)]
# Aggregate expression-related fields into 'expression'
expression_parts = [action_data.get('head', '')]
expression_parts = [p for p in expression_parts if p]
combined_data['defaults'] = {
'pose': ", ".join(pose_parts),
'expression': ", ".join(expression_parts),
'scene': action_data.get('additional', '')
}
# Merge lora triggers if present
action_lora = action_obj.data.get('lora', {})
if action_lora.get('lora_triggers'):
if 'lora' not in combined_data: combined_data['lora'] = {}
combined_data['lora']['lora_triggers'] = f"{combined_data['lora'].get('lora_triggers', '')}, {action_lora['lora_triggers']}"
# Use action's defaults if no manual selection
if not selected_fields:
selected_fields = list(action_obj.default_fields) if action_obj.default_fields else []
# Auto-include essential character fields if a character is selected
if selected_fields:
_ensure_character_fields(character, selected_fields, include_wardrobe=not suppress_wardrobe)
else:
# Fallback to sensible defaults if still empty (no checkboxes and no action defaults)
selected_fields = ['special::name', 'defaults::pose', 'defaults::expression']
# Add identity fields
for key in ['base', 'head']:
if character.data.get('identity', {}).get(key):
selected_fields.append(f'identity::{key}')
# Add wardrobe fields (unless suppressed)
if not suppress_wardrobe:
from utils import _WARDROBE_KEYS
wardrobe = character.get_active_wardrobe()
for key in _WARDROBE_KEYS:
if wardrobe.get(key):
selected_fields.append(f'wardrobe::{key}')
default_fields = action_obj.default_fields
active_outfit = character.active_outfit
else:
# Action only - no character (rarely makes sense for actions but let's handle it)
action_data = action_obj.data.get('action', {})
# Aggregate pose-related fields into 'pose'
pose_fields = ['base', 'upper_body', 'lower_body', 'hands', 'feet']
pose_parts = [action_data.get(k) for k in pose_fields if action_data.get(k)]
# Aggregate expression-related fields into 'expression'
expression_parts = [action_data.get('head', '')]
expression_parts = [p for p in expression_parts if p]
combined_data = {
'character_id': action_obj.action_id,
'defaults': {
'pose': ", ".join(pose_parts),
'expression': ", ".join(expression_parts),
'scene': action_data.get('additional', '')
},
'lora': action_obj.data.get('lora', {}),
'tags': action_obj.data.get('tags', [])
}
if not selected_fields:
selected_fields = ['defaults::pose', 'defaults::expression', 'defaults::scene', 'lora::lora_triggers']
default_fields = action_obj.default_fields
active_outfit = 'default'
# Queue generation
with open('comfy_workflow.json', 'r') as f:
workflow = json.load(f)
# Build prompts for combined data
prompts = build_prompt(combined_data, selected_fields, default_fields, active_outfit=active_outfit)
# Handle multiple female characters
participants = action_obj.data.get('participants', {})
orientation = participants.get('orientation', '')
f_count = orientation.upper().count('F')
if f_count > 1:
# We need f_count - 1 additional characters
num_extras = f_count - 1
# Get all characters excluding the current one
query = Character.query
if character:
query = query.filter(Character.id != character.id)
all_others = query.all()
if len(all_others) >= num_extras:
extras = random.sample(all_others, num_extras)
for extra_char in extras:
extra_parts = []
# Identity
ident = extra_char.data.get('identity', {})
for key in ['base', 'head', 'additional']:
val = ident.get(key)
if val:
# Remove 1girl/solo
val = re.sub(r'\b(1girl|1boy|solo)\b', '', val).replace(', ,', ',').strip(', ')
extra_parts.append(val)
# Wardrobe (active outfit) — skip if suppressed
if not suppress_wardrobe:
from utils import _WARDROBE_KEYS
wardrobe = extra_char.get_active_wardrobe()
for key in _WARDROBE_KEYS:
val = wardrobe.get(key)
if val:
extra_parts.append(val)
# Append to main prompt
if extra_parts:
prompts["main"] += ", " + ", ".join(extra_parts)
print(f"Added extra character: {extra_char.name}")
_append_background(prompts, character)
if extra_positive:
prompts["main"] = f"{prompts['main']}, {extra_positive}"
# Parse optional seed
seed_val = request.form.get('seed', '').strip()
fixed_seed = int(seed_val) if seed_val else None
# Prepare workflow
ckpt_path, ckpt_data = _get_default_checkpoint()
workflow = _prepare_workflow(workflow, character, prompts, action=action_obj, custom_negative=extra_negative or None, checkpoint=ckpt_path, checkpoint_data=ckpt_data, fixed_seed=fixed_seed)
char_label = character.name if character else 'no character'
label = f"Action: {action_obj.name} ({char_label}) {action}"
job = _enqueue_job(label, workflow, _make_finalize('actions', slug, Action, action))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'status': 'queued', 'job_id': job['id']}
return redirect(url_for('action_detail', slug=slug))
except Exception as e:
print(f"Generation error: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': str(e)}, 500
flash(f"Error during generation: {str(e)}")
return redirect(url_for('action_detail', slug=slug))
@app.route('/action/<path:slug>/replace_cover_from_preview', methods=['POST'])
def replace_action_cover_from_preview(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
preview_path = request.form.get('preview_path')
if preview_path and os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], preview_path)):
action.image_path = preview_path
db.session.commit()
flash('Cover image updated!')
else:
flash('No valid preview image selected.', 'error')
return redirect(url_for('action_detail', slug=slug))
@app.route('/action/<path:slug>/save_defaults', methods=['POST'])
def save_action_defaults(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
selected_fields = request.form.getlist('include_field')
action.default_fields = selected_fields
db.session.commit()
flash('Default prompt selection saved for this action!')
return redirect(url_for('action_detail', slug=slug))
@app.route('/actions/bulk_create', methods=['POST'])
def bulk_create_actions_from_loras():
_s = Settings.query.first()
actions_lora_dir = ((_s.lora_dir_actions if _s else None) or '/ImageModels/lora/Illustrious/Poses').rstrip('/')
_lora_subfolder = os.path.basename(actions_lora_dir)
if not os.path.exists(actions_lora_dir):
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': 'Actions LoRA directory not found.'}, 400
flash('Actions LoRA directory not found.', 'error')
return redirect(url_for('actions_index'))
overwrite = request.form.get('overwrite') == 'true'
system_prompt = load_prompt('action_system.txt')
if not system_prompt:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'error': 'Action system prompt file not found.'}, 500
flash('Action system prompt file not found.', 'error')
return redirect(url_for('actions_index'))
job_ids = []
skipped = 0
for filename in sorted(os.listdir(actions_lora_dir)):
if not filename.endswith('.safetensors'):
continue
name_base = filename.rsplit('.', 1)[0]
action_id = re.sub(r'[^a-zA-Z0-9_]', '_', name_base.lower())
action_name = re.sub(r'[^a-zA-Z0-9]+', ' ', name_base).title()
json_filename = f"{action_id}.json"
json_path = os.path.join(app.config['ACTIONS_DIR'], json_filename)
is_existing = os.path.exists(json_path)
if is_existing and not overwrite:
skipped += 1
continue
# Read HTML companion file if it exists
html_path = os.path.join(actions_lora_dir, f"{name_base}.html")
html_content = ""
if os.path.exists(html_path):
try:
with open(html_path, 'r', encoding='utf-8', errors='ignore') as hf:
html_raw = hf.read()
clean_html = re.sub(r'<script[^>]*>.*?</script>', '', html_raw, flags=re.DOTALL)
clean_html = re.sub(r'<style[^>]*>.*?</style>', '', clean_html, flags=re.DOTALL)
clean_html = re.sub(r'<img[^>]*>', '', clean_html)
clean_html = re.sub(r'<[^>]+>', ' ', clean_html)
html_content = ' '.join(clean_html.split())
except Exception:
pass
def make_task(fn, aid, aname, jp, lsf, html_ctx, sys_prompt, is_exist):
def task_fn(job):
prompt = f"Describe an action/pose for an AI image generation model based on the LoRA filename: '{fn}'"
if html_ctx:
prompt += f"\n\nHere is descriptive text and metadata extracted from an associated HTML file for this LoRA:\n###\n{html_ctx[:3000]}\n###"
llm_response = call_llm(prompt, sys_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
action_data = json.loads(clean_json)
# Enforce system values while preserving LLM-extracted metadata
action_data['action_id'] = aid
action_data['action_name'] = aname
# Update lora dict safely
if 'lora' not in action_data:
action_data['lora'] = {}
action_data['lora']['lora_name'] = f"Illustrious/{lsf}/{fn}"
# Fallbacks if LLM failed to extract metadata
if not action_data['lora'].get('lora_triggers'):
action_data['lora']['lora_triggers'] = fn.rsplit('.', 1)[0]
if action_data['lora'].get('lora_weight') is None:
action_data['lora']['lora_weight'] = 1.0
if action_data['lora'].get('lora_weight_min') is None:
action_data['lora']['lora_weight_min'] = 0.7
if action_data['lora'].get('lora_weight_max') is None:
action_data['lora']['lora_weight_max'] = 1.0
os.makedirs(os.path.dirname(jp), exist_ok=True)
with open(jp, 'w') as f:
json.dump(action_data, f, indent=2)
job['result'] = {'name': aname, 'action': 'overwritten' if is_exist else 'created'}
return task_fn
job = _enqueue_task(
f"Create action: {action_name}",
make_task(filename, action_id, action_name, json_path,
_lora_subfolder, html_content, system_prompt, is_existing)
)
job_ids.append(job['id'])
# Enqueue a sync task to run after all creates
if job_ids:
def sync_task(job):
sync_actions()
job['result'] = {'synced': True}
_enqueue_task("Sync actions DB", sync_task)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'success': True, 'queued': len(job_ids), 'skipped': skipped}
flash(f'Queued {len(job_ids)} action creation tasks ({skipped} skipped). Watch progress in the queue.')
return redirect(url_for('actions_index'))
@app.route('/action/create', methods=['GET', 'POST'])
def create_action():
form_data = {}
if request.method == 'POST':
name = request.form.get('name')
slug = request.form.get('filename', '').strip()
prompt = request.form.get('prompt', '')
use_llm = request.form.get('use_llm') == 'on'
form_data = {'name': name, 'filename': slug, 'prompt': prompt, 'use_llm': use_llm}
if not slug:
slug = re.sub(r'[^a-zA-Z0-9]+', '_', name.lower()).strip('_')
safe_slug = re.sub(r'[^a-zA-Z0-9_]', '', slug)
if not safe_slug:
safe_slug = 'action'
base_slug = safe_slug
counter = 1
while os.path.exists(os.path.join(app.config['ACTIONS_DIR'], f"{safe_slug}.json")):
safe_slug = f"{base_slug}_{counter}"
counter += 1
if use_llm:
if not prompt:
flash("Description is required when AI generation is enabled.")
return render_template('actions/create.html', form_data=form_data)
system_prompt = load_prompt('action_system.txt')
if not system_prompt:
flash("Action system prompt file not found.")
return render_template('actions/create.html', form_data=form_data)
try:
llm_response = call_llm(f"Create an action profile for '{name}' based on this description: {prompt}", system_prompt)
clean_json = llm_response.replace('```json', '').replace('```', '').strip()
action_data = json.loads(clean_json)
action_data['action_id'] = safe_slug
action_data['action_name'] = name
except Exception as e:
print(f"LLM error: {e}")
flash(f"Failed to generate action profile: {e}")
return render_template('actions/create.html', form_data=form_data)
else:
action_data = {
"action_id": safe_slug,
"action_name": name,
"action": {
"base": "", "head": "", "upper_body": "", "lower_body": "",
"hands": "", "feet": "", "additional": ""
},
"suppress_wardrobe": False,
"lora": {"lora_name": "", "lora_weight": 1.0, "lora_triggers": ""},
"tags": []
}
try:
file_path = os.path.join(app.config['ACTIONS_DIR'], f"{safe_slug}.json")
with open(file_path, 'w') as f:
json.dump(action_data, f, indent=2)
new_action = Action(
action_id=safe_slug, slug=safe_slug, filename=f"{safe_slug}.json",
name=name, data=action_data
)
db.session.add(new_action)
db.session.commit()
flash('Action created successfully!')
return redirect(url_for('action_detail', slug=safe_slug))
except Exception as e:
print(f"Save error: {e}")
flash(f"Failed to create action: {e}")
return render_template('actions/create.html', form_data=form_data)
return render_template('actions/create.html', form_data=form_data)
@app.route('/action/<path:slug>/clone', methods=['POST'])
def clone_action(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
# Find the next available number for the clone
base_id = action.action_id
match = re.match(r'^(.+?)_(\d+)$', base_id)
if match:
base_name = match.group(1)
current_num = int(match.group(2))
else:
base_name = base_id
current_num = 1
next_num = current_num + 1
while True:
new_id = f"{base_name}_{next_num:02d}"
new_filename = f"{new_id}.json"
new_path = os.path.join(app.config['ACTIONS_DIR'], new_filename)
if not os.path.exists(new_path):
break
next_num += 1
new_data = action.data.copy()
new_data['action_id'] = new_id
new_data['action_name'] = f"{action.name} (Copy)"
with open(new_path, 'w') as f:
json.dump(new_data, f, indent=2)
new_slug = re.sub(r'[^a-zA-Z0-9_]', '', new_id)
new_action = Action(
action_id=new_id, slug=new_slug, filename=new_filename,
name=new_data['action_name'], data=new_data
)
db.session.add(new_action)
db.session.commit()
flash(f'Action cloned as "{new_id}"!')
return redirect(url_for('action_detail', slug=new_slug))
@app.route('/action/<path:slug>/save_json', methods=['POST'])
def save_action_json(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
try:
new_data = json.loads(request.form.get('json_data', ''))
except (ValueError, TypeError) as e:
return {'success': False, 'error': f'Invalid JSON: {e}'}, 400
action.data = new_data
flag_modified(action, 'data')
db.session.commit()
if action.filename:
file_path = os.path.join(app.config['ACTIONS_DIR'], action.filename)
with open(file_path, 'w') as f:
json.dump(new_data, f, indent=2)
return {'success': True}
@app.route('/action/<path:slug>/favourite', methods=['POST'])
def toggle_action_favourite(slug):
action = Action.query.filter_by(slug=slug).first_or_404()
action.is_favourite = not action.is_favourite
db.session.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return {'success': True, 'is_favourite': action.is_favourite}
return redirect(url_for('action_detail', slug=slug))