Files
character-browser/services/job_queue.py
Aodhan Collins 5e4348ebc1 Add extra prompts, endless generation, random character default, and small fixes
- Add extra positive/negative prompt textareas to all 9 detail pages with session persistence
- Add Endless generation button to all detail pages (continuous preview generation until stopped)
- Default character selector to "Random Character" on all secondary detail pages
- Fix queue clear endpoint (remove spurious auth check)
- Refactor app.py into routes/ and services/ modules
- Update CLAUDE.md with new architecture documentation
- Various data file updates and cleanup

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 02:07:16 +00:00

266 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import uuid
import logging
import threading
from collections import deque
from flask import current_app
from models import db
from services.comfyui import queue_prompt, get_history, get_image
logger = logging.getLogger('gaze')
# ---------------------------------------------------------------------------
# Generation Job Queue
# ---------------------------------------------------------------------------
# Each job is a dict:
# id — unique UUID string
# label — human-readable description (e.g. "Tifa Lockhart preview")
# status — 'pending' | 'processing' | 'done' | 'failed' | 'paused' | 'removed'
# workflow — the fully-prepared ComfyUI workflow dict
# finalize_fn — callable(comfy_prompt_id, job) that saves the image; called after ComfyUI finishes
# error — error message string (when status == 'failed')
# result — dict with image_url etc. (set by finalize_fn on success)
# created_at — unix timestamp
# comfy_prompt_id — the prompt_id returned by ComfyUI (set when processing starts)
_job_queue_lock = threading.Lock()
_job_queue = deque() # ordered list of job dicts (pending + paused + processing)
_job_history = {} # job_id -> job dict (all jobs ever added, for status lookup)
_queue_worker_event = threading.Event() # signals worker that a new job is available
# Stored reference to the Flask app, set by init_queue_worker()
_app = None
def _enqueue_job(label, workflow, finalize_fn):
"""Add a generation job to the queue. Returns the job dict."""
job = {
'id': str(uuid.uuid4()),
'label': label,
'status': 'pending',
'workflow': workflow,
'finalize_fn': finalize_fn,
'error': None,
'result': None,
'created_at': time.time(),
'comfy_prompt_id': None,
}
with _job_queue_lock:
_job_queue.append(job)
_job_history[job['id']] = job
logger.info("Job queued: [%s] %s", job['id'][:8], label)
_queue_worker_event.set()
return job
def _queue_worker():
"""Background thread: processes jobs from _job_queue sequentially."""
while True:
_queue_worker_event.wait()
_queue_worker_event.clear()
while True:
job = None
with _job_queue_lock:
# Find the first pending job
for j in _job_queue:
if j['status'] == 'pending':
job = j
break
if job is None:
break # No pending jobs — go back to waiting
# Mark as processing
with _job_queue_lock:
job['status'] = 'processing'
logger.info("=" * 80)
logger.info("JOB STARTED: [%s] %s", job['id'][:8], job['label'])
logger.info("Job created at: %s", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(job['created_at'])))
# Log workflow summary before sending to ComfyUI
workflow = job['workflow']
logger.info("Workflow summary:")
logger.info(" Checkpoint: %s", workflow.get('4', {}).get('inputs', {}).get('ckpt_name', '(not set)'))
logger.info(" Seed: %s", workflow.get('3', {}).get('inputs', {}).get('seed', '(not set)'))
logger.info(" Resolution: %sx%s",
workflow.get('5', {}).get('inputs', {}).get('width', '?'),
workflow.get('5', {}).get('inputs', {}).get('height', '?'))
logger.info(" Sampler: %s / %s (steps=%s, cfg=%s)",
workflow.get('3', {}).get('inputs', {}).get('sampler_name', '?'),
workflow.get('3', {}).get('inputs', {}).get('scheduler', '?'),
workflow.get('3', {}).get('inputs', {}).get('steps', '?'),
workflow.get('3', {}).get('inputs', {}).get('cfg', '?'))
# Log active LoRAs
active_loras = []
for node_id, label_str in [("16", "char/look"), ("17", "outfit"), ("18", "action"), ("19", "style/detail/scene")]:
if node_id in workflow:
lora_name = workflow[node_id]["inputs"].get("lora_name", "")
if lora_name:
strength = workflow[node_id]["inputs"].get("strength_model", "?")
active_loras.append(f"{label_str}:{lora_name.split('/')[-1]}@{strength}")
logger.info(" Active LoRAs: %s", ' | '.join(active_loras) if active_loras else '(none)')
# Log prompts
logger.info(" Positive prompt: %s", workflow.get('6', {}).get('inputs', {}).get('text', '(not set)')[:200])
logger.info(" Negative prompt: %s", workflow.get('7', {}).get('inputs', {}).get('text', '(not set)')[:200])
logger.info("=" * 80)
try:
with _app.app_context():
# Send workflow to ComfyUI
logger.info("Sending workflow to ComfyUI...")
prompt_response = queue_prompt(job['workflow'])
if 'prompt_id' not in prompt_response:
raise Exception(f"ComfyUI rejected job: {prompt_response.get('error', 'unknown error')}")
comfy_id = prompt_response['prompt_id']
with _job_queue_lock:
job['comfy_prompt_id'] = comfy_id
logger.info("Job [%s] queued in ComfyUI as %s", job['id'][:8], comfy_id)
# Poll until done (max ~10 minutes)
max_retries = 300
finished = False
poll_count = 0
logger.info("Polling ComfyUI for completion (max %d retries, 2s interval)...", max_retries)
while max_retries > 0:
history = get_history(comfy_id)
if comfy_id in history:
finished = True
logger.info("Generation completed after %d polls (%d seconds)",
poll_count, poll_count * 2)
break
poll_count += 1
if poll_count % 10 == 0: # Log every 20 seconds
logger.info("Still waiting for generation... (%d polls, %d seconds elapsed)",
poll_count, poll_count * 2)
time.sleep(2)
max_retries -= 1
if not finished:
raise Exception("ComfyUI generation timed out")
logger.info("Job [%s] generation complete, finalizing...", job['id'][:8])
# Run the finalize callback (saves image to disk / DB)
job['finalize_fn'](comfy_id, job)
with _job_queue_lock:
job['status'] = 'done'
logger.info("=" * 80)
logger.info("JOB COMPLETED: [%s] %s", job['id'][:8], job['label'])
logger.info("=" * 80)
except Exception as e:
logger.error("=" * 80)
logger.exception("JOB FAILED: [%s] %s%s", job['id'][:8], job['label'], e)
logger.error("=" * 80)
with _job_queue_lock:
job['status'] = 'failed'
job['error'] = str(e)
# Remove completed/failed jobs from the active queue (keep in history)
with _job_queue_lock:
try:
_job_queue.remove(job)
except ValueError:
pass # Already removed (e.g. by user)
# Periodically purge old finished jobs from history to avoid unbounded growth
_prune_job_history()
def _make_finalize(category, slug, db_model_class=None, action=None):
"""Return a finalize callback for a standard queue job.
category — upload sub-directory name (e.g. 'characters', 'outfits')
slug — entity slug used for the upload folder name
db_model_class — SQLAlchemy model class for cover-image DB update; None = skip
action — 'replace' → update DB; None → always update; anything else → skip
"""
def _finalize(comfy_prompt_id, job):
logger.debug("=" * 80)
logger.debug("FINALIZE - Starting finalization for prompt ID: %s", comfy_prompt_id)
logger.debug("Category: %s, Slug: %s, Action: %s", category, slug, action)
history = get_history(comfy_prompt_id)
outputs = history[comfy_prompt_id]['outputs']
logger.debug("Processing outputs from %d node(s)", len(outputs))
for node_id, node_output in outputs.items():
logger.debug(" Node %s: %s", node_id, list(node_output.keys()))
if 'images' in node_output:
logger.debug(" Found %d image(s) in node %s", len(node_output['images']), node_id)
image_info = node_output['images'][0]
logger.debug(" Image info: filename=%s, subfolder=%s, type=%s",
image_info['filename'], image_info['subfolder'], image_info['type'])
image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type'])
upload_folder = current_app.config['UPLOAD_FOLDER']
folder = os.path.join(upload_folder, f"{category}/{slug}")
os.makedirs(folder, exist_ok=True)
filename = f"gen_{int(time.time())}.png"
full_path = os.path.join(folder, filename)
logger.debug(" Saving image to: %s", full_path)
with open(full_path, 'wb') as f:
f.write(image_data)
logger.info("Image saved: %s (%d bytes)", full_path, len(image_data))
relative_path = f"{category}/{slug}/{filename}"
# Include the seed used for this generation
used_seed = job['workflow'].get('3', {}).get('inputs', {}).get('seed')
job['result'] = {
'image_url': f'/static/uploads/{relative_path}',
'relative_path': relative_path,
'seed': used_seed,
}
if db_model_class and (action is None or action == 'replace'):
logger.debug(" Updating database: %s.image_path = %s", db_model_class.__name__, relative_path)
obj = db_model_class.query.filter_by(slug=slug).first()
if obj:
obj.image_path = relative_path
db.session.commit()
logger.debug(" Database updated successfully")
else:
logger.warning(" Object not found in database: %s(slug=%s)", db_model_class.__name__, slug)
else:
logger.debug(" Skipping database update (db_model_class=%s, action=%s)",
db_model_class.__name__ if db_model_class else None, action)
logger.debug("FINALIZE - Completed successfully")
logger.debug("=" * 80)
return
logger.warning("FINALIZE - No images found in outputs!")
logger.debug("=" * 80)
return _finalize
def _prune_job_history(max_age_seconds=3600):
"""Remove completed/failed jobs older than max_age_seconds from _job_history."""
cutoff = time.time() - max_age_seconds
with _job_queue_lock:
stale = [jid for jid, j in _job_history.items()
if j['status'] in ('done', 'failed', 'removed') and j['created_at'] < cutoff]
for jid in stale:
del _job_history[jid]
def init_queue_worker(flask_app):
"""Store the Flask app reference and start the background worker thread.
Called once from app.py during startup.
"""
global _app
_app = flask_app
worker = threading.Thread(target=_queue_worker, daemon=True, name='queue-worker')
worker.start()