document-processor/app/routes/documents.py

3254 lines
121 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from docx.shared import Pt, Inches
from docx import Document as DocxDocument
import mammoth
from pdf2docx import Converter
from copy import deepcopy
from datetime import datetime
from decimal import Decimal, InvalidOperation
import re
import traceback
import os
import hashlib
import json
from decimal import Decimal
from pathlib import Path
from io import BytesIO
from fastapi import APIRouter, Depends, Form, Query, Request
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse, Response
from fastapi.templating import Jinja2Templates
from sqlalchemy import distinct
from sqlalchemy import func
from sqlalchemy.orm import Session, selectinload
from pypdf import PdfReader
from pdf2image import convert_from_path
from app.core.storage_settings import get_default_save_root
from app.db.deps import get_db
from app.logic.document_outputs import (
save_field_enriched_pdf_current,
save_ocr_corrected_pdf_current,
save_replica_pdf,
)
from app.logic.storage_paths import build_proposed_storage_path
from app.logic.extraction import (
auto_extract_from_document,
get_current_extracted_fields,
save_extracted_fields,
_extract_receipt_line_items,
_get_current_reviewed_text,
_get_document_lines,
_replace_document_line_items,
)
from app.logic.ingest import compute_quality_score, rerun_ocr_for_document
from app.models.document_analysis_version import DocumentAnalysisVersion
from app.logic.document_analysis import build_layout_ocr_analysis_for_document
from app.logic.layout_ocr import run_layout_ocr
from app.models.document import Document
from app.models.document_line_item import DocumentLineItem
from app.models.document_line_item_set import DocumentLineItemSet
from app.models.document_line_item_set_version import DocumentLineItemSetVersion
from app.models.document_line_item_version_item import DocumentLineItemVersionItem
from app.models.document_additional_field import DocumentAdditionalField
from app.models.document_additional_field_version import DocumentAdditionalFieldVersion
from app.models.extracted_field_version import ExtractedFieldVersion
from app.models.document_preset import DocumentPreset
from app.models.document_version import DocumentVersion
from app.models.text_version import TextVersion
from app.models.document_review_state import DocumentReviewState
from app.models.extracted_field import ExtractedField
from app.models.document_additional_field import DocumentAdditionalField
from app.models.text_version import TextVersion
from app.utils.filesize import human_size
router = APIRouter(prefix="/documents", tags=["documents"])
def _get_or_create_document_review_state(db: Session, document: Document) -> DocumentReviewState:
state = (
db.query(DocumentReviewState)
.filter(DocumentReviewState.document_id == document.id)
.first()
)
if state is None:
state = DocumentReviewState(document_id=document.id)
db.add(state)
db.flush()
return state
def _storage_available() -> bool:
candidate_roots = [
Path("/mnt/storage"),
Path("/mnt/svr-01/storage"),
]
try:
for root in candidate_roots:
if root.exists() and root.is_dir() and os.access(root, os.R_OK | os.X_OK):
return True
except Exception:
pass
return False
def _sha256_for_file(path_obj: Path) -> str:
hasher = hashlib.sha256()
with path_obj.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
hasher.update(chunk)
return hasher.hexdigest()
def _version_file_available(version, expected_document_id: str) -> bool:
file_path = getattr(version, "file_path", None)
if not file_path:
return False
try:
path_obj = Path(file_path)
if not path_obj.exists() or not path_obj.is_file():
return False
reader = PdfReader(str(path_obj))
meta = reader.metadata or {}
if str(meta.get("/DocumentID", "")).strip() != str(expected_document_id):
return False
if str(meta.get("/VersionNumber", "")).strip() != str(version.version_number):
return False
if str(meta.get("/VersionType", "")).strip() != str(version.version_type):
return False
expected_sha = getattr(version, "sha256", None)
if expected_sha:
actual_sha = _sha256_for_file(path_obj)
if actual_sha != expected_sha:
return False
return True
except Exception:
return False
def _json_safe(value):
if isinstance(value, Decimal):
return float(value)
if hasattr(value, "isoformat"):
return value.isoformat()
return value
def _serialize_model_row(row, fields: list[str]) -> dict:
if not row:
return {}
data = {}
for field in fields:
value = getattr(row, field, None)
data[field] = _json_safe(value)
return data
def _document_export_payload(document) -> dict:
raw_ocr, reviewed_ocr = _get_current_text_versions(document)
extracted = get_current_extracted_fields(document)
additional = _get_current_additional_fields(document)
versions = []
for version in sorted(getattr(document, "versions", []), key=lambda v: v.version_number):
created_at = getattr(version, "created_at", None)
versions.append({
"version_number": _json_safe(version.version_number),
"version_type": _json_safe(version.version_type),
"file_path": _json_safe(version.file_path),
"sha256": _json_safe(version.sha256),
"created_by": _json_safe(version.created_by),
"notes": _json_safe(version.notes),
"created_at": _json_safe(created_at),
})
return {
"document_id": document.document_id,
"document_type": document.document_type,
"review_status": document.review_status,
"source_path": document.source_path,
"original_path": document.original_path,
"current_path": document.current_path,
"share_path": document.share_path,
"original_filename": document.original_filename,
"canonical_filename": document.canonical_filename,
"mime_type": document.mime_type,
"file_size": _json_safe(document.file_size),
"page_count": _json_safe(document.page_count),
"sha256_original": _json_safe(document.sha256_original),
"sha256_current": _json_safe(document.sha256_current),
"raw_ocr_text": _json_safe(raw_ocr.text_content if raw_ocr else None),
"reviewed_ocr_text": _json_safe(reviewed_ocr.text_content if reviewed_ocr else None),
"ocr_quality_score": _json_safe(raw_ocr.quality_score if raw_ocr else None),
"quality_flags": raw_ocr.quality_flags if raw_ocr and raw_ocr.quality_flags else [],
"quality_note": _json_safe(raw_ocr.quality_note if raw_ocr else None),
"extracted_fields": _serialize_model_row(extracted, [
"merchant_raw",
"merchant_normalized",
"transaction_date",
"transaction_time",
"subtotal",
"tax",
"total",
"currency",
"payment_method",
"receipt_number",
"location",
"counterparty",
]),
"additional_fields": _serialize_model_row(additional, [
"owner_primary",
"owner_secondary",
"paid_by_person",
"occasion_note",
"is_shared_expense",
"covered_people",
"attendees",
"reimbursement_expected_from",
"reimbursement_paid_by",
"reimbursement_paid_to",
"reimbursement_paid_amount",
"reimbursement_paid_date",
"reimbursement_note",
]),
"versions": versions,
}
def _latest_raw_ocr(document):
rows = [tv for tv in getattr(document, "text_versions", []) if getattr(tv, "version_type", None) == "raw_ocr"]
rows.sort(key=lambda x: x.version_number)
return rows[-1] if rows else None
def _clear_current_extracted(db: Session, document: Document) -> None:
db.query(ExtractedField).filter(
ExtractedField.document_id == document.id
).delete(synchronize_session=False)
def _clear_current_additional(db: Session, document: Document) -> None:
db.query(DocumentAdditionalField).filter(
DocumentAdditionalField.document_id == document.id
).delete(synchronize_session=False)
def _reset_ocr_to_raw(db: Session, document: Document) -> None:
db.query(TextVersion).filter(
TextVersion.document_id == document.id
).delete(synchronize_session=False)
document.review_status = "pending"
BASE_DIR = Path(__file__).resolve().parent.parent
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
templates.env.globals["human_size"] = human_size
def _next_extracted_field_version_number(db: Session, document_id: int) -> int:
return (db.query(func.max(ExtractedFieldVersion.version_number))
.filter(ExtractedFieldVersion.document_id == document_id)
.scalar() or 0) + 1
def _next_additional_field_version_number(db: Session, document_id: int) -> int:
return (db.query(func.max(DocumentAdditionalFieldVersion.version_number))
.filter(DocumentAdditionalFieldVersion.document_id == document_id)
.scalar() or 0) + 1
def _snapshot_extracted_field(db: Session, document: Document, row, created_by: str, notes: str | None = None) -> None:
version = ExtractedFieldVersion(
document_id=document.id,
version_number=_next_extracted_field_version_number(db, document.id),
merchant_raw=row.merchant_raw,
merchant_normalized=row.merchant_normalized,
transaction_date=row.transaction_date,
transaction_time=row.transaction_time,
subtotal=row.subtotal,
tax=row.tax,
total=row.total,
currency=row.currency,
payment_method=row.payment_method,
receipt_number=row.receipt_number,
location=row.location,
counterparty=row.counterparty,
extra_json=row.extra_json,
created_by=created_by,
notes=notes,
)
db.add(version)
# =========================
# RESTORE HELPERS (NO SNAPSHOT)
# =========================
def _restore_extracted_to_original(db: Session, document: Document) -> bool:
return _restore_extracted_from_version_number(db, document, 1)
def _restore_extracted_from_version_number(db: Session, document: Document, target_version_number: int) -> bool:
version = (
db.query(ExtractedFieldVersion)
.filter(
ExtractedFieldVersion.document_id == document.id,
ExtractedFieldVersion.version_number == target_version_number,
)
.first()
)
if not version:
return False
row = (
db.query(ExtractedField)
.filter(ExtractedField.document_id == document.id)
.first()
)
if not row:
return False
# overwrite live row (NO NEW VERSION)
row.merchant_raw = version.merchant_raw
row.merchant_normalized = version.merchant_normalized
row.transaction_date = version.transaction_date
row.transaction_time = version.transaction_time
row.subtotal = version.subtotal
row.tax = version.tax
row.total = version.total
row.currency = version.currency
row.payment_method = version.payment_method
row.receipt_number = version.receipt_number
row.location = version.location
row.counterparty = version.counterparty
row.extra_json = version.extra_json
db.add(row)
return True
row = (
db.query(ExtractedField)
.filter(ExtractedField.document_id == document.id)
.first()
)
if row is None:
row = ExtractedField(document_id=document.id)
db.add(row)
row.merchant_raw = target.merchant_raw
row.merchant_normalized = target.merchant_normalized
row.transaction_date = target.transaction_date
row.transaction_time = target.transaction_time
row.subtotal = target.subtotal
row.tax = target.tax
row.total = target.total
row.currency = target.currency
row.payment_method = target.payment_method
row.receipt_number = target.receipt_number
row.location = target.location
row.counterparty = target.counterparty
row.extra_json = target.extra_json
db.add(row)
return True
def _restore_additional_to_original(db: Session, document: Document) -> bool:
return _restore_additional_from_version_number(db, document, 1)
def _restore_additional_from_version_number(db: Session, document: Document, target_version_number: int) -> bool:
version = (
db.query(DocumentAdditionalFieldVersion)
.filter(
DocumentAdditionalFieldVersion.document_id == document.id,
DocumentAdditionalFieldVersion.version_number == target_version_number,
)
.first()
)
if not version:
return False
row = (
db.query(DocumentAdditionalField)
.filter(DocumentAdditionalField.document_id == document.id)
.first()
)
if not row:
return False
# overwrite live row (NO NEW VERSION)
row.owner_primary = version.owner_primary
row.owner_secondary = version.owner_secondary
row.paid_by_person = version.paid_by_person
row.occasion_note = version.occasion_note
row.is_shared_expense = version.is_shared_expense
row.covered_people = version.covered_people
row.attendees = version.attendees
row.reimbursement_expected_from = version.reimbursement_expected_from
row.reimbursement_paid_by = version.reimbursement_paid_by
row.reimbursement_paid_to = version.reimbursement_paid_to
row.reimbursement_paid_amount = version.reimbursement_paid_amount
row.reimbursement_paid_date = version.reimbursement_paid_date
row.reimbursement_note = version.reimbursement_note
db.add(row)
return True
row = (
db.query(DocumentAdditionalField)
.filter(DocumentAdditionalField.document_id == document.id)
.first()
)
if row is None:
row = DocumentAdditionalField(document_id=document.id)
db.add(row)
row.owner_primary = target.owner_primary
row.owner_secondary = target.owner_secondary
row.paid_by_person = target.paid_by_person
row.occasion_note = target.occasion_note
row.is_shared_expense = target.is_shared_expense
row.covered_people = target.covered_people
row.attendees = target.attendees
row.reimbursement_expected_from = target.reimbursement_expected_from
row.reimbursement_paid_by = target.reimbursement_paid_by
row.reimbursement_paid_to = target.reimbursement_paid_to
row.reimbursement_paid_amount = target.reimbursement_paid_amount
row.reimbursement_paid_date = target.reimbursement_paid_date
row.reimbursement_note = target.reimbursement_note
db.add(row)
return True
def _snapshot_additional_field(db: Session, document: Document, row, created_by: str, notes: str | None = None) -> None:
version = DocumentAdditionalFieldVersion(
document_id=document.id,
version_number=_next_additional_field_version_number(db, document.id),
owner_primary=row.owner_primary,
owner_secondary=row.owner_secondary,
paid_by_person=row.paid_by_person,
occasion_note=row.occasion_note,
is_shared_expense=row.is_shared_expense,
covered_people=row.covered_people,
attendees=row.attendees,
reimbursement_expected_from=row.reimbursement_expected_from,
reimbursement_paid_by=row.reimbursement_paid_by,
reimbursement_paid_to=row.reimbursement_paid_to,
reimbursement_paid_amount=row.reimbursement_paid_amount,
reimbursement_paid_date=row.reimbursement_paid_date,
reimbursement_note=row.reimbursement_note,
created_by=created_by,
notes=notes,
)
db.add(version)
QUALITY_FLAG_OPTIONS = [
"bad_embedded_text",
"ocr_garbled",
"low_text_coverage",
"missing_lines",
"bad_line_breaks",
"low_contrast",
"blurry",
"skewed_scan",
"cropped",
"shadowed",
"small_text",
"thermal_faded",
"handwriting_present",
"receipt_damage",
"manual_rerun_helped",
"manual_rerun_no_change",
"major_manual_cleanup",
"minor_manual_cleanup",
]
def _parse_people_list(value: str) -> list[str]:
return [part.strip() for part in value.split(",") if part.strip()]
def _format_people_list(value: list | None) -> str:
if not value:
return ""
return ", ".join(str(x).strip() for x in value if str(x).strip())
def _to_decimal(value: str) -> Decimal | None:
cleaned = (value or "").strip()
if not cleaned:
return None
try:
return Decimal(cleaned)
except (InvalidOperation, TypeError):
return None
def _get_all_presets(db: Session) -> list[DocumentPreset]:
return db.query(DocumentPreset).order_by(DocumentPreset.name.asc()).all()
def _get_preset_by_id(db: Session, preset_id: int | None) -> DocumentPreset | None:
if not preset_id:
return None
return db.query(DocumentPreset).filter(DocumentPreset.id == preset_id).first()
def _merge_additional_form_with_preset(values: dict, preset: DocumentPreset | None) -> dict:
if preset is None:
return values
return {
"owner_primary": preset.owner_primary if preset.owner_primary is not None else values.get("owner_primary", ""),
"owner_secondary": preset.owner_secondary if preset.owner_secondary is not None else values.get("owner_secondary", ""),
"paid_by_person": preset.paid_by_person if preset.paid_by_person is not None else values.get("paid_by_person", ""),
"covered_people": _format_people_list(preset.covered_people) if preset.covered_people is not None else values.get("covered_people", ""),
"attendees": _format_people_list(preset.attendees) if preset.attendees is not None else values.get("attendees", ""),
"occasion_note": preset.occasion_note if preset.occasion_note is not None else values.get("occasion_note", ""),
"is_shared_expense": bool(preset.is_shared_expense),
"reimbursement_expected_from": _format_people_list(preset.reimbursement_expected_from) if preset.reimbursement_expected_from is not None else values.get("reimbursement_expected_from", ""),
"reimbursement_paid_by": preset.reimbursement_paid_by if preset.reimbursement_paid_by is not None else values.get("reimbursement_paid_by", ""),
"reimbursement_paid_to": preset.reimbursement_paid_to if preset.reimbursement_paid_to is not None else values.get("reimbursement_paid_to", ""),
"reimbursement_paid_amount": values.get("reimbursement_paid_amount", ""),
"reimbursement_paid_date": values.get("reimbursement_paid_date", ""),
"reimbursement_note": preset.reimbursement_note if preset.reimbursement_note is not None else values.get("reimbursement_note", ""),
}
def _get_current_additional_fields(document: Document) -> DocumentAdditionalField | None:
rows = list(getattr(document, "additional_fields", []) or [])
if not rows:
return None
return sorted(rows, key=lambda x: x.updated_at or x.created_at, reverse=True)[0]
def _extracted_field_form_values(document: Document, request: Request) -> dict:
current = get_current_extracted_fields(document)
auto = request.query_params.get("autofill_extracted")
if auto == "1":
values = auto_extract_from_document(None, document)
elif current is not None:
values = {
"merchant_raw": current.merchant_raw or "",
"merchant_normalized": current.merchant_normalized or "",
"transaction_date": current.transaction_date.isoformat() if current.transaction_date else "",
"transaction_time": current.transaction_time or "",
"subtotal": str(current.subtotal) if current.subtotal is not None else "",
"tax": str(current.tax) if current.tax is not None else "",
"total": str(current.total) if current.total is not None else "",
"currency": current.currency or "",
"payment_method": current.payment_method or "",
"receipt_number": current.receipt_number or "",
"location": current.location or "",
"counterparty": current.counterparty or "",
"extra_json": "{}" if current.extra_json is None else __import__("json").dumps(current.extra_json, indent=2, sort_keys=True),
}
else:
values = {
"merchant_raw": "",
"merchant_normalized": "",
"transaction_date": "",
"transaction_time": "",
"subtotal": "",
"tax": "",
"total": "",
"currency": "",
"payment_method": "",
"receipt_number": "",
"location": "",
"counterparty": "",
"extra_json": "{}",
}
return values
def _additional_field_form_values(document: Document, preset: DocumentPreset | None = None) -> dict:
current = _get_current_additional_fields(document)
if current is None:
values = {
"owner_primary": "",
"owner_secondary": "",
"paid_by_person": "",
"covered_people": "",
"attendees": "",
"occasion_note": "",
"is_shared_expense": False,
"reimbursement_expected_from": "",
"reimbursement_paid_by": "",
"reimbursement_paid_to": "",
"reimbursement_paid_amount": "",
"reimbursement_paid_date": "",
"reimbursement_note": "",
}
return _merge_additional_form_with_preset(values, preset)
values = {
"owner_primary": current.owner_primary or "",
"owner_secondary": current.owner_secondary or "",
"paid_by_person": current.paid_by_person or "",
"covered_people": _format_people_list(current.covered_people),
"attendees": _format_people_list(current.attendees),
"occasion_note": current.occasion_note or "",
"is_shared_expense": bool(current.is_shared_expense),
"reimbursement_expected_from": _format_people_list(current.reimbursement_expected_from),
"reimbursement_paid_by": current.reimbursement_paid_by or "",
"reimbursement_paid_to": current.reimbursement_paid_to or "",
"reimbursement_paid_amount": str(current.reimbursement_paid_amount) if current.reimbursement_paid_amount is not None else "",
"reimbursement_paid_date": current.reimbursement_paid_date.isoformat() if current.reimbursement_paid_date else "",
"reimbursement_note": current.reimbursement_note or "",
}
return _merge_additional_form_with_preset(values, preset)
def _get_current_text_versions(document: Document) -> tuple[TextVersion | None, TextVersion | None]:
sorted_text_versions = sorted(
document.text_versions,
key=lambda x: (x.version_number, x.created_at),
reverse=True,
)
raw_ocr = next(
(tv for tv in sorted_text_versions if tv.version_type == "raw_ocr" and tv.is_current),
None,
)
if raw_ocr is None:
raw_ocr = next(
(tv for tv in sorted_text_versions if tv.version_type == "raw_ocr"),
None,
)
reviewed_ocr = next(
(
tv for tv in sorted_text_versions
if tv.version_type in ("reviewed", "reviewed_ocr") and tv.is_current
),
None,
)
if reviewed_ocr is None:
reviewed_ocr = next(
(
tv for tv in sorted_text_versions
if tv.version_type in ("reviewed", "reviewed_ocr")
),
None,
)
return raw_ocr, reviewed_ocr
def _default_word_style() -> dict:
return {
"font_family": "Helvetica",
"font_postscript_name": None,
"font_weight": 400,
"font_style": "normal",
"font_stretch": "normal",
"font_size": 10.0,
"line_height": None,
"letter_spacing": 0.0,
"word_spacing": 0.0,
"text_color": "#000000",
"opacity": 1.0,
"render_mode": "fill",
"text_align": "left",
}
def _merge_style_layers(inferred_style: dict | None, override_style: dict | None) -> dict:
base = _default_word_style()
if isinstance(inferred_style, dict):
base.update({k: v for k, v in inferred_style.items() if v is not None})
if isinstance(override_style, dict):
base.update({k: v for k, v in override_style.items() if v is not None})
return base
def _normalize_word_style(word: dict) -> dict:
inferred = word.get("inferred_style") if isinstance(word.get("inferred_style"), dict) else {}
override = word.get("override_style") if isinstance(word.get("override_style"), dict) else {}
resolved = _merge_style_layers(inferred, override)
word["inferred_style"] = _merge_style_layers({}, inferred)
word["override_style"] = override
word["resolved_style"] = resolved
manual_flags = word.get("manual_flags") if isinstance(word.get("manual_flags"), dict) else {}
manual_flags.setdefault("text_edited", False)
manual_flags.setdefault("geometry_edited", False)
manual_flags.setdefault("style_edited", False)
word["manual_flags"] = manual_flags
return word
def _normalize_layout_review_payload(layout_json: dict | None) -> dict:
layout_json = layout_json if isinstance(layout_json, dict) else {}
layout_json.setdefault("schema_version", 2)
layout_json.setdefault("edit_log", [])
pages = layout_json.get("pages")
if not isinstance(pages, list):
pages = []
layout_json["pages"] = pages
for page in pages:
words = page.get("words")
if not isinstance(words, list):
page["words"] = []
continue
for word in words:
if isinstance(word, dict):
_normalize_word_style(word)
return layout_json
def _append_layout_edit_event(layout_json: dict, event: dict) -> None:
edit_log = layout_json.setdefault("edit_log", [])
if isinstance(edit_log, list):
edit_log.append(event)
def _extract_line_texts_from_layout(layout_json: dict | None) -> list[str]:
if not layout_json:
return []
lines: list[str] = []
for page in layout_json.get("pages", []):
for line in page.get("lines", []):
lines.append((line.get("text") or "").strip())
return lines
def _build_review_text_value(
raw_ocr: TextVersion | None,
reviewed_ocr: TextVersion | None,
editor_source: str = "reviewed",
) -> str:
if editor_source == "raw":
source = raw_ocr or reviewed_ocr
else:
source = reviewed_ocr or raw_ocr
if source and source.text_content:
return source.text_content
if source and source.layout_json:
return "\n".join(_extract_line_texts_from_layout(source.layout_json))
return ""
def _line_count_from_layout(layout_json: dict | None) -> int:
return len(_extract_line_texts_from_layout(layout_json))
def _apply_reviewed_lines_to_layout(base_layout: dict | None, reviewed_text: str) -> dict | None:
if not base_layout:
return None
new_layout = deepcopy(base_layout)
reviewed_lines = reviewed_text.splitlines()
line_idx = 0
for page in new_layout.get("pages", []):
page_words = page.get("words", []) or []
words_by_id = {}
words_by_bbox = {}
for w in page_words:
word_id = w.get("id")
if word_id is not None:
words_by_id[str(word_id)] = w
bbox = w.get("bbox")
if isinstance(bbox, (list, tuple)) and len(bbox) == 4:
words_by_bbox[tuple(float(x) for x in bbox)] = w
for line in page.get("lines", []) or []:
new_line_text = reviewed_lines[line_idx] if line_idx < len(reviewed_lines) else ""
line["text"] = new_line_text
line_idx += 1
line_words = line.get("words", []) or []
if not line_words:
continue
tokens = new_line_text.split()
assigned = []
if not tokens:
assigned = [""] * len(line_words)
elif len(tokens) == len(line_words):
assigned = tokens
elif len(tokens) < len(line_words):
assigned = tokens + ([""] * (len(line_words) - len(tokens)))
else:
assigned = tokens[:len(line_words) - 1] + [" ".join(tokens[len(line_words) - 1:])]
for lw, token in zip(line_words, assigned):
lw["text"] = token
target = None
word_id = lw.get("id")
if word_id is not None:
target = words_by_id.get(str(word_id))
if target is None:
bbox = lw.get("bbox")
if isinstance(bbox, (list, tuple)) and len(bbox) == 4:
target = words_by_bbox.get(tuple(float(x) for x in bbox))
if target is not None:
target["text"] = token
return new_layout
def _get_existing_document_types(db: Session) -> list[str]:
rows = (
db.query(distinct(Document.document_type))
.filter(Document.document_type.isnot(None))
.order_by(Document.document_type.asc())
.all()
)
values: list[str] = []
for row in rows:
value = row[0]
if value:
values.append(str(value))
return values
def _get_queue_navigation(db: Session, document: Document) -> dict:
active_docs = (
db.query(Document)
.filter(Document.is_trashed.is_(False))
.order_by(Document.created_at.asc())
.all()
)
doc_ids = [d.document_id for d in active_docs]
prev_doc = None
next_doc = None
if document.document_id in doc_ids:
idx = doc_ids.index(document.document_id)
if idx > 0:
prev_doc = active_docs[idx - 1]
if idx < len(active_docs) - 1:
next_doc = active_docs[idx + 1]
needs_ocr = (
db.query(Document)
.filter(Document.is_trashed.is_(False))
.filter(Document.review_status != "reviewed")
.order_by(Document.created_at.asc())
.all()
)
reviewed_no_fields = []
for d in (
db.query(Document)
.options(selectinload(Document.extracted_fields))
.filter(Document.is_trashed.is_(False))
.filter(Document.review_status == "reviewed")
.order_by(Document.updated_at.asc())
.all()
):
if not d.extracted_fields:
reviewed_no_fields.append(d)
next_ocr = None
next_fields = None
if needs_ocr:
for d in needs_ocr:
if d.document_id != document.document_id:
next_ocr = d
break
if reviewed_no_fields:
for d in reviewed_no_fields:
if d.document_id != document.document_id:
next_fields = d
break
return {
"prev_doc": prev_doc,
"next_doc": next_doc,
"next_ocr_doc": next_ocr,
"next_fields_doc": next_fields,
}
def _document_matches_filters(
doc: Document,
q: str,
document_type: str,
review_status: str,
merchant: str,
owner_primary: str,
) -> bool:
q_norm = q.strip().lower()
type_norm = document_type.strip().lower()
review_norm = review_status.strip().lower()
merchant_norm = merchant.strip().lower()
owner_norm = owner_primary.strip().lower()
if q_norm:
haystacks = [
doc.document_id or "",
doc.document_type or "",
doc.original_filename or "",
doc.canonical_filename or "",
doc.current_path or "",
doc.source_path or "",
]
current_extracted = get_current_extracted_fields(doc)
current_additional = _get_current_additional_fields(doc)
if current_extracted is not None:
haystacks.extend([
current_extracted.merchant_raw or "",
current_extracted.merchant_normalized or "",
current_extracted.location or "",
current_extracted.counterparty or "",
current_extracted.receipt_number or "",
])
if current_additional is not None:
haystacks.extend([
current_additional.owner_primary or "",
current_additional.owner_secondary or "",
current_additional.paid_by_person or "",
current_additional.occasion_note or "",
])
if not any(q_norm in h.lower() for h in haystacks):
return False
if type_norm and type_norm != (doc.document_type or "").lower():
return False
if review_norm and review_norm != (doc.review_status or "").lower():
return False
if merchant_norm:
current_extracted = get_current_extracted_fields(doc)
merchant_values = []
if current_extracted is not None:
merchant_values = [
current_extracted.merchant_raw or "",
current_extracted.merchant_normalized or "",
]
if not any(merchant_norm in m.lower() for m in merchant_values):
return False
if owner_norm:
current_additional = _get_current_additional_fields(doc)
owner_values = []
if current_additional is not None:
owner_values = [
current_additional.owner_primary or "",
current_additional.owner_secondary or "",
]
if not any(owner_norm in o.lower() for o in owner_values):
return False
return True
def _norm_acl(value) -> str:
return str(value or "").strip().casefold()
def _user_is_admin(user) -> bool:
if not user:
return False
username = _norm_acl(user.get("username"))
if username in {"admin", "mcelwain"}:
return True
return bool(user.get("is_admin"))
def _user_can_access_document(user, doc) -> bool:
if not user:
return False
if user.get("is_admin"):
return True
allowed = {
_norm_acl(user.get("username")),
_norm_acl(user.get("display_name")),
}
allowed.discard("")
for addl in getattr(doc, "additional_fields", []) or []:
if _norm_acl(getattr(addl, "owner_primary", None)) in allowed:
return True
if _norm_acl(getattr(addl, "owner_secondary", None)) in allowed:
return True
return False
@router.get("/", response_class=HTMLResponse)
def list_documents(
request: Request,
q: str = Query("", description="Search"),
document_type: str = Query("", description="Document type"),
review_status: str = Query("", description="Review status"),
merchant: str = Query("", description="Merchant contains"),
owner_primary: str = Query("", description="Owner contains"),
tab: str = Query("all-documents"),
db: Session = Depends(get_db),
):
current_user = getattr(request.state, "current_user", None)
documents_all = (
db.query(Document)
.options(
selectinload(Document.extracted_fields),
selectinload(Document.additional_fields),
)
.filter(Document.is_trashed.is_(False))
.order_by(Document.created_at.desc())
.all()
)
# ACL temporarily disabled to restore document visibility
has_search_query = any([
q.strip(),
document_type.strip(),
review_status.strip(),
merchant.strip(),
owner_primary.strip(),
])
filtered_documents = documents_all
if has_search_query:
filtered_documents = []
for doc in documents_all:
if _document_matches_filters(
doc=doc,
q=q,
document_type=document_type,
review_status=review_status,
merchant=merchant,
owner_primary=owner_primary,
):
filtered_documents.append(doc)
if tab not in {"all-documents", "advanced-search"}:
tab = "all-documents"
return templates.TemplateResponse(
request=request,
name="documents/list.html",
context={
"request": request,
"documents": filtered_documents,
"q": q,
"document_type": document_type,
"review_status": review_status,
"merchant": merchant,
"owner_primary": owner_primary,
"has_search_query": has_search_query,
"active_tab": tab,
"active_page": "documents",
"current_user": getattr(request.state, "current_user", None),
},
)
@router.post("/{document_id}/save-document-type", response_class=RedirectResponse)
def save_document_type_route(
document_id: str,
document_type: str = Form(""),
db: Session = Depends(get_db),
):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
document.document_type = document_type.strip() or None
db.commit()
return RedirectResponse(url=f"/documents/{document.document_id}?tab=ocr-review&success=saved_document_type", status_code=303)
@router.post("/{document_id}/rerun-ocr", response_class=RedirectResponse)
def rerun_ocr(document_id: str, db: Session = Depends(get_db)):
document = (
db.query(Document)
.options(
selectinload(Document.text_versions),
selectinload(Document.analysis_versions),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
try:
if not document.current_path:
return RedirectResponse(
url=f"/documents/{document.document_id}?error=rerun_ocr_failed&tab=ocr-review",
status_code=303,
)
layout_result = run_layout_ocr(document.current_path)
analysis_json = build_layout_ocr_analysis_for_document(document)
text_content = analysis_json.get("text_content") or ""
for row in getattr(document, "text_versions", []) or []:
if getattr(row, "is_current", False):
row.is_current = False
next_version = (
max((getattr(v, "version_number", 0) or 0) for v in getattr(document, "text_versions", []) or []) + 1
if getattr(document, "text_versions", None) else 1
)
text_row = TextVersion(
document_id=document.id,
version_number=next_version,
version_type="raw_ocr",
text_content=text_content,
created_by="rerun_ocr_layout",
is_current=True,
ocr_engine=layout_result.engine_name,
ocr_engine_version=layout_result.engine_version,
rerun_source="layout_ocr",
quality_score=0.9 if analysis_json.get("quality", {}).get("usable_layout") else 0.5,
quality_flags=analysis_json.get("quality", {}).get("issues", []),
quality_note="Layout OCR generated line and word boxes for replica workflow.",
layout_json={"pages": analysis_json.get("pages", [])},
)
db.add(text_row)
db.flush()
for row in getattr(document, "analysis_versions", []) or []:
if getattr(row, "is_current", False):
row.is_current = False
next_analysis_version = (
max((getattr(v, "version_number", 0) or 0) for v in getattr(document, "analysis_versions", []) or []) + 1
if getattr(document, "analysis_versions", None) else 1
)
analysis_row = DocumentAnalysisVersion(
document_id=document.id,
version_number=next_analysis_version,
analysis_type="canonical",
is_current=True,
created_by="rerun_ocr_layout",
engine_name=layout_result.engine_name,
engine_version=layout_result.engine_version,
quality_score=0.9 if analysis_json.get("quality", {}).get("usable_layout") else 0.5,
quality_flags=analysis_json.get("quality", {}).get("issues", []),
quality_note="Canonical analysis refreshed from layout OCR result.",
analysis_json=analysis_json,
)
db.add(analysis_row)
db.commit()
except Exception:
traceback.print_exc()
db.rollback()
return RedirectResponse(
url=f"/documents/{document.document_id}?error=rerun_ocr_failed&tab=ocr-review",
status_code=303,
)
return RedirectResponse(
url=f"/documents/{document.document_id}?success=rerun_ocr&editor_source=raw&tab=ocr-review",
status_code=303,
)
@router.post("/{document_id}/save-ocr-corrected-pdf", response_class=RedirectResponse)
def save_ocr_corrected_pdf(document_id: str, db: Session = Depends(get_db)):
return RedirectResponse(
url=f"/documents/{document_id}?error=deprecated_pdf_route_disabled&tab=ocr-review",
status_code=303,
)
@router.post("/{document_id}/save-review-flags", response_class=RedirectResponse)
def save_review_flags(
document_id: str,
is_approved: str = Form(""),
is_excluded: str = Form(""),
db: Session = Depends(get_db),
):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
state = _get_or_create_document_review_state(db, document)
state.is_approved = bool(is_approved)
state.is_excluded = bool(is_excluded)
state.reviewed_at = datetime.utcnow()
db.add(state)
db.commit()
return RedirectResponse(
url=f"/documents/{document.document_id}?success=saved_review_flags",
status_code=303,
)
@router.post("/{document_id}/move-to-trash", response_class=RedirectResponse)
def move_to_trash(document_id: str, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
document.is_trashed = True
document.trashed_at = datetime.utcnow()
db.commit()
return RedirectResponse(url="/documents/", status_code=303)
def _resolve_document_output_path(document, output_path: str = "") -> Path:
save_root = get_default_save_root()
naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None
default_output_path = Path(
build_proposed_storage_path(
document=document,
save_root=save_root,
naming_row=naming_row,
)
)
default_output_path = default_output_path.with_name(
re.sub(r"(?:_v\d+|_\d+)(?=\.[^.]+$)", "", default_output_path.name)
)
if default_output_path.suffix.lower() != ".pdf":
default_output_path = default_output_path.with_suffix(".pdf")
output_path_raw = (output_path or "").strip()
output_path_obj = Path(output_path_raw) if output_path_raw else default_output_path
if output_path_obj.suffix.lower() != ".pdf":
output_path_obj = output_path_obj.with_suffix(".pdf")
allowed_root = Path(save_root).resolve()
resolved_parent = output_path_obj.parent.resolve()
if allowed_root != resolved_parent and allowed_root not in resolved_parent.parents:
raise ValueError("invalid_output_path")
output_path_obj.parent.mkdir(parents=True, exist_ok=True)
return output_path_obj
@router.post("/{document_id}/save-pdf", response_class=RedirectResponse)
def save_pdf(document_id: str, output_path: str = Form(""), db: Session = Depends(get_db)):
if not _storage_available():
return RedirectResponse(
url=f"/documents/{document_id}?error=storage_unavailable",
status_code=303,
)
document = (
db.query(Document)
.options(
selectinload(Document.text_versions),
selectinload(Document.naming_fields),
selectinload(Document.extracted_fields),
selectinload(Document.additional_fields),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
try:
output_path_obj = _resolve_document_output_path(document, output_path)
except ValueError:
return RedirectResponse(
url=f"/documents/{document.document_id}?error=invalid_output_path",
status_code=303,
)
has_extracted = bool(getattr(document, "extracted_fields", None))
has_additional = bool(getattr(document, "additional_fields", None))
try:
if has_extracted or has_additional:
save_field_enriched_pdf_current(db, document, output_path=output_path_obj)
else:
save_ocr_corrected_pdf_current(db, document, output_path=output_path_obj)
except Exception as e:
print("save_pdf failed:", repr(e), flush=True)
traceback.print_exc()
return RedirectResponse(
url=f"/documents/{document.document_id}?error=save_pdf_failed",
status_code=303,
)
return RedirectResponse(url=f"/documents/{document.document_id}?tab=ocr-review", status_code=303)
@router.post("/{document_id}/save-replica-pdf", response_class=RedirectResponse)
def save_replica_pdf_clean(document_id: str, output_path: str = Form(""), db: Session = Depends(get_db)):
if not _storage_available():
return RedirectResponse(url=f"/documents/{document_id}?error=storage_unavailable", status_code=303)
document = (
db.query(Document)
.options(
selectinload(Document.text_versions),
selectinload(Document.naming_fields),
selectinload(Document.replica_review_states),
selectinload(Document.replica_outputs),
selectinload(Document.extracted_fields),
selectinload(Document.analysis_versions),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
try:
output_path_obj = _resolve_document_output_path(document, output_path)
save_replica_pdf(db, document, output_path_obj, mode="clean")
return RedirectResponse(
url=f"/documents/{document.document_id}?success=saved_replica_pdf&tab=ocr-review&viewer_source=replica",
status_code=303,
)
except ValueError as e:
msg = str(e)
if "invalid_output_path" in msg:
return RedirectResponse(
url=f"/documents/{document.document_id}?error=invalid_output_path",
status_code=303,
)
if "document_analysis_missing_usable_layout" in msg or "clean_replica_has_no_renderable_lines" in msg:
return RedirectResponse(
url=f"/documents/{document.document_id}?error=clean_replica_requires_layout_ocr&tab=ocr-review&viewer_source=scan",
status_code=303,
)
traceback.print_exc()
return RedirectResponse(
url=f"/documents/{document.document_id}?error=save_replica_pdf_failed&tab=ocr-review",
status_code=303,
)
except Exception:
traceback.print_exc()
return RedirectResponse(
url=f"/documents/{document.document_id}?error=save_replica_pdf_failed&tab=ocr-review",
status_code=303,
)
@router.post("/{document_id}/save-replica-pdf-scan-backed", response_class=RedirectResponse)
def save_replica_pdf_scan_backed(document_id: str, output_path: str = Form(""), db: Session = Depends(get_db)):
if not _storage_available():
return RedirectResponse(url=f"/documents/{document_id}?error=storage_unavailable", status_code=303)
document = (
db.query(Document)
.options(
selectinload(Document.text_versions),
selectinload(Document.naming_fields),
selectinload(Document.replica_review_states),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
try:
output_path_obj = _resolve_document_output_path(document, output_path)
save_replica_pdf(db, document, output_path_obj, mode="scan_backed")
except ValueError as e:
if "invalid_output_path" in str(e):
return RedirectResponse(url=f"/documents/{document.document_id}?error=invalid_output_path", status_code=303)
return RedirectResponse(url=f"/documents/{document.document_id}?error=save_replica_pdf_scan_backed_failed&tab=ocr-review", status_code=303)
except Exception:
traceback.print_exc()
return RedirectResponse(url=f"/documents/{document.document_id}?error=save_replica_pdf_scan_backed_failed&tab=ocr-review", status_code=303)
return RedirectResponse(url=f"/documents/{document.document_id}?success=saved_replica_pdf_scan_backed&tab=ocr-review", status_code=303)
@router.post("/{document_id}/save-replica-pdf-debug-overlay", response_class=RedirectResponse)
def save_replica_pdf_debug_overlay(document_id: str, output_path: str = Form(""), db: Session = Depends(get_db)):
if not _storage_available():
return RedirectResponse(url=f"/documents/{document_id}?error=storage_unavailable", status_code=303)
document = (
db.query(Document)
.options(
selectinload(Document.text_versions),
selectinload(Document.naming_fields),
selectinload(Document.replica_review_states),
selectinload(Document.replica_outputs),
selectinload(Document.analysis_versions),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
try:
output_path_obj = _resolve_document_output_path(document, output_path)
save_replica_pdf(db, document, output_path_obj, mode="debug_overlay")
except ValueError as e:
if "invalid_output_path" in str(e):
return RedirectResponse(url=f"/documents/{document.document_id}?error=invalid_output_path", status_code=303)
traceback.print_exc()
return RedirectResponse(
url=f"/documents/{document.document_id}?error=save_replica_pdf_debug_overlay_failed&tab=ocr-review",
status_code=303,
)
except Exception:
traceback.print_exc()
return RedirectResponse(
url=f"/documents/{document.document_id}?error=save_replica_pdf_debug_overlay_failed&tab=ocr-review",
status_code=303,
)
return RedirectResponse(
url=f"/documents/{document.document_id}?success=saved_replica_pdf_debug_overlay&tab=ocr-review&viewer_source=replica_debug_overlay",
status_code=303,
)
@router.post("/{document_id}/save-field-enriched-pdf", response_class=RedirectResponse)
def save_field_enriched_pdf(document_id: str, db: Session = Depends(get_db)):
return RedirectResponse(
url=f"/documents/{document_id}?error=deprecated_pdf_route_disabled&tab=extracted-fields",
status_code=303,
)
@router.post("/{document_id}/review-text", response_class=RedirectResponse)
async def review_text(
document_id: str,
reviewed_text: str = Form(""),
quality_flags: list[str] = Form(default=[]),
quality_note: str = Form(""),
db: Session = Depends(get_db),
):
document = (
db.query(Document)
.options(selectinload(Document.text_versions))
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
raw_ocr, reviewed_ocr = _get_current_text_versions(document)
base_layout = None
if reviewed_ocr and isinstance(getattr(reviewed_ocr, "layout_json", None), dict):
base_layout = json.loads(json.dumps(reviewed_ocr.layout_json))
elif raw_ocr and isinstance(getattr(raw_ocr, "layout_json", None), dict):
base_layout = json.loads(json.dumps(raw_ocr.layout_json))
expected_line_count = _line_count_from_layout(base_layout)
actual_line_count = len(reviewed_text.splitlines())
existing_reviewed = [
tv for tv in document.text_versions
if tv.version_type in ("reviewed", "reviewed_ocr") and tv.is_current
]
for tv in existing_reviewed:
tv.is_current = False
if expected_line_count and actual_line_count == expected_line_count:
reviewed_layout = _apply_reviewed_lines_to_layout(base_layout, reviewed_text)
if isinstance(reviewed_layout, dict):
reviewed_layout["layout_sync_source"] = "ocr_review"
reviewed_layout["layout_sync_status"] = "synced"
reviewed_layout["layout_needs_review"] = False
else:
reviewed_layout = dict(base_layout or {})
reviewed_layout["layout_sync_source"] = "ocr_review"
reviewed_layout["layout_sync_status"] = "text_changed_needs_layout_review"
reviewed_layout["layout_needs_review"] = True
reviewed_version = TextVersion(
document_id=document.id,
version_number=max(tv.version_number for tv in document.text_versions) + 1 if document.text_versions else 1,
version_type="reviewed_ocr",
text_content=reviewed_text,
created_by="mcelwain",
is_current=True,
derived_from_version_id=(reviewed_ocr.id if reviewed_ocr else (raw_ocr.id if raw_ocr else None)),
layout_json=reviewed_layout,
)
db.add(reviewed_version)
if raw_ocr:
raw_ocr.quality_score = compute_quality_score(raw_ocr.text_content, reviewed_text)
raw_ocr.quality_flags = quality_flags or []
raw_ocr.quality_note = quality_note or None
document.review_status = "reviewed"
db.commit()
return RedirectResponse(
url=f"/documents/{document.document_id}?tab=ocr-review&success=saved_reviewed_ocr",
status_code=303,
)
@router.post("/{document_id}/save-extracted-fields", response_class=RedirectResponse)
def save_extracted_fields_route(
document_id: str,
merchant_raw: str = Form(""),
merchant_normalized: str = Form(""),
transaction_date: str = Form(""),
transaction_time: str = Form(""),
subtotal: str = Form(""),
tax: str = Form(""),
total: str = Form(""),
currency: str = Form(""),
payment_method: str = Form(""),
receipt_number: str = Form(""),
location: str = Form(""),
counterparty: str = Form(""),
extra_json: str = Form("{}"),
db: Session = Depends(get_db),
):
document = (
db.query(Document)
.options(
selectinload(Document.extracted_fields),
selectinload(Document.receipt_line_items),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
save_extracted_fields(
db=db,
document=document,
merchant_raw=merchant_raw,
merchant_normalized=merchant_normalized,
transaction_date=transaction_date,
transaction_time=transaction_time,
subtotal=subtotal,
tax=tax,
total=total,
currency=currency,
payment_method=payment_method,
receipt_number=receipt_number,
location=location,
counterparty=counterparty,
extra_json=extra_json,
)
db.refresh(document)
current_extracted = get_current_extracted_fields(document)
if current_extracted is not None:
_snapshot_extracted_field(
db,
document,
current_extracted,
created_by="save_extracted_fields",
notes="Saved extracted fields from document detail form.",
)
db.commit()
return RedirectResponse(
url=f"/documents/{document.document_id}?autofill_extracted=0&tab=extracted-fields",
status_code=303,
)
@router.post("/{document_id}/save-additional-fields", response_class=RedirectResponse)
def save_additional_fields_route(
document_id: str,
owner_primary: str = Form(""),
owner_secondary: str = Form(""),
paid_by_person: str = Form(""),
covered_people: str = Form(""),
attendees: str = Form(""),
occasion_note: str = Form(""),
is_shared_expense: str | None = Form(None),
reimbursement_expected_from: str = Form(""),
reimbursement_paid_by: str = Form(""),
reimbursement_paid_to: str = Form(""),
reimbursement_paid_amount: str = Form(""),
reimbursement_paid_date: str = Form(""),
reimbursement_note: str = Form(""),
db: Session = Depends(get_db),
):
document = (
db.query(Document)
.options(selectinload(Document.additional_fields))
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None
if additional is None:
additional = DocumentAdditionalField(document_id=document.id)
db.add(additional)
db.flush()
additional.owner_primary = owner_primary or None
additional.owner_secondary = owner_secondary or None
additional.paid_by_person = paid_by_person or None
additional.covered_people = [v.strip() for v in covered_people.split(",") if v.strip()] or None
additional.attendees = [v.strip() for v in attendees.split(",") if v.strip()] or None
additional.occasion_note = occasion_note or None
additional.is_shared_expense = bool(is_shared_expense)
additional.reimbursement_expected_from = [v.strip() for v in reimbursement_expected_from.split(",") if v.strip()] or None
additional.reimbursement_paid_by = reimbursement_paid_by or None
additional.reimbursement_paid_to = reimbursement_paid_to or None
additional.reimbursement_paid_amount = Decimal(reimbursement_paid_amount) if reimbursement_paid_amount.strip() else None
additional.reimbursement_paid_date = datetime.strptime(reimbursement_paid_date, "%Y-%m-%d").date() if reimbursement_paid_date.strip() else None
additional.reimbursement_note = reimbursement_note or None
db.add(additional)
db.commit()
db.refresh(document)
current_additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None
if current_additional is not None:
_snapshot_additional_field(
db,
document,
current_additional,
created_by="save_additional_fields",
notes="Saved additional fields from document detail form.",
)
db.commit()
return RedirectResponse(
url=f"/documents/{document.document_id}?tab=additional-fields",
status_code=303,
)
@router.post("/{document_id}/regenerate-line-items", response_class=RedirectResponse)
def regenerate_line_items(document_id: str, db: Session = Depends(get_db)):
document = (
db.query(Document)
.options(
selectinload(Document.text_versions),
selectinload(Document.line_item_set).selectinload(DocumentLineItemSet.items),
selectinload(Document.line_item_set_versions),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
text_version = _get_current_reviewed_text(document)
if text_version is None:
return RedirectResponse(
url=f"/documents/{document.document_id}?tab=line-items&error=regenerate_line_items_failed",
status_code=303,
)
try:
lines = _get_document_lines(text_version)
items = _extract_receipt_line_items(lines)
_replace_document_line_items(db, document, items)
db.flush()
next_version = max([v.version_number for v in document.line_item_set_versions], default=0) + 1
version = DocumentLineItemSetVersion(
document_id=document.id,
version_number=next_version,
schema_type=document.line_item_set.schema_type if document.line_item_set else (document.document_type or "generic"),
created_by="regenerate_line_items",
notes="Regenerated line items from current OCR text.",
)
db.add(version)
db.flush()
current_items = (
db.query(DocumentLineItem)
.filter(DocumentLineItem.line_item_set_id == document.line_item_set.id)
.order_by(DocumentLineItem.line_number.asc())
.all()
)
for item in current_items:
db.add(DocumentLineItemVersionItem(
set_version_id=version.id,
line_number=item.line_number,
entry_date=item.entry_date,
description=item.description,
quantity=item.quantity,
unit_price=item.unit_price,
line_total=item.line_total,
tax_amount=item.tax_amount,
category=item.category,
notes=item.notes,
raw_json=item.raw_json,
))
db.commit()
except Exception:
traceback.print_exc()
db.rollback()
return RedirectResponse(
url=f"/documents/{document.document_id}?tab=line-items&error=regenerate_line_items_failed",
status_code=303,
)
return RedirectResponse(
url=f"/documents/{document.document_id}?tab=line-items&success=regenerated_line_items",
status_code=303,
)
@router.post("/{document_id}/save-line-items", response_class=RedirectResponse)
async def save_line_items(
document_id: str,
request: Request,
row_count: int = Form(...),
db: Session = Depends(get_db),
):
document = (
db.query(Document)
.options(
selectinload(Document.line_item_set).selectinload(DocumentLineItemSet.items),
selectinload(Document.line_item_set_versions),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
form = await request.form()
if document.line_item_set is None:
document.line_item_set = DocumentLineItemSet(
document_id=document.id,
schema_type=document.document_type or "generic",
)
db.add(document.line_item_set)
db.flush()
document.line_item_set.schema_type = document.document_type or "generic"
document.line_item_set.items.clear()
db.flush()
for i in range(row_count):
entry_date = (form.get(f"entry_date_{i}") or "").strip()
description = (form.get(f"description_{i}") or "").strip()
quantity = (form.get(f"quantity_{i}") or "").strip()
unit_price = (form.get(f"unit_price_{i}") or "").strip()
line_total = (form.get(f"line_total_{i}") or "").strip()
tax_amount = (form.get(f"tax_amount_{i}") or "").strip()
category = (form.get(f"category_{i}") or "").strip()
notes = (form.get(f"notes_{i}") or "").strip()
if not any([entry_date, description, quantity, unit_price, line_total, tax_amount, category, notes]):
continue
item = DocumentLineItem(
line_item_set_id=document.line_item_set.id,
line_number=i + 1,
entry_date=datetime.strptime(entry_date, "%Y-%m-%d").date() if entry_date else None,
description=description or None,
quantity=Decimal(quantity) if quantity else None,
unit_price=Decimal(unit_price) if unit_price else None,
line_total=Decimal(line_total) if line_total else None,
tax_amount=Decimal(tax_amount) if tax_amount else None,
category=category or None,
notes=notes or None,
)
db.add(item)
db.flush()
next_version = max([v.version_number for v in document.line_item_set_versions], default=0) + 1
version = DocumentLineItemSetVersion(
document_id=document.id,
version_number=next_version,
schema_type=document.line_item_set.schema_type,
created_by="save_line_items",
notes="Saved line items from document detail tab.",
)
db.add(version)
db.flush()
current_items = (
db.query(DocumentLineItem)
.filter(DocumentLineItem.line_item_set_id == document.line_item_set.id)
.order_by(DocumentLineItem.line_number.asc())
.all()
)
for item in current_items:
db.add(DocumentLineItemVersionItem(
set_version_id=version.id,
line_number=item.line_number,
entry_date=item.entry_date,
description=item.description,
quantity=item.quantity,
unit_price=item.unit_price,
line_total=item.line_total,
tax_amount=item.tax_amount,
category=item.category,
notes=item.notes,
raw_json=item.raw_json,
))
db.commit()
return RedirectResponse(
url=f"/documents/{document.document_id}?tab=line-items",
status_code=303,
)
@router.get("/{document_id}/preview-image")
def document_preview_image(document_id: str, page: int = 1, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None or not document.current_path:
return HTMLResponse(content="Preview image not found", status_code=404)
path_obj = Path(document.current_path)
if not path_obj.exists() or not path_obj.is_file():
return HTMLResponse(content="Preview image not found", status_code=404)
try:
pil_images = convert_from_path(str(path_obj), dpi=150, first_page=page, last_page=page)
if not pil_images:
return HTMLResponse(content="Preview image not found", status_code=404)
img = pil_images[0]
buf = BytesIO()
img.save(buf, format="PNG")
return Response(content=buf.getvalue(), media_type="image/png")
except Exception as e:
return HTMLResponse(content=f"Preview image generation failed: {e!r}", status_code=500)
@router.get("/{document_id}/preview-file")
def document_preview_file(document_id: str, path: str | None = None, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
resolved_path = path or (document.current_path if document else None)
if document is None or not resolved_path:
return HTMLResponse(content="Preview file not found", status_code=404)
path_obj = Path(resolved_path)
if not path_obj.exists() or not path_obj.is_file():
return HTMLResponse(content="Preview file not found", status_code=404)
media_type = document.mime_type or "application/octet-stream"
return FileResponse(path=str(path_obj), media_type=media_type, filename=path_obj.name, headers={"Content-Disposition": "inline; filename=\"" + path_obj.name + "\""})
def _get_latest_replica_output(document, output_type: str):
outputs = getattr(document, "replica_outputs", None) or []
matches = [row for row in outputs if getattr(row, "output_type", None) == output_type]
matches.sort(key=lambda x: getattr(x, "created_at", None) or 0, reverse=True)
return matches[0] if matches else None
def _build_preview_url_for_path(request: Request, document_id: str, path_value: str | None):
if not path_value:
return None
path_obj = Path(path_value)
if not path_obj.exists() or not path_obj.is_file():
return None
from urllib.parse import quote
base = str(request.url_for("document_preview_file", document_id=document_id))
return f"{base}?path={quote(str(path_obj))}&v={int(path_obj.stat().st_mtime)}"
# --- layout review save helpers start ---
def _layout_review_group_words_into_lines(words, y_tol: float = 12.0):
normalized = []
for word in words or []:
bbox = word.get("bbox") or [0, 0, 0, 0]
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
continue
try:
x1 = float(bbox[0])
y1 = float(bbox[1])
x2 = float(bbox[2])
y2 = float(bbox[3])
except Exception:
continue
normalized.append({
"id": word.get("id"),
"text": (word.get("text") or "").strip(),
"bbox": [x1, y1, x2, y2],
"font_size_guess": float(word.get("font_size_guess") or max(6.0, (y2 - y1) * 0.75)),
"font_family_guess": (word.get("font_family_guess") or "Helvetica"),
})
normalized.sort(key=lambda w: (w["bbox"][1], w["bbox"][0]))
groups = []
for word in normalized:
word_center_y = (word["bbox"][1] + word["bbox"][3]) / 2.0
placed = False
for group in groups:
group_center_y = sum((item["bbox"][1] + item["bbox"][3]) / 2.0 for item in group) / len(group)
if abs(word_center_y - group_center_y) <= y_tol:
group.append(word)
placed = True
break
if not placed:
groups.append([word])
lines = []
for group in groups:
group.sort(key=lambda w: w["bbox"][0])
line_text = " ".join((item.get("text") or "").strip() for item in group).strip()
left = min(item["bbox"][0] for item in group)
top = min(item["bbox"][1] for item in group)
right = max(item["bbox"][2] for item in group)
bottom = max(item["bbox"][3] for item in group)
line_font_sizes = [float(item.get("font_size_guess") or max(6.0, (item["bbox"][3] - item["bbox"][1]) * 0.75)) for item in group]
line_font_family = next((item.get("font_family_guess") for item in group if item.get("font_family_guess")), "Helvetica")
lines.append({
"text": line_text,
"bbox": [left, top, right, bottom],
"confidence": None,
"font_family_guess": line_font_family,
"font_size_guess": round(sum(line_font_sizes) / len(line_font_sizes), 2),
"text_color_guess": "#000000",
"words": group,
})
return lines
@router.post("/{document_id}/save-layout-review")
async def save_layout_review(document_id: str, request: Request, db: Session = Depends(get_db)):
form = await request.form()
payload_raw = form.get("layout_review_json")
print("[save_layout_review] payload present:", bool(payload_raw))
print("[save_layout_review] payload length:", len(payload_raw) if payload_raw else 0)
print(f"[save_layout_review] document_id={document_id} payload_present={bool(payload_raw)} payload_len={len(payload_raw) if payload_raw else 0}", flush=True)
if not payload_raw:
return RedirectResponse(
url=f"/documents/{document_id}?tab=layout-review&error=layout_review_missing_payload",
status_code=303,
)
try:
payload = json.loads(payload_raw)
except Exception:
return RedirectResponse(
url=f"/documents/{document_id}?tab=layout-review&error=layout_review_invalid_json",
status_code=303,
)
document = (
db.query(Document)
.options(selectinload(Document.text_versions))
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return HTMLResponse(content="Document not found", status_code=404)
raw_ocr, reviewed_ocr = _get_current_text_versions(document)
current_text_version = next(
(
tv for tv in sorted(
getattr(document, "text_versions", []),
key=lambda x: (x.version_number, x.created_at),
reverse=True,
)
if tv.is_current
),
None,
)
source_version = reviewed_ocr or raw_ocr or current_text_version
if source_version is None:
return RedirectResponse(
url=f"/documents/{document_id}?tab=layout-review&error=layout_review_no_source",
status_code=303,
)
posted_pages = payload.get("pages") if isinstance(payload, dict) else None
if not isinstance(posted_pages, list) or not posted_pages:
return RedirectResponse(
url=f"/documents/{document_id}?tab=layout-review&error=layout_review_no_pages",
status_code=303,
)
rebuilt_pages = []
rebuilt_text_lines = []
for idx, page in enumerate(posted_pages, start=1):
page_number = int(page.get("page") or idx)
page_width = float(page.get("page_width") or 1.0)
page_height = float(page.get("page_height") or 1.0)
words = []
for word_idx, word in enumerate(page.get("words", []) or [], start=1):
bbox = word.get("bbox") or [0, 0, 0, 0]
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
continue
try:
x1 = float(bbox[0])
y1 = float(bbox[1])
x2 = float(bbox[2])
y2 = float(bbox[3])
except Exception:
continue
x_left = min(x1, x2)
x_right = max(x1, x2)
y_top = min(y1, y2)
y_bottom = max(y1, y2)
if abs(x_right - x_left) < 1.0 or abs(y_bottom - y_top) < 1.0:
continue
font_size_guess = float(word.get("font_size_guess") or max(6.0, (y_bottom - y_top) * 0.75))
font_family_guess = (word.get("font_family_guess") or "Helvetica")
words.append({
"id": int(word.get("id") or word_idx),
"text": (word.get("text") or "").strip(),
"bbox": [x_left, y_top, x_right, y_bottom],
"confidence": None,
"font_size_guess": font_size_guess,
"font_family_guess": font_family_guess,
})
words.sort(key=lambda w: (w["bbox"][1], w["bbox"][0]))
lines = _layout_review_group_words_into_lines(words)
rebuilt_text_lines.extend((line.get("text") or "") for line in lines)
rebuilt_pages.append({
"page": page_number,
"page_width": page_width,
"page_height": page_height,
"image_width": page_width,
"image_height": page_height,
"words": words,
"lines": lines,
})
source_layout_json = getattr(source_version, "layout_json", None)
new_layout_json = {}
if isinstance(source_layout_json, dict):
for key in ("schema_version", "analysis_type", "engine"):
if key in source_layout_json:
new_layout_json[key] = source_layout_json[key]
if "schema_version" not in new_layout_json:
new_layout_json["schema_version"] = 1
if "analysis_type" not in new_layout_json:
new_layout_json["analysis_type"] = "canonical"
new_layout_json["pages"] = rebuilt_pages
new_layout_json["layout_sync_status"] = "synced"
new_layout_json["layout_sync_source"] = "layout_review"
new_layout_json["layout_needs_review"] = False
new_layout_json = _normalize_layout_review_payload(new_layout_json)
_append_layout_edit_event(
new_layout_json,
{
"event_type": "layout_review_save",
"actor": "user",
"source": "layout_review_editor",
"timestamp": datetime.utcnow().isoformat() + "Z",
},
)
new_text_content = "\n".join(rebuilt_text_lines).strip()
next_version_number = max(
[getattr(tv, "version_number", 0) for tv in getattr(document, "text_versions", [])] or [0]
) + 1
for tv in getattr(document, "text_versions", []):
tv.is_current = False
new_version = TextVersion(
document_id=document.id,
version_number=next_version_number,
version_type="reviewed_ocr",
text_content=new_text_content,
created_by="layout_review_editor",
is_current=True,
ocr_engine=getattr(source_version, "ocr_engine", None),
ocr_engine_version=getattr(source_version, "ocr_engine_version", None),
rerun_source="layout_review",
quality_score=getattr(source_version, "quality_score", None),
quality_flags=getattr(source_version, "quality_flags", None),
quality_note=getattr(source_version, "quality_note", None),
derived_from_version_id=getattr(source_version, "id", None),
layout_json=new_layout_json,
)
db.add(new_version)
db.commit()
return RedirectResponse(
url=f"/documents/{document_id}?tab=layout-review&success=saved_layout_review",
status_code=303,
)
# --- layout review save helpers end ---
@router.get("/{document_id}", response_class=HTMLResponse)
def document_detail(document_id: str, request: Request, queue: str | None = None, viewer_source: str = "scan", db: Session = Depends(get_db)):
current_user = getattr(request.state, "current_user", None)
document = (
db.query(Document)
.options(
selectinload(Document.versions),
selectinload(Document.text_versions),
selectinload(Document.extracted_fields),
selectinload(Document.layer1_candidates),
selectinload(Document.additional_fields),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return HTMLResponse(content="Document not found", status_code=404)
raw_ocr, reviewed_ocr = _get_current_text_versions(document)
layout_source_version = reviewed_ocr or raw_ocr
layout_source_json = (
layout_source_version.layout_json
if layout_source_version and isinstance(getattr(layout_source_version, "layout_json", None), dict)
else None
)
current_text_version = next(
(
tv for tv in sorted(
getattr(document, "text_versions", []),
key=lambda x: (x.version_number, x.created_at),
reverse=True,
)
if tv.is_current
),
None,
)
editor_source = request.query_params.get("editor_source", "reviewed")
review_text_value = _build_review_text_value(raw_ocr, reviewed_ocr, editor_source)
layout_source_version = reviewed_ocr or raw_ocr or current_text_version
layout_source_json = (
layout_source_version.layout_json
if layout_source_version and isinstance(getattr(layout_source_version, "layout_json", None), dict)
else None
)
expected_line_count = _line_count_from_layout(layout_source_json)
actual_line_count = len(review_text_value.splitlines()) if review_text_value else 0
line_numbers = list(range(1, max(actual_line_count, expected_line_count) + 1))
replica_clean_output = _get_latest_replica_output(document, "clean")
replica_scan_backed_output = _get_latest_replica_output(document, "scan_backed")
replica_debug_overlay_output = _get_latest_replica_output(document, "debug_overlay")
overlay_page_data = []
layout_review_pages = []
try:
layout_json = layout_source_json or {}
overlay_pages = layout_json.get("pages", []) if isinstance(layout_json, dict) else []
for page in overlay_pages:
page_width = float(page.get("page_width") or page.get("image_width") or 1.0)
page_height = float(page.get("page_height") or page.get("image_height") or 1.0)
words = []
for idx, word in enumerate(page.get("words", []) or [], start=1):
bbox = word.get("bbox") or [0, 0, 0, 0]
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
continue
word_row = {
"id": idx,
"text": (word.get("text") or "").strip(),
"bbox": [float(bbox[0]), float(bbox[1]), float(bbox[2]), float(bbox[3])],
"font_size_guess": float(word.get("font_size_guess") or max(6.0, (float(bbox[3]) - float(bbox[1])) * 0.75)),
"font_family_guess": (word.get("font_family_guess") or "Helvetica"),
}
words.append(word_row)
lines = []
source_lines = []
for region in page.get("regions", []) or []:
source_lines.extend(region.get("lines", []) or [])
if not source_lines:
source_lines = page.get("lines", []) or []
for line in source_lines:
bbox = line.get("bbox") or [0, 0, 0, 0]
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
continue
lines.append({
"text": (line.get("text") or "").strip(),
"bbox": [float(bbox[0]), float(bbox[1]), float(bbox[2]), float(bbox[3])],
})
overlay_page_data.append({
"page": page.get("page"),
"page_width": page_width,
"page_height": page_height,
"words": [{"text": w["text"], "bbox": w["bbox"]} for w in words],
"lines": lines,
})
layout_review_pages.append({
"page": page.get("page"),
"page_width": page_width,
"page_height": page_height,
"words": words,
"lines": lines,
})
except Exception as e:
print("layout review build failed:", repr(e), flush=True)
overlay_page_data = []
layout_review_pages = []
scan_path = document.current_path
replica_path = replica_clean_output.file_path if replica_clean_output and replica_clean_output.file_path else None
replica_scan_backed_path = replica_scan_backed_output.file_path if replica_scan_backed_output and replica_scan_backed_output.file_path else None
replica_debug_overlay_path = replica_debug_overlay_output.file_path if replica_debug_overlay_output and replica_debug_overlay_output.file_path else None
effective_viewer_source = viewer_source or "scan"
preview_path = scan_path
if effective_viewer_source == "docx":
preview_path = scan_path
elif effective_viewer_source == "replica" and replica_path:
preview_path = replica_path
elif effective_viewer_source == "replica_scan_backed" and replica_scan_backed_path:
preview_path = replica_scan_backed_path
elif effective_viewer_source == "replica_debug_overlay" and replica_debug_overlay_path:
preview_path = replica_debug_overlay_path
else:
effective_viewer_source = "scan"
preview_path = scan_path
storage_available = _storage_available()
file_url = _build_preview_url_for_path(request, document.document_id, preview_path)
layout_review_image_url = str(request.url_for("document_preview_image", document_id=document.document_id)) + "?page=1"
app_url = str(request.url_for("document_detail", document_id=document.document_id))
error = request.query_params.get("error")
success = request.query_params.get("success")
error_expected = request.query_params.get("expected")
error_actual = request.query_params.get("actual")
preset_id_raw = request.query_params.get("preset_id")
try:
preset_id = int(preset_id_raw) if preset_id_raw else None
except ValueError:
preset_id = None
selected_preset = _get_preset_by_id(db, preset_id)
all_presets = _get_all_presets(db)
existing_document_types = _get_existing_document_types(db)
extracted_form = _extracted_field_form_values(document, request)
additional_form = _additional_field_form_values(document, selected_preset)
current_extracted = get_current_extracted_fields(document)
current_additional = _get_current_additional_fields(document)
current_extracted_version_number = _get_current_extracted_version_number(document)
current_additional_version_number = _get_current_additional_version_number(document)
line_items = []
if document.line_item_set and document.line_item_set.items:
line_items = sorted(
document.line_item_set.items,
key=lambda x: x.line_number or 0,
)
# ACL temporarily disabled to restore detail visibility
review_state = _get_or_create_document_review_state(db, document)
queue_nav = _get_queue_navigation(db, document)
naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None
default_save_root = get_default_save_root()
proposed_storage_path = build_proposed_storage_path(
document=document,
save_root=default_save_root,
naming_row=naming_row,
)
proposed_storage_path = str(
Path(proposed_storage_path).with_name(
re.sub(r"(?:_v\d+|_\d+)(?=\.[^.]+$)", "", Path(proposed_storage_path).name)
)
)
version_rows = []
for version in sorted(getattr(document, "versions", []), key=lambda v: v.version_number, reverse=True):
file_exists = _version_file_available(version, document.document_id)
version_rows.append((version, file_exists))
current_line_item_version = None
if document.line_item_set_versions:
current_line_item_version = max(
document.line_item_set_versions,
key=lambda v: (v.version_number, v.created_at),
)
ocr_version_options = [
(v.version_number, v.version_type, v.created_at)
for v in sorted(getattr(document, "text_versions", []), key=lambda v: v.version_number, reverse=True)
]
extracted_version_options = [
(v.version_number, v.created_at)
for v in sorted(getattr(document, "extracted_field_versions", []), key=lambda v: v.version_number, reverse=True)
]
additional_version_options = [
(v.version_number, v.created_at)
for v in sorted(getattr(document, "additional_field_versions", []), key=lambda v: v.version_number, reverse=True)
]
active_tab = request.query_params.get("tab", "ocr-review")
if active_tab not in {"ocr-review", "layout-review", "extracted-fields", "additional-fields", "line-items", "versions", "raw-ocr", "source-options"}:
active_tab = "ocr-review"
return templates.TemplateResponse(
request=request,
name="documents/detail.html",
context={
"request": request,
"document": document,
"review_state": review_state,
"default_save_root": default_save_root,
"proposed_storage_path": proposed_storage_path,
"prev_doc": queue_nav.get("prev_doc"),
"next_doc": queue_nav.get("next_doc"),
"next_ocr_doc": queue_nav.get("next_ocr_doc"),
"next_fields_doc": queue_nav.get("next_fields_doc"),
"raw_ocr": raw_ocr,
"reviewed_ocr": reviewed_ocr,
"current_text_version": current_text_version,
"review_text_value": review_text_value,
"file_url": file_url,
"layout_review_image_url": layout_review_image_url,
"storage_available": storage_available,
"viewer_source": effective_viewer_source,
"overlay_page_data": overlay_page_data,
"layout_review_pages": layout_review_pages,
"replica_clean_output": replica_clean_output,
"replica_scan_backed_output": replica_scan_backed_output,
"replica_debug_overlay_output": replica_debug_overlay_output,
"version_rows": version_rows,
"current_line_item_version": current_line_item_version,
"ocr_version_options": ocr_version_options,
"extracted_version_options": extracted_version_options,
"additional_version_options": additional_version_options,
"app_url": app_url,
"quality_flag_options": QUALITY_FLAG_OPTIONS,
"current_quality_flags": raw_ocr.quality_flags if raw_ocr and raw_ocr.quality_flags else [],
"current_quality_note": raw_ocr.quality_note if raw_ocr and raw_ocr.quality_note else "",
"line_numbers": line_numbers,
"expected_line_count": expected_line_count,
"actual_line_count": actual_line_count,
"error": error,
"success": success,
"error_expected": error_expected,
"error_actual": error_actual,
"extracted_form": extracted_form,
"current_extracted": current_extracted,
"current_extracted_version_number": current_extracted_version_number,
"additional_form": additional_form,
"current_additional": current_additional,
"current_additional_version_number": current_additional_version_number,
"line_items": line_items,
"presets": all_presets,
"selected_preset_id": preset_id,
"existing_document_types": existing_document_types,
"active_tab": active_tab,
"active_page": "documents",
"current_user": current_user,
},
)
def _get_current_ocr_text_for_document_export(document: Document) -> str:
reviewed_rows = [
tv for tv in getattr(document, "text_versions", [])
if tv.version_type == "reviewed" and tv.is_current
]
if reviewed_rows:
reviewed_rows.sort(key=lambda x: (x.version_number, x.created_at), reverse=True)
return reviewed_rows[0].text_content or ""
raw_rows = [
tv for tv in getattr(document, "text_versions", [])
if tv.version_type == "raw_ocr" and tv.is_current
]
if raw_rows:
raw_rows.sort(key=lambda x: (x.version_number, x.created_at), reverse=True)
return raw_rows[0].text_content or ""
return ""
@router.get("/export/training.jsonl")
def export_training_jsonl(db: Session = Depends(get_db)):
docs = (
db.query(Document)
.options(
selectinload(Document.text_versions),
selectinload(Document.naming_fields),
selectinload(Document.extracted_fields),
selectinload(Document.additional_fields),
selectinload(Document.line_item_set).selectinload(DocumentLineItemSet.items),
selectinload(Document.review_state),
)
.order_by(Document.updated_at.asc())
.all()
)
export_dir = Path("/mnt/storage/document-processor/exports")
export_dir.mkdir(parents=True, exist_ok=True)
out_path = export_dir / "document_training.jsonl"
with out_path.open("w", encoding="utf-8") as f:
for document in docs:
review_state = getattr(document, "review_state", None)
if review_state is None:
continue
if not review_state.reviewed_at:
continue
if not review_state.is_approved:
continue
if review_state.is_excluded:
continue
extracted = get_current_extracted_fields(document)
additional = _get_current_additional_fields(document)
line_items = []
if document.line_item_set and document.line_item_set.items:
for item in sorted(document.line_item_set.items, key=lambda x: x.line_number or 0):
line_items.append(
{
"line_item_id": item.id,
"line_number": item.line_number,
"entry_date": item.entry_date.isoformat() if item.entry_date else "",
"description": item.description or "",
"quantity": str(item.quantity) if item.quantity is not None else "",
"unit_price": str(item.unit_price) if item.unit_price is not None else "",
"line_total": str(item.line_total) if item.line_total is not None else "",
"tax_amount": str(item.tax_amount) if item.tax_amount is not None else "",
"category": item.category or "",
"notes": item.notes or "",
"raw_json": item.raw_json or {},
}
)
raw_ocr_version = None
reviewed_ocr_version = None
current_ocr_version = None
for tv in sorted(getattr(document, "text_versions", []), key=lambda x: (x.version_number, x.created_at), reverse=True):
if tv.is_current and current_ocr_version is None:
current_ocr_version = tv
if tv.version_type == "reviewed" and reviewed_ocr_version is None:
reviewed_ocr_version = tv
if tv.version_type == "raw_ocr" and raw_ocr_version is None:
raw_ocr_version = tv
naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None
proposed_storage_path = ""
if naming_row is not None:
try:
proposed_storage_path = str(
Path(
build_proposed_storage_path(
document=document,
save_root=get_default_save_root(),
naming_row=naming_row,
)
).with_name(
re.sub(
r"(?:_v\d+|_\d+)(?=\.[^.]+$)",
"",
Path(
build_proposed_storage_path(
document=document,
save_root=get_default_save_root(),
naming_row=naming_row,
)
).name,
)
)
)
except Exception:
proposed_storage_path = ""
payload = {
"schema_version": review_state.schema_version or "v1",
"document": {
"document_id": document.document_id,
"document_type": document.document_type or "",
"original_filename": document.original_filename or "",
"canonical_filename": document.canonical_filename or "",
"mime_type": document.mime_type or "",
"source_path": document.source_path or "",
"current_path": document.current_path or "",
"share_path": document.share_path or "",
"created_at": document.created_at.isoformat() if document.created_at else "",
"updated_at": document.updated_at.isoformat() if document.updated_at else "",
},
"review": {
"reviewed_at": review_state.reviewed_at.isoformat() if review_state.reviewed_at else "",
"is_approved": bool(review_state.is_approved),
"is_excluded": bool(review_state.is_excluded),
},
"ocr": {
"current_text": _get_current_ocr_text_for_document_export(document),
"raw_text": raw_ocr_version.text_content if raw_ocr_version and raw_ocr_version.text_content else "",
"reviewed_text": reviewed_ocr_version.text_content if reviewed_ocr_version and reviewed_ocr_version.text_content else "",
"current_version_number": current_ocr_version.version_number if current_ocr_version else None,
"current_version_type": current_ocr_version.version_type if current_ocr_version else "",
"raw_version_number": raw_ocr_version.version_number if raw_ocr_version else None,
"reviewed_version_number": reviewed_ocr_version.version_number if reviewed_ocr_version else None,
"quality_score": str(current_ocr_version.quality_score) if current_ocr_version and current_ocr_version.quality_score is not None else "",
"quality_flags": current_ocr_version.quality_flags if current_ocr_version and current_ocr_version.quality_flags else [],
"quality_note": current_ocr_version.quality_note if current_ocr_version and current_ocr_version.quality_note else "",
"ocr_engine": current_ocr_version.ocr_engine if current_ocr_version else "",
"ocr_engine_version": current_ocr_version.ocr_engine_version if current_ocr_version else "",
"rerun_source": current_ocr_version.rerun_source if current_ocr_version else "",
},
"ocr_text": _get_current_ocr_text_for_document_export(document),
"naming_fields": {
"naming_entity": naming_row.naming_entity if naming_row else "",
"naming_account_last4": naming_row.naming_account_last4 if naming_row else "",
"naming_type": naming_row.naming_type if naming_row else "",
"naming_date": naming_row.naming_date.isoformat() if naming_row and naming_row.naming_date else "",
"naming_date_precision": naming_row.naming_date_precision if naming_row else "",
"naming_description": naming_row.naming_description if naming_row else "",
"naming_reference_number": naming_row.naming_reference_number if naming_row else "",
"naming_variant": naming_row.naming_variant if naming_row else "",
"naming_schema_version": naming_row.naming_schema_version if naming_row else "",
"naming_locked": bool(naming_row.naming_locked) if naming_row else False,
"proposed_storage_path": proposed_storage_path,
},
"extracted_fields": {
"merchant_raw": extracted.merchant_raw if extracted else "",
"merchant_normalized": extracted.merchant_normalized if extracted else "",
"transaction_date": extracted.transaction_date.isoformat() if extracted and extracted.transaction_date else "",
"transaction_time": extracted.transaction_time if extracted else "",
"subtotal": str(extracted.subtotal) if extracted and extracted.subtotal is not None else "",
"tax": str(extracted.tax) if extracted and extracted.tax is not None else "",
"total": str(extracted.total) if extracted and extracted.total is not None else "",
"currency": extracted.currency if extracted else "",
"payment_method": extracted.payment_method if extracted else "",
"receipt_number": extracted.receipt_number if extracted else "",
"location": extracted.location if extracted else "",
"counterparty": extracted.counterparty if extracted else "",
"extra_json": extracted.extra_json if extracted and extracted.extra_json else {},
},
"additional_fields": {
"owner_primary": additional.owner_primary if additional else "",
"owner_secondary": additional.owner_secondary if additional else "",
"paid_by_person": additional.paid_by_person if additional else "",
"occasion_note": additional.occasion_note if additional else "",
"is_shared_expense": bool(additional.is_shared_expense) if additional else False,
"covered_people": additional.covered_people if additional else "",
"attendees": additional.attendees if additional else "",
"reimbursement_expected_from": additional.reimbursement_expected_from if additional else "",
"reimbursement_paid_by": additional.reimbursement_paid_by if additional else "",
"reimbursement_paid_to": additional.reimbursement_paid_to if additional else "",
"reimbursement_paid_amount": str(additional.reimbursement_paid_amount) if additional and additional.reimbursement_paid_amount is not None else "",
"reimbursement_paid_date": additional.reimbursement_paid_date.isoformat() if additional and additional.reimbursement_paid_date else "",
"reimbursement_note": additional.reimbursement_note if additional else "",
},
"line_items": line_items,
}
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
return FileResponse(
path=str(out_path),
media_type="application/json",
filename=out_path.name,
)
@router.get("/export/reviewed.jsonl")
def export_reviewed_jsonl(db: Session = Depends(get_db)):
docs = (
db.query(Document)
.options(
selectinload(Document.text_versions),
selectinload(Document.naming_fields),
selectinload(Document.extracted_fields),
selectinload(Document.additional_fields),
selectinload(Document.versions),
)
.filter(Document.review_status == "reviewed")
.order_by(Document.updated_at.asc())
.all()
)
export_dir = Path("/mnt/storage/document-processor/exports")
export_dir.mkdir(parents=True, exist_ok=True)
out_path = export_dir / "reviewed_documents.jsonl"
with out_path.open("w", encoding="utf-8") as f:
for document in docs:
payload = _document_export_payload(document)
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
return FileResponse(
path=str(out_path),
media_type="application/json",
filename=out_path.name,
)
def _restore_ocr_to_original(db: Session, document: Document) -> bool:
target = (
db.query(TextVersion)
.filter(
TextVersion.document_id == document.id,
TextVersion.version_number == 1,
)
.first()
)
if target is None:
return False
all_versions = (
db.query(TextVersion)
.filter(TextVersion.document_id == document.id)
.all()
)
for tv in all_versions:
tv.is_current = (tv.id == target.id)
document.review_status = "reviewed" if target.version_type == "reviewed" else "pending"
db.add(document)
return True
def _restore_ocr_from_version_number(db: Session, document: Document, target_version_number: int) -> bool:
target = (
db.query(TextVersion)
.filter(
TextVersion.document_id == document.id,
TextVersion.version_number == target_version_number,
)
.first()
)
if target is None:
return False
all_versions = (
db.query(TextVersion)
.filter(TextVersion.document_id == document.id)
.all()
)
for tv in all_versions:
tv.is_current = (tv.id == target.id)
document.review_status = "reviewed" if target.version_type == "reviewed" else "pending"
db.add(document)
return True
def _get_current_extracted_version_number(document: Document) -> int | None:
row = get_current_extracted_fields(document)
versions = getattr(document, "extracted_field_versions", None) or []
if row is None:
return None
for v in sorted(versions, key=lambda x: x.version_number, reverse=True):
if (
row.merchant_raw == v.merchant_raw
and row.merchant_normalized == v.merchant_normalized
and row.transaction_date == v.transaction_date
and row.transaction_time == v.transaction_time
and row.subtotal == v.subtotal
and row.tax == v.tax
and row.total == v.total
and row.currency == v.currency
and row.payment_method == v.payment_method
and row.receipt_number == v.receipt_number
and row.location == v.location
and row.counterparty == v.counterparty
and row.extra_json == v.extra_json
):
return v.version_number
return None
def _get_current_additional_version_number(document: Document) -> int | None:
row = _get_current_additional_fields(document)
versions = getattr(document, "additional_field_versions", None) or []
if row is None:
return None
for v in sorted(versions, key=lambda x: x.version_number, reverse=True):
if (
row.owner_primary == v.owner_primary
and row.owner_secondary == v.owner_secondary
and row.paid_by_person == v.paid_by_person
and row.occasion_note == v.occasion_note
and row.is_shared_expense == v.is_shared_expense
and row.covered_people == v.covered_people
and row.attendees == v.attendees
and row.reimbursement_expected_from == v.reimbursement_expected_from
and row.reimbursement_paid_by == v.reimbursement_paid_by
and row.reimbursement_paid_to == v.reimbursement_paid_to
and row.reimbursement_paid_amount == v.reimbursement_paid_amount
and row.reimbursement_paid_date == v.reimbursement_paid_date
and row.reimbursement_note == v.reimbursement_note
):
return v.version_number
return None
def _clear_line_items(db: Session, document: Document) -> bool:
if not document.line_item_set:
return False
had_items = bool(document.line_item_set.items)
document.line_item_set.items.clear()
db.flush()
return had_items
def _restore_line_items_from_version_number(db: Session, document: Document, target_version_number: int) -> bool:
version = (
db.query(DocumentLineItemSetVersion)
.options(selectinload(DocumentLineItemSetVersion.items))
.filter(
DocumentLineItemSetVersion.document_id == document.id,
DocumentLineItemSetVersion.version_number == target_version_number,
)
.first()
)
if version is None:
return False
if document.line_item_set is None:
document.line_item_set = DocumentLineItemSet(
document_id=document.id,
schema_type=version.schema_type or document.document_type or "generic",
)
db.add(document.line_item_set)
db.flush()
document.line_item_set.schema_type = version.schema_type or document.document_type or "generic"
document.line_item_set.items.clear()
db.flush()
for vi in sorted(version.items, key=lambda x: x.line_number):
db.add(DocumentLineItem(
line_item_set_id=document.line_item_set.id,
line_number=vi.line_number,
entry_date=vi.entry_date,
description=vi.description,
quantity=vi.quantity,
unit_price=vi.unit_price,
line_total=vi.line_total,
tax_amount=vi.tax_amount,
category=vi.category,
notes=vi.notes,
raw_json=vi.raw_json,
))
return True
def _parse_restore_choice(value: str) -> tuple[str, int | None]:
if not value or value == "none":
return ("none", None)
if value == "original":
return ("original", None)
if value.startswith("version:"):
try:
return ("version", int(value.split(":", 1)[1]))
except ValueError:
return ("none", None)
return ("none", None)
@router.post("/{document_id}/source-options", response_class=RedirectResponse)
def apply_source_options(
document_id: str,
file_action: str = Form("none"),
ocr_restore_choice: str = Form("none"),
extracted_restore_choice: str = Form("none"),
additional_restore_choice: str = Form("none"),
line_item_restore_choice: str = Form("none"),
db: Session = Depends(get_db),
):
document = (
db.query(Document)
.options(
selectinload(Document.text_versions),
selectinload(Document.naming_fields),
selectinload(Document.extracted_fields),
selectinload(Document.additional_fields),
selectinload(Document.versions),
selectinload(Document.extracted_field_versions),
selectinload(Document.additional_field_versions),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
try:
changed = False
if file_action == "revert_original":
original_path = document.original_path or document.source_path
if original_path:
original_file = Path(original_path)
if original_file.exists():
document.current_path = str(original_file)
document.canonical_filename = original_file.name
document.sha256_current = _sha256_for_file(original_file)
db.add(document)
changed = True
elif file_action == "revert_current_version":
latest_version = (
db.query(DocumentVersion)
.filter(DocumentVersion.document_id == document.id)
.order_by(DocumentVersion.version_number.desc())
.first()
)
if latest_version and latest_version.file_path:
version_file = Path(latest_version.file_path)
if version_file.exists():
document.current_path = str(version_file)
document.canonical_filename = version_file.name
document.sha256_current = _sha256_for_file(version_file)
db.add(document)
changed = True
ocr_mode, ocr_version = _parse_restore_choice(ocr_restore_choice)
print("PARSED_OCR", ocr_restore_choice, ocr_mode, ocr_version, flush=True)
if ocr_mode == "original":
if _restore_ocr_to_original(db, document):
changed = True
elif ocr_mode == "version" and ocr_version is not None:
if _restore_ocr_from_version_number(db, document, ocr_version):
changed = True
extracted_mode, extracted_version = _parse_restore_choice(extracted_restore_choice)
print("PARSED_EXTRACTED", extracted_restore_choice, extracted_mode, extracted_version, flush=True)
if extracted_mode == "original":
if _restore_extracted_to_original(db, document):
changed = True
elif extracted_mode == "version" and extracted_version is not None:
if _restore_extracted_from_version_number(db, document, extracted_version):
changed = True
additional_mode, additional_version = _parse_restore_choice(additional_restore_choice)
print("PARSED_ADDITIONAL", additional_restore_choice, additional_mode, additional_version, flush=True)
if additional_mode == "original":
if _restore_additional_to_original(db, document):
changed = True
elif additional_mode == "version" and additional_version is not None:
if _restore_additional_from_version_number(db, document, additional_version):
changed = True
if line_item_restore_choice == "clear":
if _clear_line_items(db, document):
changed = True
elif line_item_restore_choice.startswith("version:"):
try:
target_line_item_version = int(line_item_restore_choice.split(":", 1)[1])
except ValueError:
target_line_item_version = None
if target_line_item_version is not None:
if _restore_line_items_from_version_number(db, document, target_line_item_version):
changed = True
if changed:
db.commit()
else:
db.rollback()
except Exception as e:
print("source-options failed:", repr(e), flush=True)
traceback.print_exc()
db.rollback()
return RedirectResponse(
url=f"/documents/{document.document_id}?error=source_options_failed&tab=source-options",
status_code=303,
)
return RedirectResponse(
url=f"/documents/{document.document_id}?tab=source-options",
status_code=303,
)
# --- diagnostic DOCX export/view routes start ---
@router.post("/{document_id}/export-diagnostic-docx")
async def export_diagnostic_docx(document_id: str, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return HTMLResponse(content="Document not found", status_code=404)
current_text_version = (
db.query(TextVersion)
.filter(TextVersion.document_id == document.id)
.filter(TextVersion.is_current == True)
.order_by(TextVersion.version_number.desc())
.first()
)
if current_text_version is None:
return RedirectResponse(
url=f"/documents/{document_id}?tab=ocr-review&error=docx_no_current_text",
status_code=303,
)
layout_json = current_text_version.layout_json if isinstance(current_text_version.layout_json, dict) else {}
pages = layout_json.get("pages") or []
out_dir = Path("/mnt/storage/document-processor/diagnostics/docx")
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{document.document_id}_pdf2docx.docx"
docx = DocxDocument()
section = docx.sections[0]
section.top_margin = Inches(0.4)
section.bottom_margin = Inches(0.4)
section.left_margin = Inches(0.4)
section.right_margin = Inches(0.4)
style = docx.styles["Normal"]
style.font.name = "Courier New"
style.font.size = Pt(8)
wrote_anything = False
def normalize_bbox(bbox):
x1, y1, x2, y2 = [float(v) for v in bbox]
return [min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2)]
for page_idx, page in enumerate(pages):
if page_idx:
docx.add_page_break()
lines = page.get("lines") or []
if not lines and page.get("words"):
words = []
for word in page.get("words") or []:
text = (word.get("text") or "").strip()
bbox = word.get("bbox")
if not text or not bbox or len(bbox) != 4:
continue
words.append({"text": text, "bbox": normalize_bbox(bbox)})
words.sort(key=lambda w: (w["bbox"][1], w["bbox"][0]))
grouped = []
for word in words:
cy = (word["bbox"][1] + word["bbox"][3]) / 2
placed = False
for group in grouped:
if abs(cy - group["cy"]) <= 8:
group["words"].append(word)
group["cy"] = sum((w["bbox"][1] + w["bbox"][3]) / 2 for w in group["words"]) / len(group["words"])
placed = True
break
if not placed:
grouped.append({"cy": cy, "words": [word]})
lines = []
for group in grouped:
group["words"].sort(key=lambda w: w["bbox"][0])
lines.append({
"text": " ".join(w["text"] for w in group["words"]),
"bbox": [
min(w["bbox"][0] for w in group["words"]),
min(w["bbox"][1] for w in group["words"]),
max(w["bbox"][2] for w in group["words"]),
max(w["bbox"][3] for w in group["words"]),
],
})
lines.sort(key=lambda line: normalize_bbox(line.get("bbox") or [0,0,0,0])[1])
for line in lines:
line_text = (line.get("text") or "").strip()
if not line_text:
continue
pgh = docx.add_paragraph()
pgh.paragraph_format.space_after = Pt(0)
pgh.paragraph_format.line_spacing = 1.0
run = pgh.add_run(line_text)
run.font.name = "Courier New"
run.font.size = Pt(float(line.get("font_size_guess") or 8))
wrote_anything = True
if not wrote_anything:
fallback_text = current_text_version.text_content or ""
for line in fallback_text.splitlines():
pgh = docx.add_paragraph()
pgh.paragraph_format.space_after = Pt(0)
run = pgh.add_run(line)
run.font.name = "Courier New"
run.font.size = Pt(8)
docx.save(out_path)
return RedirectResponse(
url=f"/documents/{document_id}?tab=ocr-review&viewer_source=docx&success=diagnostic_docx_saved",
status_code=303,
)
@router.get("/{document_id}/diagnostic-docx-download")
async def diagnostic_docx_download(document_id: str, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return HTMLResponse(content="Document not found", status_code=404)
path = Path("/mnt/storage/document-processor/diagnostics/docx") / f"{document.document_id}_pdf2docx.docx"
if not path.exists():
return HTMLResponse(content="Diagnostic DOCX not found. Export it first.", status_code=404)
return FileResponse(
path=str(path),
filename=path.name,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
@router.get("/{document_id}/diagnostic-docx-html", response_class=HTMLResponse)
async def diagnostic_docx_html(document_id: str, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return HTMLResponse(content="Document not found", status_code=404)
docx_path = Path("/mnt/storage/document-processor/diagnostics/docx") / f"{document.document_id}_pdf2docx.docx"
if not docx_path.exists():
return HTMLResponse(
content="""
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body {
font-family: system-ui, sans-serif;
padding: 1rem;
color: #1f2937;
background: #f8fafc;
}
.missing {
max-width: 42rem;
margin: 2rem auto;
background: white;
border: 1px solid #e5e7eb;
border-radius: 0.75rem;
padding: 1rem;
}
</style>
</head>
<body>
<div class="missing">
<p>Diagnostic DOCX not found. Use <b>Export Diagnostic DOCX</b> first.</p>
</div>
</body>
</html>
""",
status_code=404,
)
with open(docx_path, "rb") as f:
result = mammoth.convert_to_html(f)
html = result.value or ""
return HTMLResponse(content=f"""
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
html, body {{
margin: 0;
padding: 0;
background: #2b2b31;
color: #111827;
font-family: Arial, Helvetica, sans-serif;
}}
.docx-viewer-shell {{
min-height: 100vh;
overflow: auto;
padding: 1rem;
box-sizing: border-box;
}}
.docx-page {{
background: white;
color: #111827;
width: 8.5in;
min-height: 11in;
margin: 0 auto;
padding: 0.5in;
box-sizing: border-box;
transform-origin: top left;
box-shadow: 0 0 0.25rem rgba(0,0,0,0.35);
}}
.docx-page * {{
max-width: 100%;
box-sizing: border-box;
}}
.docx-page p {{
margin: 0 0 0.35rem 0;
line-height: 1.15;
}}
.docx-page table {{
border-collapse: collapse;
max-width: 100%;
}}
.docx-page td,
.docx-page th {{
vertical-align: top;
padding: 0.1rem 0.25rem;
}}
.docx-toolbar {{
position: sticky;
top: 0;
z-index: 10;
display: flex;
gap: 0.5rem;
align-items: center;
padding: 0.5rem;
margin: -1rem -1rem 1rem -1rem;
background: #23232a;
color: white;
border-bottom: 1px solid rgba(255,255,255,0.12);
}}
.docx-toolbar button {{
border: 1px solid rgba(255,255,255,0.25);
background: #111827;
color: white;
border-radius: 999px;
padding: 0.35rem 0.7rem;
font-size: 0.9rem;
}}
.docx-toolbar span {{
font-size: 0.9rem;
opacity: 0.85;
}}
@media (max-width: 900px) {{
.docx-viewer-shell {{
padding: 0.5rem;
}}
.docx-toolbar {{
margin: -0.5rem -0.5rem 0.75rem -0.5rem;
}}
.docx-page {{
width: 8.5in;
min-height: 11in;
padding: 0.35in;
}}
}}
</style>
</head>
<body>
<div class="docx-viewer-shell">
<div class="docx-toolbar">
<button type="button" onclick="setZoom(-0.1)"></button>
<button type="button" onclick="fitWidth()">Fit</button>
<button type="button" onclick="setZoom(0.1)">+</button>
<span id="zoom-label">Fit width</span>
</div>
<div id="docx-page" class="docx-page">
{html}
</div>
</div>
<script>
let zoom = 1;
function applyZoom() {{
const page = document.getElementById("docx-page");
const label = document.getElementById("zoom-label");
if (!page) return;
page.style.transform = "scale(" + zoom + ")";
page.style.marginBottom = ((page.offsetHeight * zoom) - page.offsetHeight + 24) + "px";
if (label) label.textContent = Math.round(zoom * 100) + "%";
}}
function fitWidth() {{
const shell = document.querySelector(".docx-viewer-shell");
const page = document.getElementById("docx-page");
if (!shell || !page) return;
const available = shell.clientWidth - 24;
const pageWidth = page.offsetWidth || 816;
zoom = Math.max(0.25, Math.min(1.5, available / pageWidth));
applyZoom();
}}
function setZoom(delta) {{
zoom = Math.max(0.25, Math.min(2.0, zoom + delta));
applyZoom();
}}
window.addEventListener("resize", fitWidth);
window.addEventListener("load", fitWidth);
setTimeout(fitWidth, 100);
</script>
</body>
</html>
""")