#!/usr/bin/env python3 """Create recipes_woodworking table and load data from datasets/Woodworking.csv. Usage: python3 scripts/load_woodworking_to_db.py The script reads database connection details from db.conf located at the project root. It is idempotent – creating the table only if it doesn't already exist, then inserting new rows (it truncates beforehand to avoid duplicates). """ from __future__ import annotations import asyncio import csv import pathlib import re from typing import Any, Dict, List, Optional import asyncpg PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1] CONF_PATH = PROJECT_ROOT / "db.conf" CSV_PATH = PROJECT_ROOT / "datasets/Woodworking.csv" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def parse_db_conf(path: pathlib.Path) -> Dict[str, str]: """Parse simple KEY=VALUE lines into a dict.""" conf: Dict[str, str] = {} pattern = re.compile(r"^([A-Z0-9_]+)=(.*)$") for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue m = pattern.match(line) if m: key, value = m.group(1), m.group(2) # Remove surrounding quotes if present value = value.strip().strip("'\"") conf[key] = value required = {"PSQL_HOST", "PSQL_PORT", "PSQL_USER", "PSQL_PASSWORD", "PSQL_DBNAME"} missing = required - conf.keys() if missing: raise RuntimeError(f"Missing keys in db.conf: {', '.join(sorted(missing))}") return conf async def create_table(conn: asyncpg.Connection) -> None: await conn.execute( """ CREATE TABLE IF NOT EXISTS recipes_woodworking ( id SERIAL PRIMARY KEY, category TEXT NOT NULL, level INT NOT NULL, product_name TEXT NOT NULL, nq_yield INT, hq1_yield INT, hq2_yield INT, hq3_yield INT, crystal TEXT, ingredients TEXT ); """ ) async def truncate_table(conn: asyncpg.Connection) -> None: await conn.execute("TRUNCATE TABLE recipes_woodworking;") async def insert_rows(conn: asyncpg.Connection, rows: List[Dict[str, str]]) -> None: """Bulk insert via copy protocol for speed.""" # Prepare iterable of tuples converting blanks to None and ints accordingly tuples = [] for r in rows: tuples.append( ( r["category"], int(r["level"]), r["product_name"], _to_int_or_none(r["nq_yield"]), _to_int_or_none(r["hq1_yield"]), _to_int_or_none(r["hq2_yield"]), _to_int_or_none(r["hq3_yield"]), r["crystal"] or None, r["ingredients"] or None, ) ) await conn.copy_records_to_table( "recipes_woodworking", records=tuples, columns=[ "category", "level", "product_name", "nq_yield", "hq1_yield", "hq2_yield", "hq3_yield", "crystal", "ingredients", ], ) def _to_int_or_none(s: str) -> Optional[int]: s = s.strip() return int(s) if s else None # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- async def main() -> None: if not CSV_PATH.exists(): raise SystemExit("CSV file not found. Run woodworking_to_csv.py first.") conf = parse_db_conf(CONF_PATH) conn = await asyncpg.connect( host=conf["PSQL_HOST"], port=int(conf["PSQL_PORT"]), user=conf["PSQL_USER"], password=conf["PSQL_PASSWORD"], database=conf["PSQL_DBNAME"], ) try: await create_table(conn) await truncate_table(conn) with CSV_PATH.open(newline="", encoding="utf-8") as f: reader = csv.DictReader(f) rows = list(reader) await insert_rows(conn, rows) print(f"Inserted {len(rows)} rows into recipes_woodworking.") finally: await conn.close() if __name__ == "__main__": asyncio.run(main())