2117 lines
78 KiB
Python
2117 lines
78 KiB
Python
from copy import deepcopy
|
|
from datetime import datetime
|
|
from decimal import Decimal, InvalidOperation
|
|
import re
|
|
import traceback
|
|
import os
|
|
import hashlib
|
|
import json
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, Depends, Form, Query, Request
|
|
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import distinct
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session, selectinload
|
|
from pypdf import PdfReader
|
|
|
|
from app.core.storage_settings import get_default_save_root
|
|
from app.db.deps import get_db
|
|
from app.logic.document_outputs import (
|
|
create_field_enriched_pdf_version,
|
|
create_ocr_corrected_pdf_version,
|
|
)
|
|
from app.logic.storage_paths import build_proposed_storage_path
|
|
from app.logic.extraction import (
|
|
auto_extract_from_document,
|
|
get_current_extracted_fields,
|
|
save_extracted_fields,
|
|
_extract_receipt_line_items,
|
|
_get_current_reviewed_text,
|
|
_get_document_lines,
|
|
_replace_document_line_items,
|
|
)
|
|
from app.logic.ingest import compute_quality_score, rerun_ocr_for_document
|
|
from app.models.document import Document
|
|
from app.models.document_line_item import DocumentLineItem
|
|
from app.models.document_line_item_set import DocumentLineItemSet
|
|
from app.models.document_line_item_set_version import DocumentLineItemSetVersion
|
|
from app.models.document_line_item_version_item import DocumentLineItemVersionItem
|
|
from app.models.document_additional_field import DocumentAdditionalField
|
|
from app.models.document_additional_field_version import DocumentAdditionalFieldVersion
|
|
from app.models.extracted_field_version import ExtractedFieldVersion
|
|
from app.models.document_preset import DocumentPreset
|
|
from app.models.document_version import DocumentVersion
|
|
from app.models.text_version import TextVersion
|
|
from app.models.document_review_state import DocumentReviewState
|
|
from app.models.extracted_field import ExtractedField
|
|
from app.models.document_additional_field import DocumentAdditionalField
|
|
from app.models.text_version import TextVersion
|
|
from app.utils.filesize import human_size
|
|
|
|
router = APIRouter(prefix="/documents", tags=["documents"])
|
|
|
|
|
|
def _get_or_create_document_review_state(db: Session, document: Document) -> DocumentReviewState:
|
|
state = (
|
|
db.query(DocumentReviewState)
|
|
.filter(DocumentReviewState.document_id == document.id)
|
|
.first()
|
|
)
|
|
if state is None:
|
|
state = DocumentReviewState(document_id=document.id)
|
|
db.add(state)
|
|
db.flush()
|
|
return state
|
|
|
|
|
|
|
|
|
|
def _storage_available() -> bool:
|
|
candidate_roots = [
|
|
Path("/mnt/storage"),
|
|
Path("/mnt/svr-01/storage"),
|
|
]
|
|
try:
|
|
for root in candidate_roots:
|
|
if root.exists() and root.is_dir() and os.access(root, os.R_OK | os.X_OK):
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
|
|
def _sha256_for_file(path_obj: Path) -> str:
|
|
hasher = hashlib.sha256()
|
|
with path_obj.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def _version_file_available(version, expected_document_id: str) -> bool:
|
|
file_path = getattr(version, "file_path", None)
|
|
if not file_path:
|
|
return False
|
|
|
|
try:
|
|
path_obj = Path(file_path)
|
|
if not path_obj.exists() or not path_obj.is_file():
|
|
return False
|
|
|
|
reader = PdfReader(str(path_obj))
|
|
meta = reader.metadata or {}
|
|
|
|
if str(meta.get("/DocumentID", "")).strip() != str(expected_document_id):
|
|
return False
|
|
if str(meta.get("/VersionNumber", "")).strip() != str(version.version_number):
|
|
return False
|
|
if str(meta.get("/VersionType", "")).strip() != str(version.version_type):
|
|
return False
|
|
|
|
expected_sha = getattr(version, "sha256", None)
|
|
if expected_sha:
|
|
actual_sha = _sha256_for_file(path_obj)
|
|
if actual_sha != expected_sha:
|
|
return False
|
|
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _json_safe(value):
|
|
if isinstance(value, Decimal):
|
|
return float(value)
|
|
if hasattr(value, "isoformat"):
|
|
return value.isoformat()
|
|
return value
|
|
|
|
|
|
def _serialize_model_row(row, fields: list[str]) -> dict:
|
|
if not row:
|
|
return {}
|
|
data = {}
|
|
for field in fields:
|
|
value = getattr(row, field, None)
|
|
data[field] = _json_safe(value)
|
|
return data
|
|
|
|
|
|
def _document_export_payload(document) -> dict:
|
|
raw_ocr, reviewed_ocr = _get_current_text_versions(document)
|
|
extracted = get_current_extracted_fields(document)
|
|
additional = _get_current_additional_fields(document)
|
|
|
|
versions = []
|
|
for version in sorted(getattr(document, "versions", []), key=lambda v: v.version_number):
|
|
created_at = getattr(version, "created_at", None)
|
|
versions.append({
|
|
"version_number": _json_safe(version.version_number),
|
|
"version_type": _json_safe(version.version_type),
|
|
"file_path": _json_safe(version.file_path),
|
|
"sha256": _json_safe(version.sha256),
|
|
"created_by": _json_safe(version.created_by),
|
|
"notes": _json_safe(version.notes),
|
|
"created_at": _json_safe(created_at),
|
|
})
|
|
|
|
return {
|
|
"document_id": document.document_id,
|
|
"document_type": document.document_type,
|
|
"review_status": document.review_status,
|
|
"source_path": document.source_path,
|
|
"original_path": document.original_path,
|
|
"current_path": document.current_path,
|
|
"share_path": document.share_path,
|
|
"original_filename": document.original_filename,
|
|
"canonical_filename": document.canonical_filename,
|
|
"mime_type": document.mime_type,
|
|
"file_size": _json_safe(document.file_size),
|
|
"page_count": _json_safe(document.page_count),
|
|
"sha256_original": _json_safe(document.sha256_original),
|
|
"sha256_current": _json_safe(document.sha256_current),
|
|
"raw_ocr_text": _json_safe(raw_ocr.text_content if raw_ocr else None),
|
|
"reviewed_ocr_text": _json_safe(reviewed_ocr.text_content if reviewed_ocr else None),
|
|
"ocr_quality_score": _json_safe(raw_ocr.quality_score if raw_ocr else None),
|
|
"quality_flags": raw_ocr.quality_flags if raw_ocr and raw_ocr.quality_flags else [],
|
|
"quality_note": _json_safe(raw_ocr.quality_note if raw_ocr else None),
|
|
"extracted_fields": _serialize_model_row(extracted, [
|
|
"merchant_raw",
|
|
"merchant_normalized",
|
|
"transaction_date",
|
|
"transaction_time",
|
|
"subtotal",
|
|
"tax",
|
|
"total",
|
|
"currency",
|
|
"payment_method",
|
|
"receipt_number",
|
|
"location",
|
|
"counterparty",
|
|
]),
|
|
"additional_fields": _serialize_model_row(additional, [
|
|
"owner_primary",
|
|
"owner_secondary",
|
|
"paid_by_person",
|
|
"occasion_note",
|
|
"is_shared_expense",
|
|
"covered_people",
|
|
"attendees",
|
|
"reimbursement_expected_from",
|
|
"reimbursement_paid_by",
|
|
"reimbursement_paid_to",
|
|
"reimbursement_paid_amount",
|
|
"reimbursement_paid_date",
|
|
"reimbursement_note",
|
|
]),
|
|
"versions": versions,
|
|
}
|
|
|
|
|
|
|
|
def _latest_raw_ocr(document):
|
|
rows = [tv for tv in getattr(document, "text_versions", []) if getattr(tv, "version_type", None) == "raw_ocr"]
|
|
rows.sort(key=lambda x: x.version_number)
|
|
return rows[-1] if rows else None
|
|
|
|
|
|
def _clear_current_extracted(db: Session, document: Document) -> None:
|
|
db.query(ExtractedField).filter(
|
|
ExtractedField.document_id == document.id
|
|
).delete(synchronize_session=False)
|
|
|
|
|
|
def _clear_current_additional(db: Session, document: Document) -> None:
|
|
db.query(DocumentAdditionalField).filter(
|
|
DocumentAdditionalField.document_id == document.id
|
|
).delete(synchronize_session=False)
|
|
|
|
|
|
def _reset_ocr_to_raw(db: Session, document: Document) -> None:
|
|
db.query(TextVersion).filter(
|
|
TextVersion.document_id == document.id
|
|
).delete(synchronize_session=False)
|
|
document.review_status = "pending"
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent.parent
|
|
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
|
|
templates.env.globals["human_size"] = human_size
|
|
|
|
|
|
def _next_extracted_field_version_number(db: Session, document_id: int) -> int:
|
|
return (db.query(func.max(ExtractedFieldVersion.version_number))
|
|
.filter(ExtractedFieldVersion.document_id == document_id)
|
|
.scalar() or 0) + 1
|
|
|
|
|
|
def _next_additional_field_version_number(db: Session, document_id: int) -> int:
|
|
return (db.query(func.max(DocumentAdditionalFieldVersion.version_number))
|
|
.filter(DocumentAdditionalFieldVersion.document_id == document_id)
|
|
.scalar() or 0) + 1
|
|
|
|
|
|
def _snapshot_extracted_field(db: Session, document: Document, row, created_by: str, notes: str | None = None) -> None:
|
|
version = ExtractedFieldVersion(
|
|
document_id=document.id,
|
|
version_number=_next_extracted_field_version_number(db, document.id),
|
|
merchant_raw=row.merchant_raw,
|
|
merchant_normalized=row.merchant_normalized,
|
|
transaction_date=row.transaction_date,
|
|
transaction_time=row.transaction_time,
|
|
subtotal=row.subtotal,
|
|
tax=row.tax,
|
|
total=row.total,
|
|
currency=row.currency,
|
|
payment_method=row.payment_method,
|
|
receipt_number=row.receipt_number,
|
|
location=row.location,
|
|
counterparty=row.counterparty,
|
|
extra_json=row.extra_json,
|
|
created_by=created_by,
|
|
notes=notes,
|
|
)
|
|
db.add(version)
|
|
|
|
|
|
|
|
|
|
# =========================
|
|
# RESTORE HELPERS (NO SNAPSHOT)
|
|
# =========================
|
|
|
|
def _restore_extracted_to_original(db: Session, document: Document) -> bool:
|
|
return _restore_extracted_from_version_number(db, document, 1)
|
|
|
|
|
|
def _restore_extracted_from_version_number(db: Session, document: Document, target_version_number: int) -> bool:
|
|
version = (
|
|
db.query(ExtractedFieldVersion)
|
|
.filter(
|
|
ExtractedFieldVersion.document_id == document.id,
|
|
ExtractedFieldVersion.version_number == target_version_number,
|
|
)
|
|
.first()
|
|
)
|
|
if not version:
|
|
return False
|
|
|
|
row = (
|
|
db.query(ExtractedField)
|
|
.filter(ExtractedField.document_id == document.id)
|
|
.first()
|
|
)
|
|
if not row:
|
|
return False
|
|
|
|
# overwrite live row (NO NEW VERSION)
|
|
row.merchant_raw = version.merchant_raw
|
|
row.merchant_normalized = version.merchant_normalized
|
|
row.transaction_date = version.transaction_date
|
|
row.transaction_time = version.transaction_time
|
|
row.subtotal = version.subtotal
|
|
row.tax = version.tax
|
|
row.total = version.total
|
|
row.currency = version.currency
|
|
row.payment_method = version.payment_method
|
|
row.receipt_number = version.receipt_number
|
|
row.location = version.location
|
|
row.counterparty = version.counterparty
|
|
row.extra_json = version.extra_json
|
|
|
|
db.add(row)
|
|
return True
|
|
|
|
row = (
|
|
db.query(ExtractedField)
|
|
.filter(ExtractedField.document_id == document.id)
|
|
.first()
|
|
)
|
|
if row is None:
|
|
row = ExtractedField(document_id=document.id)
|
|
db.add(row)
|
|
|
|
row.merchant_raw = target.merchant_raw
|
|
row.merchant_normalized = target.merchant_normalized
|
|
row.transaction_date = target.transaction_date
|
|
row.transaction_time = target.transaction_time
|
|
row.subtotal = target.subtotal
|
|
row.tax = target.tax
|
|
row.total = target.total
|
|
row.currency = target.currency
|
|
row.payment_method = target.payment_method
|
|
row.receipt_number = target.receipt_number
|
|
row.location = target.location
|
|
row.counterparty = target.counterparty
|
|
row.extra_json = target.extra_json
|
|
|
|
db.add(row)
|
|
return True
|
|
|
|
|
|
def _restore_additional_to_original(db: Session, document: Document) -> bool:
|
|
return _restore_additional_from_version_number(db, document, 1)
|
|
|
|
|
|
def _restore_additional_from_version_number(db: Session, document: Document, target_version_number: int) -> bool:
|
|
version = (
|
|
db.query(DocumentAdditionalFieldVersion)
|
|
.filter(
|
|
DocumentAdditionalFieldVersion.document_id == document.id,
|
|
DocumentAdditionalFieldVersion.version_number == target_version_number,
|
|
)
|
|
.first()
|
|
)
|
|
if not version:
|
|
return False
|
|
|
|
row = (
|
|
db.query(DocumentAdditionalField)
|
|
.filter(DocumentAdditionalField.document_id == document.id)
|
|
.first()
|
|
)
|
|
if not row:
|
|
return False
|
|
|
|
# overwrite live row (NO NEW VERSION)
|
|
row.owner_primary = version.owner_primary
|
|
row.owner_secondary = version.owner_secondary
|
|
row.paid_by_person = version.paid_by_person
|
|
row.occasion_note = version.occasion_note
|
|
row.is_shared_expense = version.is_shared_expense
|
|
row.covered_people = version.covered_people
|
|
row.attendees = version.attendees
|
|
row.reimbursement_expected_from = version.reimbursement_expected_from
|
|
row.reimbursement_paid_by = version.reimbursement_paid_by
|
|
row.reimbursement_paid_to = version.reimbursement_paid_to
|
|
row.reimbursement_paid_amount = version.reimbursement_paid_amount
|
|
row.reimbursement_paid_date = version.reimbursement_paid_date
|
|
row.reimbursement_note = version.reimbursement_note
|
|
|
|
db.add(row)
|
|
return True
|
|
|
|
row = (
|
|
db.query(DocumentAdditionalField)
|
|
.filter(DocumentAdditionalField.document_id == document.id)
|
|
.first()
|
|
)
|
|
if row is None:
|
|
row = DocumentAdditionalField(document_id=document.id)
|
|
db.add(row)
|
|
|
|
row.owner_primary = target.owner_primary
|
|
row.owner_secondary = target.owner_secondary
|
|
row.paid_by_person = target.paid_by_person
|
|
row.occasion_note = target.occasion_note
|
|
row.is_shared_expense = target.is_shared_expense
|
|
row.covered_people = target.covered_people
|
|
row.attendees = target.attendees
|
|
row.reimbursement_expected_from = target.reimbursement_expected_from
|
|
row.reimbursement_paid_by = target.reimbursement_paid_by
|
|
row.reimbursement_paid_to = target.reimbursement_paid_to
|
|
row.reimbursement_paid_amount = target.reimbursement_paid_amount
|
|
row.reimbursement_paid_date = target.reimbursement_paid_date
|
|
row.reimbursement_note = target.reimbursement_note
|
|
|
|
db.add(row)
|
|
return True
|
|
|
|
def _snapshot_additional_field(db: Session, document: Document, row, created_by: str, notes: str | None = None) -> None:
|
|
version = DocumentAdditionalFieldVersion(
|
|
document_id=document.id,
|
|
version_number=_next_additional_field_version_number(db, document.id),
|
|
owner_primary=row.owner_primary,
|
|
owner_secondary=row.owner_secondary,
|
|
paid_by_person=row.paid_by_person,
|
|
occasion_note=row.occasion_note,
|
|
is_shared_expense=row.is_shared_expense,
|
|
covered_people=row.covered_people,
|
|
attendees=row.attendees,
|
|
reimbursement_expected_from=row.reimbursement_expected_from,
|
|
reimbursement_paid_by=row.reimbursement_paid_by,
|
|
reimbursement_paid_to=row.reimbursement_paid_to,
|
|
reimbursement_paid_amount=row.reimbursement_paid_amount,
|
|
reimbursement_paid_date=row.reimbursement_paid_date,
|
|
reimbursement_note=row.reimbursement_note,
|
|
created_by=created_by,
|
|
notes=notes,
|
|
)
|
|
db.add(version)
|
|
|
|
QUALITY_FLAG_OPTIONS = [
|
|
"bad_embedded_text",
|
|
"ocr_garbled",
|
|
"low_text_coverage",
|
|
"missing_lines",
|
|
"bad_line_breaks",
|
|
"low_contrast",
|
|
"blurry",
|
|
"skewed_scan",
|
|
"cropped",
|
|
"shadowed",
|
|
"small_text",
|
|
"thermal_faded",
|
|
"handwriting_present",
|
|
"receipt_damage",
|
|
"manual_rerun_helped",
|
|
"manual_rerun_no_change",
|
|
"major_manual_cleanup",
|
|
"minor_manual_cleanup",
|
|
]
|
|
|
|
|
|
def _parse_people_list(value: str) -> list[str]:
|
|
return [part.strip() for part in value.split(",") if part.strip()]
|
|
|
|
|
|
def _format_people_list(value: list | None) -> str:
|
|
if not value:
|
|
return ""
|
|
return ", ".join(str(x).strip() for x in value if str(x).strip())
|
|
|
|
|
|
def _to_decimal(value: str) -> Decimal | None:
|
|
cleaned = (value or "").strip()
|
|
if not cleaned:
|
|
return None
|
|
try:
|
|
return Decimal(cleaned)
|
|
except (InvalidOperation, TypeError):
|
|
return None
|
|
|
|
|
|
def _get_all_presets(db: Session) -> list[DocumentPreset]:
|
|
return db.query(DocumentPreset).order_by(DocumentPreset.name.asc()).all()
|
|
|
|
|
|
def _get_preset_by_id(db: Session, preset_id: int | None) -> DocumentPreset | None:
|
|
if not preset_id:
|
|
return None
|
|
return db.query(DocumentPreset).filter(DocumentPreset.id == preset_id).first()
|
|
|
|
|
|
def _merge_additional_form_with_preset(values: dict, preset: DocumentPreset | None) -> dict:
|
|
if preset is None:
|
|
return values
|
|
|
|
return {
|
|
"owner_primary": preset.owner_primary if preset.owner_primary is not None else values.get("owner_primary", ""),
|
|
"owner_secondary": preset.owner_secondary if preset.owner_secondary is not None else values.get("owner_secondary", ""),
|
|
"paid_by_person": preset.paid_by_person if preset.paid_by_person is not None else values.get("paid_by_person", ""),
|
|
"covered_people": _format_people_list(preset.covered_people) if preset.covered_people is not None else values.get("covered_people", ""),
|
|
"attendees": _format_people_list(preset.attendees) if preset.attendees is not None else values.get("attendees", ""),
|
|
"occasion_note": preset.occasion_note if preset.occasion_note is not None else values.get("occasion_note", ""),
|
|
"is_shared_expense": bool(preset.is_shared_expense),
|
|
"reimbursement_expected_from": _format_people_list(preset.reimbursement_expected_from) if preset.reimbursement_expected_from is not None else values.get("reimbursement_expected_from", ""),
|
|
"reimbursement_paid_by": preset.reimbursement_paid_by if preset.reimbursement_paid_by is not None else values.get("reimbursement_paid_by", ""),
|
|
"reimbursement_paid_to": preset.reimbursement_paid_to if preset.reimbursement_paid_to is not None else values.get("reimbursement_paid_to", ""),
|
|
"reimbursement_paid_amount": values.get("reimbursement_paid_amount", ""),
|
|
"reimbursement_paid_date": values.get("reimbursement_paid_date", ""),
|
|
"reimbursement_note": preset.reimbursement_note if preset.reimbursement_note is not None else values.get("reimbursement_note", ""),
|
|
}
|
|
|
|
|
|
def _get_current_additional_fields(document: Document) -> DocumentAdditionalField | None:
|
|
rows = list(getattr(document, "additional_fields", []) or [])
|
|
if not rows:
|
|
return None
|
|
return sorted(rows, key=lambda x: x.updated_at or x.created_at, reverse=True)[0]
|
|
|
|
|
|
def _extracted_field_form_values(document: Document, request: Request) -> dict:
|
|
current = get_current_extracted_fields(document)
|
|
auto = request.query_params.get("autofill_extracted")
|
|
|
|
if auto == "1":
|
|
values = auto_extract_from_document(None, document)
|
|
elif current is not None:
|
|
values = {
|
|
"merchant_raw": current.merchant_raw or "",
|
|
"merchant_normalized": current.merchant_normalized or "",
|
|
"transaction_date": current.transaction_date.isoformat() if current.transaction_date else "",
|
|
"transaction_time": current.transaction_time or "",
|
|
"subtotal": str(current.subtotal) if current.subtotal is not None else "",
|
|
"tax": str(current.tax) if current.tax is not None else "",
|
|
"total": str(current.total) if current.total is not None else "",
|
|
"currency": current.currency or "",
|
|
"payment_method": current.payment_method or "",
|
|
"receipt_number": current.receipt_number or "",
|
|
"location": current.location or "",
|
|
"counterparty": current.counterparty or "",
|
|
"extra_json": "{}" if current.extra_json is None else __import__("json").dumps(current.extra_json, indent=2, sort_keys=True),
|
|
}
|
|
else:
|
|
values = {
|
|
"merchant_raw": "",
|
|
"merchant_normalized": "",
|
|
"transaction_date": "",
|
|
"transaction_time": "",
|
|
"subtotal": "",
|
|
"tax": "",
|
|
"total": "",
|
|
"currency": "",
|
|
"payment_method": "",
|
|
"receipt_number": "",
|
|
"location": "",
|
|
"counterparty": "",
|
|
"extra_json": "{}",
|
|
}
|
|
|
|
return values
|
|
|
|
|
|
def _additional_field_form_values(document: Document, preset: DocumentPreset | None = None) -> dict:
|
|
current = _get_current_additional_fields(document)
|
|
if current is None:
|
|
values = {
|
|
"owner_primary": "",
|
|
"owner_secondary": "",
|
|
"paid_by_person": "",
|
|
"covered_people": "",
|
|
"attendees": "",
|
|
"occasion_note": "",
|
|
"is_shared_expense": False,
|
|
"reimbursement_expected_from": "",
|
|
"reimbursement_paid_by": "",
|
|
"reimbursement_paid_to": "",
|
|
"reimbursement_paid_amount": "",
|
|
"reimbursement_paid_date": "",
|
|
"reimbursement_note": "",
|
|
}
|
|
return _merge_additional_form_with_preset(values, preset)
|
|
|
|
values = {
|
|
"owner_primary": current.owner_primary or "",
|
|
"owner_secondary": current.owner_secondary or "",
|
|
"paid_by_person": current.paid_by_person or "",
|
|
"covered_people": _format_people_list(current.covered_people),
|
|
"attendees": _format_people_list(current.attendees),
|
|
"occasion_note": current.occasion_note or "",
|
|
"is_shared_expense": bool(current.is_shared_expense),
|
|
"reimbursement_expected_from": _format_people_list(current.reimbursement_expected_from),
|
|
"reimbursement_paid_by": current.reimbursement_paid_by or "",
|
|
"reimbursement_paid_to": current.reimbursement_paid_to or "",
|
|
"reimbursement_paid_amount": str(current.reimbursement_paid_amount) if current.reimbursement_paid_amount is not None else "",
|
|
"reimbursement_paid_date": current.reimbursement_paid_date.isoformat() if current.reimbursement_paid_date else "",
|
|
"reimbursement_note": current.reimbursement_note or "",
|
|
}
|
|
return _merge_additional_form_with_preset(values, preset)
|
|
|
|
|
|
def _get_current_text_versions(document: Document) -> tuple[TextVersion | None, TextVersion | None]:
|
|
sorted_text_versions = sorted(
|
|
document.text_versions,
|
|
key=lambda x: (x.version_number, x.created_at),
|
|
reverse=True,
|
|
)
|
|
|
|
raw_ocr = next(
|
|
(tv for tv in sorted_text_versions if tv.version_type == "raw_ocr" and tv.is_current),
|
|
None,
|
|
)
|
|
|
|
reviewed_ocr = next(
|
|
(tv for tv in sorted_text_versions if tv.version_type == "reviewed" and tv.is_current),
|
|
None,
|
|
)
|
|
|
|
return raw_ocr, reviewed_ocr
|
|
|
|
|
|
def _extract_line_texts_from_layout(layout_json: dict | None) -> list[str]:
|
|
if not layout_json:
|
|
return []
|
|
|
|
lines: list[str] = []
|
|
for page in layout_json.get("pages", []):
|
|
for line in page.get("lines", []):
|
|
lines.append((line.get("text") or "").strip())
|
|
return lines
|
|
|
|
|
|
def _build_review_text_value(
|
|
raw_ocr: TextVersion | None,
|
|
reviewed_ocr: TextVersion | None,
|
|
editor_source: str = "reviewed",
|
|
) -> str:
|
|
if editor_source == "raw":
|
|
source = raw_ocr or reviewed_ocr
|
|
else:
|
|
source = reviewed_ocr or raw_ocr
|
|
|
|
if source and source.layout_json:
|
|
return "\n".join(_extract_line_texts_from_layout(source.layout_json))
|
|
if source and source.text_content:
|
|
return source.text_content
|
|
return ""
|
|
|
|
|
|
def _line_count_from_layout(layout_json: dict | None) -> int:
|
|
return len(_extract_line_texts_from_layout(layout_json))
|
|
|
|
|
|
def _apply_reviewed_lines_to_layout(base_layout: dict | None, reviewed_text: str) -> dict | None:
|
|
if not base_layout:
|
|
return None
|
|
|
|
reviewed_lines = reviewed_text.splitlines()
|
|
new_layout = deepcopy(base_layout)
|
|
|
|
idx = 0
|
|
for page in new_layout.get("pages", []):
|
|
for line in page.get("lines", []):
|
|
line["text"] = reviewed_lines[idx] if idx < len(reviewed_lines) else ""
|
|
idx += 1
|
|
|
|
return new_layout
|
|
|
|
|
|
|
|
def _get_existing_document_types(db: Session) -> list[str]:
|
|
rows = (
|
|
db.query(distinct(Document.document_type))
|
|
.filter(Document.document_type.isnot(None))
|
|
.order_by(Document.document_type.asc())
|
|
.all()
|
|
)
|
|
values: list[str] = []
|
|
for row in rows:
|
|
value = row[0]
|
|
if value:
|
|
values.append(str(value))
|
|
return values
|
|
|
|
|
|
def _get_queue_navigation(db: Session, document: Document) -> dict:
|
|
active_docs = (
|
|
db.query(Document)
|
|
.filter(Document.is_trashed.is_(False))
|
|
.order_by(Document.created_at.asc())
|
|
.all()
|
|
)
|
|
|
|
doc_ids = [d.document_id for d in active_docs]
|
|
prev_doc = None
|
|
next_doc = None
|
|
|
|
if document.document_id in doc_ids:
|
|
idx = doc_ids.index(document.document_id)
|
|
if idx > 0:
|
|
prev_doc = active_docs[idx - 1]
|
|
if idx < len(active_docs) - 1:
|
|
next_doc = active_docs[idx + 1]
|
|
|
|
needs_ocr = (
|
|
db.query(Document)
|
|
.filter(Document.is_trashed.is_(False))
|
|
.filter(Document.review_status != "reviewed")
|
|
.order_by(Document.created_at.asc())
|
|
.all()
|
|
)
|
|
|
|
reviewed_no_fields = []
|
|
for d in (
|
|
db.query(Document)
|
|
.options(selectinload(Document.extracted_fields))
|
|
.filter(Document.is_trashed.is_(False))
|
|
.filter(Document.review_status == "reviewed")
|
|
.order_by(Document.updated_at.asc())
|
|
.all()
|
|
):
|
|
if not d.extracted_fields:
|
|
reviewed_no_fields.append(d)
|
|
|
|
next_ocr = None
|
|
next_fields = None
|
|
|
|
if needs_ocr:
|
|
for d in needs_ocr:
|
|
if d.document_id != document.document_id:
|
|
next_ocr = d
|
|
break
|
|
|
|
if reviewed_no_fields:
|
|
for d in reviewed_no_fields:
|
|
if d.document_id != document.document_id:
|
|
next_fields = d
|
|
break
|
|
|
|
return {
|
|
"prev_doc": prev_doc,
|
|
"next_doc": next_doc,
|
|
"next_ocr_doc": next_ocr,
|
|
"next_fields_doc": next_fields,
|
|
}
|
|
|
|
|
|
def _document_matches_filters(
|
|
doc: Document,
|
|
q: str,
|
|
document_type: str,
|
|
review_status: str,
|
|
merchant: str,
|
|
owner_primary: str,
|
|
) -> bool:
|
|
q_norm = q.strip().lower()
|
|
type_norm = document_type.strip().lower()
|
|
review_norm = review_status.strip().lower()
|
|
merchant_norm = merchant.strip().lower()
|
|
owner_norm = owner_primary.strip().lower()
|
|
|
|
if q_norm:
|
|
haystacks = [
|
|
doc.document_id or "",
|
|
doc.document_type or "",
|
|
doc.original_filename or "",
|
|
doc.canonical_filename or "",
|
|
doc.current_path or "",
|
|
doc.source_path or "",
|
|
]
|
|
current_extracted = get_current_extracted_fields(doc)
|
|
current_additional = _get_current_additional_fields(doc)
|
|
if current_extracted is not None:
|
|
haystacks.extend([
|
|
current_extracted.merchant_raw or "",
|
|
current_extracted.merchant_normalized or "",
|
|
current_extracted.location or "",
|
|
current_extracted.counterparty or "",
|
|
current_extracted.receipt_number or "",
|
|
])
|
|
if current_additional is not None:
|
|
haystacks.extend([
|
|
current_additional.owner_primary or "",
|
|
current_additional.owner_secondary or "",
|
|
current_additional.paid_by_person or "",
|
|
current_additional.occasion_note or "",
|
|
])
|
|
if not any(q_norm in h.lower() for h in haystacks):
|
|
return False
|
|
|
|
if type_norm and type_norm != (doc.document_type or "").lower():
|
|
return False
|
|
|
|
if review_norm and review_norm != (doc.review_status or "").lower():
|
|
return False
|
|
|
|
if merchant_norm:
|
|
current_extracted = get_current_extracted_fields(doc)
|
|
merchant_values = []
|
|
if current_extracted is not None:
|
|
merchant_values = [
|
|
current_extracted.merchant_raw or "",
|
|
current_extracted.merchant_normalized or "",
|
|
]
|
|
if not any(merchant_norm in m.lower() for m in merchant_values):
|
|
return False
|
|
|
|
if owner_norm:
|
|
current_additional = _get_current_additional_fields(doc)
|
|
owner_values = []
|
|
if current_additional is not None:
|
|
owner_values = [
|
|
current_additional.owner_primary or "",
|
|
current_additional.owner_secondary or "",
|
|
]
|
|
if not any(owner_norm in o.lower() for o in owner_values):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
@router.get("/", response_class=HTMLResponse)
|
|
def list_documents(
|
|
request: Request,
|
|
q: str = Query("", description="Search"),
|
|
document_type: str = Query("", description="Document type"),
|
|
review_status: str = Query("", description="Review status"),
|
|
merchant: str = Query("", description="Merchant contains"),
|
|
owner_primary: str = Query("", description="Owner contains"),
|
|
tab: str = Query("all-documents"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
documents_all = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.extracted_fields),
|
|
selectinload(Document.additional_fields),
|
|
)
|
|
.filter(Document.is_trashed.is_(False))
|
|
.order_by(Document.created_at.desc())
|
|
.all()
|
|
)
|
|
|
|
has_search_query = any([
|
|
q.strip(),
|
|
document_type.strip(),
|
|
review_status.strip(),
|
|
merchant.strip(),
|
|
owner_primary.strip(),
|
|
])
|
|
|
|
filtered_documents = documents_all
|
|
if has_search_query:
|
|
filtered_documents = []
|
|
for doc in documents_all:
|
|
if _document_matches_filters(
|
|
doc=doc,
|
|
q=q,
|
|
document_type=document_type,
|
|
review_status=review_status,
|
|
merchant=merchant,
|
|
owner_primary=owner_primary,
|
|
):
|
|
filtered_documents.append(doc)
|
|
|
|
if tab not in {"all-documents", "advanced-search"}:
|
|
tab = "all-documents"
|
|
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="documents/list.html",
|
|
context={
|
|
"request": request,
|
|
"documents": filtered_documents,
|
|
"q": q,
|
|
"document_type": document_type,
|
|
"review_status": review_status,
|
|
"merchant": merchant,
|
|
"owner_primary": owner_primary,
|
|
"has_search_query": has_search_query,
|
|
"active_tab": tab,
|
|
"active_page": "documents",
|
|
},
|
|
)
|
|
|
|
|
|
|
|
@router.post("/{document_id}/save-document-type", response_class=RedirectResponse)
|
|
def save_document_type_route(
|
|
document_id: str,
|
|
document_type: str = Form(""),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
document = db.query(Document).filter(Document.document_id == document_id).first()
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
document.document_type = document_type.strip() or None
|
|
db.commit()
|
|
|
|
return RedirectResponse(url=f"/documents/{document.document_id}?tab=ocr-review&success=rerun_ocr", status_code=303)
|
|
|
|
|
|
@router.post("/{document_id}/rerun-ocr", response_class=RedirectResponse)
|
|
def rerun_ocr(document_id: str, db: Session = Depends(get_db)):
|
|
document = db.query(Document).filter(Document.document_id == document_id).first()
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
try:
|
|
rerun_ocr_for_document(db, document)
|
|
except Exception:
|
|
return RedirectResponse(url=f"/documents/{document.document_id}?error=rerun_ocr_failed", status_code=303)
|
|
|
|
return RedirectResponse(url=f"/documents/{document.document_id}?editor_source=raw&tab=ocr-review", status_code=303)
|
|
|
|
|
|
@router.post("/{document_id}/save-ocr-corrected-pdf", response_class=RedirectResponse)
|
|
def save_ocr_corrected_pdf(document_id: str, db: Session = Depends(get_db)):
|
|
document = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.text_versions),
|
|
selectinload(Document.naming_fields),
|
|
selectinload(Document.extracted_fields),
|
|
selectinload(Document.additional_fields),
|
|
)
|
|
.filter(Document.document_id == document_id)
|
|
.first()
|
|
)
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
save_root = get_default_save_root()
|
|
naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None
|
|
output_path = Path(
|
|
build_proposed_storage_path(
|
|
document=document,
|
|
save_root=save_root,
|
|
naming_row=naming_row,
|
|
)
|
|
)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
create_ocr_corrected_pdf_version(db, document, output_path=output_path_obj)
|
|
except Exception:
|
|
return RedirectResponse(url=f"/documents/{document.document_id}?error=save_ocr_corrected_failed", status_code=303)
|
|
|
|
return RedirectResponse(url=f"/documents/{document.document_id}?tab=ocr-review", status_code=303)
|
|
|
|
|
|
|
|
@router.post("/{document_id}/save-review-flags", response_class=RedirectResponse)
|
|
def save_review_flags(
|
|
document_id: str,
|
|
is_approved: str = Form(""),
|
|
is_excluded: str = Form(""),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
document = db.query(Document).filter(Document.document_id == document_id).first()
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
state = _get_or_create_document_review_state(db, document)
|
|
state.is_approved = bool(is_approved)
|
|
state.is_excluded = bool(is_excluded)
|
|
state.reviewed_at = datetime.utcnow()
|
|
db.add(state)
|
|
db.commit()
|
|
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?success=saved_review_flags",
|
|
status_code=303,
|
|
)
|
|
|
|
|
|
@router.post("/{document_id}/move-to-trash", response_class=RedirectResponse)
|
|
def move_to_trash(document_id: str, db: Session = Depends(get_db)):
|
|
document = db.query(Document).filter(Document.document_id == document_id).first()
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
document.is_trashed = True
|
|
document.trashed_at = datetime.utcnow()
|
|
db.commit()
|
|
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
|
|
|
|
@router.post("/{document_id}/save-pdf", response_class=RedirectResponse)
|
|
def save_pdf(document_id: str, output_path: str = Form(""), db: Session = Depends(get_db)):
|
|
if not _storage_available():
|
|
return RedirectResponse(
|
|
url=f"/documents/{document_id}?error=storage_unavailable",
|
|
status_code=303,
|
|
)
|
|
document = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.text_versions),
|
|
selectinload(Document.naming_fields),
|
|
selectinload(Document.extracted_fields),
|
|
selectinload(Document.additional_fields),
|
|
)
|
|
.filter(Document.document_id == document_id)
|
|
.first()
|
|
)
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
save_root = get_default_save_root()
|
|
naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None
|
|
|
|
default_output_path = Path(
|
|
build_proposed_storage_path(
|
|
document=document,
|
|
save_root=save_root,
|
|
naming_row=naming_row,
|
|
)
|
|
)
|
|
default_output_path = default_output_path.with_name(
|
|
re.sub(r"(?:_v\d+|_\d+)(?=\.[^.]+$)", "", default_output_path.name)
|
|
)
|
|
if default_output_path.suffix.lower() != ".pdf":
|
|
default_output_path = default_output_path.with_suffix(".pdf")
|
|
|
|
output_path_raw = (output_path or "").strip()
|
|
if output_path_raw:
|
|
output_path_obj = Path(output_path_raw)
|
|
else:
|
|
output_path_obj = default_output_path
|
|
|
|
if output_path_obj.suffix.lower() != ".pdf":
|
|
output_path_obj = output_path_obj.with_suffix(".pdf")
|
|
|
|
allowed_root = Path(save_root).resolve()
|
|
resolved_parent = output_path_obj.parent.resolve()
|
|
if allowed_root != resolved_parent and allowed_root not in resolved_parent.parents:
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?error=invalid_output_path",
|
|
status_code=303,
|
|
)
|
|
|
|
output_path_obj.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
has_extracted = bool(getattr(document, "extracted_fields", None))
|
|
has_additional = bool(getattr(document, "additional_fields", None))
|
|
|
|
try:
|
|
if has_extracted or has_additional:
|
|
create_field_enriched_pdf_version(db, document, output_path=output_path_obj)
|
|
else:
|
|
create_ocr_corrected_pdf_version(db, document, output_path=output_path_obj)
|
|
except Exception as e:
|
|
print("save_pdf failed:", repr(e), flush=True)
|
|
traceback.print_exc()
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?error=save_pdf_failed",
|
|
status_code=303,
|
|
)
|
|
|
|
return RedirectResponse(url=f"/documents/{document.document_id}?tab=ocr-review", status_code=303)
|
|
|
|
|
|
|
|
@router.post("/{document_id}/save-field-enriched-pdf", response_class=RedirectResponse)
|
|
def save_field_enriched_pdf(document_id: str, db: Session = Depends(get_db)):
|
|
document = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.naming_fields),
|
|
selectinload(Document.extracted_fields),
|
|
selectinload(Document.additional_fields),
|
|
)
|
|
.filter(Document.document_id == document_id)
|
|
.first()
|
|
)
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
save_root = get_default_save_root()
|
|
naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None
|
|
output_path = Path(
|
|
build_proposed_storage_path(
|
|
document=document,
|
|
save_root=save_root,
|
|
naming_row=naming_row,
|
|
)
|
|
)
|
|
output_path = output_path.with_name(
|
|
re.sub(r"_v\d+(?=\.[^.]+$)", "", output_path.name)
|
|
)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
create_field_enriched_pdf_version(db, document, output_path=output_path_obj)
|
|
except Exception as e:
|
|
return RedirectResponse(url=f"/documents/{document.document_id}?error=save_field_enriched_failed", status_code=303)
|
|
|
|
return RedirectResponse(url=f"/documents/{document.document_id}?tab=extracted-fields", status_code=303)
|
|
|
|
|
|
@router.post("/{document_id}/review-text", response_class=RedirectResponse)
|
|
def save_reviewed_text(
|
|
document_id: str,
|
|
reviewed_text: str = Form(...),
|
|
quality_flags: list[str] | None = Form(None),
|
|
quality_note: str = Form(""),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
document = (
|
|
db.query(Document)
|
|
.options(selectinload(Document.text_versions))
|
|
.filter(Document.document_id == document_id)
|
|
.first()
|
|
)
|
|
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
raw_ocr, _ = _get_current_text_versions(document)
|
|
expected_line_count = _line_count_from_layout(raw_ocr.layout_json if raw_ocr else None)
|
|
actual_line_count = len(reviewed_text.splitlines())
|
|
|
|
if expected_line_count and actual_line_count != expected_line_count:
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?error=line_count_mismatch&expected={expected_line_count}&actual={actual_line_count}&tab=ocr-review",
|
|
status_code=303,
|
|
)
|
|
|
|
existing_reviewed = [tv for tv in document.text_versions if tv.version_type == "reviewed" and tv.is_current]
|
|
for tv in existing_reviewed:
|
|
tv.is_current = False
|
|
|
|
reviewed_layout = _apply_reviewed_lines_to_layout(
|
|
raw_ocr.layout_json if raw_ocr else None,
|
|
reviewed_text,
|
|
)
|
|
|
|
reviewed_version = TextVersion(
|
|
document_id=document.id,
|
|
version_number=max(tv.version_number for tv in document.text_versions) + 1 if document.text_versions else 1,
|
|
version_type="reviewed",
|
|
text_content=reviewed_text,
|
|
created_by="mcelwain",
|
|
is_current=True,
|
|
derived_from_version_id=raw_ocr.id if raw_ocr else None,
|
|
layout_json=reviewed_layout,
|
|
)
|
|
db.add(reviewed_version)
|
|
|
|
if raw_ocr:
|
|
raw_ocr.quality_score = compute_quality_score(raw_ocr.text_content, reviewed_text)
|
|
raw_ocr.quality_flags = quality_flags or []
|
|
raw_ocr.quality_note = quality_note or None
|
|
|
|
document.review_status = "reviewed"
|
|
db.commit()
|
|
|
|
return RedirectResponse(url=f"/documents/{document.document_id}?tab=line-items&success=saved_reviewed_ocr", status_code=303)
|
|
|
|
|
|
@router.post("/{document_id}/save-extracted-fields", response_class=RedirectResponse)
|
|
def save_extracted_fields_route(
|
|
document_id: str,
|
|
merchant_raw: str = Form(""),
|
|
merchant_normalized: str = Form(""),
|
|
transaction_date: str = Form(""),
|
|
transaction_time: str = Form(""),
|
|
subtotal: str = Form(""),
|
|
tax: str = Form(""),
|
|
total: str = Form(""),
|
|
currency: str = Form(""),
|
|
payment_method: str = Form(""),
|
|
receipt_number: str = Form(""),
|
|
location: str = Form(""),
|
|
counterparty: str = Form(""),
|
|
extra_json: str = Form("{}"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
document = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.extracted_fields),
|
|
selectinload(Document.receipt_line_items),
|
|
)
|
|
.filter(Document.document_id == document_id)
|
|
.first()
|
|
)
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
save_extracted_fields(
|
|
db=db,
|
|
document=document,
|
|
merchant_raw=merchant_raw,
|
|
merchant_normalized=merchant_normalized,
|
|
transaction_date=transaction_date,
|
|
transaction_time=transaction_time,
|
|
subtotal=subtotal,
|
|
tax=tax,
|
|
total=total,
|
|
currency=currency,
|
|
payment_method=payment_method,
|
|
receipt_number=receipt_number,
|
|
location=location,
|
|
counterparty=counterparty,
|
|
extra_json=extra_json,
|
|
)
|
|
|
|
db.refresh(document)
|
|
current_extracted = get_current_extracted_fields(document)
|
|
if current_extracted is not None:
|
|
_snapshot_extracted_field(
|
|
db,
|
|
document,
|
|
current_extracted,
|
|
created_by="save_extracted_fields",
|
|
notes="Saved extracted fields from document detail form.",
|
|
)
|
|
db.commit()
|
|
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?autofill_extracted=0&tab=extracted-fields",
|
|
status_code=303,
|
|
)
|
|
|
|
@router.post("/{document_id}/save-additional-fields", response_class=RedirectResponse)
|
|
def save_additional_fields_route(
|
|
document_id: str,
|
|
owner_primary: str = Form(""),
|
|
owner_secondary: str = Form(""),
|
|
paid_by_person: str = Form(""),
|
|
covered_people: str = Form(""),
|
|
attendees: str = Form(""),
|
|
occasion_note: str = Form(""),
|
|
is_shared_expense: str | None = Form(None),
|
|
reimbursement_expected_from: str = Form(""),
|
|
reimbursement_paid_by: str = Form(""),
|
|
reimbursement_paid_to: str = Form(""),
|
|
reimbursement_paid_amount: str = Form(""),
|
|
reimbursement_paid_date: str = Form(""),
|
|
reimbursement_note: str = Form(""),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
document = (
|
|
db.query(Document)
|
|
.options(selectinload(Document.additional_fields))
|
|
.filter(Document.document_id == document_id)
|
|
.first()
|
|
)
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None
|
|
if additional is None:
|
|
additional = DocumentAdditionalField(document_id=document.id)
|
|
db.add(additional)
|
|
db.flush()
|
|
|
|
additional.owner_primary = owner_primary or None
|
|
additional.owner_secondary = owner_secondary or None
|
|
additional.paid_by_person = paid_by_person or None
|
|
additional.covered_people = [v.strip() for v in covered_people.split(",") if v.strip()] or None
|
|
additional.attendees = [v.strip() for v in attendees.split(",") if v.strip()] or None
|
|
additional.occasion_note = occasion_note or None
|
|
additional.is_shared_expense = bool(is_shared_expense)
|
|
additional.reimbursement_expected_from = [v.strip() for v in reimbursement_expected_from.split(",") if v.strip()] or None
|
|
additional.reimbursement_paid_by = reimbursement_paid_by or None
|
|
additional.reimbursement_paid_to = reimbursement_paid_to or None
|
|
additional.reimbursement_paid_amount = Decimal(reimbursement_paid_amount) if reimbursement_paid_amount.strip() else None
|
|
additional.reimbursement_paid_date = datetime.strptime(reimbursement_paid_date, "%Y-%m-%d").date() if reimbursement_paid_date.strip() else None
|
|
additional.reimbursement_note = reimbursement_note or None
|
|
|
|
db.add(additional)
|
|
db.commit()
|
|
|
|
db.refresh(document)
|
|
current_additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None
|
|
if current_additional is not None:
|
|
_snapshot_additional_field(
|
|
db,
|
|
document,
|
|
current_additional,
|
|
created_by="save_additional_fields",
|
|
notes="Saved additional fields from document detail form.",
|
|
)
|
|
db.commit()
|
|
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?tab=additional-fields",
|
|
status_code=303,
|
|
)
|
|
|
|
|
|
|
|
@router.post("/{document_id}/regenerate-line-items", response_class=RedirectResponse)
|
|
def regenerate_line_items(document_id: str, db: Session = Depends(get_db)):
|
|
document = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.text_versions),
|
|
selectinload(Document.line_item_set).selectinload(DocumentLineItemSet.items),
|
|
selectinload(Document.line_item_set_versions),
|
|
)
|
|
.filter(Document.document_id == document_id)
|
|
.first()
|
|
)
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
text_version = _get_current_reviewed_text(document)
|
|
if text_version is None:
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?tab=line-items&error=regenerate_line_items_failed",
|
|
status_code=303,
|
|
)
|
|
|
|
try:
|
|
lines = _get_document_lines(text_version)
|
|
items = _extract_receipt_line_items(lines)
|
|
_replace_document_line_items(db, document, items)
|
|
db.flush()
|
|
|
|
next_version = max([v.version_number for v in document.line_item_set_versions], default=0) + 1
|
|
version = DocumentLineItemSetVersion(
|
|
document_id=document.id,
|
|
version_number=next_version,
|
|
schema_type=document.line_item_set.schema_type if document.line_item_set else (document.document_type or "generic"),
|
|
created_by="regenerate_line_items",
|
|
notes="Regenerated line items from current OCR text.",
|
|
)
|
|
db.add(version)
|
|
db.flush()
|
|
|
|
current_items = (
|
|
db.query(DocumentLineItem)
|
|
.filter(DocumentLineItem.line_item_set_id == document.line_item_set.id)
|
|
.order_by(DocumentLineItem.line_number.asc())
|
|
.all()
|
|
)
|
|
|
|
for item in current_items:
|
|
db.add(DocumentLineItemVersionItem(
|
|
set_version_id=version.id,
|
|
line_number=item.line_number,
|
|
entry_date=item.entry_date,
|
|
description=item.description,
|
|
quantity=item.quantity,
|
|
unit_price=item.unit_price,
|
|
line_total=item.line_total,
|
|
tax_amount=item.tax_amount,
|
|
category=item.category,
|
|
notes=item.notes,
|
|
raw_json=item.raw_json,
|
|
))
|
|
|
|
db.commit()
|
|
except Exception:
|
|
traceback.print_exc()
|
|
db.rollback()
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?tab=line-items&error=regenerate_line_items_failed",
|
|
status_code=303,
|
|
)
|
|
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?tab=line-items&success=regenerated_line_items",
|
|
status_code=303,
|
|
)
|
|
|
|
|
|
@router.post("/{document_id}/save-line-items", response_class=RedirectResponse)
|
|
async def save_line_items(
|
|
document_id: str,
|
|
request: Request,
|
|
row_count: int = Form(...),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
document = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.line_item_set).selectinload(DocumentLineItemSet.items),
|
|
selectinload(Document.line_item_set_versions),
|
|
)
|
|
.filter(Document.document_id == document_id)
|
|
.first()
|
|
)
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
form = await request.form()
|
|
|
|
if document.line_item_set is None:
|
|
document.line_item_set = DocumentLineItemSet(
|
|
document_id=document.id,
|
|
schema_type=document.document_type or "generic",
|
|
)
|
|
db.add(document.line_item_set)
|
|
db.flush()
|
|
|
|
document.line_item_set.schema_type = document.document_type or "generic"
|
|
document.line_item_set.items.clear()
|
|
db.flush()
|
|
|
|
for i in range(row_count):
|
|
entry_date = (form.get(f"entry_date_{i}") or "").strip()
|
|
description = (form.get(f"description_{i}") or "").strip()
|
|
quantity = (form.get(f"quantity_{i}") or "").strip()
|
|
unit_price = (form.get(f"unit_price_{i}") or "").strip()
|
|
line_total = (form.get(f"line_total_{i}") or "").strip()
|
|
tax_amount = (form.get(f"tax_amount_{i}") or "").strip()
|
|
category = (form.get(f"category_{i}") or "").strip()
|
|
notes = (form.get(f"notes_{i}") or "").strip()
|
|
|
|
if not any([entry_date, description, quantity, unit_price, line_total, tax_amount, category, notes]):
|
|
continue
|
|
|
|
item = DocumentLineItem(
|
|
line_item_set_id=document.line_item_set.id,
|
|
line_number=i + 1,
|
|
entry_date=datetime.strptime(entry_date, "%Y-%m-%d").date() if entry_date else None,
|
|
description=description or None,
|
|
quantity=Decimal(quantity) if quantity else None,
|
|
unit_price=Decimal(unit_price) if unit_price else None,
|
|
line_total=Decimal(line_total) if line_total else None,
|
|
tax_amount=Decimal(tax_amount) if tax_amount else None,
|
|
category=category or None,
|
|
notes=notes or None,
|
|
)
|
|
db.add(item)
|
|
|
|
db.flush()
|
|
|
|
next_version = max([v.version_number for v in document.line_item_set_versions], default=0) + 1
|
|
version = DocumentLineItemSetVersion(
|
|
document_id=document.id,
|
|
version_number=next_version,
|
|
schema_type=document.line_item_set.schema_type,
|
|
created_by="save_line_items",
|
|
notes="Saved line items from document detail tab.",
|
|
)
|
|
db.add(version)
|
|
db.flush()
|
|
|
|
current_items = (
|
|
db.query(DocumentLineItem)
|
|
.filter(DocumentLineItem.line_item_set_id == document.line_item_set.id)
|
|
.order_by(DocumentLineItem.line_number.asc())
|
|
.all()
|
|
)
|
|
|
|
for item in current_items:
|
|
db.add(DocumentLineItemVersionItem(
|
|
set_version_id=version.id,
|
|
line_number=item.line_number,
|
|
entry_date=item.entry_date,
|
|
description=item.description,
|
|
quantity=item.quantity,
|
|
unit_price=item.unit_price,
|
|
line_total=item.line_total,
|
|
tax_amount=item.tax_amount,
|
|
category=item.category,
|
|
notes=item.notes,
|
|
raw_json=item.raw_json,
|
|
))
|
|
|
|
db.commit()
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?tab=line-items",
|
|
status_code=303,
|
|
)
|
|
|
|
@router.get("/{document_id}/preview-file")
|
|
def document_preview_file(document_id: str, db: Session = Depends(get_db)):
|
|
document = db.query(Document).filter(Document.document_id == document_id).first()
|
|
if document is None or not document.current_path:
|
|
return HTMLResponse(content="Preview file not found", status_code=404)
|
|
|
|
path_obj = Path(document.current_path)
|
|
if not path_obj.exists() or not path_obj.is_file():
|
|
return HTMLResponse(content="Preview file not found", status_code=404)
|
|
|
|
media_type = document.mime_type or "application/octet-stream"
|
|
return FileResponse(path=str(path_obj), media_type=media_type, filename=path_obj.name, headers={"Content-Disposition": "inline; filename=\"" + path_obj.name + "\""})
|
|
|
|
|
|
@router.get("/{document_id}", response_class=HTMLResponse)
|
|
def document_detail(document_id: str, request: Request, queue: str | None = None, db: Session = Depends(get_db)):
|
|
document = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.versions),
|
|
selectinload(Document.text_versions),
|
|
selectinload(Document.extracted_fields),
|
|
selectinload(Document.layer1_candidates),
|
|
selectinload(Document.additional_fields),
|
|
)
|
|
.filter(Document.document_id == document_id)
|
|
.first()
|
|
)
|
|
|
|
if document is None:
|
|
return HTMLResponse(content="Document not found", status_code=404)
|
|
|
|
raw_ocr, reviewed_ocr = _get_current_text_versions(document)
|
|
current_text_version = next(
|
|
(
|
|
tv for tv in sorted(
|
|
getattr(document, "text_versions", []),
|
|
key=lambda x: (x.version_number, x.created_at),
|
|
reverse=True,
|
|
)
|
|
if tv.is_current
|
|
),
|
|
None,
|
|
)
|
|
|
|
editor_source = request.query_params.get("editor_source", "reviewed")
|
|
review_text_value = _build_review_text_value(raw_ocr, reviewed_ocr, editor_source)
|
|
|
|
expected_line_count = _line_count_from_layout(raw_ocr.layout_json if raw_ocr else None)
|
|
actual_line_count = len(review_text_value.splitlines()) if review_text_value else 0
|
|
line_numbers = list(range(1, max(actual_line_count, expected_line_count) + 1))
|
|
|
|
file_url = None
|
|
storage_available = _storage_available()
|
|
if document.current_path:
|
|
current_path = Path(document.current_path)
|
|
if current_path.exists() and current_path.is_file():
|
|
file_url = str(request.url_for("document_preview_file", document_id=document.document_id))
|
|
|
|
app_url = str(request.url_for("document_detail", document_id=document.document_id))
|
|
error = request.query_params.get("error")
|
|
success = request.query_params.get("success")
|
|
error_expected = request.query_params.get("expected")
|
|
error_actual = request.query_params.get("actual")
|
|
|
|
preset_id_raw = request.query_params.get("preset_id")
|
|
try:
|
|
preset_id = int(preset_id_raw) if preset_id_raw else None
|
|
except ValueError:
|
|
preset_id = None
|
|
|
|
selected_preset = _get_preset_by_id(db, preset_id)
|
|
all_presets = _get_all_presets(db)
|
|
existing_document_types = _get_existing_document_types(db)
|
|
|
|
extracted_form = _extracted_field_form_values(document, request)
|
|
additional_form = _additional_field_form_values(document, selected_preset)
|
|
current_extracted = get_current_extracted_fields(document)
|
|
current_additional = _get_current_additional_fields(document)
|
|
current_extracted_version_number = _get_current_extracted_version_number(document)
|
|
current_additional_version_number = _get_current_additional_version_number(document)
|
|
|
|
line_items = []
|
|
if document.line_item_set and document.line_item_set.items:
|
|
line_items = sorted(
|
|
document.line_item_set.items,
|
|
key=lambda x: x.line_number or 0,
|
|
)
|
|
|
|
review_state = _get_or_create_document_review_state(db, document)
|
|
|
|
queue_nav = _get_queue_navigation(db, document)
|
|
|
|
naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None
|
|
default_save_root = get_default_save_root()
|
|
proposed_storage_path = build_proposed_storage_path(
|
|
document=document,
|
|
save_root=default_save_root,
|
|
naming_row=naming_row,
|
|
)
|
|
proposed_storage_path = str(
|
|
Path(proposed_storage_path).with_name(
|
|
re.sub(r"(?:_v\d+|_\d+)(?=\.[^.]+$)", "", Path(proposed_storage_path).name)
|
|
)
|
|
)
|
|
|
|
version_rows = []
|
|
for version in sorted(getattr(document, "versions", []), key=lambda v: v.version_number, reverse=True):
|
|
file_exists = _version_file_available(version, document.document_id)
|
|
version_rows.append((version, file_exists))
|
|
|
|
current_line_item_version = None
|
|
if document.line_item_set_versions:
|
|
current_line_item_version = max(
|
|
document.line_item_set_versions,
|
|
key=lambda v: (v.version_number, v.created_at),
|
|
)
|
|
|
|
ocr_version_options = [
|
|
(v.version_number, v.version_type, v.created_at)
|
|
for v in sorted(getattr(document, "text_versions", []), key=lambda v: v.version_number, reverse=True)
|
|
]
|
|
extracted_version_options = [
|
|
(v.version_number, v.created_at)
|
|
for v in sorted(getattr(document, "extracted_field_versions", []), key=lambda v: v.version_number, reverse=True)
|
|
]
|
|
additional_version_options = [
|
|
(v.version_number, v.created_at)
|
|
for v in sorted(getattr(document, "additional_field_versions", []), key=lambda v: v.version_number, reverse=True)
|
|
]
|
|
|
|
active_tab = request.query_params.get("tab", "ocr-review")
|
|
if active_tab not in {"ocr-review", "extracted-fields", "additional-fields", "versions", "raw-ocr", "source-options"}:
|
|
active_tab = "ocr-review"
|
|
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="documents/detail.html",
|
|
context={
|
|
"request": request,
|
|
"document": document,
|
|
"review_state": review_state,
|
|
"default_save_root": default_save_root,
|
|
"proposed_storage_path": proposed_storage_path,
|
|
"prev_doc": queue_nav.get("prev_doc"),
|
|
"next_doc": queue_nav.get("next_doc"),
|
|
"next_ocr_doc": queue_nav.get("next_ocr_doc"),
|
|
"next_fields_doc": queue_nav.get("next_fields_doc"),
|
|
"raw_ocr": raw_ocr,
|
|
"reviewed_ocr": reviewed_ocr,
|
|
"current_text_version": current_text_version,
|
|
"review_text_value": review_text_value,
|
|
"file_url": file_url,
|
|
"storage_available": storage_available,
|
|
"version_rows": version_rows,
|
|
"current_line_item_version": current_line_item_version,
|
|
"ocr_version_options": ocr_version_options,
|
|
"extracted_version_options": extracted_version_options,
|
|
"additional_version_options": additional_version_options,
|
|
"app_url": app_url,
|
|
"quality_flag_options": QUALITY_FLAG_OPTIONS,
|
|
"current_quality_flags": raw_ocr.quality_flags if raw_ocr and raw_ocr.quality_flags else [],
|
|
"current_quality_note": raw_ocr.quality_note if raw_ocr and raw_ocr.quality_note else "",
|
|
"line_numbers": line_numbers,
|
|
"expected_line_count": expected_line_count,
|
|
"actual_line_count": actual_line_count,
|
|
"error": error,
|
|
"success": success,
|
|
"error_expected": error_expected,
|
|
"error_actual": error_actual,
|
|
"extracted_form": extracted_form,
|
|
"current_extracted": current_extracted,
|
|
"current_extracted_version_number": current_extracted_version_number,
|
|
"additional_form": additional_form,
|
|
"current_additional": current_additional,
|
|
"current_additional_version_number": current_additional_version_number,
|
|
"line_items": line_items,
|
|
"presets": all_presets,
|
|
"selected_preset_id": preset_id,
|
|
"existing_document_types": existing_document_types,
|
|
"active_tab": active_tab,
|
|
"active_page": "documents",
|
|
},
|
|
)
|
|
|
|
|
|
|
|
|
|
def _get_current_ocr_text_for_document_export(document: Document) -> str:
|
|
reviewed_rows = [
|
|
tv for tv in getattr(document, "text_versions", [])
|
|
if tv.version_type == "reviewed" and tv.is_current
|
|
]
|
|
if reviewed_rows:
|
|
reviewed_rows.sort(key=lambda x: (x.version_number, x.created_at), reverse=True)
|
|
return reviewed_rows[0].text_content or ""
|
|
|
|
raw_rows = [
|
|
tv for tv in getattr(document, "text_versions", [])
|
|
if tv.version_type == "raw_ocr" and tv.is_current
|
|
]
|
|
if raw_rows:
|
|
raw_rows.sort(key=lambda x: (x.version_number, x.created_at), reverse=True)
|
|
return raw_rows[0].text_content or ""
|
|
|
|
return ""
|
|
|
|
|
|
@router.get("/export/training.jsonl")
|
|
def export_training_jsonl(db: Session = Depends(get_db)):
|
|
docs = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.text_versions),
|
|
selectinload(Document.naming_fields),
|
|
selectinload(Document.extracted_fields),
|
|
selectinload(Document.additional_fields),
|
|
selectinload(Document.line_item_set).selectinload(DocumentLineItemSet.items),
|
|
selectinload(Document.review_state),
|
|
)
|
|
.order_by(Document.updated_at.asc())
|
|
.all()
|
|
)
|
|
|
|
export_dir = Path("/mnt/storage/document-processor/exports")
|
|
export_dir.mkdir(parents=True, exist_ok=True)
|
|
out_path = export_dir / "document_training.jsonl"
|
|
|
|
with out_path.open("w", encoding="utf-8") as f:
|
|
for document in docs:
|
|
review_state = getattr(document, "review_state", None)
|
|
if review_state is None:
|
|
continue
|
|
if not review_state.reviewed_at:
|
|
continue
|
|
if not review_state.is_approved:
|
|
continue
|
|
if review_state.is_excluded:
|
|
continue
|
|
|
|
extracted = get_current_extracted_fields(document)
|
|
additional = _get_current_additional_fields(document)
|
|
|
|
line_items = []
|
|
if document.line_item_set and document.line_item_set.items:
|
|
for item in sorted(document.line_item_set.items, key=lambda x: x.line_number or 0):
|
|
line_items.append(
|
|
{
|
|
"line_item_id": item.id,
|
|
"line_number": item.line_number,
|
|
"entry_date": item.entry_date.isoformat() if item.entry_date else "",
|
|
"description": item.description or "",
|
|
"quantity": str(item.quantity) if item.quantity is not None else "",
|
|
"unit_price": str(item.unit_price) if item.unit_price is not None else "",
|
|
"line_total": str(item.line_total) if item.line_total is not None else "",
|
|
"tax_amount": str(item.tax_amount) if item.tax_amount is not None else "",
|
|
"category": item.category or "",
|
|
"notes": item.notes or "",
|
|
"raw_json": item.raw_json or {},
|
|
}
|
|
)
|
|
|
|
payload = {
|
|
"schema_version": review_state.schema_version or "v1",
|
|
"document": {
|
|
"document_id": document.document_id,
|
|
"document_type": document.document_type or "",
|
|
"original_filename": document.original_filename or "",
|
|
"canonical_filename": document.canonical_filename or "",
|
|
"mime_type": document.mime_type or "",
|
|
"source_path": document.source_path or "",
|
|
"current_path": document.current_path or "",
|
|
"created_at": document.created_at.isoformat() if document.created_at else "",
|
|
"updated_at": document.updated_at.isoformat() if document.updated_at else "",
|
|
},
|
|
"review": {
|
|
"reviewed_at": review_state.reviewed_at.isoformat() if review_state.reviewed_at else "",
|
|
"is_approved": bool(review_state.is_approved),
|
|
"is_excluded": bool(review_state.is_excluded),
|
|
},
|
|
"ocr_text": _get_current_ocr_text_for_document_export(document),
|
|
"extracted_fields": {
|
|
"merchant_raw": extracted.merchant_raw if extracted else "",
|
|
"merchant_normalized": extracted.merchant_normalized if extracted else "",
|
|
"transaction_date": extracted.transaction_date.isoformat() if extracted and extracted.transaction_date else "",
|
|
"transaction_time": extracted.transaction_time if extracted else "",
|
|
"subtotal": str(extracted.subtotal) if extracted and extracted.subtotal is not None else "",
|
|
"tax": str(extracted.tax) if extracted and extracted.tax is not None else "",
|
|
"total": str(extracted.total) if extracted and extracted.total is not None else "",
|
|
"currency": extracted.currency if extracted else "",
|
|
"payment_method": extracted.payment_method if extracted else "",
|
|
"receipt_number": extracted.receipt_number if extracted else "",
|
|
"location": extracted.location if extracted else "",
|
|
"counterparty": extracted.counterparty if extracted else "",
|
|
"extra_json": extracted.extra_json if extracted and extracted.extra_json else {},
|
|
},
|
|
"additional_fields": {
|
|
"owner_primary": additional.owner_primary if additional else "",
|
|
"owner_secondary": additional.owner_secondary if additional else "",
|
|
"paid_by_person": additional.paid_by_person if additional else "",
|
|
"occasion_note": additional.occasion_note if additional else "",
|
|
"is_shared_expense": bool(additional.is_shared_expense) if additional else False,
|
|
"covered_people": additional.covered_people if additional else "",
|
|
"attendees": additional.attendees if additional else "",
|
|
"reimbursement_expected_from": additional.reimbursement_expected_from if additional else "",
|
|
"reimbursement_paid_by": additional.reimbursement_paid_by if additional else "",
|
|
"reimbursement_paid_to": additional.reimbursement_paid_to if additional else "",
|
|
"reimbursement_paid_amount": str(additional.reimbursement_paid_amount) if additional and additional.reimbursement_paid_amount is not None else "",
|
|
"reimbursement_paid_date": additional.reimbursement_paid_date.isoformat() if additional and additional.reimbursement_paid_date else "",
|
|
"reimbursement_note": additional.reimbursement_note if additional else "",
|
|
},
|
|
"line_items": line_items,
|
|
}
|
|
|
|
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
|
|
|
|
return FileResponse(
|
|
path=str(out_path),
|
|
media_type="application/json",
|
|
filename=out_path.name,
|
|
)
|
|
|
|
|
|
@router.get("/export/reviewed.jsonl")
|
|
def export_reviewed_jsonl(db: Session = Depends(get_db)):
|
|
docs = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.text_versions),
|
|
selectinload(Document.naming_fields),
|
|
selectinload(Document.extracted_fields),
|
|
selectinload(Document.additional_fields),
|
|
selectinload(Document.versions),
|
|
)
|
|
.filter(Document.review_status == "reviewed")
|
|
.order_by(Document.updated_at.asc())
|
|
.all()
|
|
)
|
|
|
|
export_dir = Path("/mnt/storage/document-processor/exports")
|
|
export_dir.mkdir(parents=True, exist_ok=True)
|
|
out_path = export_dir / "reviewed_documents.jsonl"
|
|
|
|
with out_path.open("w", encoding="utf-8") as f:
|
|
for document in docs:
|
|
payload = _document_export_payload(document)
|
|
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
|
|
|
|
return FileResponse(
|
|
path=str(out_path),
|
|
media_type="application/json",
|
|
filename=out_path.name,
|
|
)
|
|
|
|
|
|
|
|
def _restore_ocr_to_original(db: Session, document: Document) -> bool:
|
|
target = (
|
|
db.query(TextVersion)
|
|
.filter(
|
|
TextVersion.document_id == document.id,
|
|
TextVersion.version_number == 1,
|
|
)
|
|
.first()
|
|
)
|
|
if target is None:
|
|
return False
|
|
|
|
all_versions = (
|
|
db.query(TextVersion)
|
|
.filter(TextVersion.document_id == document.id)
|
|
.all()
|
|
)
|
|
for tv in all_versions:
|
|
tv.is_current = (tv.id == target.id)
|
|
|
|
document.review_status = "reviewed" if target.version_type == "reviewed" else "pending"
|
|
db.add(document)
|
|
return True
|
|
|
|
|
|
def _restore_ocr_from_version_number(db: Session, document: Document, target_version_number: int) -> bool:
|
|
target = (
|
|
db.query(TextVersion)
|
|
.filter(
|
|
TextVersion.document_id == document.id,
|
|
TextVersion.version_number == target_version_number,
|
|
)
|
|
.first()
|
|
)
|
|
if target is None:
|
|
return False
|
|
|
|
all_versions = (
|
|
db.query(TextVersion)
|
|
.filter(TextVersion.document_id == document.id)
|
|
.all()
|
|
)
|
|
for tv in all_versions:
|
|
tv.is_current = (tv.id == target.id)
|
|
|
|
document.review_status = "reviewed" if target.version_type == "reviewed" else "pending"
|
|
db.add(document)
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_current_extracted_version_number(document: Document) -> int | None:
|
|
row = get_current_extracted_fields(document)
|
|
versions = getattr(document, "extracted_field_versions", None) or []
|
|
if row is None:
|
|
return None
|
|
for v in sorted(versions, key=lambda x: x.version_number, reverse=True):
|
|
if (
|
|
row.merchant_raw == v.merchant_raw
|
|
and row.merchant_normalized == v.merchant_normalized
|
|
and row.transaction_date == v.transaction_date
|
|
and row.transaction_time == v.transaction_time
|
|
and row.subtotal == v.subtotal
|
|
and row.tax == v.tax
|
|
and row.total == v.total
|
|
and row.currency == v.currency
|
|
and row.payment_method == v.payment_method
|
|
and row.receipt_number == v.receipt_number
|
|
and row.location == v.location
|
|
and row.counterparty == v.counterparty
|
|
and row.extra_json == v.extra_json
|
|
):
|
|
return v.version_number
|
|
return None
|
|
|
|
|
|
def _get_current_additional_version_number(document: Document) -> int | None:
|
|
row = _get_current_additional_fields(document)
|
|
versions = getattr(document, "additional_field_versions", None) or []
|
|
if row is None:
|
|
return None
|
|
for v in sorted(versions, key=lambda x: x.version_number, reverse=True):
|
|
if (
|
|
row.owner_primary == v.owner_primary
|
|
and row.owner_secondary == v.owner_secondary
|
|
and row.paid_by_person == v.paid_by_person
|
|
and row.occasion_note == v.occasion_note
|
|
and row.is_shared_expense == v.is_shared_expense
|
|
and row.covered_people == v.covered_people
|
|
and row.attendees == v.attendees
|
|
and row.reimbursement_expected_from == v.reimbursement_expected_from
|
|
and row.reimbursement_paid_by == v.reimbursement_paid_by
|
|
and row.reimbursement_paid_to == v.reimbursement_paid_to
|
|
and row.reimbursement_paid_amount == v.reimbursement_paid_amount
|
|
and row.reimbursement_paid_date == v.reimbursement_paid_date
|
|
and row.reimbursement_note == v.reimbursement_note
|
|
):
|
|
return v.version_number
|
|
return None
|
|
|
|
|
|
def _clear_line_items(db: Session, document: Document) -> bool:
|
|
if not document.line_item_set:
|
|
return False
|
|
had_items = bool(document.line_item_set.items)
|
|
document.line_item_set.items.clear()
|
|
db.flush()
|
|
return had_items
|
|
|
|
|
|
def _restore_line_items_from_version_number(db: Session, document: Document, target_version_number: int) -> bool:
|
|
version = (
|
|
db.query(DocumentLineItemSetVersion)
|
|
.options(selectinload(DocumentLineItemSetVersion.items))
|
|
.filter(
|
|
DocumentLineItemSetVersion.document_id == document.id,
|
|
DocumentLineItemSetVersion.version_number == target_version_number,
|
|
)
|
|
.first()
|
|
)
|
|
if version is None:
|
|
return False
|
|
|
|
if document.line_item_set is None:
|
|
document.line_item_set = DocumentLineItemSet(
|
|
document_id=document.id,
|
|
schema_type=version.schema_type or document.document_type or "generic",
|
|
)
|
|
db.add(document.line_item_set)
|
|
db.flush()
|
|
|
|
document.line_item_set.schema_type = version.schema_type or document.document_type or "generic"
|
|
document.line_item_set.items.clear()
|
|
db.flush()
|
|
|
|
for vi in sorted(version.items, key=lambda x: x.line_number):
|
|
db.add(DocumentLineItem(
|
|
line_item_set_id=document.line_item_set.id,
|
|
line_number=vi.line_number,
|
|
entry_date=vi.entry_date,
|
|
description=vi.description,
|
|
quantity=vi.quantity,
|
|
unit_price=vi.unit_price,
|
|
line_total=vi.line_total,
|
|
tax_amount=vi.tax_amount,
|
|
category=vi.category,
|
|
notes=vi.notes,
|
|
raw_json=vi.raw_json,
|
|
))
|
|
|
|
return True
|
|
|
|
|
|
def _parse_restore_choice(value: str) -> tuple[str, int | None]:
|
|
if not value or value == "none":
|
|
return ("none", None)
|
|
if value == "original":
|
|
return ("original", None)
|
|
if value.startswith("version:"):
|
|
try:
|
|
return ("version", int(value.split(":", 1)[1]))
|
|
except ValueError:
|
|
return ("none", None)
|
|
return ("none", None)
|
|
|
|
@router.post("/{document_id}/source-options", response_class=RedirectResponse)
|
|
def apply_source_options(
|
|
document_id: str,
|
|
file_action: str = Form("none"),
|
|
ocr_restore_choice: str = Form("none"),
|
|
extracted_restore_choice: str = Form("none"),
|
|
additional_restore_choice: str = Form("none"),
|
|
line_item_restore_choice: str = Form("none"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
document = (
|
|
db.query(Document)
|
|
.options(
|
|
selectinload(Document.text_versions),
|
|
selectinload(Document.naming_fields),
|
|
selectinload(Document.extracted_fields),
|
|
selectinload(Document.additional_fields),
|
|
selectinload(Document.versions),
|
|
selectinload(Document.extracted_field_versions),
|
|
selectinload(Document.additional_field_versions),
|
|
)
|
|
.filter(Document.document_id == document_id)
|
|
.first()
|
|
)
|
|
if document is None:
|
|
return RedirectResponse(url="/documents/", status_code=303)
|
|
|
|
try:
|
|
changed = False
|
|
|
|
if file_action == "revert_original":
|
|
original_path = document.original_path or document.source_path
|
|
if original_path:
|
|
original_file = Path(original_path)
|
|
if original_file.exists():
|
|
document.current_path = str(original_file)
|
|
document.canonical_filename = original_file.name
|
|
document.sha256_current = _sha256_for_file(original_file)
|
|
db.add(document)
|
|
changed = True
|
|
|
|
elif file_action == "revert_current_version":
|
|
latest_version = (
|
|
db.query(DocumentVersion)
|
|
.filter(DocumentVersion.document_id == document.id)
|
|
.order_by(DocumentVersion.version_number.desc())
|
|
.first()
|
|
)
|
|
if latest_version and latest_version.file_path:
|
|
version_file = Path(latest_version.file_path)
|
|
if version_file.exists():
|
|
document.current_path = str(version_file)
|
|
document.canonical_filename = version_file.name
|
|
document.sha256_current = _sha256_for_file(version_file)
|
|
db.add(document)
|
|
changed = True
|
|
|
|
ocr_mode, ocr_version = _parse_restore_choice(ocr_restore_choice)
|
|
print("PARSED_OCR", ocr_restore_choice, ocr_mode, ocr_version, flush=True)
|
|
if ocr_mode == "original":
|
|
if _restore_ocr_to_original(db, document):
|
|
changed = True
|
|
elif ocr_mode == "version" and ocr_version is not None:
|
|
if _restore_ocr_from_version_number(db, document, ocr_version):
|
|
changed = True
|
|
|
|
extracted_mode, extracted_version = _parse_restore_choice(extracted_restore_choice)
|
|
print("PARSED_EXTRACTED", extracted_restore_choice, extracted_mode, extracted_version, flush=True)
|
|
if extracted_mode == "original":
|
|
if _restore_extracted_to_original(db, document):
|
|
changed = True
|
|
elif extracted_mode == "version" and extracted_version is not None:
|
|
if _restore_extracted_from_version_number(db, document, extracted_version):
|
|
changed = True
|
|
|
|
additional_mode, additional_version = _parse_restore_choice(additional_restore_choice)
|
|
print("PARSED_ADDITIONAL", additional_restore_choice, additional_mode, additional_version, flush=True)
|
|
if additional_mode == "original":
|
|
if _restore_additional_to_original(db, document):
|
|
changed = True
|
|
elif additional_mode == "version" and additional_version is not None:
|
|
if _restore_additional_from_version_number(db, document, additional_version):
|
|
changed = True
|
|
|
|
if line_item_restore_choice == "clear":
|
|
if _clear_line_items(db, document):
|
|
changed = True
|
|
elif line_item_restore_choice.startswith("version:"):
|
|
try:
|
|
target_line_item_version = int(line_item_restore_choice.split(":", 1)[1])
|
|
except ValueError:
|
|
target_line_item_version = None
|
|
if target_line_item_version is not None:
|
|
if _restore_line_items_from_version_number(db, document, target_line_item_version):
|
|
changed = True
|
|
|
|
if changed:
|
|
db.commit()
|
|
else:
|
|
db.rollback()
|
|
|
|
except Exception as e:
|
|
print("source-options failed:", repr(e), flush=True)
|
|
traceback.print_exc()
|
|
db.rollback()
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?error=source_options_failed&tab=source-options",
|
|
status_code=303,
|
|
)
|
|
|
|
return RedirectResponse(
|
|
url=f"/documents/{document.document_id}?tab=source-options",
|
|
status_code=303,
|
|
)
|