from http.server import HTTPServer, BaseHTTPRequestHandler import os import json import random import mimetypes import urllib.parse import sqlite3 import time import datetime import zipfile import io # Path to the image directory IMAGE_DIR = "/mnt/secret-items/sd-outputs/Sorted/Images/Portrait" # Database file path DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "image_selections.db") # Initialize the database def init_db(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Check if orientation column exists cursor.execute("PRAGMA table_info(image_selections)") columns = [column[1] for column in cursor.fetchall()] # Create table if it doesn't exist if 'image_selections' not in [table[0] for table in cursor.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()]: cursor.execute(''' CREATE TABLE IF NOT EXISTS image_selections ( id INTEGER PRIMARY KEY AUTOINCREMENT, image_path TEXT NOT NULL, resolution TEXT NOT NULL, action TEXT NOT NULL, timestamp INTEGER NOT NULL, orientation TEXT ) ''') print("Created new image_selections table with orientation column") elif 'orientation' not in columns: # Add orientation column if it doesn't exist cursor.execute('ALTER TABLE image_selections ADD COLUMN orientation TEXT') print("Added orientation column to existing table") conn.commit() conn.close() print(f"Database initialized at {DB_PATH}") # Add a selection to the database def add_selection(image_path, resolution, action): # Determine if image is portrait or landscape orientation = "unknown" try: from PIL import Image full_path = os.path.join(IMAGE_DIR, image_path.replace('/images/', '')) with Image.open(full_path) as img: width, height = img.size orientation = "portrait" if height > width else "landscape" if width > height else "square" except Exception as e: print(f"DEBUG ERROR determining image orientation: {str(e)}") conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Insert the selection cursor.execute(''' INSERT INTO image_selections (image_path, resolution, action, timestamp, orientation) VALUES (?, ?, ?, ?, ?) ''', (image_path, resolution, action, int(time.time()), orientation)) conn.commit() conn.close() # Get all selections from the database def get_selections(): print("DEBUG: get_selections() called") try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row # This enables column access by name cursor = conn.cursor() cursor.execute(''' SELECT * FROM image_selections ORDER BY timestamp DESC ''') rows = cursor.fetchall() print(f"DEBUG: Fetched {len(rows)} rows from database") # Properly convert SQLite Row objects to dictionaries results = [] for row in rows: item = {} for key in row.keys(): item[key] = row[key] # Ensure orientation exists if 'orientation' not in item or not item['orientation']: try: # Try to determine orientation if not in database from PIL import Image image_path = item['image_path'] if image_path.startswith('/images/'): image_path = image_path[8:] full_path = os.path.join(IMAGE_DIR, image_path) with Image.open(full_path) as img: width, height = img.size item['orientation'] = "portrait" if height > width else "landscape" if width > height else "square" except Exception as e: print(f"DEBUG ERROR determining missing orientation: {str(e)}") item['orientation'] = "unknown" results.append(item) print(f"DEBUG: Converted {len(results)} rows to dictionaries") print(f"DEBUG: First result (if any): {results[0] if results else 'None'}") conn.close() return results except Exception as e: print(f"DEBUG ERROR in get_selections(): {str(e)}") # Return empty list on error to prevent client from hanging return [] # Get a list of all image paths that have already been actioned def get_actioned_images(): try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT DISTINCT image_path FROM image_selections ''') rows = cursor.fetchall() actioned_images = [row[0] for row in rows] conn.close() return actioned_images except Exception as e: print(f"DEBUG ERROR in get_actioned_images(): {str(e)}") return [] # Update a selection in the database def update_selection(selection_id, action): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Update the selection cursor.execute(''' UPDATE image_selections SET action = ?, timestamp = ? WHERE id = ? ''', (action, int(time.time()), selection_id)) # Check if a row was affected rows_affected = cursor.rowcount conn.commit() conn.close() return rows_affected > 0 # Delete a selection from the database def delete_selection(selection_id): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Delete the selection cursor.execute(''' DELETE FROM image_selections WHERE id = ? ''', (selection_id,)) # Check if a row was affected rows_affected = cursor.rowcount conn.commit() conn.close() return rows_affected > 0 # Reset the database by deleting all selections def reset_database(): print("DEBUG: Resetting database - deleting all selections") conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Delete all selections cursor.execute(''' DELETE FROM image_selections ''') # Get the number of rows affected rows_affected = cursor.rowcount conn.commit() conn.close() print(f"DEBUG: Reset database - deleted {rows_affected} selections") return rows_affected class ImageSwipeHandler(BaseHTTPRequestHandler): # Set response headers for CORS def _set_cors_headers(self): self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') def do_GET(self): # Parse the URL parsed_url = urllib.parse.urlparse(self.path) path = parsed_url.path # Handle different paths if path == '/': self.serve_file('index.html', 'text/html') elif path == '/history': self.serve_file('history.html', 'text/html') elif path == '/styles.css': self.serve_file('styles.css', 'text/css') elif path == '/script.js': self.serve_file('script.js', 'application/javascript') elif path == '/random-image': self.serve_random_image() elif path == '/selections': self.serve_selections() elif path.startswith('/images/'): self.serve_image(path[8:]) elif path.startswith('/download-selected'): self.handle_download_selected() else: # Try to serve as a static file if path.startswith('/'): path = path[1:] # Remove leading slash try: self.serve_file(path) except: self.send_error(404, "File not found") def serve_file(self, file_path, content_type=None): try: with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path), 'rb') as file: content = file.read() self.send_response(200) # Set the content type based on parameter or guess from file extension if not content_type: content_type, _ = mimetypes.guess_type(file_path) if content_type: self.send_header('Content-type', content_type) else: self.send_header('Content-type', 'application/octet-stream') self._set_cors_headers() self.send_header('Content-length', len(content)) self.end_headers() self.wfile.write(content) except FileNotFoundError: self.send_error(404, f"File not found: {file_path}") def serve_image(self, image_path): try: # Decode URL-encoded path image_path = urllib.parse.unquote(image_path) full_path = os.path.join(IMAGE_DIR, image_path) with open(full_path, 'rb') as file: content = file.read() self.send_response(200) # Set the content type based on file extension content_type, _ = mimetypes.guess_type(full_path) if content_type: self.send_header('Content-type', content_type) else: self.send_header('Content-type', 'application/octet-stream') self._set_cors_headers() self.send_header('Content-length', len(content)) self.end_headers() self.wfile.write(content) except FileNotFoundError: self.send_error(404, f"Image not found: {image_path}") def serve_random_image(self): print("DEBUG: serve_random_image() called") try: # Get list of already actioned images actioned_images = get_actioned_images() print(f"DEBUG: Found {len(actioned_images)} already actioned images") # Get all resolution directories resolutions = [d for d in os.listdir(IMAGE_DIR) if os.path.isdir(os.path.join(IMAGE_DIR, d))] # Try to find an unactioned image max_attempts = 20 # Limit the number of attempts to find an unactioned image for attempt in range(max_attempts): # Choose a random resolution resolution = random.choice(resolutions) resolution_dir = os.path.join(IMAGE_DIR, resolution) # Get all images in the selected resolution directory images = [f for f in os.listdir(resolution_dir) if f.endswith(('.png', '.jpg', '.jpeg'))] if not images: continue # Try another resolution if this one has no images # Filter out already actioned images unactioned_images = [img for img in images if f"{resolution}/{img}" not in actioned_images] # If we have unactioned images, choose one randomly if unactioned_images: image_name = random.choice(unactioned_images) print(f"DEBUG: Found unactioned image: {resolution}/{image_name}") break elif attempt == max_attempts - 1: # If we've tried max_attempts times and still haven't found an unactioned image, # just choose any image image_name = random.choice(images) print(f"DEBUG: No unactioned images found after {max_attempts} attempts, using: {resolution}/{image_name}") else: # This will only execute if the for loop completes without a break # Choose any random image as fallback resolution = random.choice(resolutions) resolution_dir = os.path.join(IMAGE_DIR, resolution) images = [f for f in os.listdir(resolution_dir) if f.endswith(('.png', '.jpg', '.jpeg'))] if not images: self.send_error(404, "No images found in any resolution directory") return image_name = random.choice(images) print(f"DEBUG: Using fallback random image: {resolution}/{image_name}") image_path = f"{resolution}/{image_name}" full_image_path = os.path.join(IMAGE_DIR, image_path) # Get the file creation time try: file_stat = os.stat(full_image_path) creation_time = file_stat.st_mtime # Use modification time as creation time creation_date = datetime.datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S') except Exception as e: print(f"DEBUG ERROR getting file creation time: {str(e)}") creation_date = "Unknown" # Determine if image is portrait or landscape try: from PIL import Image with Image.open(full_image_path) as img: width, height = img.size orientation = "portrait" if height > width else "landscape" if width > height else "square" except Exception as e: print(f"DEBUG ERROR determining image orientation: {str(e)}") orientation = "unknown" # Return the image path as JSON response = { 'path': f"/images/{image_path}", 'resolution': resolution, 'filename': image_name, 'creation_date': creation_date, 'orientation': orientation } self.send_response(200) self.send_header('Content-type', 'application/json') self._set_cors_headers() self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: self.send_error(500, f"Error serving random image: {str(e)}") def serve_resolutions(self): try: # Get all resolution directories resolutions = [d for d in os.listdir(IMAGE_DIR) if os.path.isdir(os.path.join(IMAGE_DIR, d))] # Return the resolutions as JSON response = { 'resolutions': resolutions } self.send_response(200) self.send_header('Content-type', 'application/json') self._set_cors_headers() self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: self.send_error(500, f"Error serving resolutions: {str(e)}") def serve_selections(self): print("DEBUG: serve_selections() called") try: # Get all selections from the database selections = get_selections() # Return the selections as JSON response = { 'selections': selections } # Debug the response before sending print(f"DEBUG: Response has {len(selections)} selections") # Try to serialize to JSON to catch any serialization errors try: response_json = json.dumps(response) print(f"DEBUG: JSON serialization successful, length: {len(response_json)}") except Exception as json_err: print(f"DEBUG ERROR in JSON serialization: {str(json_err)}") # If there's an error in serialization, send a simpler response response = {'selections': [], 'error': 'JSON serialization error'} response_json = json.dumps(response) self.send_response(200) self.send_header('Content-type', 'application/json') self._set_cors_headers() self.end_headers() self.wfile.write(response_json.encode()) print("DEBUG: Response sent successfully") except Exception as e: print(f"DEBUG ERROR in serve_selections(): {str(e)}") self.send_error(500, f"Error serving selections: {str(e)}") def do_POST(self): # Parse the URL path parsed_path = urllib.parse.urlparse(self.path) path = parsed_path.path if path == "/record-selection": self.handle_record_selection() elif path == "/update-selection": self.handle_update_selection() elif path == "/delete-selection": self.handle_delete_selection() elif self.path == '/reset-database': self.handle_reset_database() return elif self.path.startswith('/download-selected'): self.handle_download_selected() return else: self.send_error(404, "Not found") def handle_record_selection(self): try: # Get the content length content_length = int(self.headers['Content-Length']) # Read the request body post_data = self.rfile.read(content_length).decode('utf-8') data = json.loads(post_data) # Extract the required fields image_path = data.get('path', '').replace('/images/', '') resolution = data.get('resolution', '') action = data.get('action', '') # Validate the data if not image_path or not resolution or not action: self.send_error(400, "Missing required fields") return # Add the selection to the database add_selection(image_path, resolution, action) # Return success response response = { 'success': True, 'message': f"Selection recorded: {action} for {image_path}" } self.send_response(200) self.send_header('Content-type', 'application/json') self._set_cors_headers() self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: self.send_error(500, f"Error recording selection: {str(e)}") def handle_update_selection(self): try: # Get the content length content_length = int(self.headers['Content-Length']) # Read the request body post_data = self.rfile.read(content_length).decode('utf-8') data = json.loads(post_data) # Extract the required fields selection_id = data.get('id') action = data.get('action', '') # Validate the data if not selection_id or not action: self.send_error(400, "Missing required fields") return # Update the selection in the database success = update_selection(selection_id, action) if not success: self.send_error(404, f"Selection with ID {selection_id} not found") return # Return success response response = { 'success': True, 'message': f"Selection updated: ID {selection_id} to {action}" } self.send_response(200) self.send_header('Content-type', 'application/json') self._set_cors_headers() self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: self.send_error(500, f"Error updating selection: {str(e)}") def handle_delete_selection(self): try: # Get the content length content_length = int(self.headers['Content-Length']) # Read the request body post_data = self.rfile.read(content_length).decode('utf-8') data = json.loads(post_data) # Extract the required fields selection_id = data.get('id') # Validate the data if not selection_id: self.send_error(400, "Missing selection ID") return # Delete the selection from the database success = delete_selection(selection_id) if not success: self.send_error(404, f"Selection with ID {selection_id} not found") return # Return success response response = { 'success': True, 'message': f"Selection deleted: ID {selection_id}" } self.send_response(200) self.send_header('Content-type', 'application/json') self._set_cors_headers() self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: print(f"DEBUG ERROR in handle_delete_selection(): {str(e)}") self.send_error(500, f"Error deleting selection: {str(e)}") def handle_reset_database(self): try: reset_database() self.send_response(200) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps({'success': True, 'message': 'Database reset successfully'}).encode()) except Exception as e: print(f"DEBUG ERROR in handle_reset_database: {str(e)}") self.send_response(500) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps({'success': False, 'message': f'Error: {str(e)}'}).encode()) def handle_download_selected(self): try: # Parse the query parameters to get the selected image paths query_components = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query) image_paths = query_components.get('paths', []) if not image_paths: self.send_response(400) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps({'success': False, 'message': 'No image paths provided'}).encode()) return # Create a zip file in memory zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file: for path in image_paths: # Remove the /images/ prefix if path.startswith('/images/'): path = path[8:] full_path = os.path.join(IMAGE_DIR, path) if os.path.exists(full_path): # Add the file to the zip with just the filename (no directory structure) filename = os.path.basename(path) zip_file.write(full_path, filename) # Seek to the beginning of the buffer zip_buffer.seek(0) # Send the zip file as a response self.send_response(200) self.send_header('Content-type', 'application/zip') self.send_header('Content-Disposition', 'attachment; filename="selected_images.zip"') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(zip_buffer.getvalue()) except Exception as e: print(f"DEBUG ERROR in handle_download_selected: {str(e)}") self.send_response(500) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps({'success': False, 'message': f'Error: {str(e)}'}).encode()) def do_OPTIONS(self): # Handle preflight requests for CORS self.send_response(200) self._set_cors_headers() self.end_headers() def run(server_class=HTTPServer, handler_class=ImageSwipeHandler, port=8000): # Initialize the database init_db() server_address = ('', port) httpd = server_class(server_address, handler_class) print(f"Starting server on port {port}...") print(f"Image directory: {IMAGE_DIR}") print(f"Database: {DB_PATH}") httpd.serve_forever() if __name__ == "__main__": run()