From c7dab22f169691a16024149731ea8050b13b5189 Mon Sep 17 00:00:00 2001 From: McElwain Date: Mon, 6 Apr 2026 14:51:05 -0500 Subject: [PATCH] feat: improve receipt extraction with reference number and line item pairing --- app/logic/extraction.py | 634 ++++++++++++++++++++++++++-- app/templates/documents/detail.html | 39 +- 2 files changed, 622 insertions(+), 51 deletions(-) diff --git a/app/logic/extraction.py b/app/logic/extraction.py index 7d0b939..615fabc 100644 --- a/app/logic/extraction.py +++ b/app/logic/extraction.py @@ -2,35 +2,65 @@ from __future__ import annotations import json import re +from dataclasses import dataclass from datetime import datetime from decimal import Decimal, InvalidOperation -from sqlalchemy.orm import Session, selectinload +from sqlalchemy.orm import Session from app.models.document import Document from app.models.extracted_field import ExtractedField +from app.models.receipt_line_item import ReceiptLineItem from app.models.text_version import TextVersion -MONEY_RE = re.compile(r"\$?\s*([0-9]+(?:\.[0-9]{2}))") +MONEY_RE = re.compile(r"(? TextVersion | None: @@ -45,10 +75,64 @@ def _get_current_reviewed_text(document: Document) -> TextVersion | None: return None +def _normalize_line(text: str) -> str: + return re.sub(r"\s+", " ", text.strip()).lower() + + def _clean_lines(text: str) -> list[str]: return [line.strip() for line in text.splitlines() if line.strip()] +def _build_lines_from_layout(layout_json: dict | None) -> list[DocumentLine]: + if not layout_json: + return [] + + lines: list[DocumentLine] = [] + idx = 0 + + for page in layout_json.get("pages", []): + page_num = page.get("page") + for line in page.get("lines", []): + text = (line.get("text") or "").strip() + if not text: + continue + + lines.append( + DocumentLine( + page=page_num, + line_index=idx, + text=text, + normalized=_normalize_line(text), + bbox=line.get("bbox"), + confidence=line.get("confidence"), + ) + ) + idx += 1 + + return lines + + +def _build_lines_from_text(text: str) -> list[DocumentLine]: + return [ + DocumentLine( + page=None, + line_index=idx, + text=line, + normalized=_normalize_line(line), + bbox=None, + confidence=None, + ) + for idx, line in enumerate(_clean_lines(text)) + ] + + +def _get_document_lines(text_version: TextVersion) -> list[DocumentLine]: + lines = _build_lines_from_layout(text_version.layout_json) + if lines: + return lines + return _build_lines_from_text(text_version.text_content or "") + + def _parse_date(text: str): for pat in DATE_PATTERNS: m = pat.search(text) @@ -59,6 +143,8 @@ def _parse_date(text: str): try: if pat.pattern.startswith(r"\b(\d{4})"): return datetime.strptime("-".join(groups), "%Y-%m-%d").date() + if len(groups[2]) == 2: + return datetime.strptime("/".join(groups), "%m/%d/%y").date() return datetime.strptime("/".join(groups), "%m/%d/%Y").date() except ValueError: continue @@ -74,19 +160,35 @@ def _parse_time(text: str) -> str | None: def _to_decimal(value: str | None) -> Decimal | None: - if not value: + if value is None: return None try: - return Decimal(value) + return Decimal(str(value).strip()) except (InvalidOperation, TypeError): return None -def _find_amount(pattern: re.Pattern[str], text: str) -> Decimal | None: - m = pattern.search(text) - if not m: +def _extract_line_amount(line: DocumentLine) -> Decimal | None: + matches = MONEY_RE.findall(line.text.replace(",", "")) + if not matches: return None - return _to_decimal(m.group(1)) + return _to_decimal(matches[-1]) + + +def _money_match_count(text: str) -> int: + return len(MONEY_RE.findall(text.replace(",", ""))) + + +def _source_span(line: DocumentLine | None) -> dict | None: + if line is None: + return None + return { + "page": line.page, + "line_index": line.line_index, + "text": line.text, + "bbox": line.bbox, + "confidence": line.confidence, + } def _clean_merchant_name(line: str) -> str: @@ -104,21 +206,55 @@ def _clean_merchant_name(line: str) -> str: return cleaned -def _guess_merchant(lines: list[str]) -> str | None: +def _looks_like_address(line: str) -> bool: + return bool(ADDRESS_HINT_RE.search(line) or (any(ch.isdigit() for ch in line) and "," in line)) + + +def _looks_like_phone(line: str) -> bool: + return bool(PHONE_RE.search(line)) + + +def _looks_like_date_line(line: str) -> bool: + return any(p.search(line) for p in DATE_PATTERNS) + + +def _is_price_only_line(line: DocumentLine) -> bool: + text = line.text.strip().replace(",", "") + if not text: + return False + if _money_match_count(text) != 1: + return False + stripped = text.replace("$", "").strip() + return bool(re.fullmatch(r"[0-9]+\.[0-9]{2}", stripped)) + + +def _guess_merchant(lines: list[DocumentLine]) -> tuple[str | None, DocumentLine | None]: for line in lines[:5]: - if len(line) >= 3 and not any(ch.isdigit() for ch in line[:8]): - return _clean_merchant_name(line) - return _clean_merchant_name(lines[0]) if lines else None + text = line.text.strip() + if len(text) < 3: + continue + if _looks_like_phone(text): + continue + if _looks_like_address(text): + continue + if _looks_like_date_line(text): + continue + return _clean_merchant_name(text), line + + if lines: + return _clean_merchant_name(lines[0].text), lines[0] + return None, None -def _guess_location(lines: list[str]) -> str | None: +def _guess_location(lines: list[DocumentLine]) -> tuple[str | None, DocumentLine | None]: for line in lines[1:6]: - if any(ch.isdigit() for ch in line) or "," in line or "(" in line: - return line - return None + text = line.text + if _looks_like_address(text) or "," in text or "(" in text: + return text, line + return None, None -def _extract_extra(lines: list[str], text: str) -> dict: +def _extract_extra(lines: list[DocumentLine], text: str) -> dict: extra: dict = {} m = CARD_LAST4_RE.search(text) @@ -130,48 +266,463 @@ def _extract_extra(lines: list[str], text: str) -> dict: extra["store_number"] = m.group(1) cashier = None + cashier_span = None for line in lines: - if re.search(r"\bcashier\b", line, re.IGNORECASE): - cashier = line + if re.search(r"\bcashier\b", line.text, re.IGNORECASE): + cashier = line.text + cashier_span = _source_span(line) break + if cashier: extra["cashier"] = cashier + extra["cashier_source"] = cashier_span return extra +def _score_total_line(line: DocumentLine, total_lines: int) -> float: + score = 0.0 + text = line.normalized + amount = _extract_line_amount(line) + + if "subtotal" in text or "sub total" in text or "sub-total" in text: + score -= 8.0 + if "tax" in text: + score -= 5.0 + if "tip" in text: + score -= 2.0 + + if "grand total" in text: + score += 8.0 + elif re.search(r"\btotal\b", text): + score += 6.0 + + if amount is not None: + score += 2.0 + + if total_lines > 0: + score += (line.line_index / max(total_lines, 1)) * 2.0 + + return score + + +def _score_subtotal_line(line: DocumentLine) -> float: + score = 0.0 + text = line.normalized + amount = _extract_line_amount(line) + + if "subtotal" in text or "sub total" in text or "sub-total" in text: + score += 8.0 + elif re.search(r"\btotal\b", text): + score -= 3.0 + + if "tax" in text: + score -= 3.0 + + if amount is not None: + score += 2.0 + + return score + + +def _score_tax_line(line: DocumentLine) -> float: + score = 0.0 + text = line.normalized + amount = _extract_line_amount(line) + + if "sales tax" in text: + score += 8.0 + elif re.search(r"\btax\b", text): + score += 7.0 + elif "vat" in text or "gst" in text: + score += 6.0 + + if "total" in text and "subtotal" not in text and "sub total" not in text and "sub-total" not in text: + score -= 2.0 + + if amount is not None: + score += 2.0 + + return score + + +def _pick_best_line(lines: list[DocumentLine], scorer) -> DocumentLine | None: + if not lines: + return None + + scored = [(scorer(line), line) for line in lines] + scored.sort(key=lambda item: item[0], reverse=True) + best_score, best_line = scored[0] + + if best_score <= 0: + return None + return best_line + + +def _extract_total(lines: list[DocumentLine]) -> tuple[Decimal | None, DocumentLine | None]: + best = _pick_best_line(lines, lambda line: _score_total_line(line, len(lines))) + if not best: + return None, None + amount = _extract_line_amount(best) + if amount is not None: + return amount, best + + next_idx = best.line_index + 1 + next_line = next((line for line in lines if line.line_index == next_idx), None) + if next_line: + return _extract_line_amount(next_line), best + return None, best + + +def _extract_subtotal(lines: list[DocumentLine]) -> tuple[Decimal | None, DocumentLine | None]: + best = _pick_best_line(lines, _score_subtotal_line) + if not best: + return None, None + amount = _extract_line_amount(best) + if amount is not None: + return amount, best + + next_idx = best.line_index + 1 + next_line = next((line for line in lines if line.line_index == next_idx), None) + if next_line: + return _extract_line_amount(next_line), best + return None, best + + +def _extract_tax(lines: list[DocumentLine]) -> tuple[Decimal | None, DocumentLine | None]: + best = _pick_best_line(lines, _score_tax_line) + if not best: + return None, None + amount = _extract_line_amount(best) + if amount is not None: + return amount, best + + next_idx = best.line_index + 1 + next_line = next((line for line in lines if line.line_index == next_idx), None) + if next_line: + return _extract_line_amount(next_line), best + return None, best + + +def _is_non_item_line(normalized: str) -> bool: + blocked_terms = [ + "subtotal", + "sub total", + "total", + "tax", + "service fee", + "tip", + "pay this amount", + "recommended gratuity", + "gratuity", + "cashier", + "server", + "guest", + "table #", + "table:", + "date:", + "time:", + "order #", + "order:", + "invoice #", + "invoice:", + "reference #", + "confirmation #", + "receipt", + "visa", + "mastercard", + "discover", + "amex", + "cash", + "debit", + "thank you", + "regresen pronto", + "gracias", + ] + if any(term in normalized for term in blocked_terms): + return True + if "% =" in normalized: + return True + return False + + +def _normalize_item_description(text: str) -> str: + cleaned = re.sub(r"\s+", " ", text.strip()) + cleaned = cleaned.strip("-: ") + return cleaned.title() + + +def _infer_item_category(text: str) -> str | None: + normalized = text.lower() + if "margarita" in normalized: + return "cocktail" + if "beer" in normalized: + return "beer" + if "wine" in normalized: + return "wine" + if any(word in normalized for word in ["enchilada", "steak", "taco", "burrito", "quesadilla"]): + return "food" + if any(word in normalized for word in ["add ", "extra ", "side ", "sauce", "cheese", "espinaca"]): + return "modifier" + return None + + +def _candidate_item_description_line(line: DocumentLine) -> bool: + text = line.text.strip() + normalized = line.normalized + + if len(text) < 3: + return False + if _is_non_item_line(normalized): + return False + if _looks_like_address(text) or _looks_like_phone(text) or _looks_like_date_line(text): + return False + if _money_match_count(text) > 1: + return False + if _is_price_only_line(line): + return False + return True + + +def _extract_receipt_line_items(lines: list[DocumentLine]) -> list[dict]: + items: list[dict] = [] + used_line_indexes: set[int] = set() + + protected_amount_indexes: set[int] = set() + for label in ["subtotal", "tax", "service fee", "total", "pay this amount"]: + for idx, line in enumerate(lines): + if label in line.normalized: + protected_amount_indexes.add(line.line_index) + if idx + 1 < len(lines): + protected_amount_indexes.add(lines[idx + 1].line_index) + + for idx, line in enumerate(lines): + if line.line_index in used_line_indexes: + continue + if line.line_index in protected_amount_indexes: + continue + + normalized = line.normalized + text = line.text.strip() + + if len(text) < 3: + continue + if _is_non_item_line(normalized): + continue + if _looks_like_address(text) or _looks_like_phone(text) or _looks_like_date_line(text): + continue + if _money_match_count(text) > 1: + continue + + same_line_match = ITEM_LINE_RE.match(text.replace(",", "")) + if same_line_match: + description_part = same_line_match.group(1).strip() + price_part = same_line_match.group(2).strip() + + if description_part and description_part not in {"$"}: + quantity = None + description = description_part + + qty_match = QTY_PREFIX_RE.match(description_part) + if qty_match: + quantity = _to_decimal(qty_match.group(1)) + description = qty_match.group(2).strip() + + line_total = _to_decimal(price_part) + if description and line_total is not None and description.lower() not in {"total", "subtotal", "tax"}: + confidence = Decimal("85.00") + if quantity is not None: + confidence = Decimal("90.00") + + items.append( + { + "line_index": line.line_index, + "raw_description": description, + "normalized_description": _normalize_item_description(description), + "quantity": str(quantity) if quantity is not None else "", + "unit_price": "", + "line_total": str(line_total), + "item_category": _infer_item_category(description) or "", + "confidence": str(confidence), + "extra_json": { + "page": line.page, + "bbox": line.bbox, + "source_text": line.text, + "source_confidence": line.confidence, + "match_type": "same_line", + }, + } + ) + used_line_indexes.add(line.line_index) + continue + + if not _candidate_item_description_line(line): + continue + + next_line = lines[idx + 1] if idx + 1 < len(lines) else None + if next_line and next_line.line_index not in used_line_indexes and next_line.line_index not in protected_amount_indexes: + if _is_price_only_line(next_line) and not _is_non_item_line(next_line.normalized): + description = text + quantity = None + + qty_match = QTY_PREFIX_RE.match(description) + if qty_match: + quantity = _to_decimal(qty_match.group(1)) + description = qty_match.group(2).strip() + + line_total = _extract_line_amount(next_line) + if description and line_total is not None: + confidence = Decimal("88.00") + if quantity is not None: + confidence = Decimal("92.00") + + items.append( + { + "line_index": line.line_index, + "raw_description": description, + "normalized_description": _normalize_item_description(description), + "quantity": str(quantity) if quantity is not None else "", + "unit_price": "", + "line_total": str(line_total), + "item_category": _infer_item_category(description) or "", + "confidence": str(confidence), + "extra_json": { + "page": line.page, + "bbox": line.bbox, + "price_line_index": next_line.line_index, + "price_bbox": next_line.bbox, + "price_text": next_line.text, + "source_text": line.text, + "source_confidence": line.confidence, + "match_type": "paired_next_line", + }, + } + ) + used_line_indexes.add(line.line_index) + used_line_indexes.add(next_line.line_index) + continue + + prev_line = lines[idx - 1] if idx - 1 >= 0 else None + if ( + prev_line + and prev_line.line_index not in used_line_indexes + and prev_line.line_index not in protected_amount_indexes + and _is_price_only_line(prev_line) + and not _is_non_item_line(prev_line.normalized) + ): + description = text + quantity = None + + qty_match = QTY_PREFIX_RE.match(description) + if qty_match: + quantity = _to_decimal(qty_match.group(1)) + description = qty_match.group(2).strip() + + line_total = _extract_line_amount(prev_line) + if description and line_total is not None: + confidence = Decimal("89.00") + if quantity is not None: + confidence = Decimal("93.00") + + items.append( + { + "line_index": line.line_index, + "raw_description": description, + "normalized_description": _normalize_item_description(description), + "quantity": str(quantity) if quantity is not None else "", + "unit_price": "", + "line_total": str(line_total), + "item_category": _infer_item_category(description) or "", + "confidence": str(confidence), + "extra_json": { + "page": line.page, + "bbox": line.bbox, + "price_line_index": prev_line.line_index, + "price_bbox": prev_line.bbox, + "price_text": prev_line.text, + "source_text": line.text, + "source_confidence": line.confidence, + "match_type": "paired_prev_line", + }, + } + ) + used_line_indexes.add(line.line_index) + used_line_indexes.add(prev_line.line_index) + continue + + return items + + +def _replace_receipt_line_items(db: Session, document: Document, items: list[dict]) -> None: + existing_items = list(getattr(document, "receipt_line_items", []) or []) + for item in existing_items: + db.delete(item) + + for item in items: + db.add( + ReceiptLineItem( + document_id=document.id, + line_index=item.get("line_index"), + raw_description=item.get("raw_description") or "", + normalized_description=item.get("normalized_description") or None, + quantity=_to_decimal(item.get("quantity")), + unit_price=_to_decimal(item.get("unit_price")), + line_total=_to_decimal(item.get("line_total")), + item_category=item.get("item_category") or None, + confidence=_to_decimal(item.get("confidence")), + extra_json=item.get("extra_json") or {}, + ) + ) + + def auto_extract_from_document(db: Session, document: Document) -> dict: text_version = _get_current_reviewed_text(document) if text_version is None: return {} text = text_version.text_content or "" - lines = _clean_lines(text) + lines = _get_document_lines(text_version) - merchant_raw = _guess_merchant(lines) + merchant_raw, merchant_line = _guess_merchant(lines) merchant_normalized = merchant_raw transaction_date = _parse_date(text) transaction_time = _parse_time(text) - subtotal = _find_amount(SUBTOTAL_RE, text) - tax = _find_amount(TAX_RE, text) - total = _find_amount(TOTAL_RE, text) + subtotal, subtotal_line = _extract_subtotal(lines) + tax, tax_line = _extract_tax(lines) + total, total_line = _extract_total(lines) payment_method = None m = PAYMENT_METHOD_RE.search(text) if m: payment_method = m.group(1).upper() - receipt_number = None - m = RECEIPT_NUM_RE.search(text) + reference_number = None + m = REFERENCE_NUM_RE.search(text) if m: - receipt_number = m.group(1) + reference_number = m.group(1) - location = _guess_location(lines) + location, location_line = _guess_location(lines) counterparty = merchant_raw currency = "USD" + line_items = _extract_receipt_line_items(lines) + extra = _extract_extra(lines, text) + extra["source_spans"] = { + "merchant_raw": _source_span(merchant_line), + "location": _source_span(location_line), + "subtotal": _source_span(subtotal_line), + "tax": _source_span(tax_line), + "total": _source_span(total_line), + "reference_number": {"value": reference_number} if reference_number else None, + } + extra["analysis"] = { + "line_count": len(lines), + "has_layout": bool(text_version.layout_json), + "source_version_type": text_version.version_type, + } + extra["line_items"] = line_items return { "merchant_raw": merchant_raw or "", @@ -183,7 +734,7 @@ def auto_extract_from_document(db: Session, document: Document) -> dict: "total": str(total) if total is not None else "", "currency": currency or "", "payment_method": payment_method or "", - "receipt_number": receipt_number or "", + "receipt_number": reference_number or "", "location": location or "", "counterparty": counterparty or "", "extra_json": json.dumps(extra, indent=2, sort_keys=True) if extra else "{}", @@ -234,10 +785,19 @@ def save_extracted_fields( current.location = location or None current.counterparty = counterparty or None + parsed_extra: dict try: - current.extra_json = json.loads(extra_json) if extra_json.strip() else {} + parsed_extra = json.loads(extra_json) if extra_json.strip() else {} except json.JSONDecodeError: - current.extra_json = {"raw_text": extra_json} + parsed_extra = {"raw_text": extra_json} + + current.extra_json = parsed_extra + + line_items = parsed_extra.get("line_items", []) + if isinstance(line_items, list): + _replace_receipt_line_items(db, document, line_items) + else: + _replace_receipt_line_items(db, document, []) db.commit() db.refresh(current) diff --git a/app/templates/documents/detail.html b/app/templates/documents/detail.html index b0c5e2a..7daea7b 100644 --- a/app/templates/documents/detail.html +++ b/app/templates/documents/detail.html @@ -107,13 +107,13 @@
- - - - + + + +
-
+

Reviewed OCR

{% if reviewed_ocr %}

Current reviewed version saved at {{ reviewed_ocr.created_at }} — v{{ reviewed_ocr.version_number }}

@@ -162,7 +162,7 @@
-
+

Extracted fields

{% if current_extracted %} @@ -173,6 +173,7 @@
+
@@ -189,7 +190,7 @@
-
+
@@ -201,7 +202,7 @@
-
+

Document versions

{% if document.versions %}
@@ -233,7 +234,7 @@ {% endif %}
-
+

Raw OCR

{% if raw_ocr %}
@@ -292,14 +293,24 @@ const tabButtons = document.querySelectorAll("[data-tab]"); const tabPanels = document.querySelectorAll("[data-panel]"); + + function activateTab(target) { + tabButtons.forEach(function (b) { + b.classList.toggle("active", b.getAttribute("data-tab") === target); + }); + tabPanels.forEach(function (p) { + p.classList.toggle("active", p.getAttribute("data-panel") === target); + }); + } + tabButtons.forEach(function (btn) { btn.addEventListener("click", function () { const target = btn.getAttribute("data-tab"); - tabButtons.forEach(function (b) { b.classList.remove("active"); }); - tabPanels.forEach(function (p) { p.classList.remove("active"); }); - btn.classList.add("active"); - const panel = document.querySelector('[data-panel="' + target + '"]'); - if (panel) panel.classList.add("active"); + activateTab(target); + + const url = new URL(window.location.href); + url.searchParams.set("tab", target); + window.history.replaceState({}, "", url.toString()); }); });