from __future__ import annotations import csv import hashlib import io import mimetypes import shutil import subprocess import tempfile from difflib import SequenceMatcher from pathlib import Path from PIL import Image from uuid import uuid4 from sqlalchemy import func from sqlalchemy.orm import Session from app.core.config import DOCUMENT_ARCHIVE_ROOT, INBOX_ROOT, UPLOAD_ROOT from app.models.document import Document from app.models.document_version import DocumentVersion from app.models.text_version import TextVersion ALLOWED_EXTENSIONS = {".pdf", ".jpg", ".jpeg", ".png"} def is_supported_file(path: Path) -> bool: return path.is_file() and path.suffix.lower() in ALLOWED_EXTENSIONS def sha256_for_file(path: Path) -> str: hasher = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): hasher.update(chunk) return hasher.hexdigest() def guess_mime_type(path: Path) -> str: mime_type, _ = mimetypes.guess_type(str(path)) return mime_type or "application/octet-stream" def build_storage_path(document_id: str, source_path: Path) -> Path: archive_root = Path(DOCUMENT_ARCHIVE_ROOT) filename = f"{document_id}{source_path.suffix.lower()}" return archive_root / filename def get_next_text_version_number(db: Session, document_id: int) -> int: max_version = ( db.query(func.max(TextVersion.version_number)) .filter(TextVersion.document_id == document_id) .scalar() ) return (max_version or 0) + 1 def get_tesseract_version() -> str | None: try: result = subprocess.run( ["tesseract", "--version"], capture_output=True, text=True, check=True, ) return result.stdout.splitlines()[0].strip() except Exception: return None def get_pdftotext_version() -> str | None: try: result = subprocess.run( ["pdftotext", "-v"], capture_output=True, text=True, ) text = (result.stderr or result.stdout).splitlines() return text[0].strip() if text else None except Exception: return None def extract_pdf_text(path: Path) -> str: try: result = subprocess.run( ["pdftotext", str(path), "-"], capture_output=True, text=True, check=True, ) return result.stdout.strip() except Exception: return "" def _parse_tsv_lines(tsv_text: str, page_number: int, image_width: int, image_height: int) -> dict: reader = csv.DictReader(io.StringIO(tsv_text), delimiter=" ") grouped: dict[tuple[int, int, int, int], list[dict]] = {} for row in reader: if not row.get("text"): continue text = row["text"].strip() if not text: continue try: level = int(row["level"]) page_num = int(row["page_num"]) block_num = int(row["block_num"]) par_num = int(row["par_num"]) line_num = int(row["line_num"]) left = int(row["left"]) top = int(row["top"]) width = int(row["width"]) height = int(row["height"]) conf = float(row["conf"]) if row["conf"] not in ("-1", "", None) else None except Exception: continue if level != 5: continue if page_num != page_number: continue key = (page_num, block_num, par_num, line_num) grouped.setdefault(key, []).append( { "text": text, "left": left, "top": top, "width": width, "height": height, "conf": conf, } ) lines = [] for key, words in grouped.items(): words = sorted(words, key=lambda w: w["left"]) left = min(w["left"] for w in words) top = min(w["top"] for w in words) right = max(w["left"] + w["width"] for w in words) bottom = max(w["top"] + w["height"] for w in words) line_text = " ".join(w["text"] for w in words).strip() avg_conf = None valid_conf = [w["conf"] for w in words if w["conf"] is not None] if valid_conf: avg_conf = round(sum(valid_conf) / len(valid_conf), 2) lines.append( { "text": line_text, "bbox": [left, top, right, bottom], "confidence": avg_conf, } ) lines.sort(key=lambda x: (x["bbox"][1], x["bbox"][0])) return { "page": page_number, "image_width": image_width, "image_height": image_height, "lines": lines, } def ocr_image_with_layout(path: Path) -> tuple[str, dict]: with Image.open(path) as img: image_width, image_height = img.size txt = subprocess.run( ["tesseract", str(path), "stdout"], capture_output=True, text=True, check=True, ).stdout.strip() tsv = subprocess.run( ["tesseract", str(path), "stdout", "tsv"], capture_output=True, text=True, check=True, ).stdout layout = {"pages": [_parse_tsv_lines(tsv, 1, image_width, image_height)]} return txt, layout def ocr_pdf_with_layout(path: Path) -> tuple[str, dict]: with tempfile.TemporaryDirectory() as tmpdir: output_prefix = Path(tmpdir) / "page" subprocess.run( ["pdftoppm", "-png", str(path), str(output_prefix)], capture_output=True, text=True, check=True, ) all_text = [] pages = [] for idx, img in enumerate(sorted(Path(tmpdir).glob("page-*.png")), start=1): txt, layout = ocr_image_with_layout(img) if txt: all_text.append(txt) if layout.get("pages"): page_layout = layout["pages"][0] page_layout["page"] = idx pages.append(page_layout) return "\n\n".join(all_text).strip(), {"pages": pages} def run_ocr_only(path: Path) -> tuple[str, dict | None, str | None, str | None]: suffix = path.suffix.lower() tesseract_version = get_tesseract_version() if suffix == ".pdf": txt, layout = ocr_pdf_with_layout(path) return txt.strip(), layout, "tesseract", tesseract_version if suffix in {".jpg", ".jpeg", ".png"}: txt, layout = ocr_image_with_layout(path) return txt.strip(), layout, "tesseract", tesseract_version return "", None, None, None def get_raw_text_for_document(path: Path) -> tuple[str, dict | None, str | None, str | None, str | None]: suffix = path.suffix.lower() if suffix == ".pdf": extracted = extract_pdf_text(path) if len(extracted.strip()) >= 40: return extracted, None, "pdftotext", get_pdftotext_version(), "initial_ingest" ocr_text, layout, engine, version = run_ocr_only(path) return ocr_text, layout, engine, version, "initial_ingest_fallback" if suffix in {".jpg", ".jpeg", ".png"}: ocr_text, layout, engine, version = run_ocr_only(path) return ocr_text, layout, engine, version, "initial_ingest" return "", None, None, None, None def compute_quality_score(source_text: str, reviewed_text: str) -> float: if not source_text and not reviewed_text: return 100.0 if not source_text: return 0.0 ratio = SequenceMatcher(None, source_text, reviewed_text).ratio() return round(ratio * 100, 2) def archive_document( db: Session, source: Path, source_system: str, document_type: str = "receipt", ) -> Document: if not source.exists(): raise FileNotFoundError(f"Source file not found: {source}") if not is_supported_file(source): raise ValueError(f"Unsupported file type: {source.suffix}") document_id = f"doc_{uuid4().hex[:12]}" current_path = build_storage_path(document_id, source) current_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(source, current_path) file_size = current_path.stat().st_size mime_type = guess_mime_type(current_path) sha256_current = sha256_for_file(current_path) raw_text, layout_json, ocr_engine, ocr_engine_version, rerun_source = get_raw_text_for_document(current_path) document = Document( document_id=document_id, document_type=document_type, source_path=str(source), current_path=str(current_path), original_filename=source.name, canonical_filename=current_path.name, mime_type=mime_type, file_size=file_size, page_count=1 if source.suffix.lower() == ".pdf" else None, sha256_current=sha256_current, storage_status="ingested", review_status="ocr_complete" if raw_text else "ingested", ) db.add(document) db.flush() version = DocumentVersion( document_id=document.id, version_number=1, version_type="original", file_path=str(current_path), sha256=sha256_current, created_by=source_system, notes=f"Ingested from {source_system}", ) db.add(version) if raw_text: text_version = TextVersion( document_id=document.id, version_number=1, version_type="raw_ocr", text_content=raw_text, created_by="system", is_current=True, ocr_engine=ocr_engine, ocr_engine_version=ocr_engine_version, rerun_source=rerun_source, quality_flags=[], quality_note=None, layout_json=layout_json, ) db.add(text_version) db.commit() db.refresh(document) return document def rerun_ocr_for_document(db: Session, document: Document) -> TextVersion: if not document.current_path: raise ValueError("Document has no current_path") current_file = Path(document.current_path) if not current_file.exists(): raise FileNotFoundError(f"Current file not found: {current_file}") raw_text, layout_json, ocr_engine, ocr_engine_version = run_ocr_only(current_file) if not raw_text: raise ValueError("OCR produced no text") existing_raw = ( db.query(TextVersion) .filter( TextVersion.document_id == document.id, TextVersion.version_type == "raw_ocr", TextVersion.is_current.is_(True), ) .all() ) previous_raw_id = None for tv in existing_raw: tv.is_current = False previous_raw_id = tv.id new_text = TextVersion( document_id=document.id, version_number=get_next_text_version_number(db, document.id), version_type="raw_ocr", text_content=raw_text, created_by="rerun_ocr", is_current=True, ocr_engine=ocr_engine, ocr_engine_version=ocr_engine_version, rerun_source="manual_rerun", quality_flags=[], quality_note=None, derived_from_version_id=previous_raw_id, layout_json=layout_json, ) db.add(new_text) document.review_status = "ocr_complete" db.commit() db.refresh(new_text) return new_text def ingest_file(db: Session, file_path: str, source_system: str, document_type: str = "receipt") -> Document: source = Path(file_path).expanduser().resolve() return archive_document(db=db, source=source, source_system=source_system, document_type=document_type) def ingest_uploaded_file( db: Session, filename: str, file_bytes: bytes, source_system: str = "upload_ingest", document_type: str = "receipt", ) -> Document: suffix = Path(filename).suffix.lower() if suffix not in ALLOWED_EXTENSIONS: raise ValueError(f"Unsupported file type: {suffix}") upload_root = Path(UPLOAD_ROOT) upload_root.mkdir(parents=True, exist_ok=True) staged_name = f"{uuid4().hex[:12]}_{Path(filename).name}" staged_path = upload_root / staged_name staged_path.write_bytes(file_bytes) return archive_document(db=db, source=staged_path, source_system=source_system, document_type=document_type) def ingest_directory( db: Session, directory_path: str, recursive: bool = True, source_system: str = "directory_ingest", document_type: str = "receipt", ) -> list[Document]: source_dir = Path(directory_path).expanduser().resolve() if not source_dir.exists() or not source_dir.is_dir(): raise NotADirectoryError(f"Directory not found: {source_dir}") files = source_dir.rglob("*") if recursive else source_dir.glob("*") ingested: list[Document] = [] for path in files: if not is_supported_file(path): continue try: ingested.append( ingest_file(db=db, file_path=str(path), source_system=source_system, document_type=document_type) ) except Exception: continue return ingested def ingest_inbox(db: Session) -> list[Document]: return ingest_directory( db=db, directory_path=INBOX_ROOT, recursive=True, source_system="inbox_ingest", document_type="receipt", )