document-processor/app/logic/document_outputs.py

288 lines
9.4 KiB
Python

from __future__ import annotations
import hashlib
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from PIL import Image
from pypdf import PdfReader
from reportlab.lib.utils import ImageReader
from reportlab.pdfbase.pdfmetrics import stringWidth
from reportlab.pdfgen import canvas
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.core.config import FIELD_ENRICHED_ROOT, OCR_CORRECTED_ROOT
from app.models.document import Document
from app.models.document_version import DocumentVersion
from app.models.text_version import TextVersion
def sha256_for_file(path: Path) -> str:
hasher = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
hasher.update(chunk)
return hasher.hexdigest()
def compress_pdf_with_ghostscript(path: Path) -> bool:
compressed_path = path.with_suffix(".compressed.pdf")
try:
subprocess.run(
[
"gs",
"-sDEVICE=pdfwrite",
"-dCompatibilityLevel=1.4",
"-dPDFSETTINGS=/ebook",
"-dNOPAUSE",
"-dQUIET",
"-dBATCH",
f"-sOutputFile={compressed_path}",
str(path),
],
check=True,
capture_output=True,
text=True,
)
if not compressed_path.exists() or compressed_path.stat().st_size == 0:
return False
original_size = path.stat().st_size if path.exists() else 0
compressed_size = compressed_path.stat().st_size
# Only replace if compression actually helped.
if original_size > 0 and compressed_size < original_size:
os.replace(compressed_path, path)
else:
compressed_path.unlink(missing_ok=True)
return True
except Exception:
compressed_path.unlink(missing_ok=True)
return False
def get_next_document_version_number(db: Session, document_id: int) -> int:
max_version = (
db.query(func.max(DocumentVersion.version_number))
.filter(DocumentVersion.document_id == document_id)
.scalar()
)
return (max_version or 0) + 1
def _build_output_path(root: str, document: Document, version_type: str, version_number: int) -> Path:
source = Path(document.current_path or "")
suffix = source.suffix.lower() if source.suffix else ".pdf"
filename = f"{document.document_id}_{version_type}_v{version_number}{suffix}"
return Path(root) / filename
def _latest_current_text_version(document: Document, version_type: str) -> TextVersion | None:
candidates = [tv for tv in document.text_versions if tv.version_type == version_type and tv.is_current]
if not candidates:
return None
return sorted(candidates, key=lambda x: (x.version_number, x.created_at), reverse=True)[0]
def _render_pdf_page_images(pdf_path: Path, tmpdir: Path) -> list[Path]:
prefix = tmpdir / "page"
subprocess.run(
["pdftoppm", "-r", "150", "-png", str(pdf_path), str(prefix)],
capture_output=True,
text=True,
check=True,
)
return sorted(tmpdir.glob("page-*.png"))
def _fit_font_size(text: str, box_width: float, box_height: float) -> float:
if not text:
return max(6.0, box_height * 0.80)
font_size = max(6.0, box_height * 0.88)
while font_size > 4.5 and stringWidth(text, "Helvetica", font_size) > box_width * 0.98:
font_size -= 0.25
min_reasonable = max(6.0, box_height * 0.68)
return max(min_reasonable, font_size)
def _flatten_layout_lines(layout_json: dict | None) -> list[dict]:
if not layout_json:
return []
flattened = []
for page in layout_json.get("pages", []):
for line in page.get("lines", []):
flattened.append(
{
"page": page["page"],
"bbox": line["bbox"],
"text": line.get("text", ""),
}
)
return flattened
def create_ocr_corrected_pdf_version(db: Session, document: Document) -> DocumentVersion:
if not document.current_path:
raise ValueError("Document has no current_path")
current_file = Path(document.current_path)
if not current_file.exists():
raise FileNotFoundError(f"Current file not found: {current_file}")
raw_ocr = _latest_current_text_version(document, "raw_ocr")
reviewed = _latest_current_text_version(document, "reviewed")
if raw_ocr is None:
raise ValueError("No current raw OCR version found")
if reviewed is None:
raise ValueError("No current reviewed text found")
if current_file.suffix.lower() != ".pdf":
raise ValueError("C1 corrected PDF generation currently supports PDFs only")
raw_lines = _flatten_layout_lines(raw_ocr.layout_json)
reviewed_lines = _flatten_layout_lines(reviewed.layout_json)
if not raw_lines:
raise ValueError("No OCR line boxes found in raw OCR layout data")
if reviewed_lines and len(reviewed_lines) != len(raw_lines):
raise ValueError("Reviewed line layout does not match raw OCR line layout")
source_layout = reviewed.layout_json if reviewed.layout_json else raw_ocr.layout_json
if not source_layout:
raise ValueError("No source layout found")
next_version_number = get_next_document_version_number(db, document.id)
out_path = _build_output_path(OCR_CORRECTED_ROOT, document, "ocr_corrected", next_version_number)
out_path.parent.mkdir(parents=True, exist_ok=True)
reader = PdfReader(str(current_file))
with tempfile.TemporaryDirectory() as tmpdirname:
tmpdir = Path(tmpdirname)
images = _render_pdf_page_images(current_file, tmpdir)
overlay_pdf_path = tmpdir / "overlay.pdf"
c = None
page_layouts = {page["page"]: page for page in source_layout.get("pages", [])}
for page_num, img_path in enumerate(images, start=1):
pdf_page = reader.pages[page_num - 1]
page_w = float(pdf_page.mediabox.width)
page_h = float(pdf_page.mediabox.height)
img = Image.open(img_path)
if c is None:
c = canvas.Canvas(str(overlay_pdf_path), pagesize=(page_w, page_h))
else:
c.setPageSize((page_w, page_h))
c.drawImage(ImageReader(str(img_path)), 0, 0, width=page_w, height=page_h)
page_layout = page_layouts.get(page_num, {"lines": []})
src_w = float(page_layout.get("image_width") or img.size[0])
src_h = float(page_layout.get("image_height") or img.size[1])
scale_x = page_w / src_w
scale_y = page_h / src_h
for line in page_layout.get("lines", []):
text_line = (line.get("text") or "").strip()
if not text_line:
continue
left, top, right, bottom = line["bbox"]
pdf_x = left * scale_x
pdf_y = page_h - (bottom * scale_y)
box_width = max(10.0, (right - left) * scale_x)
box_height = max(6.0, (bottom - top) * scale_y)
font_size = _fit_font_size(text_line, box_width, box_height)
text_obj = c.beginText()
text_obj.setTextRenderMode(3)
text_obj.setFont("Helvetica", font_size)
text_obj.setTextOrigin(pdf_x, pdf_y + 1)
text_obj.textLine(text_line)
c.drawText(text_obj)
c.showPage()
if c is None:
raise ValueError("Failed to build overlay PDF")
c.save()
shutil.copy2(overlay_pdf_path, out_path)
compress_pdf_with_ghostscript(out_path)
file_hash = sha256_for_file(out_path)
version = DocumentVersion(
document_id=document.id,
version_number=next_version_number,
version_type="ocr_corrected",
file_path=str(out_path),
sha256=file_hash,
created_by="save_ocr_corrected_pdf",
notes="C1 output: rebuilt searchable PDF using reviewed text aligned to OCR line boxes.",
)
db.add(version)
document.current_path = str(out_path)
document.canonical_filename = out_path.name
document.sha256_current = file_hash
db.commit()
db.refresh(version)
return version
def create_field_enriched_pdf_version(db: Session, document: Document) -> DocumentVersion:
if not document.current_path:
raise ValueError("Document has no current_path")
current_file = Path(document.current_path)
if not current_file.exists():
raise FileNotFoundError(f"Current file not found: {current_file}")
next_version_number = get_next_document_version_number(db, document.id)
out_path = _build_output_path(FIELD_ENRICHED_ROOT, document, "field_enriched", next_version_number)
out_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(current_file, out_path)
file_hash = sha256_for_file(out_path)
version = DocumentVersion(
document_id=document.id,
version_number=next_version_number,
version_type="field_enriched",
file_path=str(out_path),
sha256=file_hash,
created_by="save_field_enriched_pdf",
notes="Scaffold output: copied current file; extracted fields not yet embedded into PDF.",
)
db.add(version)
document.current_path = str(out_path)
document.canonical_filename = out_path.name
document.sha256_current = file_hash
db.commit()
db.refresh(version)
return version