Add background job queue system for generation

- Implements sequential job queue with background worker thread (_enqueue_job, _queue_worker)
- All generate routes now return job_id instead of prompt_id; frontend polls /api/queue/<id>/status
- Queue management UI in navbar with live badge, job list, pause/resume/remove controls
- Fix: replaced url_for() calls inside finalize callbacks with direct string paths (url_for raises RuntimeError without request context in background threads)
- Batch cover generation now uses two-phase pattern: queue all jobs upfront, then poll concurrently via Promise.all so page navigation doesn't interrupt the process
- Strengths gallery sweep migrated to same two-phase pattern; sgStop() cancels queued jobs server-side
- LoRA weight randomisation via lora_weight_min/lora_weight_max already present in _resolve_lora_weight

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Aodhan Collins
2026-03-03 02:32:50 +00:00
parent ae7ba961c1
commit 3c828a170f
21 changed files with 1451 additions and 2146 deletions

View File

@@ -101,11 +101,10 @@
const SG_CAT = {{ sg_category | tojson }};
const SG_SLUG = {{ sg_entity.slug | tojson }};
const SG_WS = {{ COMFYUI_WS_URL | tojson }};
const SG_CLIENT_ID = 'sg_' + Math.random().toString(36).slice(2, 10);
let sgRunning = false;
let sgShouldStop = false;
let sgQueuedJobs = []; // track all queued job IDs so stop can cancel them
// ---- helpers ----
@@ -241,52 +240,23 @@
sgHighlightBounds();
}
// ---- WebSocket wait ----
// ---- Job queue wait ----
function sgWaitForCompletion(promptId) {
function sgWaitForJob(jobId) {
return new Promise((resolve, reject) => {
let ws;
try {
ws = new WebSocket(`${SG_WS}?clientId=${SG_CLIENT_ID}`);
} catch (e) {
// Fall back to polling if WS unavailable
sgPollUntilDone(promptId).then(resolve).catch(reject);
return;
}
const timeout = setTimeout(() => {
ws.close();
sgPollUntilDone(promptId).then(resolve).catch(reject);
}, 120000);
ws.onmessage = (event) => {
let msg;
try { msg = JSON.parse(event.data); } catch { return; }
if (msg.type === 'executing' && msg.data && msg.data.prompt_id === promptId) {
if (msg.data.node === null) {
clearTimeout(timeout);
ws.close();
resolve();
const poll = setInterval(async () => {
try {
const resp = await fetch(`/api/queue/${jobId}/status`);
const data = await resp.json();
if (data.status === 'done') { clearInterval(poll); resolve(data); }
else if (data.status === 'failed' || data.status === 'removed') {
clearInterval(poll); reject(new Error(data.error || 'Job failed'));
}
}
};
ws.onerror = () => {
clearTimeout(timeout);
sgPollUntilDone(promptId).then(resolve).catch(reject);
};
} catch (err) { console.error('[Strengths] poll error:', err); }
}, 1500);
});
}
async function sgPollUntilDone(promptId) {
for (let i = 0; i < 120; i++) {
await new Promise(r => setTimeout(r, 2000));
const r = await fetch(`/check_status/${promptId}`);
const d = await r.json();
if (d.status === 'complete' || d.status === 'finished' || d.done) return;
}
}
// ---- main flow ----
async function sgClearImages() {
@@ -310,64 +280,65 @@
const steps = sgBuildSteps(min, max, sgGetInterval());
if (!steps.length) return;
// Clear any previous set before starting a new one
await sgClearImages();
sgRunning = true;
sgShouldStop = false;
sgQueuedJobs = [];
document.getElementById('sg-btn-run').classList.add('d-none');
document.getElementById('sg-btn-stop').classList.remove('d-none');
document.getElementById('sg-progress').classList.remove('d-none');
const charSelect = document.getElementById('character_select');
const charSlug = charSelect ? charSelect.value : '';
// Phase 1: Queue all steps upfront so generation continues even if the page is navigated away
document.getElementById('sg-progress-bar').style.width = '100%';
document.getElementById('sg-progress-bar').classList.add('progress-bar-striped', 'progress-bar-animated');
for (let i = 0; i < steps.length; i++) {
if (sgShouldStop) break;
const sv = steps[i];
const pct = Math.round(((i) / steps.length) * 100);
document.getElementById('sg-progress-bar').style.width = pct + '%';
document.getElementById('sg-progress-label').textContent =
`${i} / ${steps.length} \u2014 weight: ${sv}`;
`Queuing ${i + 1} / ${steps.length} \u2014 weight: ${sv}`;
try {
// Queue one generation
// Pick up the character currently selected on this detail page (if any)
const charSelect = document.getElementById('character_select');
const charSlug = charSelect ? charSelect.value : '';
const formData = new URLSearchParams({
strength_value: sv,
seed: seed,
client_id: SG_CLIENT_ID,
character_slug: charSlug,
});
const formData = new URLSearchParams({ strength_value: sv, seed, character_slug: charSlug });
const queueResp = await fetch(`/strengths/${SG_CAT}/${SG_SLUG}/generate`, {
method: 'POST',
headers: { 'X-Requested-With': 'XMLHttpRequest', 'Content-Type': 'application/x-www-form-urlencoded' },
body: formData,
});
const queueData = await queueResp.json();
if (!queueData.prompt_id) throw new Error('No prompt_id returned');
await sgWaitForCompletion(queueData.prompt_id);
// Finalize
const finData = new URLSearchParams({ strength_value: sv, seed: seed });
const finResp = await fetch(`/strengths/${SG_CAT}/${SG_SLUG}/finalize/${queueData.prompt_id}`, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: finData,
});
const finJson = await finResp.json();
if (finJson.success && finJson.image_url) {
sgAddImage(finJson.image_url, sv);
}
if (!queueData.job_id) throw new Error('No job_id returned');
sgQueuedJobs.push({ jobId: queueData.job_id, sv });
} catch (err) {
console.error('[Strengths] step error:', sv, err);
console.error('[Strengths] queue error:', sv, err);
}
}
// Phase 2: Poll all jobs concurrently; show results as each finishes
document.getElementById('sg-progress-bar').classList.remove('progress-bar-striped', 'progress-bar-animated');
document.getElementById('sg-progress-bar').style.width = '0%';
let completed = 0;
await Promise.all(sgQueuedJobs.map(async ({ jobId, sv }) => {
try {
const jobResult = await sgWaitForJob(jobId);
if (jobResult.result && jobResult.result.image_url) {
sgAddImage(jobResult.result.image_url, sv);
}
} catch (err) {
console.error('[Strengths] job error:', sv, err);
}
completed++;
const pct = Math.round((completed / sgQueuedJobs.length) * 100);
document.getElementById('sg-progress-bar').style.width = pct + '%';
document.getElementById('sg-progress-label').textContent =
`${completed} / ${sgQueuedJobs.length} done`;
}));
document.getElementById('sg-progress-bar').style.width = '100%';
document.getElementById('sg-progress-label').textContent =
`Done \u2014 ${steps.length} images generated`;
`Done \u2014 ${sgQueuedJobs.length} images generated`;
setTimeout(() => document.getElementById('sg-progress').classList.add('d-none'), 3000);
document.getElementById('sg-btn-stop').classList.add('d-none');
@@ -377,6 +348,10 @@
function sgStop() {
sgShouldStop = true;
// Cancel any pending (not yet processing) queued jobs
sgQueuedJobs.forEach(({ jobId }) => {
fetch(`/api/queue/${jobId}/remove`, { method: 'POST' }).catch(() => {});
});
document.getElementById('sg-btn-stop').classList.add('d-none');
document.getElementById('sg-btn-run').classList.remove('d-none');
}