Lots of stuff

This commit is contained in:
Aodhan
2025-07-08 23:04:43 +01:00
parent cfa2eff6ef
commit 65c1972c49
26 changed files with 4094 additions and 104 deletions

View File

@@ -13,7 +13,8 @@ PostgreSQL database.
| **recipes_to_csv_v2.py** | Generic parser. `python recipes_to_csv_v2.py <Craft>` processes one craft; use `python recipes_to_csv_v2.py --all` **or simply omit the argument** to parse every `.txt` file under `datasets/`, producing `datasets/<Craft>_v2.csv` for each. |
| **load_woodworking_to_db.py** | Loader for the legacy CSV (kept for reference). |
| **load_woodworking_v2_to_db.py** | Drops & recreates **recipes_woodworking** table and bulk-loads `Woodworking_v2.csv`. |
| **load_recipes_v2_to_db.py** | Generic loader. `python load_recipes_v2_to_db.py <Craft>` loads one craft; omit the argument to load **all** generated CSVs into their respective `recipes_<craft>` tables. |
| **load_recipes_v2_to_db.py** | Generic loader.
| **load_inventory_to_db.py** | Truncate & load `datasets/inventory.csv` into the `inventory` table. | `python load_recipes_v2_to_db.py <Craft>` loads one craft; omit the argument to load **all** generated CSVs into their respective `recipes_<craft>` tables. |
| **requirements.txt** | Minimal Python dependencies for the scripts. |
| **venv/** | Local virtual-environment created by the setup steps below. |

View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""Load datasets/inventory.csv into the **inventory** table, replacing any
existing contents.
Usage:
python load_inventory_to_db.py [CSV_PATH]
If ``CSV_PATH`` is omitted the script defaults to ``datasets/inventory.csv``
relative to the project root.
This script is similar in style to the other ETL helpers in ``scripts/``. It is
idempotent it truncates the ``inventory`` table before bulk-inserting the new
rows.
The database connection details are read from the standard ``db.conf`` file
located at the project root. The file must define at least the following keys::
PSQL_HOST
PSQL_PORT
PSQL_USER
PSQL_PASSWORD
PSQL_DBNAME
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import datetime as _dt
import pathlib
import re
from typing import Dict, List, Tuple
import asyncpg
# ---------------------------------------------------------------------------
# Paths & Constants
# ---------------------------------------------------------------------------
PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1]
CONF_PATH = PROJECT_ROOT / "db.conf"
DEFAULT_CSV_PATH = PROJECT_ROOT / "datasets" / "inventory.csv"
RE_CONF = re.compile(r"^([A-Z0-9_]+)=(.*)$")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def parse_db_conf(path: pathlib.Path) -> Dict[str, str]:
"""Parse ``db.conf`` (simple KEY=VALUE format) into a dict."""
if not path.exists():
raise FileNotFoundError("db.conf not found at project root required for DB credentials")
conf: Dict[str, str] = {}
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if (m := RE_CONF.match(line)):
key, value = m.group(1), m.group(2).strip().strip("'\"")
conf[key] = value
required = {"PSQL_HOST", "PSQL_PORT", "PSQL_USER", "PSQL_PASSWORD", "PSQL_DBNAME"}
missing = required - conf.keys()
if missing:
raise RuntimeError(f"Missing keys in db.conf: {', '.join(sorted(missing))}")
return conf
async def ensure_inventory_table(conn: asyncpg.Connection) -> None:
"""Create the ``inventory`` table if it doesn't already exist.
The schema mirrors the SQLAlchemy model in ``backend/app/models.py``.
"""
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS inventory (
id SERIAL PRIMARY KEY,
character_name TEXT NOT NULL,
storage_type TEXT NOT NULL,
item_name TEXT NOT NULL,
quantity INT NOT NULL,
last_updated TIMESTAMPTZ DEFAULT NOW()
);
"""
)
async def truncate_inventory(conn: asyncpg.Connection) -> None:
"""Remove all rows from the inventory table before re-inserting."""
await conn.execute("TRUNCATE TABLE inventory;")
async def copy_csv_to_db(conn: asyncpg.Connection, rows: List[Tuple[str, str, str, int]]) -> None:
"""Bulk copy the parsed CSV rows into the DB using ``copy_records_to_table``."""
await conn.copy_records_to_table(
"inventory",
records=rows,
columns=[
"character_name",
"storage_type",
"item_name",
"quantity",
"last_updated",
],
)
# ---------------------------------------------------------------------------
# Main logic
# ---------------------------------------------------------------------------
async def load_inventory(csv_path: pathlib.Path) -> None:
if not csv_path.exists():
raise SystemExit(f"CSV file not found: {csv_path}")
conf = parse_db_conf(CONF_PATH)
conn = await asyncpg.connect(
host=conf["PSQL_HOST"],
port=int(conf["PSQL_PORT"]),
user=conf["PSQL_USER"],
password=conf["PSQL_PASSWORD"],
database=conf["PSQL_DBNAME"],
)
try:
await ensure_inventory_table(conn)
await truncate_inventory(conn)
# Parse CSV
rows: List[Tuple[str, str, str, int]] = []
with csv_path.open(newline="", encoding="utf-8") as f:
reader = csv.DictReader(f, delimiter=";", quotechar='"')
for r in reader:
char = r["char"].strip()
storage = r["storage"].strip()
item = r["item"].strip()
qty = int(r["quantity"].strip()) if r["quantity"].strip() else 0
rows.append((char, storage, item, qty, _dt.datetime.utcnow()))
await copy_csv_to_db(conn, rows)
print(f"Inserted {len(rows)} inventory rows.")
finally:
await conn.close()
async def main_async(csv_arg: str | None) -> None:
csv_path = pathlib.Path(csv_arg).expanduser().resolve() if csv_arg else DEFAULT_CSV_PATH
await load_inventory(csv_path)
def main() -> None:
p = argparse.ArgumentParser(description="Load inventory CSV into DB")
p.add_argument("csv", nargs="?", help="Path to CSV; defaults to datasets/inventory.csv")
args = p.parse_args()
asyncio.run(main_async(args.csv))
if __name__ == "__main__":
main()