#!/usr/bin/env python3 """Load datasets/Woodworking_v2.csv into PostgreSQL (recipes_woodworking table). Drops the old table if present and creates a new one matching the v2 schema. """ from __future__ import annotations import asyncio import csv import json import pathlib import re from typing import Any, Dict, List, Optional import asyncpg PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1] CONF_PATH = PROJECT_ROOT / "db.conf" CSV_PATH = PROJECT_ROOT / "datasets/Woodworking_v2.csv" RE_KEY = re.compile(r"^([A-Z0-9_]+)=(.*)$") def parse_db_conf(path: pathlib.Path) -> Dict[str, str]: data: Dict[str, str] = {} for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue m = RE_KEY.match(line) if m: k, v = m.group(1), m.group(2).strip().strip("'\"") data[k] = v return data async def recreate_table(conn: asyncpg.Connection): await conn.execute("DROP TABLE IF EXISTS recipes_woodworking;") await conn.execute( """ CREATE TABLE recipes_woodworking ( id SERIAL PRIMARY KEY, category TEXT NOT NULL, level INT NOT NULL, subcrafts JSONB, name TEXT NOT NULL, crystal TEXT NOT NULL, key_item TEXT, ingredients JSONB, hq_yields JSONB ); """ ) CATEGORY_RANGES = [ ("Amateur", 1, 10), ("Recruit", 8, 20), ("Initiate", 18, 30), ("Novice", 28, 40), ("Apprentice", 38, 50), ("Journeyman", 48, 60), ("Craftsman", 58, 70), ("Artisan", 68, 80), ("Adept", 78, 90), ("Veteran", 88, 100), ("Expert", 98, 110), ("Authority", 111, 120), ] def category_for_level(level: int) -> str: """Return the category name that includes the given level. If multiple ranges overlap, the first match in CATEGORY_RANGES is returned. """ for name, lo, hi in CATEGORY_RANGES: if lo <= level <= hi: return name return "Unknown" async def insert_csv(conn: asyncpg.Connection): with CSV_PATH.open(encoding="utf-8") as f: reader = csv.DictReader(f) records = [] for row in reader: records.append( ( category_for_level(int(row["level"])), int(row["level"]), json.dumps(json.loads(row["subcrafts"] or "[]")), # jsonb text row["name"], row["crystal"], row["key_item"] or None, json.dumps(json.loads(row["ingredients"] or "[]")), # jsonb text json.dumps(json.loads(row["hq_yields"] or "[]")), # jsonb text ) ) await conn.copy_records_to_table( "recipes_woodworking", records=records, columns=[ "category", "level", "subcrafts", "name", "crystal", "key_item", "ingredients", "hq_yields", ], ) async def main(): if not CSV_PATH.exists(): raise SystemExit("CSV v2 not found; run parser first.") conf = parse_db_conf(CONF_PATH) conn = await asyncpg.connect( host=conf["PSQL_HOST"], port=int(conf["PSQL_PORT"]), user=conf["PSQL_USER"], password=conf["PSQL_PASSWORD"], database=conf["PSQL_DBNAME"], ) try: await recreate_table(conn) await insert_csv(conn) print("Loaded recipes into new recipes_woodworking table.") finally: await conn.close() if __name__ == "__main__": asyncio.run(main())