Files
Mog-Squire/backend/app/router.py
Aodhan b9b47c96f6 Fix
2025-07-08 23:45:25 +01:00

270 lines
8.8 KiB
Python

from typing import List, Optional, Tuple, Any
from datetime import datetime
from fastapi import APIRouter, Depends, Query, HTTPException, Path, Response, UploadFile, File
from sqlalchemy import text, select, insert
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from .database import get_session
from .models import Inventory
router = APIRouter()
# ---------------------------------------------------------------------------
# Crafting Recipes endpoints
@router.post("/inventory/import")
async def import_inventory_csv(
file: UploadFile = File(...),
session: AsyncSession = Depends(get_session),
):
"""Replace the entire inventory table with contents from an uploaded CSV.
The CSV must use the delimiter ``;`` and column headers: ``char;storage;item;quantity``.
"""
import csv
import io
# Read file bytes and decode
contents = await file.read()
try:
text_data = contents.decode("utf-8")
except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="CSV must be UTF-8 encoded")
reader = csv.DictReader(io.StringIO(text_data), delimiter=";", quotechar='"')
rows = []
for r in reader:
try:
qty = int(r["quantity"].strip()) if r["quantity"].strip() else 0
except (KeyError, ValueError):
raise HTTPException(status_code=400, detail="Invalid CSV schema or quantity value")
rows.append(
{
"character_name": r["char"].strip(),
"storage_type": r["storage"].strip(),
"item_name": r["item"].strip(),
"quantity": qty,
}
)
# Replace table contents inside a transaction
try:
await session.execute(text("TRUNCATE TABLE inventory;"))
if rows:
await session.execute(insert(Inventory), rows)
await session.commit()
except Exception as e:
await session.rollback()
raise HTTPException(status_code=500, detail=f"Failed to import CSV: {e}")
return {"imported": len(rows)}
ALLOWED_CRAFTS = {
"woodworking": "recipes_woodworking",
# Future crafts can be added here, e.g. "smithing": "recipes_smithing"
}
class MetadataResponse(BaseModel):
storage_types: List[str]
type_descriptions: List[str]
@router.get("/metadata", response_model=MetadataResponse)
async def metadata(session: AsyncSession = Depends(get_session)):
"""Return distinct storage types and type descriptions."""
storage_q = await session.execute(text("SELECT DISTINCT storage_type FROM inventory ORDER BY storage_type"))
storage_types = [row[0] for row in storage_q.fetchall() if row[0]]
type_q = await session.execute(text("SELECT DISTINCT type_description FROM all_items ORDER BY type_description"))
original_types = {row[0] for row in type_q.fetchall() if row[0]}
processed_types = set(original_types)
has_nothing_or_unknown = 'NOTHING' in processed_types or 'UNKNOWN' in processed_types
if 'NOTHING' in processed_types:
processed_types.remove('NOTHING')
if 'UNKNOWN' in processed_types:
processed_types.remove('UNKNOWN')
if has_nothing_or_unknown:
processed_types.add('MANNEQUIN')
type_descriptions = sorted(list(processed_types))
return MetadataResponse(storage_types=storage_types, type_descriptions=type_descriptions)
class InventoryItem(BaseModel):
id: int
item_name: str
quantity: int
storage_type: str
description: Optional[str]
icon_id: Optional[str]
type_description: Optional[str]
last_updated: Optional[datetime]
class Config:
from_attributes = True
@router.get("/inventory/{character}", response_model=List[InventoryItem])
async def inventory(
character: str,
storage_type: Optional[str] = Query(None),
session: AsyncSession = Depends(get_session),
):
"""Return items for a character, optionally filtered by storage_type."""
base_sql = """
SELECT i.*, ai.description, ai.icon_id, ai.type_description
FROM inventory i
LEFT JOIN all_items ai ON ai.name = i.item_name
WHERE i.character_name = :char
"""
params = {"char": character}
if storage_type:
base_sql += " AND i.storage_type = :storage"
params["storage"] = storage_type
q = text(base_sql)
result = await session.execute(q, params)
rows = result.fetchall()
return [InventoryItem(
id=r.id,
item_name=r.item_name,
quantity=r.quantity,
storage_type=r.storage_type,
description=r.description,
icon_id=r.icon_id,
type_description=r.type_description,
last_updated=r.last_updated,
) for r in rows]
class ItemSummary(BaseModel):
id: int
name: str
icon_id: Optional[str]
type_description: Optional[str]
jobs_description: Optional[List[str]]
@router.get("/items", response_model=List[ItemSummary])
async def items(
response: Response,
type: Optional[str] = Query(None, alias="type"),
search: Optional[str] = Query(None, description="Substring search on item name"),
page: int = Query(1, ge=1),
page_size: int = Query(40, ge=1, le=100),
session: AsyncSession = Depends(get_session),
):
"""Return items from all_items view with pagination."""
offset = (page - 1) * page_size
params: dict[str, Any] = {"limit": page_size, "offset": offset}
where_clauses: list[str] = ["a.name != '.'"]
if type:
if type == "MANNEQUIN":
where_clauses.append("a.type_description IN ('MANNEQUIN', 'NOTHING', 'UNKNOWN')")
else:
where_clauses.append("a.type_description = :type")
params["type"] = type
if search:
where_clauses.append("a.name ILIKE :search")
params["search"] = f"%{search}%"
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
# Removed dependency on armor_items to avoid errors if table is absent
join_sql = "FROM all_items a"
# Total count for pagination (exclude limit/offset)
count_params = {k: v for k, v in params.items() if k not in ("limit", "offset")}
count_q = text(f"SELECT COUNT(*) {join_sql} {where_sql}")
total_res = await session.execute(count_q, count_params)
total_count = total_res.scalar() or 0
response.headers["X-Total-Count"] = str(total_count)
# Main query
q = text(
f"SELECT a.id, a.name, a.icon_id, a.type_description, NULL AS jobs_description "
f"{join_sql} {where_sql} ORDER BY a.id LIMIT :limit OFFSET :offset"
)
result = await session.execute(q, params)
rows = result.fetchall()
return [
ItemSummary(
id=r.id,
name=r.name,
icon_id=r.icon_id,
type_description=r.type_description,
jobs_description=r.jobs_description,
)
for r in rows
]
class ItemDetail(BaseModel):
id: int
name: str
description: Optional[str]
icon_id: Optional[str]
type_description: Optional[str]
@router.get("/icon/{icon_id}")
async def get_icon(icon_id: str, session: AsyncSession = Depends(get_session)):
q = text("SELECT image_data, image_format, image_encoding FROM item_icons WHERE id = :id LIMIT 1")
res = await session.execute(q, {"id": icon_id})
row = res.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Icon not found")
import base64
if row.image_encoding == "base64":
data_bytes = base64.b64decode(row.image_data)
else:
data_bytes = row.image_data
media_type = f"image/{row.image_format.split('/')[-1]}" if row.image_format else "image/png"
from fastapi.responses import Response
return Response(content=data_bytes, media_type=media_type)
@router.get("/items/by-name/{item_name}", response_model=ItemDetail)
async def item_detail_by_name(item_name: str, session: AsyncSession = Depends(get_session)):
q = text("SELECT * FROM all_items WHERE name = :n LIMIT 1")
row = (await session.execute(q, {"n": item_name})).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Item not found")
return ItemDetail(
id=row.id,
name=row.name,
description=row.description,
icon_id=row.icon_id,
type_description=row.type_description,
)
@router.get("/items/{item_id}", response_model=ItemDetail)
async def item_detail(item_id: int, session: AsyncSession = Depends(get_session)):
"""Fetch full item record from all_items view."""
q = text("SELECT * FROM all_items WHERE id = :id LIMIT 1")
result = await session.execute(q, {"id": item_id})
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Item not found")
return ItemDetail(
id=row.id,
name=row.name,
description=row.description,
icon_id=row.icon_id,
type_description=row.type_description,
)