document-processor/app/routes/line_items.py

562 lines
18 KiB
Python

from pathlib import Path
import json
from datetime import datetime
from decimal import Decimal, InvalidOperation
from fastapi import APIRouter, Depends, Form, Query, Request
from fastapi.responses import HTMLResponse, RedirectResponse, FileResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import func
from sqlalchemy.orm import Session, selectinload
from app.db.deps import get_db
from app.logic.extraction import get_current_extracted_fields
from app.models.document import Document
from app.models.document_line_item import DocumentLineItem
from app.models.document_line_item_set import DocumentLineItemSet
from app.models.text_version import TextVersion
router = APIRouter(prefix="/line-items", tags=["line-items"])
BASE_DIR = Path(__file__).resolve().parent.parent
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
def _decimal_to_str(value: Decimal | None) -> str:
if value is None:
return ""
return str(value)
def _to_decimal(value: str | None) -> Decimal | None:
if value is None:
return None
cleaned = str(value).strip()
if not cleaned:
return None
try:
return Decimal(cleaned)
except (InvalidOperation, TypeError):
return None
def _line_item_extra(item: DocumentLineItem) -> dict:
return dict(item.raw_json or {})
def _line_item_quality_rating(item: DocumentLineItem) -> str:
value = _line_item_extra(item).get("quality_rating")
return "" if value is None else str(value)
def _line_item_quality_note(item: DocumentLineItem) -> str:
value = _line_item_extra(item).get("quality_note")
return "" if value is None else str(value)
def _line_item_quality_status(item: DocumentLineItem) -> str:
value = _line_item_extra(item).get("quality_status")
return "" if value is None else str(value)
def _is_quality_queue_candidate(item: DocumentLineItem) -> bool:
extra = _line_item_extra(item)
if bool(extra.get("is_na")):
return False
if extra.get("reviewed_at"):
return False
return True
def _build_row(item: DocumentLineItem) -> dict | None:
line_item_set = item.line_item_set
document = line_item_set.document if line_item_set is not None else None
if document is None:
return None
extracted = get_current_extracted_fields(document)
merchant_value = ""
transaction_date = ""
if extracted is not None:
merchant_value = (
extracted.merchant_normalized
or extracted.merchant_raw
or ""
)
if extracted.transaction_date:
transaction_date = extracted.transaction_date.isoformat()
if not transaction_date and item.entry_date:
transaction_date = item.entry_date.isoformat()
if not transaction_date and document.created_at:
transaction_date = document.created_at.date().isoformat()
return {
"line_item_id": item.id,
"document_id": document.document_id,
"transaction_date": transaction_date,
"merchant": merchant_value,
"description": item.description or "",
"raw_description": item.description or "",
"quantity": _decimal_to_str(item.quantity),
"line_total": _decimal_to_str(item.line_total),
"category": item.category or "",
"confidence": "",
"quality_rating": _line_item_quality_rating(item),
"quality_note": _line_item_quality_note(item),
"quality_status": _line_item_quality_status(item),
"is_reviewed": bool(_line_item_extra(item).get("reviewed_at")),
"is_approved": bool(_line_item_extra(item).get("is_approved")),
"is_excluded": bool(_line_item_extra(item).get("is_excluded")),
"is_na": bool(_line_item_extra(item).get("is_na")),
"reviewed_at": _line_item_extra(item).get("reviewed_at") or "",
}
def _load_all_items(db: Session) -> list[DocumentLineItem]:
return (
db.query(DocumentLineItem)
.options(
selectinload(DocumentLineItem.line_item_set)
.selectinload(DocumentLineItemSet.document)
.selectinload(Document.extracted_fields)
)
.order_by(DocumentLineItem.id.desc())
.all()
)
def _build_filtered_rows(
items: list[DocumentLineItem],
q: str,
merchant: str,
category: str,
date_from: str,
date_to: str,
rating_min: str,
rating_max: str,
) -> list[dict]:
q_norm = q.strip().lower()
merchant_norm = merchant.strip().lower()
category_norm = category.strip().lower()
rating_min_dec = _to_decimal(rating_min)
rating_max_dec = _to_decimal(rating_max)
rows: list[dict] = []
for item in items:
row = _build_row(item)
if row is None:
continue
quality_rating_dec = _to_decimal(row["quality_rating"])
if q_norm and q_norm not in row["description"].lower():
continue
if merchant_norm and merchant_norm not in row["merchant"].lower():
continue
if category_norm and category_norm != row["category"].lower():
continue
if date_from and (not row["transaction_date"] or row["transaction_date"] < date_from):
continue
if date_to and (not row["transaction_date"] or row["transaction_date"] > date_to):
continue
if rating_min_dec is not None:
if quality_rating_dec is None or quality_rating_dec < rating_min_dec:
continue
if rating_max_dec is not None:
if quality_rating_dec is None or quality_rating_dec > rating_max_dec:
continue
rows.append(row)
rows.sort(
key=lambda row: (
row["transaction_date"] or "",
row["merchant"] or "",
row["description"] or "",
),
reverse=True,
)
return rows
def _build_summary_rows(items: list[DocumentLineItem], q: str) -> list[dict]:
q_norm = q.strip().lower()
grouped: dict[str, dict] = {}
for item in items:
row = _build_row(item)
if row is None:
continue
item_name = row["description"]
if q_norm and q_norm not in item_name.lower():
continue
line_total_dec = _to_decimal(row["line_total"])
rating_dec = _to_decimal(row["quality_rating"])
bucket = grouped.setdefault(
item_name,
{
"item": item_name,
"count": 0,
"prices": [],
"rated_count": 0,
"rating_sum": Decimal("0"),
},
)
bucket["count"] += 1
if line_total_dec is not None:
bucket["prices"].append(line_total_dec)
if rating_dec is not None:
bucket["rated_count"] += 1
bucket["rating_sum"] += rating_dec
rows = []
for bucket in grouped.values():
prices = bucket["prices"]
avg_price = ""
min_price = ""
max_price = ""
if prices:
avg_price = str((sum(prices) / len(prices)).quantize(Decimal("0.01")))
min_price = str(min(prices).quantize(Decimal("0.01")))
max_price = str(max(prices).quantize(Decimal("0.01")))
avg_rating = ""
if bucket["rated_count"] > 0:
avg_rating = str((bucket["rating_sum"] / bucket["rated_count"]).quantize(Decimal("0.01")))
rows.append(
{
"item": bucket["item"],
"count": bucket["count"],
"avg_price": avg_price,
"min_price": min_price,
"max_price": max_price,
"rated_count": bucket["rated_count"],
"avg_rating": avg_rating,
}
)
rows.sort(key=lambda x: (x["count"], x["item"]), reverse=True)
return rows
@router.post("/{line_item_id}/review", response_class=RedirectResponse)
def save_line_item_review(
line_item_id: int,
q: str = Form(""),
merchant: str = Form(""),
category: str = Form(""),
date_from: str = Form(""),
date_to: str = Form(""),
rating_min: str = Form(""),
rating_max: str = Form(""),
return_to: str = Form("list"),
quality_rating: str = Form(""),
quality_note: str = Form(""),
is_approved: str = Form(""),
is_excluded: str = Form(""),
is_na: str = Form(""),
db: Session = Depends(get_db),
):
item = db.query(DocumentLineItem).filter(DocumentLineItem.id == line_item_id).first()
if item is None:
return RedirectResponse(url="/line-items/", status_code=303)
extra = _line_item_extra(item)
rating_clean = quality_rating.strip()
note_clean = quality_note.strip()
approved_checked = bool(is_approved)
excluded_checked = bool(is_excluded)
na_checked = bool(is_na)
extra["is_approved"] = approved_checked
extra["is_excluded"] = excluded_checked
extra["is_na"] = na_checked
extra["reviewed_at"] = datetime.utcnow().isoformat()
if na_checked:
extra.pop("quality_rating", None)
extra.pop("quality_note", None)
else:
if rating_clean:
extra["quality_rating"] = rating_clean
else:
extra.pop("quality_rating", None)
if note_clean:
extra["quality_note"] = note_clean
else:
extra.pop("quality_note", None)
extra.pop("quality_status", None)
item.raw_json = extra
db.commit()
if return_to in {"quality_queue", "queue"}:
return RedirectResponse(url="/line-items/?tab=queue", status_code=303)
redirect_url = (
f"/line-items/?tab=advanced-search"
f"&q={q}&merchant={merchant}&category={category}"
f"&date_from={date_from}&date_to={date_to}"
f"&rating_min={rating_min}&rating_max={rating_max}"
)
return RedirectResponse(url=redirect_url, status_code=303)
@router.get("/", response_class=HTMLResponse)
def list_line_items(
request: Request,
q: str = Query("", description="Item description contains"),
merchant: str = Query("", description="Merchant contains"),
category: str = Query("", description="Category equals"),
date_from: str = Query("", description="YYYY-MM-DD"),
date_to: str = Query("", description="YYYY-MM-DD"),
rating_min: str = Query("", description="Minimum rating"),
rating_max: str = Query("", description="Maximum rating"),
tab: str = Query("summary"),
db: Session = Depends(get_db),
):
items = _load_all_items(db)
has_advanced_query = any([
q.strip(),
merchant.strip(),
category.strip(),
date_from.strip(),
date_to.strip(),
rating_min.strip(),
rating_max.strip(),
])
detail_rows = []
if has_advanced_query:
detail_rows = _build_filtered_rows(
items=items,
q=q,
merchant=merchant,
category=category,
date_from=date_from,
date_to=date_to,
rating_min=rating_min,
rating_max=rating_max,
)
summary_rows = _build_summary_rows(items=items, q=q)
queue_rows = []
for item in items:
if not _is_quality_queue_candidate(item):
continue
row = _build_row(item)
if row is not None:
queue_rows.append(row)
queue_rows.sort(
key=lambda row: (
row["transaction_date"] or "",
row["merchant"] or "",
row["description"] or "",
)
)
if tab not in {"summary", "advanced-search", "queue"}:
tab = "summary"
if tab == "summary" and any([merchant.strip(), category.strip(), date_from.strip(), date_to.strip(), rating_min.strip(), rating_max.strip()]):
tab = "advanced-search"
return templates.TemplateResponse(
request=request,
name="line_items/list.html",
context={
"request": request,
"rows": detail_rows,
"summary_rows": summary_rows,
"queue_rows": queue_rows,
"q": q,
"merchant": merchant,
"category": category,
"date_from": date_from,
"date_to": date_to,
"rating_min": rating_min,
"rating_max": rating_max,
"active_tab": tab,
"has_advanced_query": has_advanced_query,
"active_page": "line_items",
},
)
def _get_current_ocr_text_for_export(document: Document) -> str:
reviewed_rows = [tv for tv in getattr(document, "text_versions", []) if tv.version_type == "reviewed" and tv.is_current]
if reviewed_rows:
reviewed_rows.sort(key=lambda x: (x.version_number, x.created_at), reverse=True)
return reviewed_rows[0].text_content or ""
raw_rows = [tv for tv in getattr(document, "text_versions", []) if tv.version_type == "raw_ocr" and tv.is_current]
if raw_rows:
raw_rows.sort(key=lambda x: (x.version_number, x.created_at), reverse=True)
return raw_rows[0].text_content or ""
return ""
@router.get("/export/training.jsonl")
def export_line_item_training_data(db: Session = Depends(get_db)):
items = (
db.query(DocumentLineItem)
.options(
selectinload(DocumentLineItem.line_item_set)
.selectinload(DocumentLineItemSet.document)
.selectinload(Document.text_versions),
selectinload(DocumentLineItem.line_item_set)
.selectinload(DocumentLineItemSet.document)
.selectinload(Document.extracted_fields),
)
.order_by(DocumentLineItem.id.asc())
.all()
)
export_rows = []
for item in items:
extra = _line_item_extra(item)
if not extra.get("reviewed_at"):
continue
if not bool(extra.get("is_approved")):
continue
if bool(extra.get("is_excluded")):
continue
if bool(extra.get("is_na")):
continue
line_item_set = item.line_item_set
document = line_item_set.document if line_item_set is not None else None
if document is None:
continue
extracted = get_current_extracted_fields(document)
merchant_value = ""
transaction_date = ""
if extracted is not None:
merchant_value = extracted.merchant_normalized or extracted.merchant_raw or ""
if extracted.transaction_date:
transaction_date = extracted.transaction_date.isoformat()
if not transaction_date and item.entry_date:
transaction_date = item.entry_date.isoformat()
export_rows.append(
{
"schema_version": "line_item_training_v1",
"document": {
"document_id": document.document_id,
"document_type": document.document_type or "",
"original_filename": document.original_filename or "",
"merchant": merchant_value,
"transaction_date": transaction_date,
},
"ocr_text": _get_current_ocr_text_for_export(document),
"line_item": {
"line_item_id": item.id,
"line_number": item.line_number,
"entry_date": item.entry_date.isoformat() if item.entry_date else "",
"description": item.description or "",
"quantity": _decimal_to_str(item.quantity),
"unit_price": _decimal_to_str(item.unit_price),
"line_total": _decimal_to_str(item.line_total),
"tax_amount": _decimal_to_str(item.tax_amount),
"category": item.category or "",
"notes": item.notes or "",
},
"review": {
"quality_rating": str(extra.get("quality_rating") or ""),
"quality_note": str(extra.get("quality_note") or ""),
"reviewed_at": str(extra.get("reviewed_at") or ""),
"is_approved": bool(extra.get("is_approved")),
"is_excluded": bool(extra.get("is_excluded")),
"is_na": bool(extra.get("is_na")),
},
}
)
export_dir = Path("/mnt/storage/document-processor/exports")
export_dir.mkdir(parents=True, exist_ok=True)
out_path = export_dir / "line_item_training.jsonl"
with out_path.open("w", encoding="utf-8") as f:
for row in export_rows:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
return FileResponse(
path=str(out_path),
media_type="application/json",
filename=out_path.name,
)
@router.get("/summary", response_class=RedirectResponse)
def summarize_line_items_redirect(
q: str = Query("", description="Item contains"),
):
return RedirectResponse(url=f"/line-items/?tab=summary&q={q}", status_code=303)
@router.get("/queue", response_class=HTMLResponse)
def quality_queue(
request: Request,
db: Session = Depends(get_db),
):
items = (
db.query(DocumentLineItem)
.options(
selectinload(DocumentLineItem.line_item_set)
.selectinload(DocumentLineItemSet.document)
.selectinload(Document.extracted_fields)
)
.order_by(DocumentLineItem.id.asc())
.all()
)
rows = []
for item in items:
if not _is_quality_queue_candidate(item):
continue
row = _build_row(item)
if row is not None:
rows.append(row)
rows.sort(
key=lambda row: (
row["transaction_date"] or "",
row["merchant"] or "",
row["description"] or "",
)
)
next_row = rows[0] if rows else None
return templates.TemplateResponse(
request=request,
name="line_items/queue.html",
context={
"request": request,
"rows": rows,
"next_row": next_row,
"active_page": "line_items",
},
)