Files
swiper/handler.py
2025-07-20 01:56:24 +01:00

374 lines
14 KiB
Python

"""HTTP request handler for the SWIPER application.
All web-server endpoints are implemented in the `ImageSwipeHandler` class.
It relies only on the `config` and `database` modules, avoiding direct
knowledge of paths or SQL.
"""
from __future__ import annotations
import datetime
import io
import json
import mimetypes
import os
import random
import sqlite3
import urllib.parse
import zipfile
from http.server import BaseHTTPRequestHandler
from typing import List
from PIL import Image
import config
from config import IMAGE_DIRS, find_image_file
from database import (
add_selection,
delete_selection,
get_selections,
reset_database,
update_selection,
)
# Keep first dir for legacy paths but prefer helper for lookups
IMAGE_DIR = config.IMAGE_DIR
DB_PATH = config.DB_PATH
class ImageSwipeHandler(BaseHTTPRequestHandler):
"""Implements all HTTP GET/POST endpoints for the front-end."""
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _set_cors_headers(self) -> None:
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
# ------------------------------------------------------------------
# HTTP verbs
# ------------------------------------------------------------------
def do_GET(self) -> None: # noqa: N802 (method name required by BaseHTTPRequestHandler)
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
if path == "/":
self.serve_file("index.html", "text/html")
elif path == "/history":
self.serve_file("history.html", "text/html")
elif path == "/styles.css":
self.serve_file("styles.css", "text/css")
elif path == "/script.js":
self.serve_file("script.js", "application/javascript")
elif path == "/random-image":
self.serve_random_image()
elif path == "/selections":
self.serve_selections()
elif path.startswith("/images/"):
self.serve_image(path[8:])
elif path == "/favicon.ico":
self.send_response(204)
self.end_headers()
elif path.startswith("/download-selected"):
self.handle_download_selected()
else:
# try static file
local_path = path.lstrip("/")
try:
self.serve_file(local_path)
except FileNotFoundError:
self.send_error(404, "File not found")
def do_POST(self) -> None: # noqa: N802
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
if path.startswith("/selection"):
self.handle_selection_legacy()
return
if path == "/record-selection":
self.handle_record_selection()
return
if path == "/update-selection":
self.handle_update_selection()
return
if path == "/delete-selection":
self.handle_delete_selection()
return
if path == "/reset-database":
self.handle_reset_database()
return
if path.startswith("/download-selected"):
self.handle_download_selected()
return
self.send_error(404, "Endpoint not found")
def do_OPTIONS(self) -> None: # noqa: N802
self.send_response(204)
self._set_cors_headers()
self.end_headers()
# ------------------------------------------------------------------
# Static helpers
# ------------------------------------------------------------------
def serve_file(self, file_path: str, content_type: str | None = None) -> None:
abs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path)
with open(abs_path, "rb") as fp:
content = fp.read()
self.send_response(200)
if not content_type:
content_type, _ = mimetypes.guess_type(file_path)
self.send_header("Content-Type", content_type or "application/octet-stream")
self._set_cors_headers()
self.send_header("Content-Length", len(content))
self.end_headers()
self.wfile.write(content)
def serve_image(self, rel_path: str) -> None:
abs_path = find_image_file(rel_path)
if not abs_path:
self.send_error(404, f"Image not found: {rel_path}")
return
try:
with open(abs_path, "rb") as fp:
content = fp.read()
self.send_response(200)
ctype, _ = mimetypes.guess_type(abs_path)
self.send_header("Content-Type", ctype or "application/octet-stream")
self._set_cors_headers()
self.send_header("Content-Length", len(content))
self.end_headers()
self.wfile.write(content)
except FileNotFoundError:
self.send_error(404, f"Image not found: {rel_path}")
# ------------------------------------------------------------------
# API helpers
# ------------------------------------------------------------------
def serve_random_image(self) -> None:
parsed = urllib.parse.urlparse(self.path)
query_params = urllib.parse.parse_qs(parsed.query)
orientation_str = query_params.get("orientation", ["all"])[0]
orientations = [o.strip() for o in orientation_str.split(',')]
search_keywords_str = query_params.get("search", [""])[0].strip()
allow_nsfw = query_params.get("allow_nsfw", ["0"])[0] == "1"
search_keywords = [kw.strip() for kw in search_keywords_str.split(',') if kw.strip()]
actions_str = query_params.get("actions", ["Unactioned"])[0]
actions = [a.strip() for a in actions_str.split(',') if a.strip()]
sort_order = query_params.get("sort", ["random"])[0] # Get sort order parameter
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row # Important to access columns by name
cur = conn.cursor()
query = """
SELECT
meta.path, meta.resolution_x, meta.resolution_y, meta.name,
meta.orientation, meta.creation_date, meta.prompt_data,
pd.model_name, pd.positive_prompt, pd.negative_prompt,
pd.sampler, pd.steps, pd.cfg_scale, pd.seed, pd.clip_skip,
pd.other_parameters
FROM image_metadata meta
LEFT JOIN prompt_details pd ON meta.path = pd.image_path
"""
params: List[str] = []
where_clauses = ["(meta.actioned IS NULL OR meta.actioned != 'purged')"]
# Action filter
action_conditions = []
action_params = []
if "Unactioned" in actions:
action_conditions.append("meta.actioned IS NULL")
actions.remove("Unactioned")
if actions:
placeholders = ", ".join("?" for _ in actions)
action_conditions.append(f"meta.actioned IN ({placeholders})")
action_params.extend(actions)
if action_conditions:
where_clauses.append(f"({' OR '.join(action_conditions)})")
params.extend(action_params)
# Orientation filter
if "all" not in orientations and orientations:
placeholders = ", ".join("?" for _ in orientations)
where_clauses.append(f"meta.orientation IN ({placeholders})")
params.extend(orientations)
# NSFW filter
if not allow_nsfw:
where_clauses.append("meta.nsfw = 0")
# Keyword filter
if search_keywords:
for keyword in search_keywords:
# Search only the positive prompt (pd.positive_prompt)
where_clauses.append("pd.positive_prompt LIKE ?")
params.append(f"%{keyword}%")
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
# Add sorting based on sort_order
if sort_order == "oldest":
query += " ORDER BY meta.creation_date ASC"
elif sort_order == "newest":
query += " ORDER BY meta.creation_date DESC"
cur.execute(query, params)
rows = cur.fetchall()
conn.close()
if not rows:
self._json_response({"message": "No more images available for this filter."})
return
# For random order, select a random row from the results
if sort_order == "random":
row = random.choice(rows)
else:
# For oldest/newest, use the first row in the sorted results
row = rows[0]
# Convert row to a dictionary for easier access
row_dict = dict(row)
other_params_str = row_dict.get("other_parameters")
other_params = json.loads(other_params_str) if other_params_str and other_params_str != '{}' else {}
response = {
"path": f"/images/{row_dict['path']}",
"resolution_x": row_dict["resolution_x"],
"resolution_y": row_dict["resolution_y"],
"resolution": f"{row_dict['resolution_x']}x{row_dict['resolution_y']}",
"filename": row_dict["name"],
"orientation": row_dict["orientation"],
"creation_date": datetime.datetime.fromtimestamp(row_dict["creation_date"]).strftime("%Y-%m-%d %H:%M:%S"),
"prompt_data": row_dict["prompt_data"],
"model_name": row_dict.get("model_name"),
"positive_prompt": row_dict.get("positive_prompt"),
"negative_prompt": row_dict.get("negative_prompt"),
"sampler": row_dict.get("sampler"),
"steps": row_dict.get("steps"),
"cfg_scale": row_dict.get("cfg_scale"),
"seed": row_dict.get("seed"),
"clip_skip": row_dict.get("clip_skip"),
"other_parameters": other_params,
}
self._json_response(response)
def serve_selections(self) -> None:
data = {"selections": get_selections()}
self._json_response(data)
def serve_resolutions(self) -> None:
# Collect resolutions across all configured directories
resolutions_set = set()
for base in IMAGE_DIRS:
for d in os.listdir(base):
if os.path.isdir(os.path.join(base, d)):
resolutions_set.add(d)
resolutions = sorted(resolutions_set)
self._json_response({"resolutions": resolutions})
def handle_selection_legacy(self) -> None:
# legacy /selection endpoint keeps existing clients working
try:
length = int(self.headers["Content-Length"])
body = json.loads(self.rfile.read(length))
image_path = body.get("image_path", "")
action = body.get("action")
if image_path.startswith("/images/"):
image_path = image_path[len("/images/"):]
if not image_path or not action:
self.send_error(400, "Missing image_path or action")
return
add_selection(image_path, action)
self._json_response({"status": "success"})
except Exception as exc:
self.send_error(500, f"Server error: {exc}")
def handle_record_selection(self) -> None:
try:
length = int(self.headers["Content-Length"])
data = json.loads(self.rfile.read(length))
image_path = data.get("path", "").replace("/images/", "")
action = data.get("action", "")
if not image_path or not action:
self.send_error(400, "Missing required fields")
return
add_selection(image_path, action)
self._json_response({"success": True, "message": "Selection recorded"})
except Exception as exc:
self.send_error(500, f"Error: {exc}")
def handle_update_selection(self) -> None:
try:
length = int(self.headers["Content-Length"])
data = json.loads(self.rfile.read(length))
if not update_selection(data.get("id"), data.get("action")):
self.send_error(404, "Selection not found")
return
self._json_response({"success": True})
except Exception as exc:
self.send_error(500, f"Error: {exc}")
def handle_delete_selection(self) -> None:
try:
length = int(self.headers["Content-Length"])
data = json.loads(self.rfile.read(length))
if not delete_selection(data.get("id")):
self.send_error(404, "Selection not found")
return
self._json_response({"success": True})
except Exception as exc:
self.send_error(500, f"Error: {exc}")
def handle_reset_database(self) -> None:
reset_database()
self._json_response({"status": "success", "message": "Database reset successfully"})
def handle_download_selected(self) -> None:
try:
parsed = urllib.parse.urlparse(self.path)
q = urllib.parse.parse_qs(parsed.query)
image_paths = q.get("paths", [])
if not image_paths:
self.send_error(400, "Missing paths param")
return
buf = io.BytesIO()
with zipfile.ZipFile(buf, "a", zipfile.ZIP_DEFLATED, False) as zf:
for p in image_paths:
if p.startswith("/images/"):
p = p[8:]
full = find_image_file(p)
if full and os.path.exists(full):
zf.write(full, os.path.basename(p))
buf.seek(0)
self.send_response(200)
self.send_header("Content-Type", "application/zip")
self.send_header(
"Content-Disposition", "attachment; filename=selected_images.zip"
)
self._set_cors_headers()
self.end_headers()
self.wfile.write(buf.getvalue())
except Exception as exc:
self.send_error(500, f"Error: {exc}")
# ------------------------------------------------------------------
# Utilities
# ------------------------------------------------------------------
def _json_response(self, data: dict) -> None:
payload = json.dumps(data).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self._set_cors_headers()
self.send_header("Content-Length", len(payload))
self.end_headers()
self.wfile.write(payload)