document-processor/app/routes/documents.py

521 lines
18 KiB
Python

from copy import deepcopy
from pathlib import Path
from uuid import uuid4
from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session, selectinload
from app.db.deps import get_db
from app.logic.document_outputs import (
create_field_enriched_pdf_version,
create_ocr_corrected_pdf_version,
)
from app.logic.extraction import (
auto_extract_from_document,
get_current_extracted_fields,
save_extracted_fields,
)
from app.logic.ingest import compute_quality_score, rerun_ocr_for_document
from app.models.document import Document
from app.models.document_version import DocumentVersion
from app.models.text_version import TextVersion
router = APIRouter(prefix="/documents", tags=["documents"])
BASE_DIR = Path(__file__).resolve().parent.parent
def _build_queue_navigation(db: Session, document: Document, queue: str | None) -> dict:
if not queue:
return {"queue": None, "prev_doc": None, "next_doc": None}
base = db.query(Document).filter(Document.is_trashed.is_(False))
if queue == "ocr":
docs = (
base.filter(Document.review_status != "reviewed")
.order_by(Document.created_at.asc())
.all()
)
elif queue == "fields":
docs = (
base.filter(Document.review_status == "reviewed")
.all()
)
filtered = []
for d in docs:
has_fields = bool(getattr(d, "extracted_fields", None))
if not has_fields:
filtered.append(d)
docs = sorted(filtered, key=lambda d: d.updated_at or d.created_at)
elif queue == "recent":
docs = (
base.order_by(Document.updated_at.desc())
.all()
)
else:
return {"queue": None, "prev_doc": None, "next_doc": None}
ids = [d.document_id for d in docs]
if document.document_id not in ids:
return {"queue": queue, "prev_doc": None, "next_doc": None}
idx = ids.index(document.document_id)
prev_doc = docs[idx - 1] if idx > 0 else None
next_doc = docs[idx + 1] if idx < len(docs) - 1 else None
return {"queue": queue, "prev_doc": prev_doc, "next_doc": next_doc}
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
QUALITY_FLAG_OPTIONS = [
"bad_embedded_text",
"ocr_garbled",
"low_text_coverage",
"missing_lines",
"bad_line_breaks",
"low_contrast",
"blurry",
"skewed_scan",
"cropped",
"shadowed",
"small_text",
"thermal_faded",
"handwriting_present",
"receipt_damage",
"manual_rerun_helped",
"manual_rerun_no_change",
"major_manual_cleanup",
"minor_manual_cleanup",
]
def _get_current_text_versions(document: Document) -> tuple[TextVersion | None, TextVersion | None]:
sorted_text_versions = sorted(
document.text_versions,
key=lambda x: (x.version_number, x.created_at),
reverse=True,
)
raw_ocr = next(
(tv for tv in sorted_text_versions if tv.version_type == "raw_ocr" and tv.is_current),
None,
)
reviewed_ocr = next(
(tv for tv in sorted_text_versions if tv.version_type == "reviewed" and tv.is_current),
None,
)
return raw_ocr, reviewed_ocr
def _extract_line_texts_from_layout(layout_json: dict | None) -> list[str]:
if not layout_json:
return []
lines: list[str] = []
for page in layout_json.get("pages", []):
for line in page.get("lines", []):
lines.append((line.get("text") or "").strip())
return lines
def _build_review_text_value(
raw_ocr: TextVersion | None,
reviewed_ocr: TextVersion | None,
editor_source: str = "reviewed",
) -> str:
if editor_source == "raw":
source = raw_ocr or reviewed_ocr
else:
source = reviewed_ocr or raw_ocr
if source and source.layout_json:
return "\n".join(_extract_line_texts_from_layout(source.layout_json))
if source and source.text_content:
return source.text_content
return ""
def _line_count_from_layout(layout_json: dict | None) -> int:
return len(_extract_line_texts_from_layout(layout_json))
def _apply_reviewed_lines_to_layout(base_layout: dict | None, reviewed_text: str) -> dict | None:
if not base_layout:
return None
reviewed_lines = reviewed_text.splitlines()
new_layout = deepcopy(base_layout)
idx = 0
for page in new_layout.get("pages", []):
for line in page.get("lines", []):
line["text"] = reviewed_lines[idx] if idx < len(reviewed_lines) else ""
idx += 1
return new_layout
def _get_queue_navigation(db: Session, document: Document) -> dict:
active_docs = (
db.query(Document)
.filter(Document.is_trashed.is_(False))
.order_by(Document.created_at.asc())
.all()
)
doc_ids = [d.document_id for d in active_docs]
prev_doc = None
next_doc = None
if document.document_id in doc_ids:
idx = doc_ids.index(document.document_id)
if idx > 0:
prev_doc = active_docs[idx - 1]
if idx < len(active_docs) - 1:
next_doc = active_docs[idx + 1]
needs_ocr = (
db.query(Document)
.filter(Document.is_trashed.is_(False))
.filter(Document.review_status != "reviewed")
.order_by(Document.created_at.asc())
.all()
)
reviewed_no_fields = []
for d in (
db.query(Document)
.options(selectinload(Document.extracted_fields))
.filter(Document.is_trashed.is_(False))
.filter(Document.review_status == "reviewed")
.order_by(Document.updated_at.asc())
.all()
):
if not d.extracted_fields:
reviewed_no_fields.append(d)
next_ocr = None
next_fields = None
if needs_ocr:
for d in needs_ocr:
if d.document_id != document.document_id:
next_ocr = d
break
if reviewed_no_fields:
for d in reviewed_no_fields:
if d.document_id != document.document_id:
next_fields = d
break
return {
"prev_doc": prev_doc,
"next_doc": next_doc,
"next_ocr_doc": next_ocr,
"next_fields_doc": next_fields,
}
def _extracted_field_form_values(document: Document, request: Request) -> dict:
current = get_current_extracted_fields(document)
auto = request.query_params.get("autofill_extracted")
if auto == "1":
values = auto_extract_from_document(None, document)
elif current is not None:
values = {
"merchant_raw": current.merchant_raw or "",
"merchant_normalized": current.merchant_normalized or "",
"transaction_date": current.transaction_date.isoformat() if current.transaction_date else "",
"transaction_time": current.transaction_time or "",
"subtotal": str(current.subtotal) if current.subtotal is not None else "",
"tax": str(current.tax) if current.tax is not None else "",
"total": str(current.total) if current.total is not None else "",
"currency": current.currency or "",
"payment_method": current.payment_method or "",
"receipt_number": current.receipt_number or "",
"location": current.location or "",
"counterparty": current.counterparty or "",
"extra_json": "{}" if current.extra_json is None else __import__("json").dumps(current.extra_json, indent=2, sort_keys=True),
}
else:
values = {
"merchant_raw": "",
"merchant_normalized": "",
"transaction_date": "",
"transaction_time": "",
"subtotal": "",
"tax": "",
"total": "",
"currency": "",
"payment_method": "",
"receipt_number": "",
"location": "",
"counterparty": "",
"extra_json": "{}",
}
return values
@router.get("/", response_class=HTMLResponse)
def list_documents(request: Request, db: Session = Depends(get_db)):
documents = db.query(Document).filter(Document.is_trashed.is_(False)).order_by(Document.created_at.desc()).all()
return templates.TemplateResponse(
request=request,
name="documents/list.html",
context={"request": request, "documents": documents},
)
@router.post("/{document_id}/rerun-ocr", response_class=RedirectResponse)
def rerun_ocr(document_id: str, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
try:
rerun_ocr_for_document(db, document)
except Exception:
return RedirectResponse(url=f"/documents/{document.document_id}?error=rerun_ocr_failed", status_code=303)
return RedirectResponse(url=f"/documents/{document.document_id}?editor_source=raw", status_code=303)
@router.post("/{document_id}/save-ocr-corrected-pdf", response_class=RedirectResponse)
def save_ocr_corrected_pdf(document_id: str, db: Session = Depends(get_db)):
document = db.query(Document).options(selectinload(Document.text_versions)).filter(Document.document_id == document_id).first()
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
try:
create_ocr_corrected_pdf_version(db, document)
except Exception:
return RedirectResponse(url=f"/documents/{document.document_id}?error=save_ocr_corrected_failed", status_code=303)
return RedirectResponse(url=f"/documents/{document.document_id}", status_code=303)
@router.post("/{document_id}/move-to-trash", response_class=RedirectResponse)
def move_to_trash(document_id: str, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
from datetime import datetime
document.is_trashed = True
document.trashed_at = datetime.utcnow()
db.commit()
return RedirectResponse(url="/documents/", status_code=303)
@router.post("/{document_id}/save-field-enriched-pdf", response_class=RedirectResponse)
def save_field_enriched_pdf(document_id: str, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
try:
create_field_enriched_pdf_version(db, document)
except Exception:
return RedirectResponse(url=f"/documents/{document.document_id}?error=save_field_enriched_failed", status_code=303)
return RedirectResponse(url=f"/documents/{document.document_id}", status_code=303)
@router.post("/{document_id}/review-text", response_class=RedirectResponse)
def save_reviewed_text(
document_id: str,
reviewed_text: str = Form(...),
quality_flags: list[str] | None = Form(None),
quality_note: str = Form(""),
db: Session = Depends(get_db),
):
document = (
db.query(Document)
.options(selectinload(Document.text_versions))
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
raw_ocr, _ = _get_current_text_versions(document)
expected_line_count = _line_count_from_layout(raw_ocr.layout_json if raw_ocr else None)
actual_line_count = len(reviewed_text.splitlines())
if expected_line_count and actual_line_count != expected_line_count:
return RedirectResponse(
url=f"/documents/{document.document_id}?error=line_count_mismatch&expected={expected_line_count}&actual={actual_line_count}",
status_code=303,
)
existing_reviewed = [
tv for tv in document.text_versions if tv.version_type == "reviewed" and tv.is_current
]
for tv in existing_reviewed:
tv.is_current = False
reviewed_layout = _apply_reviewed_lines_to_layout(
raw_ocr.layout_json if raw_ocr else None,
reviewed_text,
)
reviewed_version = TextVersion(
document_id=document.id,
version_number=max(tv.version_number for tv in document.text_versions) + 1 if document.text_versions else 1,
version_type="reviewed",
text_content=reviewed_text,
created_by="mcelwain",
is_current=True,
derived_from_version_id=raw_ocr.id if raw_ocr else None,
layout_json=reviewed_layout,
)
db.add(reviewed_version)
if raw_ocr:
raw_ocr.quality_score = compute_quality_score(raw_ocr.text_content, reviewed_text)
raw_ocr.quality_flags = quality_flags or []
raw_ocr.quality_note = quality_note or None
document.review_status = "reviewed"
db.commit()
return RedirectResponse(url=f"/documents/{document.document_id}?editor_source=reviewed", status_code=303)
@router.post("/{document_id}/save-extracted-fields", response_class=RedirectResponse)
def save_extracted_fields_route(
document_id: str,
merchant_raw: str = Form(""),
merchant_normalized: str = Form(""),
transaction_date: str = Form(""),
transaction_time: str = Form(""),
subtotal: str = Form(""),
tax: str = Form(""),
total: str = Form(""),
currency: str = Form(""),
payment_method: str = Form(""),
receipt_number: str = Form(""),
location: str = Form(""),
counterparty: str = Form(""),
extra_json: str = Form("{}"),
db: Session = Depends(get_db),
):
document = (
db.query(Document)
.options(selectinload(Document.extracted_fields), selectinload(Document.text_versions))
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
save_extracted_fields(
db=db,
document=document,
merchant_raw=merchant_raw,
merchant_normalized=merchant_normalized,
transaction_date=transaction_date,
transaction_time=transaction_time,
subtotal=subtotal,
tax=tax,
total=total,
currency=currency,
payment_method=payment_method,
receipt_number=receipt_number,
location=location,
counterparty=counterparty,
extra_json=extra_json,
)
return RedirectResponse(url=f"/documents/{document.document_id}?autofill_extracted=0", status_code=303)
@router.get("/{document_id}", response_class=HTMLResponse)
def document_detail(document_id: str, request: Request, queue: str | None = None, db: Session = Depends(get_db)):
document = (
db.query(Document)
.options(
selectinload(Document.versions),
selectinload(Document.text_versions),
selectinload(Document.extracted_fields),
selectinload(Document.layer1_candidates),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return HTMLResponse(content="Document not found", status_code=404)
raw_ocr, reviewed_ocr = _get_current_text_versions(document)
editor_source = request.query_params.get("editor_source", "reviewed")
review_text_value = _build_review_text_value(raw_ocr, reviewed_ocr, editor_source)
expected_line_count = _line_count_from_layout(raw_ocr.layout_json if raw_ocr else None)
actual_line_count = len(review_text_value.splitlines()) if review_text_value else 0
line_numbers = list(range(1, max(actual_line_count, expected_line_count) + 1))
file_url = None
if document.current_path:
storage_root = Path("/mnt/storage/document-processor")
current_path = Path(document.current_path)
try:
rel = current_path.relative_to(storage_root)
file_url = f"/files/{rel.as_posix()}"
except Exception:
file_url = None
app_url = str(request.url_for("document_detail", document_id=document.document_id))
error = request.query_params.get("error")
error_expected = request.query_params.get("expected")
error_actual = request.query_params.get("actual")
extracted_form = _extracted_field_form_values(document, request)
current_extracted = get_current_extracted_fields(document)
queue_nav = _get_queue_navigation(db, document)
return templates.TemplateResponse(
request=request,
name="documents/detail.html",
context={
"request": request,
"document": document,
"prev_doc": queue_nav.get("prev_doc"),
"next_doc": queue_nav.get("next_doc"),
"next_ocr_doc": queue_nav.get("next_ocr") or queue_nav.get("next_ocr_doc"),
"next_fields_doc": queue_nav.get("next_fields") or queue_nav.get("next_fields_doc"),
"raw_ocr": raw_ocr,
"reviewed_ocr": reviewed_ocr,
"review_text_value": review_text_value,
"file_url": file_url,
"app_url": app_url,
"quality_flag_options": QUALITY_FLAG_OPTIONS,
"current_quality_flags": raw_ocr.quality_flags if raw_ocr and raw_ocr.quality_flags else [],
"current_quality_note": raw_ocr.quality_note if raw_ocr and raw_ocr.quality_note else "",
"line_numbers": line_numbers,
"expected_line_count": expected_line_count,
"actual_line_count": actual_line_count,
"error": error,
"error_expected": error_expected,
"error_actual": error_actual,
"extracted_form": extracted_form,
"current_extracted": current_extracted,
},
)