document-processor/app/logic/ingest.py

443 lines
13 KiB
Python

from __future__ import annotations
import csv
import hashlib
import io
import mimetypes
import shutil
import subprocess
import tempfile
from difflib import SequenceMatcher
from pathlib import Path
from PIL import Image
from uuid import uuid4
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.core.config import DOCUMENT_ARCHIVE_ROOT, INBOX_ROOT, UPLOAD_ROOT
from app.models.document import Document
from app.models.document_version import DocumentVersion
from app.models.text_version import TextVersion
ALLOWED_EXTENSIONS = {".pdf", ".jpg", ".jpeg", ".png"}
def is_supported_file(path: Path) -> bool:
return path.is_file() and path.suffix.lower() in ALLOWED_EXTENSIONS
def sha256_for_file(path: Path) -> str:
hasher = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
hasher.update(chunk)
return hasher.hexdigest()
def guess_mime_type(path: Path) -> str:
mime_type, _ = mimetypes.guess_type(str(path))
return mime_type or "application/octet-stream"
def build_storage_path(document_id: str, source_path: Path) -> Path:
archive_root = Path(DOCUMENT_ARCHIVE_ROOT)
filename = f"{document_id}{source_path.suffix.lower()}"
return archive_root / filename
def get_next_text_version_number(db: Session, document_id: int) -> int:
max_version = (
db.query(func.max(TextVersion.version_number))
.filter(TextVersion.document_id == document_id)
.scalar()
)
return (max_version or 0) + 1
def get_tesseract_version() -> str | None:
try:
result = subprocess.run(
["tesseract", "--version"],
capture_output=True,
text=True,
check=True,
)
return result.stdout.splitlines()[0].strip()
except Exception:
return None
def get_pdftotext_version() -> str | None:
try:
result = subprocess.run(
["pdftotext", "-v"],
capture_output=True,
text=True,
)
text = (result.stderr or result.stdout).splitlines()
return text[0].strip() if text else None
except Exception:
return None
def extract_pdf_text(path: Path) -> str:
try:
result = subprocess.run(
["pdftotext", str(path), "-"],
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip()
except Exception:
return ""
def _parse_tsv_lines(tsv_text: str, page_number: int, image_width: int, image_height: int) -> dict:
reader = csv.DictReader(io.StringIO(tsv_text), delimiter=" ")
grouped: dict[tuple[int, int, int, int], list[dict]] = {}
for row in reader:
if not row.get("text"):
continue
text = row["text"].strip()
if not text:
continue
try:
level = int(row["level"])
page_num = int(row["page_num"])
block_num = int(row["block_num"])
par_num = int(row["par_num"])
line_num = int(row["line_num"])
left = int(row["left"])
top = int(row["top"])
width = int(row["width"])
height = int(row["height"])
conf = float(row["conf"]) if row["conf"] not in ("-1", "", None) else None
except Exception:
continue
if level != 5:
continue
if page_num != page_number:
continue
key = (page_num, block_num, par_num, line_num)
grouped.setdefault(key, []).append(
{
"text": text,
"left": left,
"top": top,
"width": width,
"height": height,
"conf": conf,
}
)
lines = []
for key, words in grouped.items():
words = sorted(words, key=lambda w: w["left"])
left = min(w["left"] for w in words)
top = min(w["top"] for w in words)
right = max(w["left"] + w["width"] for w in words)
bottom = max(w["top"] + w["height"] for w in words)
line_text = " ".join(w["text"] for w in words).strip()
avg_conf = None
valid_conf = [w["conf"] for w in words if w["conf"] is not None]
if valid_conf:
avg_conf = round(sum(valid_conf) / len(valid_conf), 2)
lines.append(
{
"text": line_text,
"bbox": [left, top, right, bottom],
"confidence": avg_conf,
}
)
lines.sort(key=lambda x: (x["bbox"][1], x["bbox"][0]))
return {
"page": page_number,
"image_width": image_width,
"image_height": image_height,
"lines": lines,
}
def ocr_image_with_layout(path: Path) -> tuple[str, dict]:
with Image.open(path) as img:
image_width, image_height = img.size
txt = subprocess.run(
["tesseract", str(path), "stdout"],
capture_output=True,
text=True,
check=True,
).stdout.strip()
tsv = subprocess.run(
["tesseract", str(path), "stdout", "tsv"],
capture_output=True,
text=True,
check=True,
).stdout
layout = {"pages": [_parse_tsv_lines(tsv, 1, image_width, image_height)]}
return txt, layout
def ocr_pdf_with_layout(path: Path) -> tuple[str, dict]:
with tempfile.TemporaryDirectory() as tmpdir:
output_prefix = Path(tmpdir) / "page"
subprocess.run(
["pdftoppm", "-png", str(path), str(output_prefix)],
capture_output=True,
text=True,
check=True,
)
all_text = []
pages = []
for idx, img in enumerate(sorted(Path(tmpdir).glob("page-*.png")), start=1):
txt, layout = ocr_image_with_layout(img)
if txt:
all_text.append(txt)
if layout.get("pages"):
page_layout = layout["pages"][0]
page_layout["page"] = idx
pages.append(page_layout)
return "\n\n".join(all_text).strip(), {"pages": pages}
def run_ocr_only(path: Path) -> tuple[str, dict | None, str | None, str | None]:
suffix = path.suffix.lower()
tesseract_version = get_tesseract_version()
if suffix == ".pdf":
txt, layout = ocr_pdf_with_layout(path)
return txt.strip(), layout, "tesseract", tesseract_version
if suffix in {".jpg", ".jpeg", ".png"}:
txt, layout = ocr_image_with_layout(path)
return txt.strip(), layout, "tesseract", tesseract_version
return "", None, None, None
def get_raw_text_for_document(path: Path) -> tuple[str, dict | None, str | None, str | None, str | None]:
suffix = path.suffix.lower()
if suffix == ".pdf":
extracted = extract_pdf_text(path)
if len(extracted.strip()) >= 40:
return extracted, None, "pdftotext", get_pdftotext_version(), "initial_ingest"
ocr_text, layout, engine, version = run_ocr_only(path)
return ocr_text, layout, engine, version, "initial_ingest_fallback"
if suffix in {".jpg", ".jpeg", ".png"}:
ocr_text, layout, engine, version = run_ocr_only(path)
return ocr_text, layout, engine, version, "initial_ingest"
return "", None, None, None, None
def compute_quality_score(source_text: str, reviewed_text: str) -> float:
if not source_text and not reviewed_text:
return 100.0
if not source_text:
return 0.0
ratio = SequenceMatcher(None, source_text, reviewed_text).ratio()
return round(ratio * 100, 2)
def archive_document(
db: Session,
source: Path,
source_system: str,
document_type: str = "receipt",
) -> Document:
if not source.exists():
raise FileNotFoundError(f"Source file not found: {source}")
if not is_supported_file(source):
raise ValueError(f"Unsupported file type: {source.suffix}")
document_id = f"doc_{uuid4().hex[:12]}"
current_path = build_storage_path(document_id, source)
current_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, current_path)
file_size = current_path.stat().st_size
mime_type = guess_mime_type(current_path)
sha256_current = sha256_for_file(current_path)
raw_text, layout_json, ocr_engine, ocr_engine_version, rerun_source = get_raw_text_for_document(current_path)
document = Document(
document_id=document_id,
document_type=document_type,
source_path=str(source),
current_path=str(current_path),
original_filename=source.name,
canonical_filename=current_path.name,
mime_type=mime_type,
file_size=file_size,
page_count=1 if source.suffix.lower() == ".pdf" else None,
sha256_current=sha256_current,
storage_status="ingested",
review_status="ocr_complete" if raw_text else "ingested",
)
db.add(document)
db.flush()
version = DocumentVersion(
document_id=document.id,
version_number=1,
version_type="original",
file_path=str(current_path),
sha256=sha256_current,
created_by=source_system,
notes=f"Ingested from {source_system}",
)
db.add(version)
if raw_text:
text_version = TextVersion(
document_id=document.id,
version_number=1,
version_type="raw_ocr",
text_content=raw_text,
created_by="system",
is_current=True,
ocr_engine=ocr_engine,
ocr_engine_version=ocr_engine_version,
rerun_source=rerun_source,
quality_flags=[],
quality_note=None,
layout_json=layout_json,
)
db.add(text_version)
db.commit()
db.refresh(document)
return document
def rerun_ocr_for_document(db: Session, document: Document) -> TextVersion:
if not document.current_path:
raise ValueError("Document has no current_path")
current_file = Path(document.current_path)
if not current_file.exists():
raise FileNotFoundError(f"Current file not found: {current_file}")
raw_text, layout_json, ocr_engine, ocr_engine_version = run_ocr_only(current_file)
if not raw_text:
raise ValueError("OCR produced no text")
existing_raw = (
db.query(TextVersion)
.filter(
TextVersion.document_id == document.id,
TextVersion.version_type == "raw_ocr",
TextVersion.is_current.is_(True),
)
.all()
)
previous_raw_id = None
for tv in existing_raw:
tv.is_current = False
previous_raw_id = tv.id
new_text = TextVersion(
document_id=document.id,
version_number=get_next_text_version_number(db, document.id),
version_type="raw_ocr",
text_content=raw_text,
created_by="rerun_ocr",
is_current=True,
ocr_engine=ocr_engine,
ocr_engine_version=ocr_engine_version,
rerun_source="manual_rerun",
quality_flags=[],
quality_note=None,
derived_from_version_id=previous_raw_id,
layout_json=layout_json,
)
db.add(new_text)
document.review_status = "ocr_complete"
db.commit()
db.refresh(new_text)
return new_text
def ingest_file(db: Session, file_path: str, source_system: str, document_type: str = "receipt") -> Document:
source = Path(file_path).expanduser().resolve()
return archive_document(db=db, source=source, source_system=source_system, document_type=document_type)
def ingest_uploaded_file(
db: Session,
filename: str,
file_bytes: bytes,
source_system: str = "upload_ingest",
document_type: str = "receipt",
) -> Document:
suffix = Path(filename).suffix.lower()
if suffix not in ALLOWED_EXTENSIONS:
raise ValueError(f"Unsupported file type: {suffix}")
upload_root = Path(UPLOAD_ROOT)
upload_root.mkdir(parents=True, exist_ok=True)
staged_name = f"{uuid4().hex[:12]}_{Path(filename).name}"
staged_path = upload_root / staged_name
staged_path.write_bytes(file_bytes)
return archive_document(db=db, source=staged_path, source_system=source_system, document_type=document_type)
def ingest_directory(
db: Session,
directory_path: str,
recursive: bool = True,
source_system: str = "directory_ingest",
document_type: str = "receipt",
) -> list[Document]:
source_dir = Path(directory_path).expanduser().resolve()
if not source_dir.exists() or not source_dir.is_dir():
raise NotADirectoryError(f"Directory not found: {source_dir}")
files = source_dir.rglob("*") if recursive else source_dir.glob("*")
ingested: list[Document] = []
for path in files:
if not is_supported_file(path):
continue
try:
ingested.append(
ingest_file(db=db, file_path=str(path), source_system=source_system, document_type=document_type)
)
except Exception:
continue
return ingested
def ingest_inbox(db: Session) -> list[Document]:
return ingest_directory(
db=db,
directory_path=INBOX_ROOT,
recursive=True,
source_system="inbox_ingest",
document_type="receipt",
)