import os import time import uuid import logging import threading from collections import deque from flask import current_app from models import db from services.comfyui import queue_prompt, get_history, get_image logger = logging.getLogger('gaze') # --------------------------------------------------------------------------- # Generation Job Queue # --------------------------------------------------------------------------- # Each job is a dict: # id — unique UUID string # label — human-readable description (e.g. "Tifa Lockhart – preview") # status — 'pending' | 'processing' | 'done' | 'failed' | 'paused' | 'removed' # workflow — the fully-prepared ComfyUI workflow dict # finalize_fn — callable(comfy_prompt_id, job) that saves the image; called after ComfyUI finishes # error — error message string (when status == 'failed') # result — dict with image_url etc. (set by finalize_fn on success) # created_at — unix timestamp # comfy_prompt_id — the prompt_id returned by ComfyUI (set when processing starts) _job_queue_lock = threading.Lock() _job_queue = deque() # ordered list of job dicts (pending + paused + processing) _job_history = {} # job_id -> job dict (all jobs ever added, for status lookup) _queue_worker_event = threading.Event() # signals worker that a new job is available # Stored reference to the Flask app, set by init_queue_worker() _app = None def _enqueue_job(label, workflow, finalize_fn): """Add a generation job to the queue. Returns the job dict.""" job = { 'id': str(uuid.uuid4()), 'label': label, 'status': 'pending', 'workflow': workflow, 'finalize_fn': finalize_fn, 'error': None, 'result': None, 'created_at': time.time(), 'comfy_prompt_id': None, } with _job_queue_lock: _job_queue.append(job) _job_history[job['id']] = job logger.info("Job queued: [%s] %s", job['id'][:8], label) _queue_worker_event.set() return job def _queue_worker(): """Background thread: processes jobs from _job_queue sequentially.""" while True: _queue_worker_event.wait() _queue_worker_event.clear() while True: job = None with _job_queue_lock: # Find the first pending job for j in _job_queue: if j['status'] == 'pending': job = j break if job is None: break # No pending jobs — go back to waiting # Mark as processing with _job_queue_lock: job['status'] = 'processing' logger.info("=" * 80) logger.info("JOB STARTED: [%s] %s", job['id'][:8], job['label']) logger.info("Job created at: %s", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(job['created_at']))) # Log workflow summary before sending to ComfyUI workflow = job['workflow'] logger.info("Workflow summary:") logger.info(" Checkpoint: %s", workflow.get('4', {}).get('inputs', {}).get('ckpt_name', '(not set)')) logger.info(" Seed: %s", workflow.get('3', {}).get('inputs', {}).get('seed', '(not set)')) logger.info(" Resolution: %sx%s", workflow.get('5', {}).get('inputs', {}).get('width', '?'), workflow.get('5', {}).get('inputs', {}).get('height', '?')) logger.info(" Sampler: %s / %s (steps=%s, cfg=%s)", workflow.get('3', {}).get('inputs', {}).get('sampler_name', '?'), workflow.get('3', {}).get('inputs', {}).get('scheduler', '?'), workflow.get('3', {}).get('inputs', {}).get('steps', '?'), workflow.get('3', {}).get('inputs', {}).get('cfg', '?')) # Log active LoRAs active_loras = [] for node_id, label_str in [("16", "char/look"), ("17", "outfit"), ("18", "action"), ("19", "style/detail/scene")]: if node_id in workflow: lora_name = workflow[node_id]["inputs"].get("lora_name", "") if lora_name: strength = workflow[node_id]["inputs"].get("strength_model", "?") active_loras.append(f"{label_str}:{lora_name.split('/')[-1]}@{strength}") logger.info(" Active LoRAs: %s", ' | '.join(active_loras) if active_loras else '(none)') # Log prompts logger.info(" Positive prompt: %s", workflow.get('6', {}).get('inputs', {}).get('text', '(not set)')[:200]) logger.info(" Negative prompt: %s", workflow.get('7', {}).get('inputs', {}).get('text', '(not set)')[:200]) logger.info("=" * 80) try: with _app.app_context(): # Send workflow to ComfyUI logger.info("Sending workflow to ComfyUI...") prompt_response = queue_prompt(job['workflow']) if 'prompt_id' not in prompt_response: raise Exception(f"ComfyUI rejected job: {prompt_response.get('error', 'unknown error')}") comfy_id = prompt_response['prompt_id'] with _job_queue_lock: job['comfy_prompt_id'] = comfy_id logger.info("Job [%s] queued in ComfyUI as %s", job['id'][:8], comfy_id) # Poll until done (max ~10 minutes) max_retries = 300 finished = False poll_count = 0 logger.info("Polling ComfyUI for completion (max %d retries, 2s interval)...", max_retries) while max_retries > 0: history = get_history(comfy_id) if comfy_id in history: finished = True logger.info("Generation completed after %d polls (%d seconds)", poll_count, poll_count * 2) break poll_count += 1 if poll_count % 10 == 0: # Log every 20 seconds logger.info("Still waiting for generation... (%d polls, %d seconds elapsed)", poll_count, poll_count * 2) time.sleep(2) max_retries -= 1 if not finished: raise Exception("ComfyUI generation timed out") logger.info("Job [%s] generation complete, finalizing...", job['id'][:8]) # Run the finalize callback (saves image to disk / DB) job['finalize_fn'](comfy_id, job) with _job_queue_lock: job['status'] = 'done' logger.info("=" * 80) logger.info("JOB COMPLETED: [%s] %s", job['id'][:8], job['label']) logger.info("=" * 80) except Exception as e: logger.error("=" * 80) logger.exception("JOB FAILED: [%s] %s — %s", job['id'][:8], job['label'], e) logger.error("=" * 80) with _job_queue_lock: job['status'] = 'failed' job['error'] = str(e) # Remove completed/failed jobs from the active queue (keep in history) with _job_queue_lock: try: _job_queue.remove(job) except ValueError: pass # Already removed (e.g. by user) # Periodically purge old finished jobs from history to avoid unbounded growth _prune_job_history() def _make_finalize(category, slug, db_model_class=None, action=None): """Return a finalize callback for a standard queue job. category — upload sub-directory name (e.g. 'characters', 'outfits') slug — entity slug used for the upload folder name db_model_class — SQLAlchemy model class for cover-image DB update; None = skip action — 'replace' → update DB; None → always update; anything else → skip """ def _finalize(comfy_prompt_id, job): logger.debug("=" * 80) logger.debug("FINALIZE - Starting finalization for prompt ID: %s", comfy_prompt_id) logger.debug("Category: %s, Slug: %s, Action: %s", category, slug, action) history = get_history(comfy_prompt_id) outputs = history[comfy_prompt_id]['outputs'] logger.debug("Processing outputs from %d node(s)", len(outputs)) for node_id, node_output in outputs.items(): logger.debug(" Node %s: %s", node_id, list(node_output.keys())) if 'images' in node_output: logger.debug(" Found %d image(s) in node %s", len(node_output['images']), node_id) image_info = node_output['images'][0] logger.debug(" Image info: filename=%s, subfolder=%s, type=%s", image_info['filename'], image_info['subfolder'], image_info['type']) image_data = get_image(image_info['filename'], image_info['subfolder'], image_info['type']) upload_folder = current_app.config['UPLOAD_FOLDER'] folder = os.path.join(upload_folder, f"{category}/{slug}") os.makedirs(folder, exist_ok=True) filename = f"gen_{int(time.time())}.png" full_path = os.path.join(folder, filename) logger.debug(" Saving image to: %s", full_path) with open(full_path, 'wb') as f: f.write(image_data) logger.info("Image saved: %s (%d bytes)", full_path, len(image_data)) relative_path = f"{category}/{slug}/{filename}" # Include the seed used for this generation used_seed = job['workflow'].get('3', {}).get('inputs', {}).get('seed') job['result'] = { 'image_url': f'/static/uploads/{relative_path}', 'relative_path': relative_path, 'seed': used_seed, } if db_model_class and (action is None or action == 'replace'): logger.debug(" Updating database: %s.image_path = %s", db_model_class.__name__, relative_path) obj = db_model_class.query.filter_by(slug=slug).first() if obj: obj.image_path = relative_path db.session.commit() logger.debug(" Database updated successfully") else: logger.warning(" Object not found in database: %s(slug=%s)", db_model_class.__name__, slug) else: logger.debug(" Skipping database update (db_model_class=%s, action=%s)", db_model_class.__name__ if db_model_class else None, action) logger.debug("FINALIZE - Completed successfully") logger.debug("=" * 80) return logger.warning("FINALIZE - No images found in outputs!") logger.debug("=" * 80) return _finalize def _prune_job_history(max_age_seconds=3600): """Remove completed/failed jobs older than max_age_seconds from _job_history.""" cutoff = time.time() - max_age_seconds with _job_queue_lock: stale = [jid for jid, j in _job_history.items() if j['status'] in ('done', 'failed', 'removed') and j['created_at'] < cutoff] for jid in stale: del _job_history[jid] def init_queue_worker(flask_app): """Store the Flask app reference and start the background worker thread. Called once from app.py during startup. """ global _app _app = flask_app worker = threading.Thread(target=_queue_worker, daemon=True, name='queue-worker') worker.start()