Initial commit

This commit is contained in:
Aodhan Collins
2026-03-02 23:29:58 +00:00
commit 08c6e14616
12 changed files with 2121 additions and 0 deletions

97
Dockerfile Normal file
View File

@@ -0,0 +1,97 @@
# syntax=docker/dockerfile:1
# ---------------------------------------------------------------------------
# danbooru-mcp — Dockerfile
#
# Stages:
# base Python 3.12 slim + system deps
# scraper runs scripts/scrape_tags.py to build db/tags.db
# (only executes if no pre-built DB is bind-mounted)
# runtime minimal image that runs src/server.py (MCP stdio server)
#
# Usage:
# Build (includes fresh DB scrape — takes ~15 min first time):
# docker build -t danbooru-mcp .
#
# Run (MCP clients invoke via stdio):
# docker run --rm -i danbooru-mcp
#
# Or use docker-compose (recommended — mounts a persistent DB volume):
# docker compose up
# ---------------------------------------------------------------------------
ARG PYTHON_VERSION=3.12
# ---------------------------------------------------------------------------
# Stage 1: base — shared deps
# ---------------------------------------------------------------------------
FROM python:${PYTHON_VERSION}-slim AS base
WORKDIR /app
# System deps: sqlite3 is already in python:slim
# gcc needed for some wheels; curl for healthcheck
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY scripts/ scripts/
COPY src/ src/
# ---------------------------------------------------------------------------
# Stage 2: scraper — build the SQLite database
# ---------------------------------------------------------------------------
FROM base AS scraper
ARG DANBOORU_USER=""
ARG DANBOORU_API_KEY=""
ENV DANBOORU_USER=${DANBOORU_USER}
ENV DANBOORU_API_KEY=${DANBOORU_API_KEY}
# Run the scraper — will create db/tags.db with all tags ≥10 posts
RUN mkdir -p db && \
python scripts/scrape_tags.py --no-fts && \
echo "Scrape complete. Rebuilding FTS5 index…" && \
python - <<'EOF'
import sqlite3, pathlib
conn = sqlite3.connect("db/tags.db")
conn.execute("INSERT INTO tags_fts(tags_fts) VALUES('rebuild')")
conn.commit()
conn.close()
print("FTS5 index built.")
EOF
# ---------------------------------------------------------------------------
# Stage 3: runtime — lean image with pre-built DB
# ---------------------------------------------------------------------------
FROM python:${PYTHON_VERSION}-slim AS runtime
WORKDIR /app
# Install only runtime deps (no gcc)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy source
COPY src/ src/
# Copy the scraped database from the scraper stage
COPY --from=scraper /app/db/tags.db db/tags.db
# The MCP server communicates via stdio — no ports needed
ENV DANBOORU_TAGS_DB=/app/db/tags.db
# Non-root user for security
RUN useradd --no-create-home --shell /bin/false mcpuser && \
chown -R mcpuser:mcpuser /app
USER mcpuser
# Health check: verify the DB is readable
HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \
CMD python -c "import sqlite3; c=sqlite3.connect('/app/db/tags.db'); print(c.execute('SELECT COUNT(*) FROM tags').fetchone()[0]); c.close()"
ENTRYPOINT ["python", "src/server.py"]

36
Dockerfile.prebuilt Normal file
View File

@@ -0,0 +1,36 @@
# syntax=docker/dockerfile:1
# ---------------------------------------------------------------------------
# danbooru-mcp — Fast Dockerfile (uses a pre-built db/tags.db)
#
# Use this when you already have db/tags.db built locally via:
# python scripts/scrape_tags.py
#
# Build:
# docker build -f Dockerfile.prebuilt -t danbooru-mcp .
#
# Run (MCP stdio):
# docker run --rm -i danbooru-mcp
# ---------------------------------------------------------------------------
ARG PYTHON_VERSION=3.12
FROM python:${PYTHON_VERSION}-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ src/
COPY db/tags.db db/tags.db
ENV DANBOORU_TAGS_DB=/app/db/tags.db
RUN useradd --no-create-home --shell /bin/false mcpuser && \
chown -R mcpuser:mcpuser /app
USER mcpuser
HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \
CMD python -c "import sqlite3; c=sqlite3.connect('/app/db/tags.db'); print(c.execute('SELECT COUNT(*) FROM tags').fetchone()[0]); c.close()"
ENTRYPOINT ["python", "src/server.py"]

210
README.md Normal file
View File

@@ -0,0 +1,210 @@
# danbooru-mcp
An MCP (Model Context Protocol) server that lets an LLM search, validate, and get suggestions for valid **Danbooru tags** — the prompt vocabulary used by Illustrious and other Danbooru-trained Stable Diffusion models.
📖 **[Full User Guide](docs/user-guide.md)** — workflow walkthrough, tool reference, best practices, and common scenarios.
Tags are scraped directly from the **Danbooru public API** and stored in a local SQLite database with an **FTS5 full-text search index** for fast prefix/substring queries. Each tag includes its post count, category, and deprecation status so the LLM can prioritise well-used, canonical tags.
---
## Tools
| Tool | Description |
|------|-------------|
| `search_tags(query, limit=20, category=None)` | Prefix/full-text search — returns rich tag objects ordered by relevance |
| `validate_tags(tags)` | Exact-match validation — splits into `valid`, `deprecated`, `invalid` |
| `suggest_tags(partial, limit=10, category=None)` | Autocomplete for partial tag strings, sorted by post count |
### Return object shape
All tools return tag objects with:
```json
{
"name": "blue_hair",
"post_count": 1079908,
"category": "general",
"is_deprecated": false
}
```
### Category filter values
`"general"` · `"artist"` · `"copyright"` · `"character"` · `"meta"`
---
## Setup
### 1. Install dependencies
```bash
pip install -e .
```
### 2. Build the SQLite database (scrapes the Danbooru API)
```bash
python scripts/scrape_tags.py
```
This scrapes ~12 million tags from the Danbooru public API (no account required)
and stores them in `db/tags.db` with a FTS5 index.
Estimated time: **515 minutes** depending on network speed.
```
Options:
--db PATH Output database path (default: db/tags.db)
--workers N Parallel HTTP workers (default: 4)
--max-page N Safety cap on pages (default: 2500)
--no-resume Re-scrape all pages from scratch
--no-fts Skip FTS5 rebuild (for incremental runs)
```
The scraper is **resumable** — if interrupted, re-run it and it will
continue from where it left off.
### 3. (Optional) Test API access first
```bash
python scripts/test_danbooru_api.py
```
### 4. Run the MCP server
```bash
python src/server.py
```
---
## Docker
### Quick start (pre-built DB — recommended)
Use this when you've already run `python scripts/scrape_tags.py` and have `db/tags.db`:
```bash
# Build image with the pre-built DB baked in (~30 seconds)
docker build -f Dockerfile.prebuilt -t danbooru-mcp .
# Verify
docker run --rm --entrypoint python danbooru-mcp \
-c "import sqlite3,sys; c=sqlite3.connect('/app/db/tags.db'); sys.stderr.write(str(c.execute('SELECT COUNT(*) FROM tags').fetchone()[0]) + ' tags\n')"
```
### Build from scratch (runs the scraper during Docker build)
```bash
# Scrapes the Danbooru API during build — takes ~15 minutes
docker build \
--build-arg DANBOORU_USER=your_username \
--build-arg DANBOORU_API_KEY=your_api_key \
-t danbooru-mcp .
```
### MCP client config (Docker)
```json
{
"mcpServers": {
"danbooru-tags": {
"command": "docker",
"args": ["run", "--rm", "-i", "danbooru-mcp:latest"]
}
}
}
```
---
## MCP Client Configuration
### Claude Desktop (`claude_desktop_config.json`)
```json
{
"mcpServers": {
"danbooru-tags": {
"command": "python",
"args": ["/absolute/path/to/danbooru-mcp/src/server.py"]
}
}
}
```
### Custom DB path via environment variable
```json
{
"mcpServers": {
"danbooru-tags": {
"command": "python",
"args": ["/path/to/src/server.py"],
"env": {
"DANBOORU_TAGS_DB": "/custom/path/to/tags.db"
}
}
}
}
```
---
## Example LLM Prompt Workflow
```
User: Generate a prompt for a girl with blue hair and a sword.
LLM calls validate_tags(["1girl", "blue_hairs", "sword", "looking_at_vewer"])
→ {
"valid": ["1girl", "sword"],
"deprecated": [],
"invalid": ["blue_hairs", "looking_at_vewer"]
}
LLM calls suggest_tags("blue_hair", limit=3)
→ [
{"name": "blue_hair", "post_count": 1079908, "category": "general"},
{"name": "blue_hairband", "post_count": 26905, "category": "general"},
...
]
LLM calls suggest_tags("looking_at_viewer", limit=1)
→ [{"name": "looking_at_viewer", "post_count": 4567890, "category": "general"}]
Final validated prompt: 1girl, blue_hair, sword, looking_at_viewer
```
---
## Project Structure
```
danbooru-mcp/
├── data/
│ └── all_tags.csv # original CSV export (legacy, replaced by API scrape)
├── db/
│ └── tags.db # SQLite DB (generated, gitignored)
├── plans/
│ └── danbooru-mcp-plan.md # Architecture plan
├── scripts/
│ ├── scrape_tags.py # API scraper → SQLite (primary)
│ ├── import_tags.py # Legacy CSV importer
│ └── test_danbooru_api.py # API connectivity tests
├── src/
│ └── server.py # MCP server
├── pyproject.toml
├── .gitignore
└── README.md
```
---
## Requirements
- Python 3.10+
- `mcp[cli]` — official Python MCP SDK
- `requests` — HTTP client for API scraping
- `sqlite3` — Python stdlib (no install needed)

