#!/usr/bin/env python3 """Load datasets/desythesis_recipes.csv into PostgreSQL. This script parses the *desynthesis* recipe CSV which is structured slightly differently from the v2 crafting CSVs. Recipes are grouped under craft headings (e.g. "Alchemy", "Smithing"), followed by a header row. Recent edits mean each recipe row now lists **multiple HQ columns (HQ1, HQ2, HQ3)** _directly_ in the CSV instead of a single JSON cell. A typical section now looks like:: Alchemy Item,Crystal,Ingredients,HQ1,HQ2,HQ3,Cap Distilled Water x3,Lightning,Tahrongi Cactus,HQ1: Distilled Water x6,HQ2: Distilled Water x9,HQ3: Distilled Water x12,2 Some legacy sections may still use the shorter header ``Item,Crystal,Ingredients,HQ,Cap`` with the HQ values spread across several columns. Pragmatically we treat **all columns between ``Ingredients`` and the final ``Cap`` column as HQ fields** and extract at most three of them (hq1-3) for insertion into Postgres. The resulting database table schema is:: CREATE TABLE recipes_desynthesis ( id SERIAL PRIMARY KEY, craft TEXT NOT NULL, cap INT, item TEXT NOT NULL, crystal TEXT NOT NULL, ingredients TEXT NOT NULL, hq1 TEXT, hq2 TEXT, hq3 TEXT ); Run: python scripts/load_desynth_recipes_to_db.py """ from __future__ import annotations import asyncio import csv import json import pathlib import re from typing import Dict, List, Tuple, Optional import asyncpg PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1] CONF_PATH = PROJECT_ROOT / "db.conf" CSV_PATH = PROJECT_ROOT / "datasets" / "desythesis_recipes.csv" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- RE_CONF = re.compile(r"^([A-Z0-9_]+)=(.*)$") def parse_db_conf(path: pathlib.Path) -> Dict[str, str]: """Simple KEY=VALUE parser (quotes stripped).""" if not path.exists(): raise FileNotFoundError("db.conf not found") conf: Dict[str, str] = {} for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue m = RE_CONF.match(line) if m: k, v = m.group(1), m.group(2).strip().strip("'\"") conf[k] = v required = {"PSQL_HOST", "PSQL_PORT", "PSQL_USER", "PSQL_PASSWORD", "PSQL_DBNAME"} missing = required - conf.keys() if missing: raise RuntimeError(f"Missing keys in db.conf: {', '.join(sorted(missing))}") return conf def parse_csv(csv_path: pathlib.Path) -> List[Tuple[str, Optional[int], str, str, str, Optional[str], Optional[str], Optional[str]]]: """Parse the *desythesis_recipes.csv* file and return rows for COPY. The parser is tolerant of the two currently-seen layouts: 1. ``Item,Crystal,Ingredients,HQ1,HQ2,HQ3,Cap`` 2. ``Item,Crystal,Ingredients,HQ,Cap`` (legacy header but still multiple HQ columns in the data rows). The strategy is therefore: • first three columns are *always* Item, Crystal, Ingredients. • *last* column is CAP. • everything between is treated as HQ fields – the first three of those (if present) are saved as hq1-3. """ rows: List[Tuple[str, Optional[int], str, str, str, Optional[str], Optional[str], Optional[str]]] = [] current_craft: Optional[str] = None with csv_path.open(newline="", encoding="utf-8") as fh: reader = csv.reader(fh) for raw in reader: # ------------------------------------------------------------------ # Detect craft headings (single-cell rows, e.g. "Alchemy") # ------------------------------------------------------------------ if len(raw) == 1: current_craft = raw[0].strip() continue # Skip blank lines or header rows if not raw or raw[0].strip().startswith("Item") or current_craft is None: continue if len(raw) < 4: # Not enough columns for a valid recipe – skip continue # Standard columns item = raw[0].strip() crystal = raw[1].strip() ingredients = raw[2].strip() # CAP is *always* the final column cap_raw = raw[-1].strip() try: cap = int(cap_raw) if cap_raw.isdigit() else None except ValueError: cap = None # HQ columns: everything between ingredients and cap hq_columns = [c.strip() for c in raw[3:-1]] hq1 = hq_columns[0] if len(hq_columns) > 0 and hq_columns[0] else None hq2 = hq_columns[1] if len(hq_columns) > 1 and hq_columns[1] else None hq3 = hq_columns[2] if len(hq_columns) > 2 and hq_columns[2] else None # Clean prefixes like "HQ1: " def _clean(hq_val: Optional[str]) -> Optional[str]: if hq_val and ":" in hq_val: return hq_val.split(":", 1)[1].strip() return hq_val hq1, hq2, hq3 = map(_clean, (hq1, hq2, hq3)) rows.append((current_craft, cap, item, crystal, ingredients, hq1, hq2, hq3)) return rows async def recreate_table(conn: asyncpg.Connection) -> None: await conn.execute( """ DROP TABLE IF EXISTS recipes_desynthesis; CREATE TABLE recipes_desynthesis ( id SERIAL PRIMARY KEY, craft TEXT NOT NULL, cap INT, item TEXT NOT NULL, crystal TEXT NOT NULL, ingredients TEXT NOT NULL, hq1 TEXT, hq2 TEXT, hq3 TEXT ); """ ) async def copy_rows(conn: asyncpg.Connection, rows): await conn.copy_records_to_table( "recipes_desynthesis", records=rows, columns=[ "craft", "cap", "item", "crystal", "ingredients", "hq1", "hq2", "hq3", ], ) async def main() -> None: if not CSV_PATH.exists(): raise SystemExit("CSV file not found – run conversion first") conf = parse_db_conf(CONF_PATH) rows = parse_csv(CSV_PATH) print(f"Parsed {len(rows)} recipes from CSV.") conn = await asyncpg.connect( host=conf["PSQL_HOST"], port=int(conf["PSQL_PORT"]), user=conf["PSQL_USER"], password=conf["PSQL_PASSWORD"], database=conf["PSQL_DBNAME"], ) try: await recreate_table(conn) await copy_rows(conn, rows) print("Loaded recipes_desynthesis table.") finally: await conn.close() if __name__ == "__main__": asyncio.run(main())