373 lines
10 KiB
Python
373 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import mimetypes
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from difflib import SequenceMatcher
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import DOCUMENT_ARCHIVE_ROOT, INBOX_ROOT, UPLOAD_ROOT
|
|
from app.models.document import Document
|
|
from app.models.document_version import DocumentVersion
|
|
from app.models.text_version import TextVersion
|
|
|
|
|
|
ALLOWED_EXTENSIONS = {".pdf", ".jpg", ".jpeg", ".png"}
|
|
|
|
|
|
def is_supported_file(path: Path) -> bool:
|
|
return path.is_file() and path.suffix.lower() in ALLOWED_EXTENSIONS
|
|
|
|
|
|
def sha256_for_file(path: Path) -> str:
|
|
hasher = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def guess_mime_type(path: Path) -> str:
|
|
mime_type, _ = mimetypes.guess_type(str(path))
|
|
return mime_type or "application/octet-stream"
|
|
|
|
|
|
def build_storage_path(document_id: str, source_path: Path) -> Path:
|
|
archive_root = Path(DOCUMENT_ARCHIVE_ROOT)
|
|
filename = f"{document_id}{source_path.suffix.lower()}"
|
|
return archive_root / filename
|
|
|
|
|
|
def get_next_text_version_number(db: Session, document_id: int) -> int:
|
|
max_version = (
|
|
db.query(func.max(TextVersion.version_number))
|
|
.filter(TextVersion.document_id == document_id)
|
|
.scalar()
|
|
)
|
|
return (max_version or 0) + 1
|
|
|
|
|
|
def get_tesseract_version() -> str | None:
|
|
try:
|
|
result = subprocess.run(
|
|
["tesseract", "--version"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
line = result.stdout.splitlines()[0].strip()
|
|
return line
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_pdftotext_version() -> str | None:
|
|
try:
|
|
result = subprocess.run(
|
|
["pdftotext", "-v"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
text = (result.stderr or result.stdout).splitlines()
|
|
return text[0].strip() if text else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def extract_pdf_text(path: Path) -> str:
|
|
try:
|
|
result = subprocess.run(
|
|
["pdftotext", str(path), "-"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def ocr_image(path: Path) -> str:
|
|
try:
|
|
result = subprocess.run(
|
|
["tesseract", str(path), "stdout"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def ocr_pdf(path: Path) -> str:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
output_prefix = Path(tmpdir) / "page"
|
|
try:
|
|
subprocess.run(
|
|
["pdftoppm", "-png", str(path), str(output_prefix)],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
except Exception:
|
|
return ""
|
|
|
|
texts: list[str] = []
|
|
for img in sorted(Path(tmpdir).glob("page-*.png")):
|
|
text = ocr_image(img)
|
|
if text:
|
|
texts.append(text)
|
|
|
|
return "\n\n".join(texts).strip()
|
|
|
|
|
|
def run_ocr_only(path: Path) -> tuple[str, str | None, str | None]:
|
|
suffix = path.suffix.lower()
|
|
tesseract_version = get_tesseract_version()
|
|
|
|
if suffix == ".pdf":
|
|
return ocr_pdf(path).strip(), "tesseract", tesseract_version
|
|
if suffix in {".jpg", ".jpeg", ".png"}:
|
|
return ocr_image(path).strip(), "tesseract", tesseract_version
|
|
return "", None, None
|
|
|
|
|
|
def get_raw_text_for_document(path: Path) -> tuple[str, str | None, str | None, str | None]:
|
|
suffix = path.suffix.lower()
|
|
|
|
if suffix == ".pdf":
|
|
extracted = extract_pdf_text(path)
|
|
if len(extracted.strip()) >= 40:
|
|
return extracted, "pdftotext", get_pdftotext_version(), "initial_ingest"
|
|
|
|
ocr_text = ocr_pdf(path).strip()
|
|
return ocr_text, "tesseract", get_tesseract_version(), "initial_ingest_fallback"
|
|
|
|
if suffix in {".jpg", ".jpeg", ".png"}:
|
|
return ocr_image(path).strip(), "tesseract", get_tesseract_version(), "initial_ingest"
|
|
|
|
return "", None, None, None
|
|
|
|
|
|
def compute_quality_score(source_text: str, reviewed_text: str) -> float:
|
|
if not source_text and not reviewed_text:
|
|
return 100.0
|
|
if not source_text:
|
|
return 0.0
|
|
ratio = SequenceMatcher(None, source_text, reviewed_text).ratio()
|
|
return round(ratio * 100, 2)
|
|
|
|
|
|
def archive_document(
|
|
db: Session,
|
|
source: Path,
|
|
source_system: str,
|
|
document_type: str = "receipt",
|
|
) -> Document:
|
|
if not source.exists():
|
|
raise FileNotFoundError(f"Source file not found: {source}")
|
|
|
|
if not is_supported_file(source):
|
|
raise ValueError(f"Unsupported file type: {source.suffix}")
|
|
|
|
document_id = f"doc_{uuid4().hex[:12]}"
|
|
current_path = build_storage_path(document_id, source)
|
|
|
|
current_path.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(source, current_path)
|
|
|
|
file_size = current_path.stat().st_size
|
|
mime_type = guess_mime_type(current_path)
|
|
sha256_current = sha256_for_file(current_path)
|
|
|
|
raw_text, ocr_engine, ocr_engine_version, rerun_source = get_raw_text_for_document(current_path)
|
|
|
|
document = Document(
|
|
document_id=document_id,
|
|
document_type=document_type,
|
|
source_path=str(source),
|
|
current_path=str(current_path),
|
|
original_filename=source.name,
|
|
canonical_filename=current_path.name,
|
|
mime_type=mime_type,
|
|
file_size=file_size,
|
|
page_count=1 if source.suffix.lower() == ".pdf" else None,
|
|
sha256_current=sha256_current,
|
|
storage_status="ingested",
|
|
review_status="ocr_complete" if raw_text else "ingested",
|
|
)
|
|
db.add(document)
|
|
db.flush()
|
|
|
|
version = DocumentVersion(
|
|
document_id=document.id,
|
|
version_number=1,
|
|
version_type="original",
|
|
file_path=str(current_path),
|
|
sha256=sha256_current,
|
|
created_by=source_system,
|
|
notes=f"Ingested from {source_system}",
|
|
)
|
|
db.add(version)
|
|
|
|
if raw_text:
|
|
text_version = TextVersion(
|
|
document_id=document.id,
|
|
version_number=1,
|
|
version_type="raw_ocr",
|
|
text_content=raw_text,
|
|
created_by="system",
|
|
is_current=True,
|
|
ocr_engine=ocr_engine,
|
|
ocr_engine_version=ocr_engine_version,
|
|
rerun_source=rerun_source,
|
|
quality_flags=[],
|
|
quality_note=None,
|
|
)
|
|
db.add(text_version)
|
|
|
|
db.commit()
|
|
db.refresh(document)
|
|
return document
|
|
|
|
|
|
def rerun_ocr_for_document(db: Session, document: Document) -> TextVersion:
|
|
if not document.current_path:
|
|
raise ValueError("Document has no current_path")
|
|
|
|
current_file = Path(document.current_path)
|
|
if not current_file.exists():
|
|
raise FileNotFoundError(f"Current file not found: {current_file}")
|
|
|
|
raw_text, ocr_engine, ocr_engine_version = run_ocr_only(current_file)
|
|
if not raw_text:
|
|
raise ValueError("OCR produced no text")
|
|
|
|
existing_raw = (
|
|
db.query(TextVersion)
|
|
.filter(
|
|
TextVersion.document_id == document.id,
|
|
TextVersion.version_type == "raw_ocr",
|
|
TextVersion.is_current.is_(True),
|
|
)
|
|
.all()
|
|
)
|
|
|
|
previous_raw_id = None
|
|
for tv in existing_raw:
|
|
tv.is_current = False
|
|
previous_raw_id = tv.id
|
|
|
|
new_text = TextVersion(
|
|
document_id=document.id,
|
|
version_number=get_next_text_version_number(db, document.id),
|
|
version_type="raw_ocr",
|
|
text_content=raw_text,
|
|
created_by="rerun_ocr",
|
|
is_current=True,
|
|
ocr_engine=ocr_engine,
|
|
ocr_engine_version=ocr_engine_version,
|
|
rerun_source="manual_rerun",
|
|
quality_flags=[],
|
|
quality_note=None,
|
|
derived_from_version_id=previous_raw_id,
|
|
)
|
|
db.add(new_text)
|
|
|
|
document.review_status = "ocr_complete"
|
|
|
|
db.commit()
|
|
db.refresh(new_text)
|
|
return new_text
|
|
|
|
|
|
def ingest_file(
|
|
db: Session,
|
|
file_path: str,
|
|
source_system: str,
|
|
document_type: str = "receipt",
|
|
) -> Document:
|
|
source = Path(file_path).expanduser().resolve()
|
|
return archive_document(
|
|
db=db,
|
|
source=source,
|
|
source_system=source_system,
|
|
document_type=document_type,
|
|
)
|
|
|
|
|
|
def ingest_uploaded_file(
|
|
db: Session,
|
|
filename: str,
|
|
file_bytes: bytes,
|
|
source_system: str = "upload_ingest",
|
|
document_type: str = "receipt",
|
|
) -> Document:
|
|
suffix = Path(filename).suffix.lower()
|
|
if suffix not in ALLOWED_EXTENSIONS:
|
|
raise ValueError(f"Unsupported file type: {suffix}")
|
|
|
|
upload_root = Path(UPLOAD_ROOT)
|
|
upload_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
staged_name = f"{uuid4().hex[:12]}_{Path(filename).name}"
|
|
staged_path = upload_root / staged_name
|
|
staged_path.write_bytes(file_bytes)
|
|
|
|
return archive_document(
|
|
db=db,
|
|
source=staged_path,
|
|
source_system=source_system,
|
|
document_type=document_type,
|
|
)
|
|
|
|
|
|
def ingest_directory(
|
|
db: Session,
|
|
directory_path: str,
|
|
recursive: bool = True,
|
|
source_system: str = "directory_ingest",
|
|
document_type: str = "receipt",
|
|
) -> list[Document]:
|
|
source_dir = Path(directory_path).expanduser().resolve()
|
|
|
|
if not source_dir.exists() or not source_dir.is_dir():
|
|
raise NotADirectoryError(f"Directory not found: {source_dir}")
|
|
|
|
files = source_dir.rglob("*") if recursive else source_dir.glob("*")
|
|
|
|
ingested: list[Document] = []
|
|
for path in files:
|
|
if not is_supported_file(path):
|
|
continue
|
|
try:
|
|
ingested.append(
|
|
ingest_file(
|
|
db=db,
|
|
file_path=str(path),
|
|
source_system=source_system,
|
|
document_type=document_type,
|
|
)
|
|
)
|
|
except Exception:
|
|
continue
|
|
|
|
return ingested
|
|
|
|
|
|
def ingest_inbox(db: Session) -> list[Document]:
|
|
return ingest_directory(
|
|
db=db,
|
|
directory_path=INBOX_ROOT,
|
|
recursive=True,
|
|
source_system="inbox_ingest",
|
|
document_type="receipt",
|
|
)
|