42
docker-compose.yml Normal file
View File

@@ -0,0 +1,42 @@
services:
danbooru-mcp:
build:
context: .
dockerfile: Dockerfile.prebuilt
image: danbooru-mcp:latest
container_name: danbooru-mcp
stdin_open: true # required — MCP communicates over stdio
tty: false
restart: unless-stopped
environment:
DANBOORU_TAGS_DB: /app/db/tags.db
# Log level: DEBUG | INFO | WARNING | ERROR | CRITICAL (default: INFO)
LOG_LEVEL: ${LOG_LEVEL:-INFO}
# Log format: "text" (human-readable) | "json" (structured, for log aggregators)
LOG_FORMAT: ${LOG_FORMAT:-text}
volumes:
# Persistent DB volume — survives image rebuilds.
# On first use, seed with:
# docker run --rm \
# -v danbooru_db:/target -v $(pwd)/db:/source \
# alpine sh -c "cp /source/tags.db /target/"
- danbooru_db:/app/db
# Docker logging driver — captures all stderr output from the container.
# Use "json-file" (default) for local development; switch to "journald"
# or a remote driver (fluentd, awslogs, etc.) for production.
logging:
driver: json-file
options:
max-size: "50m" # rotate when log file reaches 50MB
max-file: "5" # keep last 5 rotated files
read_only: true
tmpfs:
- /tmp
volumes:
danbooru_db:
driver: local

423
docs/user-guide.md Normal file
View File

