"""HTTP request handler for the SWIPER application. All web-server endpoints are implemented in the `ImageSwipeHandler` class. It relies only on the `config` and `database` modules, avoiding direct knowledge of paths or SQL. """ from __future__ import annotations import datetime import io import json import mimetypes import os import random import sqlite3 import urllib.parse import zipfile from http.server import BaseHTTPRequestHandler from typing import List from PIL import Image import config from config import IMAGE_DIRS, find_image_file from database import ( add_selection, delete_selection, get_selections, reset_database, update_selection, ) # Keep first dir for legacy paths but prefer helper for lookups IMAGE_DIR = config.IMAGE_DIR DB_PATH = config.DB_PATH class ImageSwipeHandler(BaseHTTPRequestHandler): """Implements all HTTP GET/POST endpoints for the front-end.""" # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _set_cors_headers(self) -> None: self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") # ------------------------------------------------------------------ # HTTP verbs # ------------------------------------------------------------------ def do_GET(self) -> None: # noqa: N802 (method name required by BaseHTTPRequestHandler) parsed = urllib.parse.urlparse(self.path) path = parsed.path if path == "/": self.serve_file("index.html", "text/html") elif path == "/history": self.serve_file("history.html", "text/html") elif path == "/styles.css": self.serve_file("styles.css", "text/css") elif path == "/script.js": self.serve_file("script.js", "application/javascript") elif path == "/random-image": self.serve_random_image() elif path == "/selections": self.serve_selections() elif path.startswith("/images/"): self.serve_image(path[8:]) elif path == "/favicon.ico": self.send_response(204) self.end_headers() elif path.startswith("/download-selected"): self.handle_download_selected() else: # try static file local_path = path.lstrip("/") try: self.serve_file(local_path) except FileNotFoundError: self.send_error(404, "File not found") def do_POST(self) -> None: # noqa: N802 parsed = urllib.parse.urlparse(self.path) path = parsed.path if path.startswith("/selection"): self.handle_selection_legacy() return if path == "/record-selection": self.handle_record_selection() return if path == "/update-selection": self.handle_update_selection() return if path == "/delete-selection": self.handle_delete_selection() return if path == "/reset-database": self.handle_reset_database() return if path.startswith("/download-selected"): self.handle_download_selected() return self.send_error(404, "Endpoint not found") def do_OPTIONS(self) -> None: # noqa: N802 self.send_response(204) self._set_cors_headers() self.end_headers() # ------------------------------------------------------------------ # Static helpers # ------------------------------------------------------------------ def serve_file(self, file_path: str, content_type: str | None = None) -> None: abs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path) with open(abs_path, "rb") as fp: content = fp.read() self.send_response(200) if not content_type: content_type, _ = mimetypes.guess_type(file_path) self.send_header("Content-Type", content_type or "application/octet-stream") self._set_cors_headers() self.send_header("Content-Length", len(content)) self.end_headers() self.wfile.write(content) def serve_image(self, rel_path: str) -> None: abs_path = find_image_file(rel_path) if not abs_path: self.send_error(404, f"Image not found: {rel_path}") return try: with open(abs_path, "rb") as fp: content = fp.read() self.send_response(200) ctype, _ = mimetypes.guess_type(abs_path) self.send_header("Content-Type", ctype or "application/octet-stream") self._set_cors_headers() self.send_header("Content-Length", len(content)) self.end_headers() self.wfile.write(content) except FileNotFoundError: self.send_error(404, f"Image not found: {rel_path}") # ------------------------------------------------------------------ # API helpers # ------------------------------------------------------------------ def serve_random_image(self) -> None: parsed = urllib.parse.urlparse(self.path) query_params = urllib.parse.parse_qs(parsed.query) orientation_str = query_params.get("orientation", ["all"])[0] orientations = [o.strip() for o in orientation_str.split(',')] search_keywords_str = query_params.get("search", [""])[0].strip() allow_nsfw = query_params.get("allow_nsfw", ["0"])[0] == "1" search_keywords = [kw.strip() for kw in search_keywords_str.split(',') if kw.strip()] actions_str = query_params.get("actions", ["Unactioned"])[0] actions = [a.strip() for a in actions_str.split(',') if a.strip()] sort_order = query_params.get("sort", ["random"])[0] # Get sort order parameter conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row # Important to access columns by name cur = conn.cursor() query = """ SELECT meta.path, meta.resolution_x, meta.resolution_y, meta.name, meta.orientation, meta.creation_date, meta.prompt_data, pd.model_name, pd.positive_prompt, pd.negative_prompt, pd.sampler, pd.steps, pd.cfg_scale, pd.seed, pd.clip_skip, pd.other_parameters FROM image_metadata meta LEFT JOIN prompt_details pd ON meta.path = pd.image_path """ params: List[str] = [] where_clauses = ["(meta.actioned IS NULL OR meta.actioned != 'purged')"] # Action filter action_conditions = [] action_params = [] if "Unactioned" in actions: action_conditions.append("meta.actioned IS NULL") actions.remove("Unactioned") if actions: placeholders = ", ".join("?" for _ in actions) action_conditions.append(f"meta.actioned IN ({placeholders})") action_params.extend(actions) if action_conditions: where_clauses.append(f"({' OR '.join(action_conditions)})") params.extend(action_params) # Orientation filter if "all" not in orientations and orientations: placeholders = ", ".join("?" for _ in orientations) where_clauses.append(f"meta.orientation IN ({placeholders})") params.extend(orientations) # NSFW filter if not allow_nsfw: where_clauses.append("meta.nsfw = 0") # Keyword filter if search_keywords: for keyword in search_keywords: # Search only the positive prompt (pd.positive_prompt) where_clauses.append("pd.positive_prompt LIKE ?") params.append(f"%{keyword}%") if where_clauses: query += " WHERE " + " AND ".join(where_clauses) # Add sorting based on sort_order if sort_order == "oldest": query += " ORDER BY meta.creation_date ASC" elif sort_order == "newest": query += " ORDER BY meta.creation_date DESC" cur.execute(query, params) rows = cur.fetchall() conn.close() if not rows: self._json_response({"message": "No more images available for this filter."}) return # For random order, select a random row from the results if sort_order == "random": row = random.choice(rows) else: # For oldest/newest, use the first row in the sorted results row = rows[0] # Convert row to a dictionary for easier access row_dict = dict(row) other_params_str = row_dict.get("other_parameters") other_params = json.loads(other_params_str) if other_params_str and other_params_str != '{}' else {} response = { "path": f"/images/{row_dict['path']}", "resolution_x": row_dict["resolution_x"], "resolution_y": row_dict["resolution_y"], "resolution": f"{row_dict['resolution_x']}x{row_dict['resolution_y']}", "filename": row_dict["name"], "orientation": row_dict["orientation"], "creation_date": datetime.datetime.fromtimestamp(row_dict["creation_date"]).strftime("%Y-%m-%d %H:%M:%S"), "prompt_data": row_dict["prompt_data"], "model_name": row_dict.get("model_name"), "positive_prompt": row_dict.get("positive_prompt"), "negative_prompt": row_dict.get("negative_prompt"), "sampler": row_dict.get("sampler"), "steps": row_dict.get("steps"), "cfg_scale": row_dict.get("cfg_scale"), "seed": row_dict.get("seed"), "clip_skip": row_dict.get("clip_skip"), "other_parameters": other_params, } self._json_response(response) def serve_selections(self) -> None: data = {"selections": get_selections()} self._json_response(data) def serve_resolutions(self) -> None: # Collect resolutions across all configured directories resolutions_set = set() for base in IMAGE_DIRS: for d in os.listdir(base): if os.path.isdir(os.path.join(base, d)): resolutions_set.add(d) resolutions = sorted(resolutions_set) self._json_response({"resolutions": resolutions}) def handle_selection_legacy(self) -> None: # legacy /selection endpoint keeps existing clients working try: length = int(self.headers["Content-Length"]) body = json.loads(self.rfile.read(length)) image_path = body.get("image_path", "") action = body.get("action") if image_path.startswith("/images/"): image_path = image_path[len("/images/"):] if not image_path or not action: self.send_error(400, "Missing image_path or action") return add_selection(image_path, action) self._json_response({"status": "success"}) except Exception as exc: self.send_error(500, f"Server error: {exc}") def handle_record_selection(self) -> None: try: length = int(self.headers["Content-Length"]) data = json.loads(self.rfile.read(length)) image_path = data.get("path", "").replace("/images/", "") action = data.get("action", "") if not image_path or not action: self.send_error(400, "Missing required fields") return add_selection(image_path, action) self._json_response({"success": True, "message": "Selection recorded"}) except Exception as exc: self.send_error(500, f"Error: {exc}") def handle_update_selection(self) -> None: try: length = int(self.headers["Content-Length"]) data = json.loads(self.rfile.read(length)) if not update_selection(data.get("id"), data.get("action")): self.send_error(404, "Selection not found") return self._json_response({"success": True}) except Exception as exc: self.send_error(500, f"Error: {exc}") def handle_delete_selection(self) -> None: try: length = int(self.headers["Content-Length"]) data = json.loads(self.rfile.read(length)) if not delete_selection(data.get("id")): self.send_error(404, "Selection not found") return self._json_response({"success": True}) except Exception as exc: self.send_error(500, f"Error: {exc}") def handle_reset_database(self) -> None: reset_database() self._json_response({"status": "success", "message": "Database reset successfully"}) def handle_download_selected(self) -> None: try: parsed = urllib.parse.urlparse(self.path) q = urllib.parse.parse_qs(parsed.query) image_paths = q.get("paths", []) if not image_paths: self.send_error(400, "Missing paths param") return buf = io.BytesIO() with zipfile.ZipFile(buf, "a", zipfile.ZIP_DEFLATED, False) as zf: for p in image_paths: if p.startswith("/images/"): p = p[8:] full = find_image_file(p) if full and os.path.exists(full): zf.write(full, os.path.basename(p)) buf.seek(0) self.send_response(200) self.send_header("Content-Type", "application/zip") self.send_header( "Content-Disposition", "attachment; filename=selected_images.zip" ) self._set_cors_headers() self.end_headers() self.wfile.write(buf.getvalue()) except Exception as exc: self.send_error(500, f"Error: {exc}") # ------------------------------------------------------------------ # Utilities # ------------------------------------------------------------------ def _json_response(self, data: dict) -> None: payload = json.dumps(data).encode() self.send_response(200) self.send_header("Content-Type", "application/json") self._set_cors_headers() self.send_header("Content-Length", len(payload)) self.end_headers() self.wfile.write(payload)