328 lines
11 KiB
Python
328 lines
11 KiB
Python
from typing import List, Optional, Tuple, Any
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, Depends, Query, HTTPException, Path, Response, UploadFile, File
|
|
from sqlalchemy import text, select, insert
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from pydantic import BaseModel
|
|
|
|
from .database import get_session
|
|
from .models import Inventory
|
|
|
|
router = APIRouter()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Crafting Recipes endpoints
|
|
|
|
@router.post("/inventory/import")
|
|
async def import_inventory_csv(
|
|
file: UploadFile = File(...),
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Replace the entire inventory table with contents from an uploaded CSV.
|
|
|
|
The CSV must use the delimiter ``;`` and column headers: ``char;storage;item;quantity``.
|
|
"""
|
|
import csv
|
|
import io
|
|
# Read file bytes and decode
|
|
contents = await file.read()
|
|
try:
|
|
text_data = contents.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
raise HTTPException(status_code=400, detail="CSV must be UTF-8 encoded")
|
|
|
|
reader = csv.DictReader(io.StringIO(text_data), delimiter=";", quotechar='"')
|
|
rows = []
|
|
for r in reader:
|
|
try:
|
|
qty = int(r["quantity"].strip()) if r["quantity"].strip() else 0
|
|
except (KeyError, ValueError):
|
|
raise HTTPException(status_code=400, detail="Invalid CSV schema or quantity value")
|
|
rows.append(
|
|
{
|
|
"character_name": r["char"].strip(),
|
|
"storage_type": r["storage"].strip(),
|
|
"item_name": r["item"].strip(),
|
|
"quantity": qty,
|
|
}
|
|
)
|
|
|
|
# Replace table contents inside a transaction
|
|
try:
|
|
await session.execute(text("TRUNCATE TABLE inventory;"))
|
|
if rows:
|
|
await session.execute(insert(Inventory), rows)
|
|
await session.commit()
|
|
except Exception as e:
|
|
await session.rollback()
|
|
raise HTTPException(status_code=500, detail=f"Failed to import CSV: {e}")
|
|
|
|
return {"imported": len(rows)}
|
|
|
|
ALLOWED_CRAFTS = {
|
|
"woodworking": "recipes_woodworking",
|
|
# Future crafts can be added here, e.g. "smithing": "recipes_smithing"
|
|
}
|
|
|
|
|
|
|
|
class MetadataResponse(BaseModel):
|
|
storage_types: List[str]
|
|
type_descriptions: List[str]
|
|
|
|
|
|
@router.get("/metadata", response_model=MetadataResponse)
|
|
async def metadata(session: AsyncSession = Depends(get_session)):
|
|
"""Return distinct storage types and type descriptions."""
|
|
storage_q = await session.execute(text("SELECT DISTINCT storage_type FROM inventory ORDER BY storage_type"))
|
|
storage_types = [row[0] for row in storage_q.fetchall() if row[0]]
|
|
|
|
type_q = await session.execute(text("SELECT DISTINCT type_description FROM all_items ORDER BY type_description"))
|
|
original_types = {row[0] for row in type_q.fetchall() if row[0]}
|
|
|
|
processed_types = set(original_types)
|
|
has_nothing_or_unknown = 'NOTHING' in processed_types or 'UNKNOWN' in processed_types
|
|
|
|
if 'NOTHING' in processed_types:
|
|
processed_types.remove('NOTHING')
|
|
if 'UNKNOWN' in processed_types:
|
|
processed_types.remove('UNKNOWN')
|
|
|
|
if has_nothing_or_unknown:
|
|
processed_types.add('MANNEQUIN')
|
|
|
|
type_descriptions = sorted(list(processed_types))
|
|
|
|
return MetadataResponse(storage_types=storage_types, type_descriptions=type_descriptions)
|
|
|
|
|
|
class InventoryItem(BaseModel):
|
|
id: int
|
|
item_name: str
|
|
quantity: int
|
|
storage_type: str
|
|
description: Optional[str]
|
|
icon_id: Optional[str]
|
|
type_description: Optional[str]
|
|
last_updated: Optional[datetime]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
@router.get("/inventory/{character}", response_model=List[InventoryItem])
|
|
async def inventory(
|
|
character: str,
|
|
storage_type: Optional[str] = Query(None),
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Return items for a character, optionally filtered by storage_type."""
|
|
base_sql = """
|
|
SELECT i.*, ai.description, ai.icon_id, ai.type_description
|
|
FROM inventory i
|
|
LEFT JOIN all_items ai ON ai.name = i.item_name
|
|
WHERE i.character_name = :char
|
|
"""
|
|
params = {"char": character}
|
|
if storage_type:
|
|
base_sql += " AND i.storage_type = :storage"
|
|
params["storage"] = storage_type
|
|
q = text(base_sql)
|
|
result = await session.execute(q, params)
|
|
rows = result.fetchall()
|
|
return [InventoryItem(
|
|
id=r.id,
|
|
item_name=r.item_name,
|
|
quantity=r.quantity,
|
|
storage_type=r.storage_type,
|
|
description=r.description,
|
|
icon_id=r.icon_id,
|
|
type_description=r.type_description,
|
|
last_updated=r.last_updated,
|
|
) for r in rows]
|
|
|
|
|
|
class ItemSummary(BaseModel):
|
|
id: int
|
|
name: str
|
|
icon_id: Optional[str]
|
|
type_description: Optional[str]
|
|
jobs_description: Optional[List[str]]
|
|
|
|
|
|
@router.get("/items", response_model=List[ItemSummary])
|
|
async def items(
|
|
response: Response,
|
|
type: Optional[str] = Query(None, alias="type"),
|
|
search: Optional[str] = Query(None, description="Substring search on item name"),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(40, ge=1, le=100),
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Return items from all_items view with pagination."""
|
|
offset = (page - 1) * page_size
|
|
|
|
params: dict[str, Any] = {"limit": page_size, "offset": offset}
|
|
where_clauses: list[str] = ["a.name != '.'"]
|
|
|
|
if type:
|
|
if type == "MANNEQUIN":
|
|
where_clauses.append("a.type_description IN ('MANNEQUIN', 'NOTHING', 'UNKNOWN')")
|
|
else:
|
|
where_clauses.append("a.type_description = :type")
|
|
params["type"] = type
|
|
|
|
if search:
|
|
where_clauses.append("a.name ILIKE :search")
|
|
params["search"] = f"%{search}%"
|
|
|
|
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
|
|
|
join_sql = "FROM all_items a LEFT JOIN armor_items ai ON ai.id = a.id"
|
|
|
|
# Total count for pagination (exclude limit/offset)
|
|
count_params = {k: v for k, v in params.items() if k not in ("limit", "offset")}
|
|
count_q = text(f"SELECT COUNT(*) {join_sql} {where_sql}")
|
|
total_res = await session.execute(count_q, count_params)
|
|
total_count = total_res.scalar() or 0
|
|
response.headers["X-Total-Count"] = str(total_count)
|
|
|
|
# Main query
|
|
q = text(
|
|
f"SELECT a.id, a.name, a.icon_id, a.type_description, ai.jobs_description "
|
|
f"{join_sql} {where_sql} ORDER BY a.id LIMIT :limit OFFSET :offset"
|
|
)
|
|
result = await session.execute(q, params)
|
|
rows = result.fetchall()
|
|
return [
|
|
ItemSummary(
|
|
id=r.id,
|
|
name=r.name,
|
|
icon_id=r.icon_id,
|
|
type_description=r.type_description,
|
|
jobs_description=r.jobs_description,
|
|
)
|
|
for r in rows
|
|
]
|
|
async def items(
|
|
response: Response,
|
|
type: Optional[str] = Query(None, alias="type"),
|
|
search: Optional[str] = Query(None, description="Substring search on item name"),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(40, ge=1, le=100),
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Return items from all_items view with pagination."""
|
|
offset = (page - 1) * page_size
|
|
params: dict[str, Any] = {"limit": page_size, "offset": offset}
|
|
where_clauses: list[str] = ["a.name != '.'"]
|
|
|
|
if type:
|
|
if type == "MANNEQUIN":
|
|
where_clauses.append("a.type_description IN ('MANNEQUIN', 'NOTHING', 'UNKNOWN')")
|
|
else:
|
|
where_clauses.append("a.type_description = :type")
|
|
params["type"] = type
|
|
|
|
if search:
|
|
where_clauses.append("a.name ILIKE :search")
|
|
params["search"] = f"%{search}%"
|
|
|
|
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
|
|
|
# Use LEFT JOIN to fetch jobs_description from armor_items table; it will be NULL for non-armor.
|
|
join_sql = "FROM all_items a LEFT JOIN armor_items ai ON ai.id = a.id"
|
|
params = {"limit": page_size, "offset": offset}
|
|
where_clauses = ["a.name != '.'"]
|
|
|
|
if type:
|
|
if type == 'MANNEQUIN':
|
|
where_clauses.append("a.type_description IN ('MANNEQUIN', 'NOTHING', 'UNKNOWN')")
|
|
else:
|
|
where_clauses.append("a.type_description = :type")
|
|
params["type"] = type
|
|
|
|
if search:
|
|
where_clauses.append("a.name ILIKE :search")
|
|
params["search"] = f"%{search}%"
|
|
|
|
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
|
|
|
# Calculate total count for pagination
|
|
count_q = text(f"SELECT COUNT(*) {join_sql} {where_sql}") {join_sql} {where_sql}")
|
|
# Use only relevant params for count query (exclude limit/offset)
|
|
count_params = {k: v for k, v in params.items() if k not in ("limit", "offset")}
|
|
total_res = await session.execute(count_q, count_params)
|
|
total_count = total_res.scalar() or 0
|
|
response.headers["X-Total-Count"] = str(total_count)
|
|
|
|
# Use LEFT JOIN to fetch jobs_description from armor_items table; it will be NULL for non-armor.
|
|
join_sql = "FROM all_items a LEFT JOIN armor_items ai ON ai.id = a.id"
|
|
q = text(
|
|
f"SELECT a.id, a.name, a.icon_id, a.type_description, ai.jobs_description {join_sql} {where_sql} ORDER BY a.id LIMIT :limit OFFSET :offset"
|
|
)
|
|
result = await session.execute(q, params)
|
|
rows = result.fetchall()
|
|
return [ItemSummary(id=r.id, name=r.name, icon_id=r.icon_id, type_description=r.type_description, jobs_description=r.jobs_description) for r in rows]
|
|
|
|
|
|
|
|
|
|
class ItemDetail(BaseModel):
|
|
id: int
|
|
name: str
|
|
description: Optional[str]
|
|
icon_id: Optional[str]
|
|
type_description: Optional[str]
|
|
|
|
|
|
@router.get("/icon/{icon_id}")
|
|
async def get_icon(icon_id: str, session: AsyncSession = Depends(get_session)):
|
|
q = text("SELECT image_data, image_format, image_encoding FROM item_icons WHERE id = :id LIMIT 1")
|
|
res = await session.execute(q, {"id": icon_id})
|
|
row = res.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Icon not found")
|
|
import base64
|
|
if row.image_encoding == "base64":
|
|
data_bytes = base64.b64decode(row.image_data)
|
|
else:
|
|
data_bytes = row.image_data
|
|
media_type = f"image/{row.image_format.split('/')[-1]}" if row.image_format else "image/png"
|
|
from fastapi.responses import Response
|
|
return Response(content=data_bytes, media_type=media_type)
|
|
|
|
|
|
@router.get("/items/by-name/{item_name}", response_model=ItemDetail)
|
|
async def item_detail_by_name(item_name: str, session: AsyncSession = Depends(get_session)):
|
|
q = text("SELECT * FROM all_items WHERE name = :n LIMIT 1")
|
|
row = (await session.execute(q, {"n": item_name})).fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
return ItemDetail(
|
|
id=row.id,
|
|
name=row.name,
|
|
description=row.description,
|
|
icon_id=row.icon_id,
|
|
type_description=row.type_description,
|
|
)
|
|
|
|
|
|
@router.get("/items/{item_id}", response_model=ItemDetail)
|
|
async def item_detail(item_id: int, session: AsyncSession = Depends(get_session)):
|
|
"""Fetch full item record from all_items view."""
|
|
q = text("SELECT * FROM all_items WHERE id = :id LIMIT 1")
|
|
result = await session.execute(q, {"id": item_id})
|
|
row = result.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Item not found")
|
|
|
|
return ItemDetail(
|
|
id=row.id,
|
|
name=row.name,
|
|
description=row.description,
|
|
icon_id=row.icon_id,
|
|
type_description=row.type_description,
|
|
)
|