@@ -0,0 +1,423 @@
# Danbooru MCP Tag Validator — User Guide
This guide explains how to integrate and use the `danbooru-mcp` server with an LLM to generate valid, high-quality prompts for Illustrious / Stable Diffusion models trained on Danbooru data.
---
## Table of Contents
1. [What is this?](#what-is-this)
2. [Quick Start](#quick-start)
3. [Tool Reference](#tool-reference)
- [search_tags](#search_tags)
- [validate_tags](#validate_tags)
- [suggest_tags](#suggest_tags)
4. [Prompt Engineering Workflow](#prompt-engineering-workflow)
5. [Category Reference](#category-reference)
6. [Best Practices](#best-practices)
7. [Common Scenarios](#common-scenarios)
8. [Troubleshooting](#troubleshooting)
---
## What is this?
Illustrious (and similar Danbooru-trained Stable Diffusion models) uses **Danbooru tags** as its prompt language.
Tags like `1girl`, `blue_hair`, `looking_at_viewer` are meaningful because the model was trained on images annotated with them.
The problem: there are hundreds of thousands of valid Danbooru tags, and misspelling or inventing tags produces no useful signal — the model generates less accurate images.
**This MCP server** lets an LLM:
- **Search** the full tag database for tag discovery
- **Validate** a proposed prompt's tags against the real Danbooru database
- **Suggest** corrections for typos or near-miss tags
The database contains **292,500 tags**, all with ≥10 posts on Danbooru — filtering out one-off or misspelled entries.
---
## Quick Start
### 1. Add to your MCP client (Claude Desktop example)
**Using Docker (recommended):**
```json
{
"mcpServers": {
"danbooru-tags": {
"command": "docker",
"args": ["run", "--rm", "-i", "danbooru-mcp:latest"]
}
}
}
```
**Using Python directly:**
```json
{
"mcpServers": {
"danbooru-tags": {
"command": "/path/to/danbooru-mcp/.venv/bin/python",
"args": ["/path/to/danbooru-mcp/src/server.py"]
}
}
}
```
### 2. Instruct the LLM
Add a system prompt telling the LLM to use the server:
```
You have access to the danbooru-tags MCP server for validating Stable Diffusion prompts.
Before generating any final prompt:
1. Use validate_tags to check all proposed tags are real Danbooru tags.
2. Use suggest_tags to fix any invalid tags.
3. Only output the validated, corrected tag list.
```
---
## Tool Reference
### `search_tags`
Find tags by name using full-text / prefix search.
**Parameters:**
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `query` | `string` | *required* | Search string. Trailing `*` added automatically for prefix match. Supports FTS5 syntax. |
| `limit` | `integer` | `20` | Max results (1200) |
| `category` | `string` | `null` | Optional filter: `"general"`, `"artist"`, `"copyright"`, `"character"`, `"meta"` |
**Returns:** List of tag objects:
```json
[
{
"name": "blue_hair",
"post_count": 1079925,
"category": "general",
"is_deprecated": false
}
]
```
**Examples:**
```
Search for hair colour tags:
search_tags("blue_hair")
→ blue_hair, blue_hairband, blue_hair-chan_(ramchi), …
Search only character tags for a Vocaloid:
search_tags("hatsune", category="character")
→ hatsune_miku, hatsune_mikuo, hatsune_miku_(append), …
Boolean search:
search_tags("hair AND blue")
→ tags matching both "hair" and "blue"
```
**FTS5 query syntax:**
| Syntax | Meaning |
|--------|---------|
| `blue_ha*` | prefix match (added automatically) |
| `"blue hair"` | phrase match |
| `hair AND blue` | both terms present |
| `hair NOT red` | exclusion |
---
### `validate_tags`
Check a list of tags against the full Danbooru database. Returns three groups: valid, deprecated, and invalid.
**Parameters:**
| Parameter | Type | Description |
|-----------|------|-------------|
| `tags` | `list[string]` | Tags to validate, e.g. `["1girl", "blue_hair", "sword"]` |
**Returns:**
```json
{
"valid": ["1girl", "blue_hair", "sword"],
"deprecated": [],
"invalid": ["blue_hairs", "not_a_real_tag"]
}
```
| Key | Meaning |
|-----|---------|
| `valid` | Exists in Danbooru and is not deprecated — safe to use |
| `deprecated` | Exists but has been deprecated (an updated canonical tag exists) |
| `invalid` | Not found — likely misspelled, hallucinated, or too niche (<10 posts) |
**Important:** Always run `validate_tags` before finalising a prompt. Invalid tags are silently ignored by the model but waste token budget and reduce prompt clarity.
---
### `suggest_tags`
Autocomplete-style suggestions for a partial or approximate tag. Results are sorted by post count (most commonly used first). Deprecated tags are **excluded**.
**Parameters:**
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `partial` | `string` | *required* | Partial tag or rough approximation |
| `limit` | `integer` | `10` | Max suggestions (150) |
| `category` | `string` | `null` | Optional category filter |
**Returns:** Same format as `search_tags`, sorted by `post_count` descending.
**Examples:**
```
Fix a typo:
suggest_tags("looking_at_vewer")
→ ["looking_at_viewer", …]
Find the most popular sword-related tags:
suggest_tags("sword", limit=5, category="general")
→ sword (337,737), sword_behind_back (7,203), …
Find character tags for a partial name:
suggest_tags("miku", category="character")
→ hatsune_miku (129,806), yuki_miku (4,754), …
```
---
## Prompt Engineering Workflow
This is the recommended workflow for an LLM building Illustrious prompts:
### Step 1 — Draft
The LLM drafts an initial list of conceptual tags based on the user's description:
```
User: "A girl with long silver hair wearing a kimono in a Japanese garden"
Draft tags:
1girl, silver_hair, long_hair, kimono, japanese_garden, cherry_blossoms,
sitting, looking_at_viewer, outdoors, traditional_clothes
```
### Step 2 — Validate
```
validate_tags([
"1girl", "silver_hair", "long_hair", "kimono", "japanese_garden",
"cherry_blossoms", "sitting", "looking_at_viewer", "outdoors",
"traditional_clothes"
])
```
Response:
```json
{
"valid": ["1girl", "long_hair", "kimono", "cherry_blossoms", "sitting",
"looking_at_viewer", "outdoors", "traditional_clothes"],
"deprecated": [],
"invalid": ["silver_hair", "japanese_garden"]
}
```
### Step 3 — Fix invalid tags
```
suggest_tags("silver_hair", limit=3)
→ [{"name": "white_hair", "post_count": 800000}, ...]
suggest_tags("japanese_garden", limit=3)
→ [{"name": "garden", "post_count": 45000},
{"name": "japanese_clothes", "post_count": 12000}, ...]
```
### Step 4 — Finalise
```
Final prompt:
1girl, white_hair, long_hair, kimono, garden, cherry_blossoms,
sitting, looking_at_viewer, outdoors, traditional_clothes
```
All tags are validated. Prompt is ready to send to ComfyUI.
---
## Category Reference
Danbooru organises tags into five categories. Understanding them helps scope searches:
| Category | Value | Description | Examples |
|----------|-------|-------------|---------|
| **general** | `0` | Descriptive tags for image content | `1girl`, `blue_hair`, `sword`, `outdoors` |
| **artist** | `1` | Artist/creator names | `wlop`, `natsuki_subaru` |
| **copyright** | `3` | Source material / franchise | `fate/stay_night`, `touhou`, `genshin_impact` |
| **character** | `4` | Specific character names | `hatsune_miku`, `hakurei_reimu` |
| **meta** | `5` | Image quality / format tags | `highres`, `absurdres`, `commentary` |
**Tips:**
- For generating images, focus on **general** tags (colours, poses, clothing, expressions)
- Add **character** and **copyright** tags when depicting a specific character
- **meta** tags like `highres` and `best_quality` can improve output quality
- Avoid **artist** tags unless intentionally mimicking a specific art style
---
## Best Practices
### ✅ Always validate before generating
```python
# Always run this before finalising
result = validate_tags(your_proposed_tags)
# Fix everything in result["invalid"] before sending to ComfyUI
```
### ✅ Use suggest_tags for discoverability
Even for tags you think you know, run `suggest_tags` to find the canonical form:
- `standing` vs `standing_on_one_leg` vs `standing_split`
- `smile` vs `small_smile` vs `evil_smile`
The tag with the highest `post_count` is almost always the right one for your intent.
### ✅ Prefer high-post-count tags
Higher post count = more training data = more consistent model response.
```python
# Get the top 5 most established hair colour tags
suggest_tags("hair_color", limit=5, category="general")
```
### ✅ Layer specificity
Good prompts move from general to specific:
```
# General → Specific
1girl, # subject count
solo, # composition
long_hair, blue_hair, # hair
white_dress, off_shoulder, # clothing
smile, looking_at_viewer, # expression/pose
outdoors, garden, daytime, # setting
masterpiece, best_quality # quality
```
### ❌ Avoid deprecated tags
If `validate_tags` reports a tag as `deprecated`, use `suggest_tags` to find the current replacement:
```python
# If "nude" is deprecated, find the current tag:
suggest_tags("nude", category="general")
```
### ❌ Don't invent tags
The model doesn't understand arbitrary natural language in prompts only tags it was trained on. `beautiful_landscape` is not a Danbooru tag; `scenery` and `landscape` are.
---
## Common Scenarios
### Scenario: Character in a specific pose
```
# 1. Search for pose tags
search_tags("sitting", category="general", limit=10)
→ sitting, sitting_on_ground, kneeling, seiza, wariza, …
# 2. Validate the full tag set
validate_tags(["1girl", "hatsune_miku", "sitting", "looking_at_viewer", "smile"])
```
### Scenario: Specific art style
```
# Find copyright tags for a franchise
search_tags("genshin", category="copyright", limit=5)
→ genshin_impact, …
# Find character from that franchise
search_tags("hu_tao", category="character", limit=3)
→ hu_tao_(genshin_impact), …
```
### Scenario: Quality boosting tags
```
# Find commonly used meta/quality tags
search_tags("quality", category="meta", limit=5)
→ best_quality, high_quality, …
search_tags("res", category="meta", limit=5)
→ highres, absurdres, ultra-high_res, …
```
### Scenario: Unknown misspelling
```
# You typed "haor" instead of "hair"
suggest_tags("haor", limit=5)
→ [] (no prefix match)
# Try a broader search
search_tags("long hair")
→ long_hair, long_hair_between_eyes, wavy_hair, …
```
---
## Troubleshooting
### "invalid" tags that should be valid
The database contains only tags with **10 posts**. Tags with fewer posts are intentionally excluded as they are likely misspellings, very niche, or one-off annotations.
If a tag you expect to be valid shows as invalid:
1. Try `suggest_tags` to find a close variant
2. Use `search_tags` to explore the tag space
3. The tag may genuinely have <10 posts use a broader synonym instead
### Server not responding
Check the MCP server is running and the `db/tags.db` file exists:
```bash
# Local
python src/server.py
# Docker
docker run --rm -i danbooru-mcp:latest
```
Environment variable override:
```bash
DANBOORU_TAGS_DB=/custom/path/tags.db python src/server.py
```
### Database needs rebuilding / updating
Re-run the scraper (it's resumable):
```bash
# Refresh all tags
python scripts/scrape_tags.py --no-resume
# Update changed tags only (re-scrapes from scratch, stops at ≥10 posts boundary)
python scripts/scrape_tags.py
```
Then rebuild the Docker image:
```bash
docker build -f Dockerfile.prebuilt -t danbooru-mcp:latest .
```

115
plans/danbooru-mcp-plan.md Normal file
View File

@@ -0,0 +1,115 @@
# Danbooru MCP Tag Validator — Plan
## Overview
A Python MCP server backed by a SQLite database that allows an LLM to search, validate, and get suggestions for valid Danbooru tags. This enables an Illustrious/Stable Diffusion ComfyUI workflow to ensure all generated prompts contain only real Danbooru tags.
---
## Architecture
```mermaid
graph TD
A[LLM / ComfyUI Workflow] -->|MCP protocol| B[danbooru-mcp Python server]
B -->|SQL queries| C[SQLite DB: tags.db]
C -->|FTS5 index| D[fast text search]
E[all_tags.csv 900k rows] -->|import script one-time| C
```
---
## Project Structure
```
danbooru-mcp/
├── data/
│ └── all_tags.csv # source data (existing, ~900k entries)
├── db/
│ └── tags.db # generated SQLite DB (gitignored)
├── scripts/
│ └── import_tags.py # one-time CSV → SQLite importer
├── src/
│ └── server.py # MCP server entry point
├── pyproject.toml # deps: mcp[cli] only (sqlite3 is stdlib)
├── .gitignore
└── README.md
```
---
## Database Design
### Table: `tags`
```sql
CREATE TABLE tags (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
```
### FTS5 Virtual Table: `tags_fts`
```sql
CREATE VIRTUAL TABLE tags_fts USING fts5(
name,
content='tags',
content_rowid='id'
);
```
- Allows fast prefix queries: `tags_fts MATCH 'blue_*'`
- Exact-match validation uses B-tree index on `tags.name UNIQUE`
---
## MCP Tools
| Tool | Input | Output | Purpose |
|------|-------|--------|---------|
| `search_tags` | `query: str`, `limit: int = 20` | `list[str]` | Prefix/substring search for tag discovery |
| `validate_tags` | `tags: list[str]` | `{valid: list, invalid: list}` | Bulk exact-match validation of a prompt |
| `suggest_tags` | `partial: str`, `limit: int = 10` | `list[str]` | Autocomplete-style correction of near-miss tags |
---
## Key Design Decisions
- **SQLite over PostgreSQL** — zero operational overhead, file-portable, sufficient for read-only tag lookup
- **FTS5 over LIKE** — `LIKE '%query%'` on 900k rows is O(n); FTS5 prefix matching is O(log n) via inverted index
- **Exact-match bulk validation** — `WHERE name IN (...)` leverages SQLite's B-tree index on the UNIQUE constraint
- **No vector/embedding search** — Danbooru tags are well-defined strings; exact + prefix matching is appropriate and zero-dependency
---
## Dependencies
- Python 3.10+
- `mcp[cli]` — official Python MCP SDK
- `sqlite3` — Python stdlib, no install required
---
## MCP Client Config Example
```json
{
"mcpServers": {
"danbooru-tags": {
"command": "python",
"args": ["/path/to/danbooru-mcp/src/server.py"]
}
}
}
```
---
## Todo List
- [ ] Write `pyproject.toml` with `mcp[cli]` dependency
- [ ] Write `scripts/import_tags.py` — CSV to SQLite with FTS5 index
- [ ] Write `src/server.py` — MCP server with 3 tools
- [ ] `search_tags` tool
- [ ] `validate_tags` tool
- [ ] `suggest_tags` tool
- [ ] Write `.gitignore` (exclude `db/tags.db`)
- [ ] Write `README.md` with setup and usage instructions

19
pyproject.toml Normal file
View File

@@ -0,0 +1,19 @@
[project]
name = "danbooru-mcp"
version = "0.1.0"
description = "MCP server for validating and searching Danbooru tags via SQLite FTS5"
requires-python = ">=3.10"
dependencies = [
"mcp[cli]>=1.0.0",
"requests>=2.31.0",
]
[project.scripts]
danbooru-mcp = "src.server:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src"]

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
mcp[cli]>=1.0.0
requests>=2.31.0

152
scripts/import_tags.py Normal file
View File

@@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""
One-time script to import data/all_tags.csv into db/tags.db (SQLite).
Creates:
- `tags` table with a UNIQUE index on `name`
- `tags_fts` FTS5 virtual table for fast prefix/full-text searches
Usage:
python scripts/import_tags.py [--csv data/all_tags.csv] [--db db/tags.db]
"""
import argparse
import csv
import os
import sqlite3
import sys
import time
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Import Danbooru tags CSV into SQLite")
parser.add_argument(
"--csv",
default=os.path.join(os.path.dirname(__file__), "..", "data", "all_tags.csv"),
help="Path to the tags CSV file (default: data/all_tags.csv)",
)
parser.add_argument(
"--db",
default=os.path.join(os.path.dirname(__file__), "..", "db", "tags.db"),
help="Path for the output SQLite database (default: db/tags.db)",
)
parser.add_argument(
"--batch-size",
type=int,
default=10_000,
help="Number of rows to insert per transaction (default: 10000)",
)
return parser.parse_args()
def create_schema(conn: sqlite3.Connection) -> None:
conn.executescript("""
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
);
-- FTS5 virtual table for fast prefix and full-text search.
-- content= links it to the `tags` table so the index is kept lean.
CREATE VIRTUAL TABLE IF NOT EXISTS tags_fts USING fts5(
name,
content='tags',
content_rowid='id',
tokenize='unicode61 remove_diacritics 1'
);
""")
conn.commit()
def import_csv(conn: sqlite3.Connection, csv_path: str, batch_size: int) -> int:
"""Insert tags from CSV and return the total count inserted."""
inserted = 0
batch: list[tuple[str]] = []
with open(csv_path, newline="", encoding="utf-8") as fh:
reader = csv.reader(fh)
# Skip header row
header = next(reader, None)
if header and header[0].strip().lower() in ("tag", "tags"):
pass # consumed
else:
# Not a header — re-process as data
if header:
tag = header[0].rstrip(",").strip()
if tag:
batch.append((tag,))
for row in reader:
if not row:
continue
# The CSV has values like "some_tag," — strip trailing comma and whitespace
tag = row[0].rstrip(",").strip()
if not tag:
continue
batch.append((tag,))
if len(batch) >= batch_size:
_flush(conn, batch)
inserted += len(batch)
batch = []
print(f"\r {inserted:,} tags imported…", end="", flush=True)
if batch:
_flush(conn, batch)
inserted += len(batch)
return inserted
def _flush(conn: sqlite3.Connection, batch: list[tuple[str]]) -> None:
conn.executemany(
"INSERT OR IGNORE INTO tags (name) VALUES (?)",
batch,
)
conn.commit()
def rebuild_fts(conn: sqlite3.Connection) -> None:
"""Populate the FTS5 index from the `tags` table."""
print("\n Rebuilding FTS5 index…", flush=True)
conn.execute("INSERT INTO tags_fts(tags_fts) VALUES('rebuild')")
conn.commit()
def main() -> None:
args = parse_args()
csv_path = os.path.abspath(args.csv)
db_path = os.path.abspath(args.db)
if not os.path.isfile(csv_path):
print(f"ERROR: CSV file not found: {csv_path}", file=sys.stderr)
sys.exit(1)
os.makedirs(os.path.dirname(db_path), exist_ok=True)
print(f"Source : {csv_path}")
print(f"Target : {db_path}")
print(f"Batch : {args.batch_size:,} rows per transaction")
print()
t0 = time.perf_counter()
conn = sqlite3.connect(db_path)
try:
create_schema(conn)
total = import_csv(conn, csv_path, args.batch_size)
rebuild_fts(conn)
finally:
conn.close()
elapsed = time.perf_counter() - t0
print(f"\nDone. {total:,} tags imported in {elapsed:.1f}s → {db_path}")
if __name__ == "__main__":
main()

359
scripts/scrape_tags.py Normal file
View File

@@ -0,0 +1,359 @@
#!/usr/bin/env python3
"""
Scrape Danbooru tags from the public API, sorted by post count (descending).
All tags with ≥10 posts fit within approximately the first 300 pages
(1000 tags/page, sorted by post_count DESC). The scraper stops automatically
as soon as an entire page consists only of tags below --min-posts.
This approach is fast (~35 minutes), clean, and requires no complex cursor
or ID-based pagination — just standard page-offset requests.
The scrape is resumable: if interrupted, re-run and it will continue from
the last completed page.
Usage:
python scripts/scrape_tags.py [--db db/tags.db]
Environment (from .env or shell):
DANBOORU_USER Danbooru login name
DANBOORU_API_KEY Danbooru API key
"""
from __future__ import annotations
import argparse
import os
import sqlite3
import sys
import time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
except ImportError:
print("ERROR: 'requests' not installed. Run: pip install requests")
sys.exit(1)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
BASE_URL = "https://danbooru.donmai.us"
PAGE_LIMIT = 1000
DEFAULT_DB = Path(__file__).parent.parent / "db" / "tags.db"
REQUEST_DELAY = 0.25 # seconds between requests per worker
MIN_POST_COUNT = 10
MAX_PAGES = 500 # safety cap (all ≥10-post tags are < 300 pages)
CATEGORY_NAMES: dict[int, str] = {
0: "general",
1: "artist",
3: "copyright",
4: "character",
5: "meta",
}
# ---------------------------------------------------------------------------
# .env loader
# ---------------------------------------------------------------------------
def _load_env() -> dict[str, str]:
env: dict[str, str] = {}
for candidate in [
Path(__file__).parent.parent / ".env",
Path.home() / ".env",
]:
if candidate.exists():
for line in candidate.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, _, v = line.partition("=")
env.setdefault(k.strip(), v.strip())
break
return env
# ---------------------------------------------------------------------------
# Database
# ---------------------------------------------------------------------------
CREATE_SQL = """
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
post_count INTEGER NOT NULL DEFAULT 0,
category INTEGER NOT NULL DEFAULT 0,
category_name TEXT NOT NULL DEFAULT 'general',
is_deprecated INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_tags_name ON tags (name);
CREATE INDEX IF NOT EXISTS idx_tags_post_count ON tags (post_count DESC);
CREATE INDEX IF NOT EXISTS idx_tags_category ON tags (category);
CREATE VIRTUAL TABLE IF NOT EXISTS tags_fts USING fts5(
name,
content='tags',
content_rowid='id',
tokenize='unicode61 remove_diacritics 1'
);
CREATE TABLE IF NOT EXISTS completed_pages (
page INTEGER PRIMARY KEY
);
"""
def init_db(db_path: Path) -> sqlite3.Connection:
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path), check_same_thread=False)
conn.executescript(CREATE_SQL)
conn.commit()
return conn
def get_completed_pages(conn: sqlite3.Connection) -> set[int]:
rows = conn.execute("SELECT page FROM completed_pages").fetchall()
return {r[0] for r in rows}
def mark_page_done(conn: sqlite3.Connection, page: int) -> None:
conn.execute(
"INSERT OR IGNORE INTO completed_pages (page) VALUES (?)", (page,)
)
conn.commit()
def upsert_tags(conn: sqlite3.Connection, tags: list[dict], min_post_count: int) -> int:
rows = [
(
t["id"],
t["name"],
t.get("post_count", 0),
t.get("category", 0),
CATEGORY_NAMES.get(t.get("category", 0), "general"),
1 if t.get("is_deprecated") else 0,
)
for t in tags
if t.get("post_count", 0) >= min_post_count
]
if not rows:
return 0
conn.executemany(
"""
INSERT INTO tags (id, name, post_count, category, category_name, is_deprecated)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
post_count = excluded.post_count,
category = excluded.category,
category_name = excluded.category_name,
is_deprecated = excluded.is_deprecated
""",
rows,
)
conn.commit()
return len(rows)
def rebuild_fts(conn: sqlite3.Connection) -> None:
print("Rebuilding FTS5 index…", flush=True)
conn.execute("INSERT INTO tags_fts(tags_fts) VALUES('rebuild')")
conn.commit()
print("FTS5 index built.", flush=True)
# ---------------------------------------------------------------------------
# HTTP
# ---------------------------------------------------------------------------
def make_session(api_key: str | None, username: str | None) -> requests.Session:
session = requests.Session()
if api_key and username:
session.auth = (username, api_key)
session.headers.update({"User-Agent": "danbooru-mcp/0.1"})
retry = Retry(
total=6, backoff_factor=2.0,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"],
)
session.mount("https://", HTTPAdapter(max_retries=retry))
return session
def fetch_page(session: requests.Session, page: int) -> list[dict]:
params = {
"limit": PAGE_LIMIT,
"search[order]": "count",
"page": page,
}
resp = session.get(f"{BASE_URL}/tags.json", params=params, timeout=30)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Scrape Danbooru tags (sorted by post count) into SQLite"
)
parser.add_argument("--db", default=str(DEFAULT_DB), help="Output SQLite DB path")
parser.add_argument(
"--min-posts", type=int, default=MIN_POST_COUNT,
help=f"Stop when a page has no tags above this threshold (default: {MIN_POST_COUNT})"
)
parser.add_argument(
"--workers", type=int, default=4,
help="Parallel HTTP workers (default: 4)"
)
parser.add_argument("--no-resume", action="store_true", help="Start from scratch")
parser.add_argument("--no-fts", action="store_true", help="Skip FTS5 rebuild")
return parser.parse_args()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def _do_page(session: requests.Session, page: int) -> tuple[int, list[dict]]:
"""Fetch a page and return (page, tags). Runs in thread pool."""
time.sleep(REQUEST_DELAY)
tags = fetch_page(session, page)
return page, tags
def main() -> None:
args = parse_args()
db_path = Path(args.db).resolve()
env = _load_env()
api_key = env.get("DANBOORU_API_KEY") or os.environ.get("DANBOORU_API_KEY")
username = env.get("DANBOORU_USER") or os.environ.get("DANBOORU_USER")
if not username:
username = env.get("DANBOORU_USERNAME") or os.environ.get("DANBOORU_USERNAME")
print(f"Database : {db_path}")
print(f"Min posts : {args.min_posts} (skip pages where all tags are below this)")
print(f"Workers : {args.workers}")
print(f"Auth : {'yes (' + username + ')' if (api_key and username) else 'none (public API)'}")
print()
conn = init_db(db_path)
session = make_session(api_key if (api_key and username) else None, username)
if args.no_resume:
print("Resetting…")
conn.execute("DELETE FROM tags")
conn.execute("DELETE FROM completed_pages")
conn.commit()
done_pages: set[int] = set()
else:
done_pages = get_completed_pages(conn)
existing = conn.execute("SELECT COUNT(*) FROM tags").fetchone()[0]
if done_pages:
print(f"Resuming — {len(done_pages)} pages done ({existing:,} tags stored)")
else:
print(f"Starting fresh ({existing:,} tags in DB)")
print()
total_tags = conn.execute("SELECT COUNT(*) FROM tags").fetchone()[0]
pages_done = 0
t0 = time.perf_counter()
stop_flag = False
REPORT_EVERY = 10
print(f"{'Page':>6} {'Done':>6} {'Min posts':>10} {'Tags':>10} {'Rate':>7} {'Elapsed':>8}")
print("-" * 60)
# We submit pages in batches of `workers`, process results in page order,
# and stop as soon as we get a page where all tags are below min_posts.
page = 1
with ThreadPoolExecutor(max_workers=args.workers) as pool:
while not stop_flag and page <= MAX_PAGES:
# Submit a window of pages
batch_pages = []
for _ in range(args.workers):
while page <= MAX_PAGES and page in done_pages:
page += 1
if page > MAX_PAGES:
break
batch_pages.append(page)
page += 1
if not batch_pages:
break
futures = {
pool.submit(_do_page, session, p): p
for p in batch_pages
}
# Collect results in page order
results: dict[int, list[dict]] = {}
for fut in as_completed(futures):
pg, tags = fut.result()
results[pg] = tags
for pg in sorted(results.keys()):
tags = results[pg]
if not tags:
print(f"\nPage {pg}: empty response. Stopping.")
stop_flag = True
break
max_in_page = max(t.get("post_count", 0) for t in tags)
min_in_page = min(t.get("post_count", 0) for t in tags)
if max_in_page < args.min_posts:
print(f"\nPage {pg}: all tags have <{args.min_posts} posts (min={min_in_page}). Stopping.")
stop_flag = True
break
stored = upsert_tags(conn, tags, min_post_count=args.min_posts)
mark_page_done(conn, pg)
total_tags += stored
pages_done += 1
elapsed = time.perf_counter() - t0
rate = pages_done / elapsed if elapsed > 0 else 0
line = (
f"{pg:>6} {pages_done:>6} {min_in_page:>10,} "
f"{total_tags:>10,} {rate:>5.1f}/s {elapsed/60:>6.1f}m"
)
if pages_done % REPORT_EVERY == 0:
print(line, flush=True)
else:
print(f"\r{line}", end="", flush=True)
elapsed_total = time.perf_counter() - t0
print(f"\n{'='*60}")
print(f"Scraping complete:")
print(f" Tags stored : {total_tags:,}")
print(f" Pages done : {pages_done:,}")
print(f" Time : {elapsed_total/60:.1f} minutes")
print()
if not args.no_fts:
rebuild_fts(conn)
conn.close()
print(f"Database saved to {db_path}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""
Test script for the Danbooru API.
Verifies:
1. Authentication with the API key works
2. Tag listing endpoint returns expected fields
3. Pagination works (multiple pages)
4. Tag search / filtering by category works
Usage:
python scripts/test_danbooru_api.py
Reads DANBOORU_API_KEY from .env or environment.
"""
import json
import os
import sys
import time
from pathlib import Path
try:
import requests
except ImportError:
print("ERROR: 'requests' is not installed. Run: pip install requests")
sys.exit(1)
# ---------------------------------------------------------------------------
# Load .env
# ---------------------------------------------------------------------------
def load_env() -> dict[str, str]:
env: dict[str, str] = {}
env_path = Path(__file__).parent.parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, _, v = line.partition("=")
env[k.strip()] = v.strip()
return env
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
BASE_URL = "https://danbooru.donmai.us"
# Danbooru tag categories
CATEGORY_NAMES = {
0: "general",
1: "artist",
3: "copyright",
4: "character",
5: "meta",
}
def make_session(api_key: str | None = None, username: str | None = None) -> requests.Session:
"""Create a requests Session.
Danbooru public endpoints (tag listing, searching) do not require
authentication. Auth is only needed for account-specific actions.
When provided, credentials must be (login, api_key) — NOT (user, api_key).
"""
session = requests.Session()
if api_key and username:
session.auth = (username, api_key)
session.headers.update({"User-Agent": "danbooru-mcp-test/0.1"})
return session
def get_tags_page(
session: requests.Session,
page: int = 1,
limit: int = 20,
search_name: str | None = None,
search_category: int | None = None,
order: str = "count", # "count" | "name" | "date"
) -> list[dict]:
"""Fetch one page of tags from the Danbooru API."""
params: dict = {
"page": page,
"limit": limit,
"search[order]": order,
}
if search_name:
params["search[name_matches]"] = search_name
if search_category is not None:
params["search[category]"] = search_category
resp = session.get(f"{BASE_URL}/tags.json", params=params, timeout=15)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_basic_fetch(session: requests.Session) -> None:
print("\n[1] Basic fetch — top 5 tags by post count")
tags = get_tags_page(session, page=1, limit=5, order="count")
assert isinstance(tags, list), f"Expected list, got {type(tags)}"
assert len(tags) > 0, "No tags returned"
for tag in tags:
cat = CATEGORY_NAMES.get(tag.get("category", -1), "unknown")
print(f" [{cat:12s}] {tag['name']:40s} posts={tag['post_count']:>8,}")
print(" PASS ✓")
def test_fields_present(session: requests.Session) -> None:
print("\n[2] Field presence check")
tags = get_tags_page(session, page=1, limit=1, order="count")
tag = tags[0]
required = {"id", "name", "post_count", "category", "is_deprecated", "words"}
missing = required - set(tag.keys())
assert not missing, f"Missing fields: {missing}"
print(f" Fields present: {sorted(tag.keys())}")
print(f" Sample tag: name={tag['name']!r} category={CATEGORY_NAMES.get(tag['category'])} deprecated={tag['is_deprecated']}")
print(" PASS ✓")
def test_pagination(session: requests.Session) -> None:
print("\n[3] Pagination — page 1 vs page 2 should differ")
p1 = get_tags_page(session, page=1, limit=5, order="count")
time.sleep(0.5)
p2 = get_tags_page(session, page=2, limit=5, order="count")
names_p1 = {t["name"] for t in p1}
names_p2 = {t["name"] for t in p2}
overlap = names_p1 & names_p2
assert not overlap, f"Pages 1 and 2 share tags: {overlap}"
print(f" Page 1: {sorted(names_p1)}")
print(f" Page 2: {sorted(names_p2)}")
print(" PASS ✓")
def test_category_filter(session: requests.Session) -> None:
print("\n[4] Category filter — fetch only 'character' tags (category=4)")
tags = get_tags_page(session, page=1, limit=5, search_category=4, order="count")
for tag in tags:
assert tag["category"] == 4, f"Expected category 4, got {tag['category']} for {tag['name']}"
print(f" {tag['name']:40s} posts={tag['post_count']:>8,}")
print(" PASS ✓")
def test_name_search(session: requests.Session) -> None:
print("\n[5] Name search — tags matching 'blue_hair*'")
tags = get_tags_page(session, page=1, limit=5, search_name="blue_hair*", order="count")
assert len(tags) > 0, "No results for blue_hair*"
for tag in tags:
cat = CATEGORY_NAMES.get(tag.get("category", -1), "unknown")
print(f" [{cat:12s}] {tag['name']:40s} posts={tag['post_count']:>8,}")
print(" PASS ✓")
def test_well_known_tags(session: requests.Session) -> None:
print("\n[6] Well-known tags — '1girl', 'blue_hair', 'sword' should exist")
for tag_name in ("1girl", "blue_hair", "sword"):
tags = get_tags_page(session, page=1, limit=1, search_name=tag_name, order="count")
found = [t for t in tags if t["name"] == tag_name]
assert found, f"Tag '{tag_name}' not found in API response"
t = found[0]
cat = CATEGORY_NAMES.get(t.get("category", -1), "unknown")
print(f" {tag_name:20s} category={cat:12s} posts={t['post_count']:>8,}")
print(" PASS ✓")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
env = load_env()
api_key = env.get("DANBOORU_API_KEY") or os.environ.get("DANBOORU_API_KEY")
username = env.get("DANBOORU_USERNAME") or os.environ.get("DANBOORU_USERNAME")
if api_key:
print(f"API key loaded: {api_key[:8]}")
else:
print("No API key found — using unauthenticated access (public endpoints only)")
# Danbooru public tag endpoints don't require auth.
# Pass username + api_key only when both are available.
session = make_session(
api_key=api_key if (api_key and username) else None,
username=username,
)
tests = [
test_basic_fetch,
test_fields_present,
test_pagination,
test_category_filter,
test_name_search,
test_well_known_tags,
]
passed = 0
failed = 0
for test_fn in tests:
try:
test_fn(session)
passed += 1
except Exception as exc:
print(f" FAIL ✗ {exc}")
failed += 1
time.sleep(0.3) # be polite to the API
print(f"\n{'='*50}")
print(f"Results: {passed} passed, {failed} failed")
if failed:
sys.exit(1)
if __name__ == "__main__":
main()

445
src/server.py Normal file
View File

@@ -0,0 +1,445 @@
#!/usr/bin/env python3
"""
Danbooru Tag Validator — MCP Server
Exposes three tools for LLMs to work with Danbooru tags:
search_tags prefix / full-text search, returns rich tag objects
validate_tags exact-match check, returns valid/invalid split with metadata
suggest_tags autocomplete-style suggestions for a partial input
The SQLite database must be pre-built with scripts/scrape_tags.py.
Logging:
All log output goes to stderr (stdout is reserved for the MCP JSON-RPC protocol).
Log level is controlled by the LOG_LEVEL environment variable:
DEBUG, INFO (default), WARNING, ERROR, CRITICAL
Log format is controlled by LOG_FORMAT:
"text" (default) — human-readable with timestamps
"json" — structured JSON, one object per line (for log aggregators)
"""
from __future__ import annotations
import json
import logging
import os
import sqlite3
import sys
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Generator
from mcp.server.fastmcp import FastMCP
# ---------------------------------------------------------------------------
# Logging setup — must go to stderr (stdout is the MCP transport)
# ---------------------------------------------------------------------------
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
LOG_FORMAT = os.environ.get("LOG_FORMAT", "text").lower()
class _JsonFormatter(logging.Formatter):
"""Emit one JSON object per log record to stderr."""
def format(self, record: logging.LogRecord) -> str: # noqa: A003
obj: dict = {
"ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S"),
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
if record.exc_info:
obj["exc"] = self.formatException(record.exc_info)
# Forward any extra keyword args as top-level fields
for k, v in record.__dict__.items():
if k not in {
"name", "msg", "args", "levelname", "levelno", "pathname",
"filename", "module", "exc_info", "exc_text", "stack_info",
"lineno", "funcName", "created", "msecs", "relativeCreated",
"thread", "threadName", "processName", "process", "message",
"taskName",
}:
obj[k] = v
return json.dumps(obj, default=str)
def _configure_logging() -> logging.Logger:
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(LOG_LEVEL)
if LOG_FORMAT == "json":
handler.setFormatter(_JsonFormatter())
else:
handler.setFormatter(
logging.Formatter(
fmt="%(asctime)s [%(levelname)-8s] %(name)s%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
)
root = logging.getLogger()
root.setLevel(LOG_LEVEL)
root.addHandler(handler)
return logging.getLogger("danbooru_mcp")
log = _configure_logging()
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
_HERE = Path(__file__).parent
_DEFAULT_DB = _HERE.parent / "db" / "tags.db"
DB_PATH = Path(os.environ.get("DANBOORU_TAGS_DB", str(_DEFAULT_DB)))
# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------
def _check_db() -> None:
if not DB_PATH.exists():
log.error("Database file not found", extra={"db_path": str(DB_PATH)})
raise FileNotFoundError(
f"Tags database not found at {DB_PATH}. "
"Run `python scripts/scrape_tags.py` first to build it."
)
@contextmanager
def _get_conn() -> Generator[sqlite3.Connection, None, None]:
_check_db()
t0 = time.perf_counter()
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA query_only = ON")
conn.execute("PRAGMA cache_size = -64000") # 64 MB page cache
log.debug("DB connection opened", extra={"db_path": str(DB_PATH)})
try:
yield conn
finally:
conn.close()
elapsed_ms = (time.perf_counter() - t0) * 1000
log.debug("DB connection closed", extra={"elapsed_ms": round(elapsed_ms, 2)})
# ---------------------------------------------------------------------------
# MCP Server
# ---------------------------------------------------------------------------
mcp = FastMCP(
"danbooru-tags",
instructions=(
"Use this server to validate, search, and suggest Danbooru tags "
"for Stable Diffusion / Illustrious prompts. "
"Always call validate_tags before finalising a prompt to confirm "
"every tag is a real, non-deprecated Danbooru tag. "
"Tags with higher post_count are more commonly used and well-supported."
),
)
log.info(
"MCP server initialised",
extra={
"db_path": str(DB_PATH),
"db_exists": DB_PATH.exists(),
"log_level": LOG_LEVEL,
"log_format": LOG_FORMAT,
},
)
# ---------------------------------------------------------------------------
# Tool: search_tags
# ---------------------------------------------------------------------------
@mcp.tool()
def search_tags(query: str, limit: int = 20, category: str | None = None) -> list[dict]:
"""Search for Danbooru tags matching a query string.
Uses FTS5 full-text and prefix search. Results are ordered by FTS5
relevance, then by post count (most-used first).
FTS5 query syntax is supported:
- Prefix (default): "blue_ha" matches "blue_hair", "blue_hat", …
- Explicit prefix: "blue_ha*"
- Phrase: '"long hair"'
- Boolean: "hair AND blue"
Args:
query: The search string. A trailing '*' wildcard is added
automatically unless the query already ends with one.
limit: Maximum results to return (default 20, max 200).
category: Optional category filter. One of:
"general", "artist", "copyright", "character", "meta"
Returns:
List of tag objects, each with:
name (str) the exact Danbooru tag string
post_count (int) number of posts using this tag
category_name (str) "general" | "artist" | "copyright" | "character" | "meta"
is_deprecated (bool) whether the tag has been deprecated on Danbooru
"""
t0 = time.perf_counter()
limit = min(max(1, limit), 200)
log.info(
"search_tags called",
extra={"query": query, "limit": limit, "category": category},
)
fts_query = query.strip()
if fts_query and not fts_query.endswith("*"):
fts_query = fts_query + "*"
CATEGORY_MAP = {
"general": 0, "artist": 1, "copyright": 3, "character": 4, "meta": 5
}
category_filter = ""
params: list = [fts_query]
if category and category.lower() in CATEGORY_MAP:
category_filter = "AND t.category = ?"
params.append(CATEGORY_MAP[category.lower()])
params.append(limit)
try:
with _get_conn() as conn:
rows = conn.execute(
f"""
SELECT t.name, t.post_count, t.category_name,
CAST(t.is_deprecated AS INTEGER) AS is_deprecated
FROM tags_fts f
JOIN tags t ON t.id = f.rowid
WHERE tags_fts MATCH ?
{category_filter}
ORDER BY rank, t.post_count DESC
LIMIT ?
""",
params,
).fetchall()
results = [
{
"name": row["name"],
"post_count": row["post_count"],
"category": row["category_name"],
"is_deprecated": bool(row["is_deprecated"]),
}
for row in rows
]
elapsed_ms = (time.perf_counter() - t0) * 1000
log.info(
"search_tags completed",
extra={
"query": query,
"fts_query": fts_query,
"category": category,
"results": len(results),
"elapsed_ms": round(elapsed_ms, 2),
},
)
return results
except Exception:
log.exception("search_tags failed", extra={"query": query})
raise
# ---------------------------------------------------------------------------
# Tool: validate_tags
# ---------------------------------------------------------------------------
@mcp.tool()
def validate_tags(tags: list[str]) -> dict:
"""Validate a list of Danbooru tags, returning valid and invalid sets.
Performs exact-match lookup against the full Danbooru tag database.
Also flags deprecated tags — they technically exist but should be
replaced with their canonical equivalents.
Use this before submitting a prompt to Stable Diffusion.
Args:
tags: A list of tag strings to validate (e.g. ["blue_hair", "1girl"]).
Returns:
A dict with three keys:
"valid" tags that exist and are not deprecated
"deprecated" tags that exist but are deprecated (should be replaced)
"invalid" tags that were not found (misspelled or invented)
"""
t0 = time.perf_counter()
log.info(
"validate_tags called",
extra={"tag_count": len(tags), "tags_sample": tags[:5]},
)
if not tags:
log.debug("validate_tags: empty input, returning early")
return {"valid": [], "deprecated": [], "invalid": []}
# Deduplicate, preserve order
seen: dict[str, None] = {}
for t in tags:
seen[t.strip()] = None
unique_tags = [t for t in seen if t]
placeholders = ",".join("?" * len(unique_tags))
try:
with _get_conn() as conn:
rows = conn.execute(
f"""
SELECT name, is_deprecated
FROM tags
WHERE name IN ({placeholders})
""",
unique_tags,
).fetchall()
found: dict[str, bool] = {
row["name"]: bool(row["is_deprecated"]) for row in rows
}
valid = [t for t in unique_tags if t in found and not found[t]]
deprecated = [t for t in unique_tags if t in found and found[t]]
invalid = [t for t in unique_tags if t not in found]
elapsed_ms = (time.perf_counter() - t0) * 1000
log.info(
"validate_tags completed",
extra={
"total": len(unique_tags),
"valid": len(valid),
"deprecated": len(deprecated),
"invalid": len(invalid),
"invalid_tags": invalid[:10], # log first 10 invalid for debugging
"elapsed_ms": round(elapsed_ms, 2),
},
)
return {"valid": valid, "deprecated": deprecated, "invalid": invalid}
except Exception:
log.exception("validate_tags failed", extra={"tags_sample": tags[:5]})
raise
# ---------------------------------------------------------------------------
# Tool: suggest_tags
# ---------------------------------------------------------------------------
@mcp.tool()
def suggest_tags(partial: str, limit: int = 10, category: str | None = None) -> list[dict]:
"""Get tag suggestions for a partial or approximate tag input.
Runs a prefix search against the FTS5 index to find the closest
existing Danbooru tags, ordered by post count (most-used first).
Useful when the LLM is unsure of the exact spelling or wants to
explore available tags for a concept.
Deprecated tags are excluded from suggestions by default.
Args:
partial: A partial tag string (e.g. "blue_ha""blue_hair").
limit: Maximum suggestions to return (default 10, max 50).
category: Optional category filter. One of:
"general", "artist", "copyright", "character", "meta"
Returns:
List of tag objects (same shape as search_tags), sorted by
post_count descending. Deprecated tags are excluded.
"""
t0 = time.perf_counter()
limit = min(max(1, limit), 50)
log.info(
"suggest_tags called",
extra={"partial": partial, "limit": limit, "category": category},
)
fts_query = partial.strip()
if not fts_query:
log.debug("suggest_tags: empty partial, returning early")
return []
if not fts_query.endswith("*"):
fts_query = fts_query + "*"
CATEGORY_MAP = {
"general": 0, "artist": 1, "copyright": 3, "character": 4, "meta": 5
}
category_filter = ""
params: list = [fts_query]
if category and category.lower() in CATEGORY_MAP:
category_filter = "AND t.category = ?"
params.append(CATEGORY_MAP[category.lower()])
params.append(limit)
try:
with _get_conn() as conn:
rows = conn.execute(
f"""
SELECT t.name, t.post_count, t.category_name,
CAST(t.is_deprecated AS INTEGER) AS is_deprecated
FROM tags_fts f
JOIN tags t ON t.id = f.rowid
WHERE tags_fts MATCH ?
AND t.is_deprecated = 0
{category_filter}
ORDER BY t.post_count DESC
LIMIT ?
""",
params,
).fetchall()
results = [
{
"name": row["name"],
"post_count": row["post_count"],
"category": row["category_name"],
"is_deprecated": False,
}
for row in rows
]
elapsed_ms = (time.perf_counter() - t0) * 1000
log.info(
"suggest_tags completed",
extra={
"partial": partial,
"fts_query": fts_query,
"category": category,
"results": len(results),
"elapsed_ms": round(elapsed_ms, 2),
},
)
return results
except Exception:
log.exception("suggest_tags failed", extra={"partial": partial})
raise
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
log.info("Starting MCP server (stdio transport)")
mcp.run()
if __name__ == "__main__":
main()