295 lines
9.7 KiB
Python
295 lines
9.7 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
from PIL import Image
|
|
from pypdf import PdfReader
|
|
from reportlab.lib.utils import ImageReader
|
|
from reportlab.pdfbase.pdfmetrics import stringWidth
|
|
from reportlab.pdfgen import canvas
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import FIELD_ENRICHED_ROOT, OCR_CORRECTED_ROOT
|
|
from app.models.document import Document
|
|
from app.models.document_version import DocumentVersion
|
|
from app.models.text_version import TextVersion
|
|
|
|
|
|
def sha256_for_file(path: Path) -> str:
|
|
hasher = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def compress_pdf_with_ghostscript(path: Path) -> bool:
|
|
compressed_path = path.with_suffix(".compressed.pdf")
|
|
|
|
try:
|
|
subprocess.run(
|
|
[
|
|
"gs",
|
|
"-sDEVICE=pdfwrite",
|
|
"-dCompatibilityLevel=1.4",
|
|
"-dPDFSETTINGS=/ebook",
|
|
"-dNOPAUSE",
|
|
"-dQUIET",
|
|
"-dBATCH",
|
|
f"-sOutputFile={compressed_path}",
|
|
str(path),
|
|
],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
if not compressed_path.exists() or compressed_path.stat().st_size == 0:
|
|
return False
|
|
|
|
original_size = path.stat().st_size if path.exists() else 0
|
|
compressed_size = compressed_path.stat().st_size
|
|
|
|
# Only replace if compression actually helped.
|
|
if original_size > 0 and compressed_size < original_size:
|
|
os.replace(compressed_path, path)
|
|
else:
|
|
compressed_path.unlink(missing_ok=True)
|
|
|
|
return True
|
|
except Exception:
|
|
compressed_path.unlink(missing_ok=True)
|
|
return False
|
|
|
|
|
|
def get_next_document_version_number(db: Session, document_id: int) -> int:
|
|
max_version = (
|
|
db.query(func.max(DocumentVersion.version_number))
|
|
.filter(DocumentVersion.document_id == document_id)
|
|
.scalar()
|
|
)
|
|
return (max_version or 0) + 1
|
|
|
|
|
|
def _build_output_path(root: str, document: Document, version_type: str, version_number: int) -> Path:
|
|
source = Path(document.current_path or "")
|
|
suffix = source.suffix.lower() if source.suffix else ".pdf"
|
|
filename = f"{document.document_id}_{version_type}_v{version_number}{suffix}"
|
|
return Path(root) / filename
|
|
|
|
|
|
def _latest_current_text_version(document: Document, version_type: str) -> TextVersion | None:
|
|
candidates = [tv for tv in document.text_versions if tv.version_type == version_type and tv.is_current]
|
|
if not candidates:
|
|
return None
|
|
return sorted(candidates, key=lambda x: (x.version_number, x.created_at), reverse=True)[0]
|
|
|
|
|
|
def _render_pdf_page_images(pdf_path: Path, tmpdir: Path) -> list[Path]:
|
|
prefix = tmpdir / "page"
|
|
subprocess.run(
|
|
["pdftoppm", "-r", "150", "-png", str(pdf_path), str(prefix)],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return sorted(tmpdir.glob("page-*.png"))
|
|
|
|
|
|
def _fit_font_size(text: str, box_width: float, box_height: float) -> float:
|
|
if not text:
|
|
return max(6.0, box_height * 0.80)
|
|
|
|
font_size = max(6.0, box_height * 0.88)
|
|
|
|
while font_size > 4.5 and stringWidth(text, "Helvetica", font_size) > box_width * 0.98:
|
|
font_size -= 0.25
|
|
|
|
min_reasonable = max(6.0, box_height * 0.68)
|
|
return max(min_reasonable, font_size)
|
|
|
|
|
|
def _flatten_layout_lines(layout_json: dict | None) -> list[dict]:
|
|
if not layout_json:
|
|
return []
|
|
|
|
flattened = []
|
|
for page in layout_json.get("pages", []):
|
|
for line in page.get("lines", []):
|
|
flattened.append(
|
|
{
|
|
"page": page["page"],
|
|
"bbox": line["bbox"],
|
|
"text": line.get("text", ""),
|
|
}
|
|
)
|
|
return flattened
|
|
|
|
|
|
def create_ocr_corrected_pdf_version(db: Session, document: Document, output_path: Path | None = None) -> DocumentVersion:
|
|
if not document.current_path:
|
|
raise ValueError("Document has no current_path")
|
|
|
|
current_file = Path(document.current_path)
|
|
if not current_file.exists():
|
|
raise FileNotFoundError(f"Current file not found: {current_file}")
|
|
|
|
raw_ocr = _latest_current_text_version(document, "raw_ocr")
|
|
reviewed = _latest_current_text_version(document, "reviewed")
|
|
|
|
if raw_ocr is None:
|
|
raise ValueError("No current raw OCR version found")
|
|
if reviewed is None:
|
|
raise ValueError("No current reviewed text found")
|
|
if current_file.suffix.lower() != ".pdf":
|
|
raise ValueError("C1 corrected PDF generation currently supports PDFs only")
|
|
|
|
raw_lines = _flatten_layout_lines(raw_ocr.layout_json)
|
|
reviewed_lines = _flatten_layout_lines(reviewed.layout_json)
|
|
|
|
if not raw_lines:
|
|
raise ValueError("No OCR line boxes found in raw OCR layout data")
|
|
|
|
if reviewed_lines and len(reviewed_lines) != len(raw_lines):
|
|
raise ValueError("Reviewed line layout does not match raw OCR line layout")
|
|
|
|
source_layout = reviewed.layout_json if reviewed.layout_json else raw_ocr.layout_json
|
|
if not source_layout:
|
|
raise ValueError("No source layout found")
|
|
|
|
next_version_number = get_next_document_version_number(db, document.id)
|
|
if output_path is None:
|
|
out_path = _build_output_path(OCR_CORRECTED_ROOT, document, "ocr_corrected", next_version_number)
|
|
else:
|
|
out_path = Path(output_path)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
reader = PdfReader(str(current_file))
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
tmpdir = Path(tmpdirname)
|
|
images = _render_pdf_page_images(current_file, tmpdir)
|
|
|
|
overlay_pdf_path = tmpdir / "overlay.pdf"
|
|
c = None
|
|
|
|
page_layouts = {page["page"]: page for page in source_layout.get("pages", [])}
|
|
|
|
for page_num, img_path in enumerate(images, start=1):
|
|
pdf_page = reader.pages[page_num - 1]
|
|
page_w = float(pdf_page.mediabox.width)
|
|
page_h = float(pdf_page.mediabox.height)
|
|
|
|
img = Image.open(img_path)
|
|
|
|
if c is None:
|
|
c = canvas.Canvas(str(overlay_pdf_path), pagesize=(page_w, page_h))
|
|
else:
|
|
c.setPageSize((page_w, page_h))
|
|
|
|
c.drawImage(ImageReader(str(img_path)), 0, 0, width=page_w, height=page_h)
|
|
|
|
page_layout = page_layouts.get(page_num, {"lines": []})
|
|
src_w = float(page_layout.get("image_width") or img.size[0])
|
|
src_h = float(page_layout.get("image_height") or img.size[1])
|
|
|
|
scale_x = page_w / src_w
|
|
scale_y = page_h / src_h
|
|
|
|
for line in page_layout.get("lines", []):
|
|
text_line = (line.get("text") or "").strip()
|
|
if not text_line:
|
|
continue
|
|
|
|
left, top, right, bottom = line["bbox"]
|
|
|
|
pdf_x = left * scale_x
|
|
pdf_y = page_h - (bottom * scale_y)
|
|
box_width = max(10.0, (right - left) * scale_x)
|
|
box_height = max(6.0, (bottom - top) * scale_y)
|
|
|
|
font_size = _fit_font_size(text_line, box_width, box_height)
|
|
|
|
text_obj = c.beginText()
|
|
text_obj.setTextRenderMode(3)
|
|
text_obj.setFont("Helvetica", font_size)
|
|
text_obj.setTextOrigin(pdf_x, pdf_y + 1)
|
|
text_obj.textLine(text_line)
|
|
c.drawText(text_obj)
|
|
|
|
c.showPage()
|
|
|
|
if c is None:
|
|
raise ValueError("Failed to build overlay PDF")
|
|
|
|
c.save()
|
|
shutil.copy2(overlay_pdf_path, out_path)
|
|
|
|
compress_pdf_with_ghostscript(out_path)
|
|
|
|
file_hash = sha256_for_file(out_path)
|
|
|
|
version = DocumentVersion(
|
|
document_id=document.id,
|
|
version_number=next_version_number,
|
|
version_type="ocr_corrected",
|
|
file_path=str(out_path),
|
|
sha256=file_hash,
|
|
created_by="save_ocr_corrected_pdf",
|
|
notes="C1 output: rebuilt searchable PDF using reviewed text aligned to OCR line boxes.",
|
|
)
|
|
db.add(version)
|
|
|
|
document.current_path = str(out_path)
|
|
document.canonical_filename = out_path.name
|
|
document.sha256_current = file_hash
|
|
|
|
db.commit()
|
|
db.refresh(version)
|
|
return version
|
|
|
|
|
|
def create_field_enriched_pdf_version(db: Session, document: Document, output_path: Path | None = None) -> DocumentVersion:
|
|
if not document.current_path:
|
|
raise ValueError("Document has no current_path")
|
|
|
|
current_file = Path(document.current_path)
|
|
if not current_file.exists():
|
|
raise FileNotFoundError(f"Current file not found: {current_file}")
|
|
|
|
next_version_number = get_next_document_version_number(db, document.id)
|
|
if output_path is None:
|
|
out_path = _build_output_path(FIELD_ENRICHED_ROOT, document, "field_enriched", next_version_number)
|
|
else:
|
|
out_path = Path(output_path)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if current_file.resolve() != out_path.resolve():
|
|
shutil.copy2(current_file, out_path)
|
|
file_hash = sha256_for_file(out_path)
|
|
|
|
version = DocumentVersion(
|
|
document_id=document.id,
|
|
version_number=next_version_number,
|
|
version_type="field_enriched",
|
|
file_path=str(out_path),
|
|
sha256=file_hash,
|
|
created_by="save_field_enriched_pdf",
|
|
notes="Scaffold output: copied current file; extracted fields not yet embedded into PDF.",
|
|
)
|
|
db.add(version)
|
|
|
|
document.current_path = str(out_path)
|
|
document.canonical_filename = out_path.name
|
|
document.sha256_current = file_hash
|
|
|
|
db.commit()
|
|
db.refresh(version)
|
|
return version
|