Files
Mog-Squire/scripts/load_woodworking_v2_to_db.py
2025-07-07 13:39:46 +01:00

135 lines
3.6 KiB
Python

#!/usr/bin/env python3
"""Load datasets/Woodworking_v2.csv into PostgreSQL (recipes_woodworking table).
Drops the old table if present and creates a new one matching the v2 schema.
"""
from __future__ import annotations
import asyncio
import csv
import json
import pathlib
import re
from typing import Any, Dict, List, Optional
import asyncpg
PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1]
CONF_PATH = PROJECT_ROOT / "db.conf"
CSV_PATH = PROJECT_ROOT / "datasets/Woodworking_v2.csv"
RE_KEY = re.compile(r"^([A-Z0-9_]+)=(.*)$")
def parse_db_conf(path: pathlib.Path) -> Dict[str, str]:
data: Dict[str, str] = {}
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
m = RE_KEY.match(line)
if m:
k, v = m.group(1), m.group(2).strip().strip("'\"")
data[k] = v
return data
async def recreate_table(conn: asyncpg.Connection):
await conn.execute("DROP TABLE IF EXISTS recipes_woodworking;")
await conn.execute(
"""
CREATE TABLE recipes_woodworking (
id SERIAL PRIMARY KEY,
category TEXT NOT NULL,
level INT NOT NULL,
subcrafts JSONB,
name TEXT NOT NULL,
crystal TEXT NOT NULL,
key_item TEXT,
ingredients JSONB,
hq_yields JSONB
);
"""
)
CATEGORY_RANGES = [
("Amateur", 1, 10),
("Recruit", 8, 20),
("Initiate", 18, 30),
("Novice", 28, 40),
("Apprentice", 38, 50),
("Journeyman", 48, 60),
("Craftsman", 58, 70),
("Artisan", 68, 80),
("Adept", 78, 90),
("Veteran", 88, 100),
("Expert", 98, 110),
("Authority", 111, 120),
]
def category_for_level(level: int) -> str:
"""Return the category name that includes the given level.
If multiple ranges overlap, the first match in CATEGORY_RANGES is returned.
"""
for name, lo, hi in CATEGORY_RANGES:
if lo <= level <= hi:
return name
return "Unknown"
async def insert_csv(conn: asyncpg.Connection):
with CSV_PATH.open(encoding="utf-8") as f:
reader = csv.DictReader(f)
records = []
for row in reader:
records.append(
(
category_for_level(int(row["level"])),
int(row["level"]),
json.dumps(json.loads(row["subcrafts"] or "[]")), # jsonb text
row["name"],
row["crystal"],
row["key_item"] or None,
json.dumps(json.loads(row["ingredients"] or "[]")), # jsonb text
json.dumps(json.loads(row["hq_yields"] or "[]")), # jsonb text
)
)
await conn.copy_records_to_table(
"recipes_woodworking",
records=records,
columns=[
"category",
"level",
"subcrafts",
"name",
"crystal",
"key_item",
"ingredients",
"hq_yields",
],
)
async def main():
if not CSV_PATH.exists():
raise SystemExit("CSV v2 not found; run parser first.")
conf = parse_db_conf(CONF_PATH)
conn = await asyncpg.connect(
host=conf["PSQL_HOST"],
port=int(conf["PSQL_PORT"]),
user=conf["PSQL_USER"],
password=conf["PSQL_PASSWORD"],
database=conf["PSQL_DBNAME"],
)
try:
await recreate_table(conn)
await insert_csv(conn)
print("Loaded recipes into new recipes_woodworking table.")
finally:
await conn.close()
if __name__ == "__main__":
asyncio.run(main())