Files
swiper/app.py

485 lines
17 KiB
Python

from http.server import HTTPServer, BaseHTTPRequestHandler
import os
import json
import random
import mimetypes
import urllib.parse
import sqlite3
import time
import datetime
# Path to the image directory
IMAGE_DIR = "/mnt/secret-items/sd-outputs/Sorted/Images/Portrait"
# Database file path
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "image_selections.db")
# Initialize database
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS image_selections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
image_path TEXT NOT NULL,
resolution TEXT NOT NULL,
action TEXT NOT NULL,
timestamp INTEGER NOT NULL
)
''')
conn.commit()
conn.close()
print(f"Database initialized at {DB_PATH}")
# Add a selection to the database
def add_selection(image_path, resolution, action):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Insert the selection
cursor.execute('''
INSERT INTO image_selections (image_path, resolution, action, timestamp)
VALUES (?, ?, ?, ?)
''', (image_path, resolution, action, int(time.time())))
conn.commit()
conn.close()
# Get all selections from the database
def get_selections():
print("DEBUG: get_selections() called")
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row # This enables column access by name
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM image_selections ORDER BY timestamp DESC
''')
rows = cursor.fetchall()
print(f"DEBUG: Fetched {len(rows)} rows from database")
# Properly convert SQLite Row objects to dictionaries
results = []
for row in rows:
item = {}
for key in row.keys():
item[key] = row[key]
results.append(item)
print(f"DEBUG: Converted {len(results)} rows to dictionaries")
print(f"DEBUG: First result (if any): {results[0] if results else 'None'}")
conn.close()
return results
except Exception as e:
print(f"DEBUG ERROR in get_selections(): {str(e)}")
# Return empty list on error to prevent client from hanging
return []
# Update a selection in the database
def update_selection(selection_id, action):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Update the selection
cursor.execute('''
UPDATE image_selections SET action = ?, timestamp = ? WHERE id = ?
''', (action, int(time.time()), selection_id))
# Check if a row was affected
rows_affected = cursor.rowcount
conn.commit()
conn.close()
return rows_affected > 0
# Delete a selection from the database
def delete_selection(selection_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Delete the selection
cursor.execute('''
DELETE FROM image_selections WHERE id = ?
''', (selection_id,))
# Check if a row was affected
rows_affected = cursor.rowcount
conn.commit()
conn.close()
return rows_affected > 0
# Reset the database by deleting all selections
def reset_database():
print("DEBUG: Resetting database - deleting all selections")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Delete all selections
cursor.execute('''
DELETE FROM image_selections
''')
# Get the number of rows affected
rows_affected = cursor.rowcount
conn.commit()
conn.close()
print(f"DEBUG: Reset database - deleted {rows_affected} selections")
return rows_affected
class ImageSwipeHandler(BaseHTTPRequestHandler):
# Set response headers for CORS
def _set_cors_headers(self):
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
def do_GET(self):
# Parse the URL path
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
# Serve static files
if path == "/" or path == "":
self.serve_file("index.html")
elif path == "/random-image":
self.serve_random_image()
elif path == "/image-resolutions":
self.serve_resolutions()
elif path == "/selections":
self.serve_selections()
elif path.startswith("/images/"):
# Extract the image path from the URL
image_path = path[8:] # Remove "/images/" prefix
self.serve_image(image_path)
else:
# Serve other static files
if path.startswith("/"):
path = path[1:] # Remove leading slash
self.serve_file(path)
def serve_file(self, file_path):
try:
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path), 'rb') as file:
content = file.read()
self.send_response(200)
# Set the content type based on file extension
content_type, _ = mimetypes.guess_type(file_path)
if content_type:
self.send_header('Content-type', content_type)
else:
self.send_header('Content-type', 'application/octet-stream')
self._set_cors_headers()
self.send_header('Content-length', len(content))
self.end_headers()
self.wfile.write(content)
except FileNotFoundError:
self.send_error(404, f"File not found: {file_path}")
def serve_image(self, image_path):
try:
# Decode URL-encoded path
image_path = urllib.parse.unquote(image_path)
full_path = os.path.join(IMAGE_DIR, image_path)
with open(full_path, 'rb') as file:
content = file.read()
self.send_response(200)
# Set the content type based on file extension
content_type, _ = mimetypes.guess_type(full_path)
if content_type:
self.send_header('Content-type', content_type)
else:
self.send_header('Content-type', 'application/octet-stream')
self._set_cors_headers()
self.send_header('Content-length', len(content))
self.end_headers()
self.wfile.write(content)
except FileNotFoundError:
self.send_error(404, f"Image not found: {image_path}")
def serve_random_image(self):
try:
# Get all resolution directories
resolutions = [d for d in os.listdir(IMAGE_DIR) if os.path.isdir(os.path.join(IMAGE_DIR, d))]
# Choose a random resolution
resolution = random.choice(resolutions)
resolution_dir = os.path.join(IMAGE_DIR, resolution)
# Get all images in the selected resolution directory
images = [f for f in os.listdir(resolution_dir) if f.endswith(('.png', '.jpg', '.jpeg'))]
if not images:
self.send_error(404, "No images found in the selected resolution directory")
return
# Choose a random image
image_name = random.choice(images)
image_path = f"{resolution}/{image_name}"
full_image_path = os.path.join(IMAGE_DIR, image_path)
# Get the file creation time
try:
file_stat = os.stat(full_image_path)
creation_time = file_stat.st_mtime # Use modification time as creation time
creation_date = datetime.datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
print(f"DEBUG ERROR getting file creation time: {str(e)}")
creation_date = "Unknown"
# Return the image path as JSON
response = {
'path': f"/images/{image_path}",
'resolution': resolution,
'filename': image_name,
'creation_date': creation_date
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
self.send_error(500, f"Error serving random image: {str(e)}")
def serve_resolutions(self):
try:
# Get all resolution directories
resolutions = [d for d in os.listdir(IMAGE_DIR) if os.path.isdir(os.path.join(IMAGE_DIR, d))]
# Return the resolutions as JSON
response = {
'resolutions': resolutions
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
self.send_error(500, f"Error serving resolutions: {str(e)}")
def serve_selections(self):
print("DEBUG: serve_selections() called")
try:
# Get all selections from the database
selections = get_selections()
# Return the selections as JSON
response = {
'selections': selections
}
# Debug the response before sending
print(f"DEBUG: Response has {len(selections)} selections")
# Try to serialize to JSON to catch any serialization errors
try:
response_json = json.dumps(response)
print(f"DEBUG: JSON serialization successful, length: {len(response_json)}")
except Exception as json_err:
print(f"DEBUG ERROR in JSON serialization: {str(json_err)}")
# If there's an error in serialization, send a simpler response
response = {'selections': [], 'error': 'JSON serialization error'}
response_json = json.dumps(response)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(response_json.encode())
print("DEBUG: Response sent successfully")
except Exception as e:
print(f"DEBUG ERROR in serve_selections(): {str(e)}")
self.send_error(500, f"Error serving selections: {str(e)}")
def do_POST(self):
# Parse the URL path
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
if path == "/record-selection":
self.handle_record_selection()
elif path == "/update-selection":
self.handle_update_selection()
elif path == "/delete-selection":
self.handle_delete_selection()
elif path == "/reset-database":
self.handle_reset_database()
else:
self.send_error(404, "Not found")
def handle_record_selection(self):
try:
# Get the content length
content_length = int(self.headers['Content-Length'])
# Read the request body
post_data = self.rfile.read(content_length).decode('utf-8')
data = json.loads(post_data)
# Extract the required fields
image_path = data.get('path', '').replace('/images/', '')
resolution = data.get('resolution', '')
action = data.get('action', '')
# Validate the data
if not image_path or not resolution or not action:
self.send_error(400, "Missing required fields")
return
# Add the selection to the database
add_selection(image_path, resolution, action)
# Return success response
response = {
'success': True,
'message': f"Selection recorded: {action} for {image_path}"
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
self.send_error(500, f"Error recording selection: {str(e)}")
def handle_update_selection(self):
try:
# Get the content length
content_length = int(self.headers['Content-Length'])
# Read the request body
post_data = self.rfile.read(content_length).decode('utf-8')
data = json.loads(post_data)
# Extract the required fields
selection_id = data.get('id')
action = data.get('action', '')
# Validate the data
if not selection_id or not action:
self.send_error(400, "Missing required fields")
return
# Update the selection in the database
success = update_selection(selection_id, action)
if not success:
self.send_error(404, f"Selection with ID {selection_id} not found")
return
# Return success response
response = {
'success': True,
'message': f"Selection updated: ID {selection_id} to {action}"
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
self.send_error(500, f"Error updating selection: {str(e)}")
def handle_delete_selection(self):
try:
# Get the content length
content_length = int(self.headers['Content-Length'])
# Read the request body
post_data = self.rfile.read(content_length).decode('utf-8')
data = json.loads(post_data)
# Extract the required fields
selection_id = data.get('id')
# Validate the data
if not selection_id:
self.send_error(400, "Missing selection ID")
return
# Delete the selection from the database
success = delete_selection(selection_id)
if not success:
self.send_error(404, f"Selection with ID {selection_id} not found")
return
# Return success response
response = {
'success': True,
'message': f"Selection deleted: ID {selection_id}"
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
print(f"DEBUG ERROR in handle_delete_selection(): {str(e)}")
self.send_error(500, f"Error deleting selection: {str(e)}")
def handle_reset_database(self):
try:
print("DEBUG: Handling reset database request")
# Reset the database
rows_affected = reset_database()
# Return success response
response = {
'success': True,
'message': f"Database reset: {rows_affected} selections deleted"
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self._set_cors_headers()
self.end_headers()
self.wfile.write(json.dumps(response).encode())
print("DEBUG: Reset database response sent")
except Exception as e:
print(f"DEBUG ERROR in handle_reset_database(): {str(e)}")
self.send_error(500, f"Error resetting database: {str(e)}")
def do_OPTIONS(self):
# Handle preflight requests for CORS
self.send_response(200)
self._set_cors_headers()
self.end_headers()
def run(server_class=HTTPServer, handler_class=ImageSwipeHandler, port=8000):
# Initialize the database
init_db()
server_address = ('', port)
httpd = server_class(server_address, handler_class)
print(f"Starting server on port {port}...")
print(f"Image directory: {IMAGE_DIR}")
print(f"Database: {DB_PATH}")
httpd.serve_forever()
if __name__ == "__main__":
run()