400 lines
18 KiB
Python
400 lines
18 KiB
Python
import json
|
|
import os
|
|
import logging
|
|
import random
|
|
from flask import request, session, current_app
|
|
from models import db, Character, Look, Outfit, Action, Style, Scene, Detailer
|
|
from sqlalchemy.orm.attributes import flag_modified
|
|
from services.prompts import build_prompt, _dedup_tags, _cross_dedup_prompts
|
|
from services.workflow import _get_default_checkpoint, _apply_checkpoint_settings, _log_workflow_prompts
|
|
from services.job_queue import _enqueue_job
|
|
from services.comfyui import get_history, get_image
|
|
|
|
logger = logging.getLogger('gaze')
|
|
|
|
_STRENGTHS_MODEL_MAP = {
|
|
'characters': Character,
|
|
'looks': Look,
|
|
'outfits': Outfit,
|
|
'actions': Action,
|
|
'styles': Style,
|
|
'scenes': Scene,
|
|
'detailers': Detailer,
|
|
}
|
|
|
|
_CATEGORY_LORA_NODES = {
|
|
'characters': '16',
|
|
'looks': '16',
|
|
'outfits': '17',
|
|
'actions': '18',
|
|
'styles': '19',
|
|
'scenes': '19',
|
|
'detailers': '19',
|
|
}
|
|
|
|
_STRENGTHS_DATA_DIRS = {
|
|
'characters': 'CHARACTERS_DIR',
|
|
'looks': 'LOOKS_DIR',
|
|
'outfits': 'CLOTHING_DIR',
|
|
'actions': 'ACTIONS_DIR',
|
|
'styles': 'STYLES_DIR',
|
|
'scenes': 'SCENES_DIR',
|
|
'detailers': 'DETAILERS_DIR',
|
|
}
|
|
|
|
|
|
def register_routes(app):
|
|
|
|
def _get_character_data_without_lora(character):
|
|
"""Extract character data excluding LoRA to prevent activation in strengths gallery."""
|
|
if not character:
|
|
return None
|
|
return {k: v for k, v in character.data.items() if k != 'lora'}
|
|
|
|
def _build_strengths_prompts(category, entity, character, action=None, extra_positive=''):
|
|
"""Build main/face/hand prompt strings for the Strengths Gallery."""
|
|
if category == 'characters':
|
|
return build_prompt(entity.data, [], entity.default_fields)
|
|
|
|
if category == 'looks':
|
|
char_data_no_lora = _get_character_data_without_lora(character)
|
|
base = build_prompt(char_data_no_lora, [], character.default_fields) if char_data_no_lora else {'main': '', 'face': '', 'hand': ''}
|
|
look_pos = entity.data.get('positive', '')
|
|
look_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
|
|
prefix_parts = [p for p in [look_triggers, look_pos] if p]
|
|
prefix = ', '.join(prefix_parts)
|
|
if prefix:
|
|
base['main'] = f"{prefix}, {base['main']}" if base['main'] else prefix
|
|
return base
|
|
|
|
if category == 'outfits':
|
|
wardrobe = entity.data.get('wardrobe', {})
|
|
outfit_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
|
|
tags = entity.data.get('tags', [])
|
|
wardrobe_parts = [v for v in wardrobe.values() if isinstance(v, str) and v]
|
|
char_parts = []
|
|
face_parts = []
|
|
hand_parts = []
|
|
if character:
|
|
identity = character.data.get('identity', {})
|
|
defaults = character.data.get('defaults', {})
|
|
char_parts = [v for v in [identity.get('base'), identity.get('head'),
|
|
defaults.get('expression')] if v]
|
|
face_parts = [v for v in [identity.get('head'),
|
|
defaults.get('expression')] if v]
|
|
hand_parts = [v for v in [wardrobe.get('hands')] if v]
|
|
main_parts = ([outfit_triggers] if outfit_triggers else []) + char_parts + wardrobe_parts + tags
|
|
return {
|
|
'main': _dedup_tags(', '.join(p for p in main_parts if p)),
|
|
'face': _dedup_tags(', '.join(face_parts)),
|
|
'hand': _dedup_tags(', '.join(hand_parts)),
|
|
}
|
|
|
|
if category == 'actions':
|
|
action_data = entity.data.get('action', {})
|
|
action_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
|
|
tags = entity.data.get('tags', [])
|
|
from utils import _BODY_GROUP_KEYS
|
|
pose_parts = [action_data.get(k, '') for k in _BODY_GROUP_KEYS if action_data.get(k)]
|
|
expr_parts = [action_data.get('head', '')] if action_data.get('head') else []
|
|
char_parts = []
|
|
face_parts = list(expr_parts)
|
|
hand_parts = [action_data.get('hands', '')] if action_data.get('hands') else []
|
|
if character:
|
|
identity = character.data.get('identity', {})
|
|
char_parts = [v for v in [identity.get('base'), identity.get('head')] if v]
|
|
face_parts = [v for v in [identity.get('head')] + expr_parts if v]
|
|
main_parts = ([action_triggers] if action_triggers else []) + char_parts + pose_parts + tags
|
|
return {
|
|
'main': _dedup_tags(', '.join(p for p in main_parts if p)),
|
|
'face': _dedup_tags(', '.join(face_parts)),
|
|
'hand': _dedup_tags(', '.join(hand_parts)),
|
|
}
|
|
|
|
# styles / scenes / detailers
|
|
entity_triggers = entity.data.get('lora', {}).get('lora_triggers', '')
|
|
tags = entity.data.get('tags', [])
|
|
|
|
if category == 'styles':
|
|
sdata = entity.data.get('style', {})
|
|
artist = f"by {sdata['artist_name']}" if sdata.get('artist_name') else ''
|
|
style_tags = sdata.get('artistic_style', '')
|
|
entity_parts = [p for p in [entity_triggers, artist, style_tags] + tags if p]
|
|
elif category == 'scenes':
|
|
sdata = entity.data.get('scene', {})
|
|
scene_parts = [v for v in sdata.values() if isinstance(v, str) and v]
|
|
entity_parts = [p for p in [entity_triggers] + scene_parts + tags if p]
|
|
else: # detailers
|
|
det_prompt = entity.data.get('prompt', '')
|
|
entity_parts = [p for p in [entity_triggers, det_prompt] + tags if p]
|
|
|
|
char_data_no_lora = _get_character_data_without_lora(character)
|
|
base = build_prompt(char_data_no_lora, [], character.default_fields) if char_data_no_lora else {'main': '', 'face': '', 'hand': '', 'feet': ''}
|
|
entity_str = ', '.join(entity_parts)
|
|
if entity_str:
|
|
base['main'] = f"{base['main']}, {entity_str}" if base['main'] else entity_str
|
|
|
|
if action is not None:
|
|
action_data = action.data.get('action', {})
|
|
from utils import _BODY_GROUP_KEYS
|
|
action_parts = [action_data.get(k, '') for k in _BODY_GROUP_KEYS
|
|
if action_data.get(k)]
|
|
action_str = ', '.join(action_parts)
|
|
if action_str:
|
|
base['main'] = f"{base['main']}, {action_str}" if base['main'] else action_str
|
|
|
|
if extra_positive:
|
|
base['main'] = f"{base['main']}, {extra_positive}" if base['main'] else extra_positive
|
|
|
|
return base
|
|
|
|
def _prepare_strengths_workflow(workflow, category, entity, character, prompts,
|
|
checkpoint, ckpt_data, strength_value, fixed_seed,
|
|
custom_negative=''):
|
|
"""Wire a ComfyUI workflow with ONLY the entity's LoRA active at a specific strength."""
|
|
active_node = _CATEGORY_LORA_NODES.get(category, '16')
|
|
entity_lora = entity.data.get('lora', {})
|
|
entity_lora_name = entity_lora.get('lora_name', '')
|
|
|
|
if checkpoint and '4' in workflow:
|
|
workflow['4']['inputs']['ckpt_name'] = checkpoint
|
|
|
|
if '5' in workflow:
|
|
workflow['5']['inputs']['width'] = 1024
|
|
workflow['5']['inputs']['height'] = 1024
|
|
|
|
if '6' in workflow:
|
|
workflow['6']['inputs']['text'] = workflow['6']['inputs']['text'].replace(
|
|
'{{POSITIVE_PROMPT}}', prompts.get('main', ''))
|
|
if '14' in workflow:
|
|
workflow['14']['inputs']['text'] = workflow['14']['inputs']['text'].replace(
|
|
'{{FACE_PROMPT}}', prompts.get('face', ''))
|
|
if '15' in workflow:
|
|
workflow['15']['inputs']['text'] = workflow['15']['inputs']['text'].replace(
|
|
'{{HAND_PROMPT}}', prompts.get('hand', ''))
|
|
|
|
if category == 'looks':
|
|
look_neg = entity.data.get('negative', '')
|
|
if look_neg and '7' in workflow:
|
|
workflow['7']['inputs']['text'] = f"{look_neg}, {workflow['7']['inputs']['text']}"
|
|
|
|
if custom_negative and '7' in workflow:
|
|
workflow['7']['inputs']['text'] = f"{custom_negative}, {workflow['7']['inputs']['text']}"
|
|
|
|
model_source = ['4', 0]
|
|
clip_source = ['4', 1]
|
|
|
|
for node_id in ['16', '17', '18', '19']:
|
|
if node_id not in workflow:
|
|
continue
|
|
if node_id == active_node and entity_lora_name:
|
|
workflow[node_id]['inputs']['lora_name'] = entity_lora_name
|
|
workflow[node_id]['inputs']['strength_model'] = float(strength_value)
|
|
workflow[node_id]['inputs']['strength_clip'] = float(strength_value)
|
|
workflow[node_id]['inputs']['model'] = list(model_source)
|
|
workflow[node_id]['inputs']['clip'] = list(clip_source)
|
|
model_source = [node_id, 0]
|
|
clip_source = [node_id, 1]
|
|
|
|
for consumer, needs_model, needs_clip in [
|
|
('3', True, False),
|
|
('6', False, True),
|
|
('7', False, True),
|
|
('11', True, True),
|
|
('13', True, True),
|
|
('14', False, True),
|
|
('15', False, True),
|
|
]:
|
|
if consumer in workflow:
|
|
if needs_model:
|
|
workflow[consumer]['inputs']['model'] = list(model_source)
|
|
if needs_clip:
|
|
workflow[consumer]['inputs']['clip'] = list(clip_source)
|
|
|
|
for seed_node in ['3', '11', '13']:
|
|
if seed_node in workflow:
|
|
workflow[seed_node]['inputs']['seed'] = int(fixed_seed)
|
|
|
|
if ckpt_data:
|
|
workflow = _apply_checkpoint_settings(workflow, ckpt_data)
|
|
|
|
sampler_name = workflow['3']['inputs'].get('sampler_name')
|
|
scheduler = workflow['3']['inputs'].get('scheduler')
|
|
for node_id in ['11', '13']:
|
|
if node_id in workflow:
|
|
if sampler_name:
|
|
workflow[node_id]['inputs']['sampler_name'] = sampler_name
|
|
if scheduler:
|
|
workflow[node_id]['inputs']['scheduler'] = scheduler
|
|
|
|
pos_text, neg_text = _cross_dedup_prompts(
|
|
workflow['6']['inputs']['text'],
|
|
workflow['7']['inputs']['text']
|
|
)
|
|
workflow['6']['inputs']['text'] = pos_text
|
|
workflow['7']['inputs']['text'] = neg_text
|
|
|
|
_log_workflow_prompts(f"_prepare_strengths_workflow [node={active_node} lora={entity_lora_name} @ {strength_value} seed={fixed_seed}]", workflow)
|
|
return workflow
|
|
|
|
@app.route('/strengths/<category>/<path:slug>/generate', methods=['POST'])
|
|
def strengths_generate(category, slug):
|
|
if category not in _STRENGTHS_MODEL_MAP:
|
|
return {'error': 'unknown category'}, 400
|
|
|
|
Model = _STRENGTHS_MODEL_MAP[category]
|
|
entity = Model.query.filter_by(slug=slug).first_or_404()
|
|
|
|
try:
|
|
strength_value = float(request.form.get('strength_value', 1.0))
|
|
fixed_seed = int(request.form.get('seed', random.randint(1, 10**15)))
|
|
|
|
_singular = {
|
|
'outfits': 'outfit', 'actions': 'action', 'styles': 'style',
|
|
'scenes': 'scene', 'detailers': 'detailer', 'looks': 'look',
|
|
}
|
|
session_prefix = _singular.get(category, category)
|
|
char_slug = (request.form.get('character_slug') or
|
|
session.get(f'char_{session_prefix}_{slug}'))
|
|
|
|
if category == 'characters':
|
|
character = entity
|
|
elif char_slug == '__random__':
|
|
character = Character.query.order_by(db.func.random()).first()
|
|
elif char_slug:
|
|
character = Character.query.filter_by(slug=char_slug).first()
|
|
else:
|
|
character = None
|
|
|
|
print(f"[Strengths] char_slug={char_slug!r} → character={character.slug if character else 'none'}")
|
|
|
|
action_obj = None
|
|
extra_positive = ''
|
|
extra_negative = ''
|
|
if category == 'detailers':
|
|
action_slug = session.get(f'action_detailer_{slug}')
|
|
if action_slug:
|
|
action_obj = Action.query.filter_by(slug=action_slug).first()
|
|
extra_positive = session.get(f'extra_pos_detailer_{slug}', '')
|
|
extra_negative = session.get(f'extra_neg_detailer_{slug}', '')
|
|
print(f"[Strengths] detailer session — char={char_slug}, action={action_slug}, extra_pos={bool(extra_positive)}, extra_neg={bool(extra_negative)}")
|
|
|
|
prompts = _build_strengths_prompts(category, entity, character,
|
|
action=action_obj, extra_positive=extra_positive)
|
|
|
|
checkpoint, ckpt_data = _get_default_checkpoint()
|
|
workflow_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'comfy_workflow.json')
|
|
with open(workflow_path, 'r') as f:
|
|
workflow = json.load(f)
|
|
|
|
workflow = _prepare_strengths_workflow(
|
|
workflow, category, entity, character, prompts,
|
|
checkpoint, ckpt_data, strength_value, fixed_seed,
|
|
custom_negative=extra_negative
|
|
)
|
|
|
|
_category = category
|
|
_slug = slug
|
|
_strength_value = strength_value
|
|
_fixed_seed = fixed_seed
|
|
def _finalize(comfy_prompt_id, job):
|
|
history = get_history(comfy_prompt_id)
|
|
outputs = history[comfy_prompt_id].get('outputs', {})
|
|
img_data = None
|
|
for node_output in outputs.values():
|
|
for img in node_output.get('images', []):
|
|
img_data = get_image(img['filename'], img.get('subfolder', ''), img.get('type', 'output'))
|
|
break
|
|
if img_data:
|
|
break
|
|
if not img_data:
|
|
raise Exception('no image in output')
|
|
strength_str = f"{_strength_value:.2f}".replace('.', '_')
|
|
upload_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], _category, _slug, 'strengths')
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
out_filename = f"strength_{strength_str}_seed_{_fixed_seed}.png"
|
|
out_path = os.path.join(upload_dir, out_filename)
|
|
with open(out_path, 'wb') as f:
|
|
f.write(img_data)
|
|
relative = f"{_category}/{_slug}/strengths/{out_filename}"
|
|
job['result'] = {'image_url': f"/static/uploads/{relative}", 'strength_value': _strength_value}
|
|
|
|
label = f"Strengths: {entity.name} @ {strength_value:.2f}"
|
|
job = _enqueue_job(label, workflow, _finalize)
|
|
return {'status': 'queued', 'job_id': job['id']}
|
|
|
|
except Exception as e:
|
|
print(f"[Strengths] generate error: {e}")
|
|
return {'error': str(e)}, 500
|
|
|
|
@app.route('/strengths/<category>/<path:slug>/list')
|
|
def strengths_list(category, slug):
|
|
upload_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], category, slug, 'strengths')
|
|
if not os.path.isdir(upload_dir):
|
|
return {'images': []}
|
|
|
|
images = []
|
|
for fname in sorted(os.listdir(upload_dir)):
|
|
if not fname.endswith('.png'):
|
|
continue
|
|
try:
|
|
parts = fname.replace('strength_', '').split('_seed_')
|
|
strength_raw = parts[0]
|
|
strength_display = strength_raw.replace('_', '.')
|
|
except Exception:
|
|
strength_display = fname
|
|
images.append({
|
|
'url': f"/static/uploads/{category}/{slug}/strengths/{fname}",
|
|
'strength': strength_display,
|
|
'filename': fname,
|
|
})
|
|
return {'images': images}
|
|
|
|
@app.route('/strengths/<category>/<path:slug>/clear', methods=['POST'])
|
|
def strengths_clear(category, slug):
|
|
upload_dir = os.path.join(current_app.config['UPLOAD_FOLDER'], category, slug, 'strengths')
|
|
if os.path.isdir(upload_dir):
|
|
for fname in os.listdir(upload_dir):
|
|
fpath = os.path.join(upload_dir, fname)
|
|
if os.path.isfile(fpath):
|
|
os.remove(fpath)
|
|
return {'success': True}
|
|
|
|
@app.route('/strengths/<category>/<path:slug>/save_range', methods=['POST'])
|
|
def strengths_save_range(category, slug):
|
|
"""Save lora_weight_min / lora_weight_max from the Strengths Gallery back to the entity JSON + DB."""
|
|
if category not in _STRENGTHS_MODEL_MAP or category not in _STRENGTHS_DATA_DIRS:
|
|
return {'error': 'unknown category'}, 400
|
|
|
|
try:
|
|
min_w = float(request.form.get('min_weight', ''))
|
|
max_w = float(request.form.get('max_weight', ''))
|
|
except (ValueError, TypeError):
|
|
return {'error': 'invalid weight values'}, 400
|
|
|
|
if min_w > max_w:
|
|
min_w, max_w = max_w, min_w
|
|
|
|
Model = _STRENGTHS_MODEL_MAP[category]
|
|
entity = Model.query.filter_by(slug=slug).first_or_404()
|
|
|
|
data = dict(entity.data)
|
|
if 'lora' not in data or not isinstance(data.get('lora'), dict):
|
|
return {'error': 'entity has no lora section'}, 400
|
|
|
|
data['lora']['lora_weight_min'] = min_w
|
|
data['lora']['lora_weight_max'] = max_w
|
|
entity.data = data
|
|
flag_modified(entity, 'data')
|
|
|
|
data_dir = current_app.config[_STRENGTHS_DATA_DIRS[category]]
|
|
filename = getattr(entity, 'filename', None) or f"{slug}.json"
|
|
file_path = os.path.join(data_dir, filename)
|
|
if os.path.exists(file_path):
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
f.write('\n')
|
|
|
|
db.session.commit()
|
|
return {'success': True, 'lora_weight_min': min_w, 'lora_weight_max': max_w}
|