From 08c6e1461618163236da453ed8f0b72b1821c54e Mon Sep 17 00:00:00 2001 From: Aodhan Collins Date: Mon, 2 Mar 2026 23:29:58 +0000 Subject: [PATCH] Initial commit --- Dockerfile | 97 ++++++++ Dockerfile.prebuilt | 36 +++ README.md | 210 +++++++++++++++++ docker-compose.yml | 42 ++++ docs/user-guide.md | 423 +++++++++++++++++++++++++++++++++ plans/danbooru-mcp-plan.md | 115 +++++++++ pyproject.toml | 19 ++ requirements.txt | 2 + scripts/import_tags.py | 152 ++++++++++++ scripts/scrape_tags.py | 359 ++++++++++++++++++++++++++++ scripts/test_danbooru_api.py | 221 +++++++++++++++++ src/server.py | 445 +++++++++++++++++++++++++++++++++++ 12 files changed, 2121 insertions(+) create mode 100644 Dockerfile create mode 100644 Dockerfile.prebuilt create mode 100644 README.md create mode 100644 docker-compose.yml create mode 100644 docs/user-guide.md create mode 100644 plans/danbooru-mcp-plan.md create mode 100644 pyproject.toml create mode 100644 requirements.txt create mode 100644 scripts/import_tags.py create mode 100644 scripts/scrape_tags.py create mode 100644 scripts/test_danbooru_api.py create mode 100644 src/server.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b3ddbfb --- /dev/null +++ b/Dockerfile @@ -0,0 +1,97 @@ +# syntax=docker/dockerfile:1 + +# --------------------------------------------------------------------------- +# danbooru-mcp — Dockerfile +# +# Stages: +# base – Python 3.12 slim + system deps +# scraper – runs scripts/scrape_tags.py to build db/tags.db +# (only executes if no pre-built DB is bind-mounted) +# runtime – minimal image that runs src/server.py (MCP stdio server) +# +# Usage: +# Build (includes fresh DB scrape — takes ~15 min first time): +# docker build -t danbooru-mcp . +# +# Run (MCP clients invoke via stdio): +# docker run --rm -i danbooru-mcp +# +# Or use docker-compose (recommended — mounts a persistent DB volume): +# docker compose up +# --------------------------------------------------------------------------- + +ARG PYTHON_VERSION=3.12 + +# --------------------------------------------------------------------------- +# Stage 1: base — shared deps +# --------------------------------------------------------------------------- +FROM python:${PYTHON_VERSION}-slim AS base + +WORKDIR /app + +# System deps: sqlite3 is already in python:slim +# gcc needed for some wheels; curl for healthcheck +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY scripts/ scripts/ +COPY src/ src/ + +# --------------------------------------------------------------------------- +# Stage 2: scraper — build the SQLite database +# --------------------------------------------------------------------------- +FROM base AS scraper + +ARG DANBOORU_USER="" +ARG DANBOORU_API_KEY="" + +ENV DANBOORU_USER=${DANBOORU_USER} +ENV DANBOORU_API_KEY=${DANBOORU_API_KEY} + +# Run the scraper — will create db/tags.db with all tags ≥10 posts +RUN mkdir -p db && \ + python scripts/scrape_tags.py --no-fts && \ + echo "Scrape complete. Rebuilding FTS5 index…" && \ + python - <<'EOF' +import sqlite3, pathlib +conn = sqlite3.connect("db/tags.db") +conn.execute("INSERT INTO tags_fts(tags_fts) VALUES('rebuild')") +conn.commit() +conn.close() +print("FTS5 index built.") +EOF + +# --------------------------------------------------------------------------- +# Stage 3: runtime — lean image with pre-built DB +# --------------------------------------------------------------------------- +FROM python:${PYTHON_VERSION}-slim AS runtime + +WORKDIR /app + +# Install only runtime deps (no gcc) +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy source +COPY src/ src/ + +# Copy the scraped database from the scraper stage +COPY --from=scraper /app/db/tags.db db/tags.db + +# The MCP server communicates via stdio — no ports needed +ENV DANBOORU_TAGS_DB=/app/db/tags.db + +# Non-root user for security +RUN useradd --no-create-home --shell /bin/false mcpuser && \ + chown -R mcpuser:mcpuser /app +USER mcpuser + +# Health check: verify the DB is readable +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ + CMD python -c "import sqlite3; c=sqlite3.connect('/app/db/tags.db'); print(c.execute('SELECT COUNT(*) FROM tags').fetchone()[0]); c.close()" + +ENTRYPOINT ["python", "src/server.py"] diff --git a/Dockerfile.prebuilt b/Dockerfile.prebuilt new file mode 100644 index 0000000..10b2da4 --- /dev/null +++ b/Dockerfile.prebuilt @@ -0,0 +1,36 @@ +# syntax=docker/dockerfile:1 +# --------------------------------------------------------------------------- +# danbooru-mcp — Fast Dockerfile (uses a pre-built db/tags.db) +# +# Use this when you already have db/tags.db built locally via: +# python scripts/scrape_tags.py +# +# Build: +# docker build -f Dockerfile.prebuilt -t danbooru-mcp . +# +# Run (MCP stdio): +# docker run --rm -i danbooru-mcp +# --------------------------------------------------------------------------- + +ARG PYTHON_VERSION=3.12 + +FROM python:${PYTHON_VERSION}-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/ src/ +COPY db/tags.db db/tags.db + +ENV DANBOORU_TAGS_DB=/app/db/tags.db + +RUN useradd --no-create-home --shell /bin/false mcpuser && \ + chown -R mcpuser:mcpuser /app +USER mcpuser + +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ + CMD python -c "import sqlite3; c=sqlite3.connect('/app/db/tags.db'); print(c.execute('SELECT COUNT(*) FROM tags').fetchone()[0]); c.close()" + +ENTRYPOINT ["python", "src/server.py"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..a501eef --- /dev/null +++ b/README.md @@ -0,0 +1,210 @@ +# danbooru-mcp + +An MCP (Model Context Protocol) server that lets an LLM search, validate, and get suggestions for valid **Danbooru tags** — the prompt vocabulary used by Illustrious and other Danbooru-trained Stable Diffusion models. + +📖 **[Full User Guide](docs/user-guide.md)** — workflow walkthrough, tool reference, best practices, and common scenarios. + +Tags are scraped directly from the **Danbooru public API** and stored in a local SQLite database with an **FTS5 full-text search index** for fast prefix/substring queries. Each tag includes its post count, category, and deprecation status so the LLM can prioritise well-used, canonical tags. + +--- + +## Tools + +| Tool | Description | +|------|-------------| +| `search_tags(query, limit=20, category=None)` | Prefix/full-text search — returns rich tag objects ordered by relevance | +| `validate_tags(tags)` | Exact-match validation — splits into `valid`, `deprecated`, `invalid` | +| `suggest_tags(partial, limit=10, category=None)` | Autocomplete for partial tag strings, sorted by post count | + +### Return object shape + +All tools return tag objects with: + +```json +{ + "name": "blue_hair", + "post_count": 1079908, + "category": "general", + "is_deprecated": false +} +``` + +### Category filter values + +`"general"` · `"artist"` · `"copyright"` · `"character"` · `"meta"` + +--- + +## Setup + +### 1. Install dependencies + +```bash +pip install -e . +``` + +### 2. Build the SQLite database (scrapes the Danbooru API) + +```bash +python scripts/scrape_tags.py +``` + +This scrapes ~1–2 million tags from the Danbooru public API (no account required) +and stores them in `db/tags.db` with a FTS5 index. +Estimated time: **5–15 minutes** depending on network speed. + +``` +Options: + --db PATH Output database path (default: db/tags.db) + --workers N Parallel HTTP workers (default: 4) + --max-page N Safety cap on pages (default: 2500) + --no-resume Re-scrape all pages from scratch + --no-fts Skip FTS5 rebuild (for incremental runs) +``` + +The scraper is **resumable** — if interrupted, re-run it and it will +continue from where it left off. + +### 3. (Optional) Test API access first + +```bash +python scripts/test_danbooru_api.py +``` + +### 4. Run the MCP server + +```bash +python src/server.py +``` + +--- + +## Docker + +### Quick start (pre-built DB — recommended) + +Use this when you've already run `python scripts/scrape_tags.py` and have `db/tags.db`: + +```bash +# Build image with the pre-built DB baked in (~30 seconds) +docker build -f Dockerfile.prebuilt -t danbooru-mcp . + +# Verify +docker run --rm --entrypoint python danbooru-mcp \ + -c "import sqlite3,sys; c=sqlite3.connect('/app/db/tags.db'); sys.stderr.write(str(c.execute('SELECT COUNT(*) FROM tags').fetchone()[0]) + ' tags\n')" +``` + +### Build from scratch (runs the scraper during Docker build) + +```bash +# Scrapes the Danbooru API during build — takes ~15 minutes +docker build \ + --build-arg DANBOORU_USER=your_username \ + --build-arg DANBOORU_API_KEY=your_api_key \ + -t danbooru-mcp . +``` + +### MCP client config (Docker) + +```json +{ + "mcpServers": { + "danbooru-tags": { + "command": "docker", + "args": ["run", "--rm", "-i", "danbooru-mcp:latest"] + } + } +} +``` + +--- + +## MCP Client Configuration + +### Claude Desktop (`claude_desktop_config.json`) + +```json +{ + "mcpServers": { + "danbooru-tags": { + "command": "python", + "args": ["/absolute/path/to/danbooru-mcp/src/server.py"] + } + } +} +``` + +### Custom DB path via environment variable + +```json +{ + "mcpServers": { + "danbooru-tags": { + "command": "python", + "args": ["/path/to/src/server.py"], + "env": { + "DANBOORU_TAGS_DB": "/custom/path/to/tags.db" + } + } + } +} +``` + +--- + +## Example LLM Prompt Workflow + +``` +User: Generate a prompt for a girl with blue hair and a sword. + +LLM calls validate_tags(["1girl", "blue_hairs", "sword", "looking_at_vewer"]) +→ { + "valid": ["1girl", "sword"], + "deprecated": [], + "invalid": ["blue_hairs", "looking_at_vewer"] + } + +LLM calls suggest_tags("blue_hair", limit=3) +→ [ + {"name": "blue_hair", "post_count": 1079908, "category": "general"}, + {"name": "blue_hairband", "post_count": 26905, "category": "general"}, + ... + ] + +LLM calls suggest_tags("looking_at_viewer", limit=1) +→ [{"name": "looking_at_viewer", "post_count": 4567890, "category": "general"}] + +Final validated prompt: 1girl, blue_hair, sword, looking_at_viewer +``` + +--- + +## Project Structure + +``` +danbooru-mcp/ +├── data/ +│ └── all_tags.csv # original CSV export (legacy, replaced by API scrape) +├── db/ +│ └── tags.db # SQLite DB (generated, gitignored) +├── plans/ +│ └── danbooru-mcp-plan.md # Architecture plan +├── scripts/ +│ ├── scrape_tags.py # API scraper → SQLite (primary) +│ ├── import_tags.py # Legacy CSV importer +│ └── test_danbooru_api.py # API connectivity tests +├── src/ +│ └── server.py # MCP server +├── pyproject.toml +├── .gitignore +└── README.md +``` + +--- + +## Requirements + +- Python 3.10+ +- `mcp[cli]` — official Python MCP SDK +- `requests` — HTTP client for API scraping +- `sqlite3` — Python stdlib (no install needed) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..9ae091d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,42 @@ +services: + danbooru-mcp: + build: + context: . + dockerfile: Dockerfile.prebuilt + image: danbooru-mcp:latest + container_name: danbooru-mcp + stdin_open: true # required — MCP communicates over stdio + tty: false + restart: unless-stopped + + environment: + DANBOORU_TAGS_DB: /app/db/tags.db + # Log level: DEBUG | INFO | WARNING | ERROR | CRITICAL (default: INFO) + LOG_LEVEL: ${LOG_LEVEL:-INFO} + # Log format: "text" (human-readable) | "json" (structured, for log aggregators) + LOG_FORMAT: ${LOG_FORMAT:-text} + + volumes: + # Persistent DB volume — survives image rebuilds. + # On first use, seed with: + # docker run --rm \ + # -v danbooru_db:/target -v $(pwd)/db:/source \ + # alpine sh -c "cp /source/tags.db /target/" + - danbooru_db:/app/db + + # Docker logging driver — captures all stderr output from the container. + # Use "json-file" (default) for local development; switch to "journald" + # or a remote driver (fluentd, awslogs, etc.) for production. + logging: + driver: json-file + options: + max-size: "50m" # rotate when log file reaches 50MB + max-file: "5" # keep last 5 rotated files + + read_only: true + tmpfs: + - /tmp + +volumes: + danbooru_db: + driver: local diff --git a/docs/user-guide.md b/docs/user-guide.md new file mode 100644 index 0000000..0bbc034 --- /dev/null +++ b/docs/user-guide.md @@ -0,0 +1,423 @@ +# Danbooru MCP Tag Validator — User Guide + +This guide explains how to integrate and use the `danbooru-mcp` server with an LLM to generate valid, high-quality prompts for Illustrious / Stable Diffusion models trained on Danbooru data. + +--- + +## Table of Contents + +1. [What is this?](#what-is-this) +2. [Quick Start](#quick-start) +3. [Tool Reference](#tool-reference) + - [search_tags](#search_tags) + - [validate_tags](#validate_tags) + - [suggest_tags](#suggest_tags) +4. [Prompt Engineering Workflow](#prompt-engineering-workflow) +5. [Category Reference](#category-reference) +6. [Best Practices](#best-practices) +7. [Common Scenarios](#common-scenarios) +8. [Troubleshooting](#troubleshooting) + +--- + +## What is this? + +Illustrious (and similar Danbooru-trained Stable Diffusion models) uses **Danbooru tags** as its prompt language. +Tags like `1girl`, `blue_hair`, `looking_at_viewer` are meaningful because the model was trained on images annotated with them. + +The problem: there are hundreds of thousands of valid Danbooru tags, and misspelling or inventing tags produces no useful signal — the model generates less accurate images. + +**This MCP server** lets an LLM: +- **Search** the full tag database for tag discovery +- **Validate** a proposed prompt's tags against the real Danbooru database +- **Suggest** corrections for typos or near-miss tags + +The database contains **292,500 tags**, all with ≥10 posts on Danbooru — filtering out one-off or misspelled entries. + +--- + +## Quick Start + +### 1. Add to your MCP client (Claude Desktop example) + +**Using Docker (recommended):** +```json +{ + "mcpServers": { + "danbooru-tags": { + "command": "docker", + "args": ["run", "--rm", "-i", "danbooru-mcp:latest"] + } + } +} +``` + +**Using Python directly:** +```json +{ + "mcpServers": { + "danbooru-tags": { + "command": "/path/to/danbooru-mcp/.venv/bin/python", + "args": ["/path/to/danbooru-mcp/src/server.py"] + } + } +} +``` + +### 2. Instruct the LLM + +Add a system prompt telling the LLM to use the server: + +``` +You have access to the danbooru-tags MCP server for validating Stable Diffusion prompts. +Before generating any final prompt: +1. Use validate_tags to check all proposed tags are real Danbooru tags. +2. Use suggest_tags to fix any invalid tags. +3. Only output the validated, corrected tag list. +``` + +--- + +## Tool Reference + +### `search_tags` + +Find tags by name using full-text / prefix search. + +**Parameters:** + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `query` | `string` | *required* | Search string. Trailing `*` added automatically for prefix match. Supports FTS5 syntax. | +| `limit` | `integer` | `20` | Max results (1–200) | +| `category` | `string` | `null` | Optional filter: `"general"`, `"artist"`, `"copyright"`, `"character"`, `"meta"` | + +**Returns:** List of tag objects: +```json +[ + { + "name": "blue_hair", + "post_count": 1079925, + "category": "general", + "is_deprecated": false + } +] +``` + +**Examples:** + +``` +Search for hair colour tags: + search_tags("blue_hair") + → blue_hair, blue_hairband, blue_hair-chan_(ramchi), … + +Search only character tags for a Vocaloid: + search_tags("hatsune", category="character") + → hatsune_miku, hatsune_mikuo, hatsune_miku_(append), … + +Boolean search: + search_tags("hair AND blue") + → tags matching both "hair" and "blue" +``` + +**FTS5 query syntax:** + +| Syntax | Meaning | +|--------|---------| +| `blue_ha*` | prefix match (added automatically) | +| `"blue hair"` | phrase match | +| `hair AND blue` | both terms present | +| `hair NOT red` | exclusion | + +--- + +### `validate_tags` + +Check a list of tags against the full Danbooru database. Returns three groups: valid, deprecated, and invalid. + +**Parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `tags` | `list[string]` | Tags to validate, e.g. `["1girl", "blue_hair", "sword"]` | + +**Returns:** +```json +{ + "valid": ["1girl", "blue_hair", "sword"], + "deprecated": [], + "invalid": ["blue_hairs", "not_a_real_tag"] +} +``` + +| Key | Meaning | +|-----|---------| +| `valid` | Exists in Danbooru and is not deprecated — safe to use | +| `deprecated` | Exists but has been deprecated (an updated canonical tag exists) | +| `invalid` | Not found — likely misspelled, hallucinated, or too niche (<10 posts) | + +**Important:** Always run `validate_tags` before finalising a prompt. Invalid tags are silently ignored by the model but waste token budget and reduce prompt clarity. + +--- + +### `suggest_tags` + +Autocomplete-style suggestions for a partial or approximate tag. Results are sorted by post count (most commonly used first). Deprecated tags are **excluded**. + +**Parameters:** + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `partial` | `string` | *required* | Partial tag or rough approximation | +| `limit` | `integer` | `10` | Max suggestions (1–50) | +| `category` | `string` | `null` | Optional category filter | + +**Returns:** Same format as `search_tags`, sorted by `post_count` descending. + +**Examples:** + +``` +Fix a typo: + suggest_tags("looking_at_vewer") + → ["looking_at_viewer", …] + +Find the most popular sword-related tags: + suggest_tags("sword", limit=5, category="general") + → sword (337,737), sword_behind_back (7,203), … + +Find character tags for a partial name: + suggest_tags("miku", category="character") + → hatsune_miku (129,806), yuki_miku (4,754), … +``` + +--- + +## Prompt Engineering Workflow + +This is the recommended workflow for an LLM building Illustrious prompts: + +### Step 1 — Draft + +The LLM drafts an initial list of conceptual tags based on the user's description: + +``` +User: "A girl with long silver hair wearing a kimono in a Japanese garden" + +Draft tags: + 1girl, silver_hair, long_hair, kimono, japanese_garden, cherry_blossoms, + sitting, looking_at_viewer, outdoors, traditional_clothes +``` + +### Step 2 — Validate + +``` +validate_tags([ + "1girl", "silver_hair", "long_hair", "kimono", "japanese_garden", + "cherry_blossoms", "sitting", "looking_at_viewer", "outdoors", + "traditional_clothes" +]) +``` + +Response: +```json +{ + "valid": ["1girl", "long_hair", "kimono", "cherry_blossoms", "sitting", + "looking_at_viewer", "outdoors", "traditional_clothes"], + "deprecated": [], + "invalid": ["silver_hair", "japanese_garden"] +} +``` + +### Step 3 — Fix invalid tags + +``` +suggest_tags("silver_hair", limit=3) +→ [{"name": "white_hair", "post_count": 800000}, ...] + +suggest_tags("japanese_garden", limit=3) +→ [{"name": "garden", "post_count": 45000}, + {"name": "japanese_clothes", "post_count": 12000}, ...] +``` + +### Step 4 — Finalise + +``` +Final prompt: + 1girl, white_hair, long_hair, kimono, garden, cherry_blossoms, + sitting, looking_at_viewer, outdoors, traditional_clothes +``` + +All tags are validated. Prompt is ready to send to ComfyUI. + +--- + +## Category Reference + +Danbooru organises tags into five categories. Understanding them helps scope searches: + +| Category | Value | Description | Examples | +|----------|-------|-------------|---------| +| **general** | `0` | Descriptive tags for image content | `1girl`, `blue_hair`, `sword`, `outdoors` | +| **artist** | `1` | Artist/creator names | `wlop`, `natsuki_subaru` | +| **copyright** | `3` | Source material / franchise | `fate/stay_night`, `touhou`, `genshin_impact` | +| **character** | `4` | Specific character names | `hatsune_miku`, `hakurei_reimu` | +| **meta** | `5` | Image quality / format tags | `highres`, `absurdres`, `commentary` | + +**Tips:** +- For generating images, focus on **general** tags (colours, poses, clothing, expressions) +- Add **character** and **copyright** tags when depicting a specific character +- **meta** tags like `highres` and `best_quality` can improve output quality +- Avoid **artist** tags unless intentionally mimicking a specific art style + +--- + +## Best Practices + +### ✅ Always validate before generating + +```python +# Always run this before finalising +result = validate_tags(your_proposed_tags) +# Fix everything in result["invalid"] before sending to ComfyUI +``` + +### ✅ Use suggest_tags for discoverability + +Even for tags you think you know, run `suggest_tags` to find the canonical form: +- `standing` vs `standing_on_one_leg` vs `standing_split` +- `smile` vs `small_smile` vs `evil_smile` + +The tag with the highest `post_count` is almost always the right one for your intent. + +### ✅ Prefer high-post-count tags + +Higher post count = more training data = more consistent model response. + +```python +# Get the top 5 most established hair colour tags +suggest_tags("hair_color", limit=5, category="general") +``` + +### ✅ Layer specificity + +Good prompts move from general to specific: +``` +# General → Specific +1girl, # subject count +solo, # composition +long_hair, blue_hair, # hair +white_dress, off_shoulder, # clothing +smile, looking_at_viewer, # expression/pose +outdoors, garden, daytime, # setting +masterpiece, best_quality # quality +``` + +### ❌ Avoid deprecated tags + +If `validate_tags` reports a tag as `deprecated`, use `suggest_tags` to find the current replacement: + +```python +# If "nude" is deprecated, find the current tag: +suggest_tags("nude", category="general") +``` + +### ❌ Don't invent tags + +The model doesn't understand arbitrary natural language in prompts — only tags it was trained on. `beautiful_landscape` is not a Danbooru tag; `scenery` and `landscape` are. + +--- + +## Common Scenarios + +### Scenario: Character in a specific pose + +``` +# 1. Search for pose tags +search_tags("sitting", category="general", limit=10) +→ sitting, sitting_on_ground, kneeling, seiza, wariza, … + +# 2. Validate the full tag set +validate_tags(["1girl", "hatsune_miku", "sitting", "looking_at_viewer", "smile"]) +``` + +### Scenario: Specific art style + +``` +# Find copyright tags for a franchise +search_tags("genshin", category="copyright", limit=5) +→ genshin_impact, … + +# Find character from that franchise +search_tags("hu_tao", category="character", limit=3) +→ hu_tao_(genshin_impact), … +``` + +### Scenario: Quality boosting tags + +``` +# Find commonly used meta/quality tags +search_tags("quality", category="meta", limit=5) +→ best_quality, high_quality, … + +search_tags("res", category="meta", limit=5) +→ highres, absurdres, ultra-high_res, … +``` + +### Scenario: Unknown misspelling + +``` +# You typed "haor" instead of "hair" +suggest_tags("haor", limit=5) +→ [] (no prefix match) + +# Try a broader search +search_tags("long hair") +→ long_hair, long_hair_between_eyes, wavy_hair, … +``` + +--- + +## Troubleshooting + +### "invalid" tags that should be valid + +The database contains only tags with **≥10 posts**. Tags with fewer posts are intentionally excluded as they are likely misspellings, very niche, or one-off annotations. + +If a tag you expect to be valid shows as invalid: +1. Try `suggest_tags` to find a close variant +2. Use `search_tags` to explore the tag space +3. The tag may genuinely have <10 posts — use a broader synonym instead + +### Server not responding + +Check the MCP server is running and the `db/tags.db` file exists: + +```bash +# Local +python src/server.py + +# Docker +docker run --rm -i danbooru-mcp:latest +``` + +Environment variable override: +```bash +DANBOORU_TAGS_DB=/custom/path/tags.db python src/server.py +``` + +### Database needs rebuilding / updating + +Re-run the scraper (it's resumable): + +```bash +# Refresh all tags +python scripts/scrape_tags.py --no-resume + +# Update changed tags only (re-scrapes from scratch, stops at ≥10 posts boundary) +python scripts/scrape_tags.py +``` + +Then rebuild the Docker image: +```bash +docker build -f Dockerfile.prebuilt -t danbooru-mcp:latest . +``` diff --git a/plans/danbooru-mcp-plan.md b/plans/danbooru-mcp-plan.md new file mode 100644 index 0000000..40d2f78 --- /dev/null +++ b/plans/danbooru-mcp-plan.md @@ -0,0 +1,115 @@ +# Danbooru MCP Tag Validator — Plan + +## Overview + +A Python MCP server backed by a SQLite database that allows an LLM to search, validate, and get suggestions for valid Danbooru tags. This enables an Illustrious/Stable Diffusion ComfyUI workflow to ensure all generated prompts contain only real Danbooru tags. + +--- + +## Architecture + +```mermaid +graph TD + A[LLM / ComfyUI Workflow] -->|MCP protocol| B[danbooru-mcp Python server] + B -->|SQL queries| C[SQLite DB: tags.db] + C -->|FTS5 index| D[fast text search] + E[all_tags.csv 900k rows] -->|import script one-time| C +``` + +--- + +## Project Structure + +``` +danbooru-mcp/ +├── data/ +│ └── all_tags.csv # source data (existing, ~900k entries) +├── db/ +│ └── tags.db # generated SQLite DB (gitignored) +├── scripts/ +│ └── import_tags.py # one-time CSV → SQLite importer +├── src/ +│ └── server.py # MCP server entry point +├── pyproject.toml # deps: mcp[cli] only (sqlite3 is stdlib) +├── .gitignore +└── README.md +``` + +--- + +## Database Design + +### Table: `tags` +```sql +CREATE TABLE tags ( + id INTEGER PRIMARY KEY, + name TEXT UNIQUE NOT NULL +); +``` + +### FTS5 Virtual Table: `tags_fts` +```sql +CREATE VIRTUAL TABLE tags_fts USING fts5( + name, + content='tags', + content_rowid='id' +); +``` + +- Allows fast prefix queries: `tags_fts MATCH 'blue_*'` +- Exact-match validation uses B-tree index on `tags.name UNIQUE` + +--- + +## MCP Tools + +| Tool | Input | Output | Purpose | +|------|-------|--------|---------| +| `search_tags` | `query: str`, `limit: int = 20` | `list[str]` | Prefix/substring search for tag discovery | +| `validate_tags` | `tags: list[str]` | `{valid: list, invalid: list}` | Bulk exact-match validation of a prompt | +| `suggest_tags` | `partial: str`, `limit: int = 10` | `list[str]` | Autocomplete-style correction of near-miss tags | + +--- + +## Key Design Decisions + +- **SQLite over PostgreSQL** — zero operational overhead, file-portable, sufficient for read-only tag lookup +- **FTS5 over LIKE** — `LIKE '%query%'` on 900k rows is O(n); FTS5 prefix matching is O(log n) via inverted index +- **Exact-match bulk validation** — `WHERE name IN (...)` leverages SQLite's B-tree index on the UNIQUE constraint +- **No vector/embedding search** — Danbooru tags are well-defined strings; exact + prefix matching is appropriate and zero-dependency + +--- + +## Dependencies + +- Python 3.10+ +- `mcp[cli]` — official Python MCP SDK +- `sqlite3` — Python stdlib, no install required + +--- + +## MCP Client Config Example + +```json +{ + "mcpServers": { + "danbooru-tags": { + "command": "python", + "args": ["/path/to/danbooru-mcp/src/server.py"] + } + } +} +``` + +--- + +## Todo List + +- [ ] Write `pyproject.toml` with `mcp[cli]` dependency +- [ ] Write `scripts/import_tags.py` — CSV to SQLite with FTS5 index +- [ ] Write `src/server.py` — MCP server with 3 tools + - [ ] `search_tags` tool + - [ ] `validate_tags` tool + - [ ] `suggest_tags` tool +- [ ] Write `.gitignore` (exclude `db/tags.db`) +- [ ] Write `README.md` with setup and usage instructions diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c33864e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,19 @@ +[project] +name = "danbooru-mcp" +version = "0.1.0" +description = "MCP server for validating and searching Danbooru tags via SQLite FTS5" +requires-python = ">=3.10" +dependencies = [ + "mcp[cli]>=1.0.0", + "requests>=2.31.0", +] + +[project.scripts] +danbooru-mcp = "src.server:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c366f71 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +mcp[cli]>=1.0.0 +requests>=2.31.0 diff --git a/scripts/import_tags.py b/scripts/import_tags.py new file mode 100644 index 0000000..67d4e03 --- /dev/null +++ b/scripts/import_tags.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +""" +One-time script to import data/all_tags.csv into db/tags.db (SQLite). + +Creates: + - `tags` table with a UNIQUE index on `name` + - `tags_fts` FTS5 virtual table for fast prefix/full-text searches + +Usage: + python scripts/import_tags.py [--csv data/all_tags.csv] [--db db/tags.db] +""" + +import argparse +import csv +import os +import sqlite3 +import sys +import time + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Import Danbooru tags CSV into SQLite") + parser.add_argument( + "--csv", + default=os.path.join(os.path.dirname(__file__), "..", "data", "all_tags.csv"), + help="Path to the tags CSV file (default: data/all_tags.csv)", + ) + parser.add_argument( + "--db", + default=os.path.join(os.path.dirname(__file__), "..", "db", "tags.db"), + help="Path for the output SQLite database (default: db/tags.db)", + ) + parser.add_argument( + "--batch-size", + type=int, + default=10_000, + help="Number of rows to insert per transaction (default: 10000)", + ) + return parser.parse_args() + + +def create_schema(conn: sqlite3.Connection) -> None: + conn.executescript(""" + PRAGMA journal_mode = WAL; + PRAGMA synchronous = NORMAL; + + CREATE TABLE IF NOT EXISTS tags ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE + ); + + -- FTS5 virtual table for fast prefix and full-text search. + -- content= links it to the `tags` table so the index is kept lean. + CREATE VIRTUAL TABLE IF NOT EXISTS tags_fts USING fts5( + name, + content='tags', + content_rowid='id', + tokenize='unicode61 remove_diacritics 1' + ); + """) + conn.commit() + + +def import_csv(conn: sqlite3.Connection, csv_path: str, batch_size: int) -> int: + """Insert tags from CSV and return the total count inserted.""" + inserted = 0 + batch: list[tuple[str]] = [] + + with open(csv_path, newline="", encoding="utf-8") as fh: + reader = csv.reader(fh) + + # Skip header row + header = next(reader, None) + if header and header[0].strip().lower() in ("tag", "tags"): + pass # consumed + else: + # Not a header — re-process as data + if header: + tag = header[0].rstrip(",").strip() + if tag: + batch.append((tag,)) + + for row in reader: + if not row: + continue + # The CSV has values like "some_tag," — strip trailing comma and whitespace + tag = row[0].rstrip(",").strip() + if not tag: + continue + batch.append((tag,)) + + if len(batch) >= batch_size: + _flush(conn, batch) + inserted += len(batch) + batch = [] + print(f"\r {inserted:,} tags imported…", end="", flush=True) + + if batch: + _flush(conn, batch) + inserted += len(batch) + + return inserted + + +def _flush(conn: sqlite3.Connection, batch: list[tuple[str]]) -> None: + conn.executemany( + "INSERT OR IGNORE INTO tags (name) VALUES (?)", + batch, + ) + conn.commit() + + +def rebuild_fts(conn: sqlite3.Connection) -> None: + """Populate the FTS5 index from the `tags` table.""" + print("\n Rebuilding FTS5 index…", flush=True) + conn.execute("INSERT INTO tags_fts(tags_fts) VALUES('rebuild')") + conn.commit() + + +def main() -> None: + args = parse_args() + + csv_path = os.path.abspath(args.csv) + db_path = os.path.abspath(args.db) + + if not os.path.isfile(csv_path): + print(f"ERROR: CSV file not found: {csv_path}", file=sys.stderr) + sys.exit(1) + + os.makedirs(os.path.dirname(db_path), exist_ok=True) + + print(f"Source : {csv_path}") + print(f"Target : {db_path}") + print(f"Batch : {args.batch_size:,} rows per transaction") + print() + + t0 = time.perf_counter() + + conn = sqlite3.connect(db_path) + try: + create_schema(conn) + total = import_csv(conn, csv_path, args.batch_size) + rebuild_fts(conn) + finally: + conn.close() + + elapsed = time.perf_counter() - t0 + print(f"\nDone. {total:,} tags imported in {elapsed:.1f}s → {db_path}") + + +if __name__ == "__main__": + main() diff --git a/scripts/scrape_tags.py b/scripts/scrape_tags.py new file mode 100644 index 0000000..1fdf17c --- /dev/null +++ b/scripts/scrape_tags.py @@ -0,0 +1,359 @@ +#!/usr/bin/env python3 +""" +Scrape Danbooru tags from the public API, sorted by post count (descending). + +All tags with ≥10 posts fit within approximately the first 300 pages +(1000 tags/page, sorted by post_count DESC). The scraper stops automatically +as soon as an entire page consists only of tags below --min-posts. + +This approach is fast (~3–5 minutes), clean, and requires no complex cursor +or ID-based pagination — just standard page-offset requests. + +The scrape is resumable: if interrupted, re-run and it will continue from +the last completed page. + +Usage: + python scripts/scrape_tags.py [--db db/tags.db] + +Environment (from .env or shell): + DANBOORU_USER Danbooru login name + DANBOORU_API_KEY Danbooru API key +""" + +from __future__ import annotations + +import argparse +import os +import sqlite3 +import sys +import time +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed + +try: + import requests + from requests.adapters import HTTPAdapter + from urllib3.util.retry import Retry +except ImportError: + print("ERROR: 'requests' not installed. Run: pip install requests") + sys.exit(1) + + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + +BASE_URL = "https://danbooru.donmai.us" +PAGE_LIMIT = 1000 +DEFAULT_DB = Path(__file__).parent.parent / "db" / "tags.db" +REQUEST_DELAY = 0.25 # seconds between requests per worker +MIN_POST_COUNT = 10 +MAX_PAGES = 500 # safety cap (all ≥10-post tags are < 300 pages) + +CATEGORY_NAMES: dict[int, str] = { + 0: "general", + 1: "artist", + 3: "copyright", + 4: "character", + 5: "meta", +} + + +# --------------------------------------------------------------------------- +# .env loader +# --------------------------------------------------------------------------- + +def _load_env() -> dict[str, str]: + env: dict[str, str] = {} + for candidate in [ + Path(__file__).parent.parent / ".env", + Path.home() / ".env", + ]: + if candidate.exists(): + for line in candidate.read_text().splitlines(): + line = line.strip() + if line and not line.startswith("#") and "=" in line: + k, _, v = line.partition("=") + env.setdefault(k.strip(), v.strip()) + break + return env + + +# --------------------------------------------------------------------------- +# Database +# --------------------------------------------------------------------------- + +CREATE_SQL = """ +PRAGMA journal_mode = WAL; +PRAGMA synchronous = NORMAL; + +CREATE TABLE IF NOT EXISTS tags ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + post_count INTEGER NOT NULL DEFAULT 0, + category INTEGER NOT NULL DEFAULT 0, + category_name TEXT NOT NULL DEFAULT 'general', + is_deprecated INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS idx_tags_name ON tags (name); +CREATE INDEX IF NOT EXISTS idx_tags_post_count ON tags (post_count DESC); +CREATE INDEX IF NOT EXISTS idx_tags_category ON tags (category); + +CREATE VIRTUAL TABLE IF NOT EXISTS tags_fts USING fts5( + name, + content='tags', + content_rowid='id', + tokenize='unicode61 remove_diacritics 1' +); + +CREATE TABLE IF NOT EXISTS completed_pages ( + page INTEGER PRIMARY KEY +); +""" + + +def init_db(db_path: Path) -> sqlite3.Connection: + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(db_path), check_same_thread=False) + conn.executescript(CREATE_SQL) + conn.commit() + return conn + + +def get_completed_pages(conn: sqlite3.Connection) -> set[int]: + rows = conn.execute("SELECT page FROM completed_pages").fetchall() + return {r[0] for r in rows} + + +def mark_page_done(conn: sqlite3.Connection, page: int) -> None: + conn.execute( + "INSERT OR IGNORE INTO completed_pages (page) VALUES (?)", (page,) + ) + conn.commit() + + +def upsert_tags(conn: sqlite3.Connection, tags: list[dict], min_post_count: int) -> int: + rows = [ + ( + t["id"], + t["name"], + t.get("post_count", 0), + t.get("category", 0), + CATEGORY_NAMES.get(t.get("category", 0), "general"), + 1 if t.get("is_deprecated") else 0, + ) + for t in tags + if t.get("post_count", 0) >= min_post_count + ] + if not rows: + return 0 + conn.executemany( + """ + INSERT INTO tags (id, name, post_count, category, category_name, is_deprecated) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + post_count = excluded.post_count, + category = excluded.category, + category_name = excluded.category_name, + is_deprecated = excluded.is_deprecated + """, + rows, + ) + conn.commit() + return len(rows) + + +def rebuild_fts(conn: sqlite3.Connection) -> None: + print("Rebuilding FTS5 index…", flush=True) + conn.execute("INSERT INTO tags_fts(tags_fts) VALUES('rebuild')") + conn.commit() + print("FTS5 index built.", flush=True) + + +# --------------------------------------------------------------------------- +# HTTP +# --------------------------------------------------------------------------- + +def make_session(api_key: str | None, username: str | None) -> requests.Session: + session = requests.Session() + if api_key and username: + session.auth = (username, api_key) + session.headers.update({"User-Agent": "danbooru-mcp/0.1"}) + retry = Retry( + total=6, backoff_factor=2.0, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["GET"], + ) + session.mount("https://", HTTPAdapter(max_retries=retry)) + return session + + +def fetch_page(session: requests.Session, page: int) -> list[dict]: + params = { + "limit": PAGE_LIMIT, + "search[order]": "count", + "page": page, + } + resp = session.get(f"{BASE_URL}/tags.json", params=params, timeout=30) + resp.raise_for_status() + return resp.json() + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Scrape Danbooru tags (sorted by post count) into SQLite" + ) + parser.add_argument("--db", default=str(DEFAULT_DB), help="Output SQLite DB path") + parser.add_argument( + "--min-posts", type=int, default=MIN_POST_COUNT, + help=f"Stop when a page has no tags above this threshold (default: {MIN_POST_COUNT})" + ) + parser.add_argument( + "--workers", type=int, default=4, + help="Parallel HTTP workers (default: 4)" + ) + parser.add_argument("--no-resume", action="store_true", help="Start from scratch") + parser.add_argument("--no-fts", action="store_true", help="Skip FTS5 rebuild") + return parser.parse_args() + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def _do_page(session: requests.Session, page: int) -> tuple[int, list[dict]]: + """Fetch a page and return (page, tags). Runs in thread pool.""" + time.sleep(REQUEST_DELAY) + tags = fetch_page(session, page) + return page, tags + + +def main() -> None: + args = parse_args() + db_path = Path(args.db).resolve() + + env = _load_env() + api_key = env.get("DANBOORU_API_KEY") or os.environ.get("DANBOORU_API_KEY") + username = env.get("DANBOORU_USER") or os.environ.get("DANBOORU_USER") + if not username: + username = env.get("DANBOORU_USERNAME") or os.environ.get("DANBOORU_USERNAME") + + print(f"Database : {db_path}") + print(f"Min posts : {args.min_posts} (skip pages where all tags are below this)") + print(f"Workers : {args.workers}") + print(f"Auth : {'yes (' + username + ')' if (api_key and username) else 'none (public API)'}") + print() + + conn = init_db(db_path) + session = make_session(api_key if (api_key and username) else None, username) + + if args.no_resume: + print("Resetting…") + conn.execute("DELETE FROM tags") + conn.execute("DELETE FROM completed_pages") + conn.commit() + done_pages: set[int] = set() + else: + done_pages = get_completed_pages(conn) + existing = conn.execute("SELECT COUNT(*) FROM tags").fetchone()[0] + if done_pages: + print(f"Resuming — {len(done_pages)} pages done ({existing:,} tags stored)") + else: + print(f"Starting fresh ({existing:,} tags in DB)") + + print() + + total_tags = conn.execute("SELECT COUNT(*) FROM tags").fetchone()[0] + pages_done = 0 + t0 = time.perf_counter() + stop_flag = False + + REPORT_EVERY = 10 + + print(f"{'Page':>6} {'Done':>6} {'Min posts':>10} {'Tags':>10} {'Rate':>7} {'Elapsed':>8}") + print("-" * 60) + + # We submit pages in batches of `workers`, process results in page order, + # and stop as soon as we get a page where all tags are below min_posts. + page = 1 + with ThreadPoolExecutor(max_workers=args.workers) as pool: + while not stop_flag and page <= MAX_PAGES: + # Submit a window of pages + batch_pages = [] + for _ in range(args.workers): + while page <= MAX_PAGES and page in done_pages: + page += 1 + if page > MAX_PAGES: + break + batch_pages.append(page) + page += 1 + + if not batch_pages: + break + + futures = { + pool.submit(_do_page, session, p): p + for p in batch_pages + } + + # Collect results in page order + results: dict[int, list[dict]] = {} + for fut in as_completed(futures): + pg, tags = fut.result() + results[pg] = tags + + for pg in sorted(results.keys()): + tags = results[pg] + + if not tags: + print(f"\nPage {pg}: empty response. Stopping.") + stop_flag = True + break + + max_in_page = max(t.get("post_count", 0) for t in tags) + min_in_page = min(t.get("post_count", 0) for t in tags) + + if max_in_page < args.min_posts: + print(f"\nPage {pg}: all tags have <{args.min_posts} posts (min={min_in_page}). Stopping.") + stop_flag = True + break + + stored = upsert_tags(conn, tags, min_post_count=args.min_posts) + mark_page_done(conn, pg) + total_tags += stored + pages_done += 1 + + elapsed = time.perf_counter() - t0 + rate = pages_done / elapsed if elapsed > 0 else 0 + + line = ( + f"{pg:>6} {pages_done:>6} {min_in_page:>10,} " + f"{total_tags:>10,} {rate:>5.1f}/s {elapsed/60:>6.1f}m" + ) + if pages_done % REPORT_EVERY == 0: + print(line, flush=True) + else: + print(f"\r{line}", end="", flush=True) + + elapsed_total = time.perf_counter() - t0 + print(f"\n{'='*60}") + print(f"Scraping complete:") + print(f" Tags stored : {total_tags:,}") + print(f" Pages done : {pages_done:,}") + print(f" Time : {elapsed_total/60:.1f} minutes") + print() + + if not args.no_fts: + rebuild_fts(conn) + + conn.close() + print(f"Database saved to {db_path}") + + +if __name__ == "__main__": + main() diff --git a/scripts/test_danbooru_api.py b/scripts/test_danbooru_api.py new file mode 100644 index 0000000..d8841b1 --- /dev/null +++ b/scripts/test_danbooru_api.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +""" +Test script for the Danbooru API. + +Verifies: + 1. Authentication with the API key works + 2. Tag listing endpoint returns expected fields + 3. Pagination works (multiple pages) + 4. Tag search / filtering by category works + +Usage: + python scripts/test_danbooru_api.py + +Reads DANBOORU_API_KEY from .env or environment. +""" + +import json +import os +import sys +import time +from pathlib import Path + +try: + import requests +except ImportError: + print("ERROR: 'requests' is not installed. Run: pip install requests") + sys.exit(1) + +# --------------------------------------------------------------------------- +# Load .env +# --------------------------------------------------------------------------- + +def load_env() -> dict[str, str]: + env: dict[str, str] = {} + env_path = Path(__file__).parent.parent / ".env" + if env_path.exists(): + for line in env_path.read_text().splitlines(): + line = line.strip() + if line and not line.startswith("#") and "=" in line: + k, _, v = line.partition("=") + env[k.strip()] = v.strip() + return env + + +# --------------------------------------------------------------------------- +# API helpers +# --------------------------------------------------------------------------- + +BASE_URL = "https://danbooru.donmai.us" + +# Danbooru tag categories +CATEGORY_NAMES = { + 0: "general", + 1: "artist", + 3: "copyright", + 4: "character", + 5: "meta", +} + + +def make_session(api_key: str | None = None, username: str | None = None) -> requests.Session: + """Create a requests Session. + + Danbooru public endpoints (tag listing, searching) do not require + authentication. Auth is only needed for account-specific actions. + When provided, credentials must be (login, api_key) — NOT (user, api_key). + """ + session = requests.Session() + if api_key and username: + session.auth = (username, api_key) + session.headers.update({"User-Agent": "danbooru-mcp-test/0.1"}) + return session + + +def get_tags_page( + session: requests.Session, + page: int = 1, + limit: int = 20, + search_name: str | None = None, + search_category: int | None = None, + order: str = "count", # "count" | "name" | "date" +) -> list[dict]: + """Fetch one page of tags from the Danbooru API.""" + params: dict = { + "page": page, + "limit": limit, + "search[order]": order, + } + if search_name: + params["search[name_matches]"] = search_name + if search_category is not None: + params["search[category]"] = search_category + + resp = session.get(f"{BASE_URL}/tags.json", params=params, timeout=15) + resp.raise_for_status() + return resp.json() + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +def test_basic_fetch(session: requests.Session) -> None: + print("\n[1] Basic fetch — top 5 tags by post count") + tags = get_tags_page(session, page=1, limit=5, order="count") + assert isinstance(tags, list), f"Expected list, got {type(tags)}" + assert len(tags) > 0, "No tags returned" + + for tag in tags: + cat = CATEGORY_NAMES.get(tag.get("category", -1), "unknown") + print(f" [{cat:12s}] {tag['name']:40s} posts={tag['post_count']:>8,}") + + print(" PASS ✓") + + +def test_fields_present(session: requests.Session) -> None: + print("\n[2] Field presence check") + tags = get_tags_page(session, page=1, limit=1, order="count") + tag = tags[0] + required = {"id", "name", "post_count", "category", "is_deprecated", "words"} + missing = required - set(tag.keys()) + assert not missing, f"Missing fields: {missing}" + print(f" Fields present: {sorted(tag.keys())}") + print(f" Sample tag: name={tag['name']!r} category={CATEGORY_NAMES.get(tag['category'])} deprecated={tag['is_deprecated']}") + print(" PASS ✓") + + +def test_pagination(session: requests.Session) -> None: + print("\n[3] Pagination — page 1 vs page 2 should differ") + p1 = get_tags_page(session, page=1, limit=5, order="count") + time.sleep(0.5) + p2 = get_tags_page(session, page=2, limit=5, order="count") + names_p1 = {t["name"] for t in p1} + names_p2 = {t["name"] for t in p2} + overlap = names_p1 & names_p2 + assert not overlap, f"Pages 1 and 2 share tags: {overlap}" + print(f" Page 1: {sorted(names_p1)}") + print(f" Page 2: {sorted(names_p2)}") + print(" PASS ✓") + + +def test_category_filter(session: requests.Session) -> None: + print("\n[4] Category filter — fetch only 'character' tags (category=4)") + tags = get_tags_page(session, page=1, limit=5, search_category=4, order="count") + for tag in tags: + assert tag["category"] == 4, f"Expected category 4, got {tag['category']} for {tag['name']}" + print(f" {tag['name']:40s} posts={tag['post_count']:>8,}") + print(" PASS ✓") + + +def test_name_search(session: requests.Session) -> None: + print("\n[5] Name search — tags matching 'blue_hair*'") + tags = get_tags_page(session, page=1, limit=5, search_name="blue_hair*", order="count") + assert len(tags) > 0, "No results for blue_hair*" + for tag in tags: + cat = CATEGORY_NAMES.get(tag.get("category", -1), "unknown") + print(f" [{cat:12s}] {tag['name']:40s} posts={tag['post_count']:>8,}") + print(" PASS ✓") + + +def test_well_known_tags(session: requests.Session) -> None: + print("\n[6] Well-known tags — '1girl', 'blue_hair', 'sword' should exist") + for tag_name in ("1girl", "blue_hair", "sword"): + tags = get_tags_page(session, page=1, limit=1, search_name=tag_name, order="count") + found = [t for t in tags if t["name"] == tag_name] + assert found, f"Tag '{tag_name}' not found in API response" + t = found[0] + cat = CATEGORY_NAMES.get(t.get("category", -1), "unknown") + print(f" {tag_name:20s} category={cat:12s} posts={t['post_count']:>8,}") + print(" PASS ✓") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + env = load_env() + api_key = env.get("DANBOORU_API_KEY") or os.environ.get("DANBOORU_API_KEY") + username = env.get("DANBOORU_USERNAME") or os.environ.get("DANBOORU_USERNAME") + + if api_key: + print(f"API key loaded: {api_key[:8]}…") + else: + print("No API key found — using unauthenticated access (public endpoints only)") + + # Danbooru public tag endpoints don't require auth. + # Pass username + api_key only when both are available. + session = make_session( + api_key=api_key if (api_key and username) else None, + username=username, + ) + + tests = [ + test_basic_fetch, + test_fields_present, + test_pagination, + test_category_filter, + test_name_search, + test_well_known_tags, + ] + + passed = 0 + failed = 0 + for test_fn in tests: + try: + test_fn(session) + passed += 1 + except Exception as exc: + print(f" FAIL ✗ {exc}") + failed += 1 + time.sleep(0.3) # be polite to the API + + print(f"\n{'='*50}") + print(f"Results: {passed} passed, {failed} failed") + if failed: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/server.py b/src/server.py new file mode 100644 index 0000000..5276bae --- /dev/null +++ b/src/server.py @@ -0,0 +1,445 @@ +#!/usr/bin/env python3 +""" +Danbooru Tag Validator — MCP Server + +Exposes three tools for LLMs to work with Danbooru tags: + + search_tags – prefix / full-text search, returns rich tag objects + validate_tags – exact-match check, returns valid/invalid split with metadata + suggest_tags – autocomplete-style suggestions for a partial input + +The SQLite database must be pre-built with scripts/scrape_tags.py. + +Logging: + All log output goes to stderr (stdout is reserved for the MCP JSON-RPC protocol). + Log level is controlled by the LOG_LEVEL environment variable: + DEBUG, INFO (default), WARNING, ERROR, CRITICAL + Log format is controlled by LOG_FORMAT: + "text" (default) — human-readable with timestamps + "json" — structured JSON, one object per line (for log aggregators) +""" + +from __future__ import annotations + +import json +import logging +import os +import sqlite3 +import sys +import time +from contextlib import contextmanager +from pathlib import Path +from typing import Generator + +from mcp.server.fastmcp import FastMCP + +# --------------------------------------------------------------------------- +# Logging setup — must go to stderr (stdout is the MCP transport) +# --------------------------------------------------------------------------- + +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() +LOG_FORMAT = os.environ.get("LOG_FORMAT", "text").lower() + + +class _JsonFormatter(logging.Formatter): + """Emit one JSON object per log record to stderr.""" + + def format(self, record: logging.LogRecord) -> str: # noqa: A003 + obj: dict = { + "ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S"), + "level": record.levelname, + "logger": record.name, + "msg": record.getMessage(), + } + if record.exc_info: + obj["exc"] = self.formatException(record.exc_info) + # Forward any extra keyword args as top-level fields + for k, v in record.__dict__.items(): + if k not in { + "name", "msg", "args", "levelname", "levelno", "pathname", + "filename", "module", "exc_info", "exc_text", "stack_info", + "lineno", "funcName", "created", "msecs", "relativeCreated", + "thread", "threadName", "processName", "process", "message", + "taskName", + }: + obj[k] = v + return json.dumps(obj, default=str) + + +def _configure_logging() -> logging.Logger: + handler = logging.StreamHandler(sys.stderr) + handler.setLevel(LOG_LEVEL) + + if LOG_FORMAT == "json": + handler.setFormatter(_JsonFormatter()) + else: + handler.setFormatter( + logging.Formatter( + fmt="%(asctime)s [%(levelname)-8s] %(name)s — %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + ) + ) + + root = logging.getLogger() + root.setLevel(LOG_LEVEL) + root.addHandler(handler) + + return logging.getLogger("danbooru_mcp") + + +log = _configure_logging() + +# --------------------------------------------------------------------------- +# Paths +# --------------------------------------------------------------------------- + +_HERE = Path(__file__).parent +_DEFAULT_DB = _HERE.parent / "db" / "tags.db" +DB_PATH = Path(os.environ.get("DANBOORU_TAGS_DB", str(_DEFAULT_DB))) + +# --------------------------------------------------------------------------- +# Database helpers +# --------------------------------------------------------------------------- + + +def _check_db() -> None: + if not DB_PATH.exists(): + log.error("Database file not found", extra={"db_path": str(DB_PATH)}) + raise FileNotFoundError( + f"Tags database not found at {DB_PATH}. " + "Run `python scripts/scrape_tags.py` first to build it." + ) + + +@contextmanager +def _get_conn() -> Generator[sqlite3.Connection, None, None]: + _check_db() + t0 = time.perf_counter() + conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA query_only = ON") + conn.execute("PRAGMA cache_size = -64000") # 64 MB page cache + log.debug("DB connection opened", extra={"db_path": str(DB_PATH)}) + try: + yield conn + finally: + conn.close() + elapsed_ms = (time.perf_counter() - t0) * 1000 + log.debug("DB connection closed", extra={"elapsed_ms": round(elapsed_ms, 2)}) + + +# --------------------------------------------------------------------------- +# MCP Server +# --------------------------------------------------------------------------- + +mcp = FastMCP( + "danbooru-tags", + instructions=( + "Use this server to validate, search, and suggest Danbooru tags " + "for Stable Diffusion / Illustrious prompts. " + "Always call validate_tags before finalising a prompt to confirm " + "every tag is a real, non-deprecated Danbooru tag. " + "Tags with higher post_count are more commonly used and well-supported." + ), +) + +log.info( + "MCP server initialised", + extra={ + "db_path": str(DB_PATH), + "db_exists": DB_PATH.exists(), + "log_level": LOG_LEVEL, + "log_format": LOG_FORMAT, + }, +) + + +# --------------------------------------------------------------------------- +# Tool: search_tags +# --------------------------------------------------------------------------- + +@mcp.tool() +def search_tags(query: str, limit: int = 20, category: str | None = None) -> list[dict]: + """Search for Danbooru tags matching a query string. + + Uses FTS5 full-text and prefix search. Results are ordered by FTS5 + relevance, then by post count (most-used first). + + FTS5 query syntax is supported: + - Prefix (default): "blue_ha" matches "blue_hair", "blue_hat", … + - Explicit prefix: "blue_ha*" + - Phrase: '"long hair"' + - Boolean: "hair AND blue" + + Args: + query: The search string. A trailing '*' wildcard is added + automatically unless the query already ends with one. + limit: Maximum results to return (default 20, max 200). + category: Optional category filter. One of: + "general", "artist", "copyright", "character", "meta" + + Returns: + List of tag objects, each with: + name (str) – the exact Danbooru tag string + post_count (int) – number of posts using this tag + category_name (str) – "general" | "artist" | "copyright" | "character" | "meta" + is_deprecated (bool) – whether the tag has been deprecated on Danbooru + """ + t0 = time.perf_counter() + limit = min(max(1, limit), 200) + + log.info( + "search_tags called", + extra={"query": query, "limit": limit, "category": category}, + ) + + fts_query = query.strip() + if fts_query and not fts_query.endswith("*"): + fts_query = fts_query + "*" + + CATEGORY_MAP = { + "general": 0, "artist": 1, "copyright": 3, "character": 4, "meta": 5 + } + category_filter = "" + params: list = [fts_query] + + if category and category.lower() in CATEGORY_MAP: + category_filter = "AND t.category = ?" + params.append(CATEGORY_MAP[category.lower()]) + + params.append(limit) + + try: + with _get_conn() as conn: + rows = conn.execute( + f""" + SELECT t.name, t.post_count, t.category_name, + CAST(t.is_deprecated AS INTEGER) AS is_deprecated + FROM tags_fts f + JOIN tags t ON t.id = f.rowid + WHERE tags_fts MATCH ? + {category_filter} + ORDER BY rank, t.post_count DESC + LIMIT ? + """, + params, + ).fetchall() + + results = [ + { + "name": row["name"], + "post_count": row["post_count"], + "category": row["category_name"], + "is_deprecated": bool(row["is_deprecated"]), + } + for row in rows + ] + + elapsed_ms = (time.perf_counter() - t0) * 1000 + log.info( + "search_tags completed", + extra={ + "query": query, + "fts_query": fts_query, + "category": category, + "results": len(results), + "elapsed_ms": round(elapsed_ms, 2), + }, + ) + return results + + except Exception: + log.exception("search_tags failed", extra={"query": query}) + raise + + +# --------------------------------------------------------------------------- +# Tool: validate_tags +# --------------------------------------------------------------------------- + +@mcp.tool() +def validate_tags(tags: list[str]) -> dict: + """Validate a list of Danbooru tags, returning valid and invalid sets. + + Performs exact-match lookup against the full Danbooru tag database. + Also flags deprecated tags — they technically exist but should be + replaced with their canonical equivalents. + + Use this before submitting a prompt to Stable Diffusion. + + Args: + tags: A list of tag strings to validate (e.g. ["blue_hair", "1girl"]). + + Returns: + A dict with three keys: + "valid" – tags that exist and are not deprecated + "deprecated" – tags that exist but are deprecated (should be replaced) + "invalid" – tags that were not found (misspelled or invented) + """ + t0 = time.perf_counter() + + log.info( + "validate_tags called", + extra={"tag_count": len(tags), "tags_sample": tags[:5]}, + ) + + if not tags: + log.debug("validate_tags: empty input, returning early") + return {"valid": [], "deprecated": [], "invalid": []} + + # Deduplicate, preserve order + seen: dict[str, None] = {} + for t in tags: + seen[t.strip()] = None + unique_tags = [t for t in seen if t] + + placeholders = ",".join("?" * len(unique_tags)) + + try: + with _get_conn() as conn: + rows = conn.execute( + f""" + SELECT name, is_deprecated + FROM tags + WHERE name IN ({placeholders}) + """, + unique_tags, + ).fetchall() + + found: dict[str, bool] = { + row["name"]: bool(row["is_deprecated"]) for row in rows + } + + valid = [t for t in unique_tags if t in found and not found[t]] + deprecated = [t for t in unique_tags if t in found and found[t]] + invalid = [t for t in unique_tags if t not in found] + + elapsed_ms = (time.perf_counter() - t0) * 1000 + log.info( + "validate_tags completed", + extra={ + "total": len(unique_tags), + "valid": len(valid), + "deprecated": len(deprecated), + "invalid": len(invalid), + "invalid_tags": invalid[:10], # log first 10 invalid for debugging + "elapsed_ms": round(elapsed_ms, 2), + }, + ) + + return {"valid": valid, "deprecated": deprecated, "invalid": invalid} + + except Exception: + log.exception("validate_tags failed", extra={"tags_sample": tags[:5]}) + raise + + +# --------------------------------------------------------------------------- +# Tool: suggest_tags +# --------------------------------------------------------------------------- + +@mcp.tool() +def suggest_tags(partial: str, limit: int = 10, category: str | None = None) -> list[dict]: + """Get tag suggestions for a partial or approximate tag input. + + Runs a prefix search against the FTS5 index to find the closest + existing Danbooru tags, ordered by post count (most-used first). + Useful when the LLM is unsure of the exact spelling or wants to + explore available tags for a concept. + + Deprecated tags are excluded from suggestions by default. + + Args: + partial: A partial tag string (e.g. "blue_ha" → "blue_hair"). + limit: Maximum suggestions to return (default 10, max 50). + category: Optional category filter. One of: + "general", "artist", "copyright", "character", "meta" + + Returns: + List of tag objects (same shape as search_tags), sorted by + post_count descending. Deprecated tags are excluded. + """ + t0 = time.perf_counter() + limit = min(max(1, limit), 50) + + log.info( + "suggest_tags called", + extra={"partial": partial, "limit": limit, "category": category}, + ) + + fts_query = partial.strip() + if not fts_query: + log.debug("suggest_tags: empty partial, returning early") + return [] + if not fts_query.endswith("*"): + fts_query = fts_query + "*" + + CATEGORY_MAP = { + "general": 0, "artist": 1, "copyright": 3, "character": 4, "meta": 5 + } + category_filter = "" + params: list = [fts_query] + + if category and category.lower() in CATEGORY_MAP: + category_filter = "AND t.category = ?" + params.append(CATEGORY_MAP[category.lower()]) + + params.append(limit) + + try: + with _get_conn() as conn: + rows = conn.execute( + f""" + SELECT t.name, t.post_count, t.category_name, + CAST(t.is_deprecated AS INTEGER) AS is_deprecated + FROM tags_fts f + JOIN tags t ON t.id = f.rowid + WHERE tags_fts MATCH ? + AND t.is_deprecated = 0 + {category_filter} + ORDER BY t.post_count DESC + LIMIT ? + """, + params, + ).fetchall() + + results = [ + { + "name": row["name"], + "post_count": row["post_count"], + "category": row["category_name"], + "is_deprecated": False, + } + for row in rows + ] + + elapsed_ms = (time.perf_counter() - t0) * 1000 + log.info( + "suggest_tags completed", + extra={ + "partial": partial, + "fts_query": fts_query, + "category": category, + "results": len(results), + "elapsed_ms": round(elapsed_ms, 2), + }, + ) + return results + + except Exception: + log.exception("suggest_tags failed", extra={"partial": partial}) + raise + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +def main() -> None: + log.info("Starting MCP server (stdio transport)") + mcp.run() + + +if __name__ == "__main__": + main()