486 lines
14 KiB
Python
486 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import hashlib
|
|
import io
|
|
import mimetypes
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from difflib import SequenceMatcher
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
|
|
from PIL import Image
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import DOCUMENT_ARCHIVE_ROOT, INBOX_ROOT, UPLOAD_ROOT
|
|
from app.models.document import Document
|
|
from app.models.document_version import DocumentVersion
|
|
from app.models.text_version import TextVersion
|
|
|
|
|
|
ALLOWED_EXTENSIONS = {".pdf", ".jpg", ".jpeg", ".png"}
|
|
|
|
|
|
def is_supported_file(path: Path) -> bool:
|
|
return path.is_file() and path.suffix.lower() in ALLOWED_EXTENSIONS
|
|
|
|
|
|
def sha256_for_file(path: Path) -> str:
|
|
hasher = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def guess_mime_type(path: Path) -> str:
|
|
mime_type, _ = mimetypes.guess_type(str(path))
|
|
return mime_type or "application/octet-stream"
|
|
|
|
|
|
def build_storage_path(document_id: str, source_path: Path) -> Path:
|
|
archive_root = Path(DOCUMENT_ARCHIVE_ROOT)
|
|
filename = f"{document_id}{source_path.suffix.lower()}"
|
|
return archive_root / filename
|
|
|
|
|
|
def get_next_text_version_number(db: Session, document_id: int) -> int:
|
|
max_version = (
|
|
db.query(func.max(TextVersion.version_number))
|
|
.filter(TextVersion.document_id == document_id)
|
|
.scalar()
|
|
)
|
|
return (max_version or 0) + 1
|
|
|
|
|
|
def get_tesseract_version() -> str | None:
|
|
try:
|
|
result = subprocess.run(
|
|
["tesseract", "--version"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return result.stdout.splitlines()[0].strip()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_pdftotext_version() -> str | None:
|
|
try:
|
|
result = subprocess.run(
|
|
["pdftotext", "-v"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
text = (result.stderr or result.stdout).splitlines()
|
|
return text[0].strip() if text else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def extract_pdf_text(path: Path) -> str:
|
|
try:
|
|
result = subprocess.run(
|
|
["pdftotext", str(path), "-"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _parse_tsv_lines(tsv_text: str, page_number: int, image_width: int, image_height: int) -> dict:
|
|
reader = csv.DictReader(io.StringIO(tsv_text), delimiter="\t")
|
|
grouped: dict[tuple[int, int, int, int], list[dict]] = {}
|
|
|
|
for row in reader:
|
|
if not row.get("text"):
|
|
continue
|
|
|
|
text = row["text"].strip()
|
|
if not text:
|
|
continue
|
|
|
|
try:
|
|
level = int(row["level"])
|
|
page_num = int(row["page_num"])
|
|
block_num = int(row["block_num"])
|
|
par_num = int(row["par_num"])
|
|
line_num = int(row["line_num"])
|
|
left = int(row["left"])
|
|
top = int(row["top"])
|
|
width = int(row["width"])
|
|
height = int(row["height"])
|
|
conf = float(row["conf"]) if row["conf"] not in ("-1", "", None) else None
|
|
except Exception:
|
|
continue
|
|
|
|
if level != 5:
|
|
continue
|
|
if page_num != page_number:
|
|
continue
|
|
|
|
key = (page_num, block_num, par_num, line_num)
|
|
grouped.setdefault(key, []).append(
|
|
{
|
|
"text": text,
|
|
"left": left,
|
|
"top": top,
|
|
"width": width,
|
|
"height": height,
|
|
"conf": conf,
|
|
}
|
|
)
|
|
|
|
lines = []
|
|
for _, words in grouped.items():
|
|
words = sorted(words, key=lambda w: w["left"])
|
|
left = min(w["left"] for w in words)
|
|
top = min(w["top"] for w in words)
|
|
right = max(w["left"] + w["width"] for w in words)
|
|
bottom = max(w["top"] + w["height"] for w in words)
|
|
line_text = " ".join(w["text"] for w in words).strip()
|
|
|
|
avg_conf = None
|
|
valid_conf = [w["conf"] for w in words if w["conf"] is not None]
|
|
if valid_conf:
|
|
avg_conf = round(sum(valid_conf) / len(valid_conf), 2)
|
|
|
|
lines.append(
|
|
{
|
|
"text": line_text,
|
|
"bbox": [left, top, right, bottom],
|
|
"confidence": avg_conf,
|
|
}
|
|
)
|
|
|
|
lines.sort(key=lambda x: (x["bbox"][1], x["bbox"][0]))
|
|
return {
|
|
"page": page_number,
|
|
"image_width": image_width,
|
|
"image_height": image_height,
|
|
"lines": lines,
|
|
}
|
|
|
|
|
|
def build_synthetic_layout_from_text(text: str) -> dict:
|
|
lines = []
|
|
line_index = 0
|
|
|
|
for raw_line in text.splitlines():
|
|
clean = raw_line.strip()
|
|
if not clean:
|
|
continue
|
|
|
|
lines.append(
|
|
{
|
|
"text": clean,
|
|
"bbox": None,
|
|
"confidence": None,
|
|
"synthetic": True,
|
|
"line_index": line_index,
|
|
}
|
|
)
|
|
line_index += 1
|
|
|
|
return {
|
|
"pages": [
|
|
{
|
|
"page": 1,
|
|
"image_width": None,
|
|
"image_height": None,
|
|
"synthetic": True,
|
|
"lines": lines,
|
|
}
|
|
]
|
|
}
|
|
|
|
|
|
def ocr_image_with_layout(path: Path) -> tuple[str, dict]:
|
|
with Image.open(path) as img:
|
|
image_width, image_height = img.size
|
|
|
|
txt = subprocess.run(
|
|
["tesseract", str(path), "stdout"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
).stdout.strip()
|
|
|
|
tsv = subprocess.run(
|
|
["tesseract", str(path), "stdout", "tsv"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
).stdout
|
|
|
|
layout = {"pages": [_parse_tsv_lines(tsv, 1, image_width, image_height)]}
|
|
return txt, layout
|
|
|
|
|
|
def ocr_pdf_with_layout(path: Path) -> tuple[str, dict]:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
output_prefix = Path(tmpdir) / "page"
|
|
subprocess.run(
|
|
["pdftoppm", "-png", str(path), str(output_prefix)],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
|
|
all_text = []
|
|
pages = []
|
|
|
|
for idx, img in enumerate(sorted(Path(tmpdir).glob("page-*.png")), start=1):
|
|
txt, layout = ocr_image_with_layout(img)
|
|
if txt:
|
|
all_text.append(txt)
|
|
if layout.get("pages"):
|
|
page_layout = layout["pages"][0]
|
|
page_layout["page"] = idx
|
|
pages.append(page_layout)
|
|
|
|
return "\n\n".join(all_text).strip(), {"pages": pages}
|
|
|
|
|
|
def run_ocr_only(path: Path) -> tuple[str, dict | None, str | None, str | None]:
|
|
suffix = path.suffix.lower()
|
|
tesseract_version = get_tesseract_version()
|
|
|
|
if suffix == ".pdf":
|
|
txt, layout = ocr_pdf_with_layout(path)
|
|
return txt.strip(), layout, "tesseract", tesseract_version
|
|
|
|
if suffix in {".jpg", ".jpeg", ".png"}:
|
|
txt, layout = ocr_image_with_layout(path)
|
|
return txt.strip(), layout, "tesseract", tesseract_version
|
|
|
|
return "", None, None, None
|
|
|
|
|
|
def get_raw_text_for_document(path: Path) -> tuple[str, dict | None, str | None, str | None, str | None]:
|
|
suffix = path.suffix.lower()
|
|
|
|
if suffix == ".pdf":
|
|
extracted = extract_pdf_text(path)
|
|
if len(extracted.strip()) >= 40:
|
|
synthetic_layout = build_synthetic_layout_from_text(extracted)
|
|
return extracted, synthetic_layout, "pdftotext", get_pdftotext_version(), "initial_ingest"
|
|
|
|
ocr_text, layout, engine, version = run_ocr_only(path)
|
|
return ocr_text, layout, engine, version, "initial_ingest_fallback"
|
|
|
|
if suffix in {".jpg", ".jpeg", ".png"}:
|
|
ocr_text, layout, engine, version = run_ocr_only(path)
|
|
return ocr_text, layout, engine, version, "initial_ingest"
|
|
|
|
return "", None, None, None, None
|
|
|
|
|
|
def compute_quality_score(source_text: str, reviewed_text: str) -> float:
|
|
if not source_text and not reviewed_text:
|
|
return 100.0
|
|
if not source_text:
|
|
return 0.0
|
|
|
|
ratio = SequenceMatcher(None, source_text, reviewed_text).ratio()
|
|
return round(ratio * 100, 2)
|
|
|
|
|
|
def archive_document(
|
|
db: Session,
|
|
source: Path,
|
|
source_system: str,
|
|
document_type: str = "receipt",
|
|
) -> Document:
|
|
if not source.exists():
|
|
raise FileNotFoundError(f"Source file not found: {source}")
|
|
if not is_supported_file(source):
|
|
raise ValueError(f"Unsupported file type: {source.suffix}")
|
|
|
|
document_id = f"doc_{uuid4().hex[:12]}"
|
|
current_path = build_storage_path(document_id, source)
|
|
|
|
current_path.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(source, current_path)
|
|
|
|
file_size = current_path.stat().st_size
|
|
mime_type = guess_mime_type(current_path)
|
|
sha256_current = sha256_for_file(current_path)
|
|
|
|
raw_text, layout_json, ocr_engine, ocr_engine_version, rerun_source = get_raw_text_for_document(current_path)
|
|
|
|
document = Document(
|
|
document_id=document_id,
|
|
document_type=document_type,
|
|
source_path=str(source),
|
|
current_path=str(current_path),
|
|
original_filename=source.name,
|
|
canonical_filename=current_path.name,
|
|
mime_type=mime_type,
|
|
file_size=file_size,
|
|
page_count=1 if source.suffix.lower() == ".pdf" else None,
|
|
sha256_current=sha256_current,
|
|
storage_status="ingested",
|
|
review_status="ocr_complete" if raw_text else "ingested",
|
|
)
|
|
db.add(document)
|
|
db.flush()
|
|
|
|
version = DocumentVersion(
|
|
document_id=document.id,
|
|
version_number=1,
|
|
version_type="original",
|
|
file_path=str(current_path),
|
|
sha256=sha256_current,
|
|
created_by=source_system,
|
|
notes=f"Ingested from {source_system}",
|
|
)
|
|
db.add(version)
|
|
|
|
if raw_text:
|
|
text_version = TextVersion(
|
|
document_id=document.id,
|
|
version_number=1,
|
|
version_type="raw_ocr",
|
|
text_content=raw_text,
|
|
created_by="system",
|
|
is_current=True,
|
|
ocr_engine=ocr_engine,
|
|
ocr_engine_version=ocr_engine_version,
|
|
rerun_source=rerun_source,
|
|
quality_flags=[],
|
|
quality_note=None,
|
|
layout_json=layout_json,
|
|
)
|
|
db.add(text_version)
|
|
|
|
db.commit()
|
|
db.refresh(document)
|
|
return document
|
|
|
|
|
|
def rerun_ocr_for_document(db: Session, document: Document) -> TextVersion:
|
|
if not document.current_path:
|
|
raise ValueError("Document has no current_path")
|
|
|
|
current_file = Path(document.current_path)
|
|
if not current_file.exists():
|
|
raise FileNotFoundError(f"Current file not found: {current_file}")
|
|
|
|
raw_text, layout_json, ocr_engine, ocr_engine_version = run_ocr_only(current_file)
|
|
if not raw_text:
|
|
raise ValueError("OCR produced no text")
|
|
|
|
existing_raw = (
|
|
db.query(TextVersion)
|
|
.filter(
|
|
TextVersion.document_id == document.id,
|
|
TextVersion.version_type == "raw_ocr",
|
|
TextVersion.is_current.is_(True),
|
|
)
|
|
.all()
|
|
)
|
|
|
|
previous_raw_id = None
|
|
for tv in existing_raw:
|
|
tv.is_current = False
|
|
previous_raw_id = tv.id
|
|
|
|
new_text = TextVersion(
|
|
document_id=document.id,
|
|
version_number=get_next_text_version_number(db, document.id),
|
|
version_type="raw_ocr",
|
|
text_content=raw_text,
|
|
created_by="rerun_ocr",
|
|
is_current=True,
|
|
ocr_engine=ocr_engine,
|
|
ocr_engine_version=ocr_engine_version,
|
|
rerun_source="manual_rerun",
|
|
quality_flags=[],
|
|
quality_note=None,
|
|
derived_from_version_id=previous_raw_id,
|
|
layout_json=layout_json,
|
|
)
|
|
db.add(new_text)
|
|
|
|
document.review_status = "ocr_complete"
|
|
|
|
db.commit()
|
|
db.refresh(new_text)
|
|
return new_text
|
|
|
|
|
|
def ingest_file(db: Session, file_path: str, source_system: str, document_type: str = "receipt") -> Document:
|
|
source = Path(file_path).expanduser().resolve()
|
|
return archive_document(db=db, source=source, source_system=source_system, document_type=document_type)
|
|
|
|
|
|
def ingest_uploaded_file(
|
|
db: Session,
|
|
filename: str,
|
|
file_bytes: bytes,
|
|
source_system: str = "upload_ingest",
|
|
document_type: str = "receipt",
|
|
) -> Document:
|
|
suffix = Path(filename).suffix.lower()
|
|
if suffix not in ALLOWED_EXTENSIONS:
|
|
raise ValueError(f"Unsupported file type: {suffix}")
|
|
|
|
upload_root = Path(UPLOAD_ROOT)
|
|
upload_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
staged_name = f"{uuid4().hex[:12]}_{Path(filename).name}"
|
|
staged_path = upload_root / staged_name
|
|
staged_path.write_bytes(file_bytes)
|
|
|
|
return archive_document(db=db, source=staged_path, source_system=source_system, document_type=document_type)
|
|
|
|
|
|
def ingest_directory(
|
|
db: Session,
|
|
directory_path: str,
|
|
recursive: bool = True,
|
|
source_system: str = "directory_ingest",
|
|
document_type: str = "receipt",
|
|
) -> list[Document]:
|
|
source_dir = Path(directory_path).expanduser().resolve()
|
|
if not source_dir.exists() or not source_dir.is_dir():
|
|
raise NotADirectoryError(f"Directory not found: {source_dir}")
|
|
|
|
files = source_dir.rglob("*") if recursive else source_dir.glob("*")
|
|
|
|
ingested: list[Document] = []
|
|
for path in files:
|
|
if not is_supported_file(path):
|
|
continue
|
|
try:
|
|
ingested.append(
|
|
ingest_file(
|
|
db=db,
|
|
file_path=str(path),
|
|
source_system=source_system,
|
|
document_type=document_type,
|
|
)
|
|
)
|
|
except Exception:
|
|
continue
|
|
|
|
return ingested
|
|
|
|
|
|
def ingest_inbox(db: Session) -> list[Document]:
|
|
return ingest_directory(
|
|
db=db,
|
|
directory_path=INBOX_ROOT,
|
|
recursive=True,
|
|
source_system="inbox_ingest",
|
|
document_type="receipt",
|
|
)
|