document-processor/app/logic/document_outputs.py

577 lines
18 KiB
Python

from __future__ import annotations
from datetime import datetime
import hashlib
import os
import re
import shutil
def _mirror_to_secondary_owner(document, canonical_path: Path) -> Path | None:
additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None
if not additional:
return None
owner_secondary = getattr(additional, "owner_secondary", None)
if not owner_secondary:
return None
from app.logic.storage_paths import (
_split_person_name,
to_owner_filepath_name,
build_proposed_storage_path,
)
from app.core.storage_settings import get_default_save_root
first, last = _split_person_name(owner_secondary)
owner_folder = to_owner_filepath_name(first, last)
if not owner_folder:
return None
save_root = get_default_save_root()
naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None
mirror_path = Path(
build_proposed_storage_path(
document=document,
save_root=save_root,
naming_row=naming_row,
)
)
# replace owner segment
parts = list(mirror_path.parts)
for i, p in enumerate(parts):
if p == "records" and i + 1 < len(parts):
parts[i + 1] = owner_folder
break
mirror_path = Path(*parts)
mirror_path = mirror_path.with_name(
re.sub(r"_v\d+(?=\.[^.]+$)", "", mirror_path.name)
)
mirror_path.parent.mkdir(parents=True, exist_ok=True)
if canonical_path.resolve() != mirror_path.resolve():
import shutil
shutil.copy2(canonical_path, mirror_path)
return mirror_path
import subprocess
import tempfile
from pathlib import Path
from PIL import Image
from pypdf import PdfReader, PdfWriter
from reportlab.lib.utils import ImageReader
from reportlab.pdfbase.pdfmetrics import stringWidth
from reportlab.pdfgen import canvas
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.core.config import FIELD_ENRICHED_ROOT, OCR_CORRECTED_ROOT
from app.models.document import Document
from app.models.document_version import DocumentVersion
from app.models.text_version import TextVersion
def _prune_old_saved_files(db: Session, document: Document, keep_paths: set[str]) -> None:
protected = {str(Path(p).resolve()) for p in keep_paths if p}
for p in [getattr(document, "source_path", None), getattr(document, "original_path", None)]:
if p:
protected.add(str(Path(p).resolve()))
prior_versions = (
db.query(DocumentVersion)
.filter(DocumentVersion.document_id == document.id)
.all()
)
candidate_paths: set[str] = set()
for version in prior_versions:
if version.file_path:
try:
candidate_paths.add(str(Path(version.file_path).resolve()))
except Exception:
candidate_paths.add(version.file_path)
for candidate in sorted(candidate_paths):
if candidate in protected:
continue
try:
candidate_path = Path(candidate)
if candidate_path.exists() and candidate_path.is_file():
candidate_path.unlink()
except Exception:
pass
def _build_pdf_keywords(document) -> str:
"""
Currently returns location-only keywords.
Easy to extend later.
"""
additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None
parts = []
if additional:
# adjust field names if needed
for field in ["location_city", "location_area", "location_name"]:
value = getattr(additional, field, None)
if value:
parts.append(str(value).strip().lower())
# dedupe while preserving order
seen = set()
clean = []
for p in parts:
if p and p not in seen:
seen.add(p)
clean.append(p)
return ", ".join(clean)
def _latest_additional(document):
rows = getattr(document, "additional_fields", None) or []
return rows[0] if rows else None
def _latest_extracted(document):
rows = getattr(document, "extracted_fields", None) or []
return rows[0] if rows else None
def _humanize_filename(path_obj: Path) -> str:
stem = path_obj.stem.replace("_", " ").replace("-", " ").strip()
stem = re.sub(r"\s+", " ", stem)
return stem.title()
def _build_pdf_title(document, out_path: Path) -> str:
return _humanize_filename(out_path)
def _build_pdf_author(document) -> str:
additional = _latest_additional(document)
owners = []
if additional:
for field in ["owner_primary", "owner_secondary"]:
value = getattr(additional, field, None)
if value:
owners.append(str(value).strip())
seen = set()
clean = []
for owner in owners:
key = owner.lower()
if key not in seen:
seen.add(key)
clean.append(owner)
return "; ".join(clean)
def _build_pdf_subject(document) -> str:
value = getattr(document, "document_type", None)
return str(value).replace("_", " ").title() if value else ""
def _build_pdf_keywords(document) -> str:
"""
Currently returns location-only keywords.
Easy to extend later.
"""
parts = []
extracted = _latest_extracted(document)
if extracted:
location = getattr(extracted, "location", None)
if location:
for chunk in re.split(r"[,;/|-]+", str(location)):
chunk = chunk.strip().lower()
if chunk:
parts.append(chunk)
seen = set()
clean = []
for p in parts:
if p and p not in seen:
seen.add(p)
clean.append(p)
return ", ".join(clean)
def _source_timestamp(document) -> datetime | None:
for attr in ["source_path", "original_path", "current_path"]:
value = getattr(document, attr, None)
if not value:
continue
try:
p = Path(value)
if p.exists():
return datetime.fromtimestamp(p.stat().st_mtime)
except Exception:
pass
return None
def _pdf_date(dt: datetime | None) -> str:
if not dt:
dt = datetime.now()
return dt.strftime("D:%Y%m%d%H%M%S")
def _write_pdf_metadata(path_obj: Path, document, version_number: int, version_type: str) -> None:
reader = PdfReader(str(path_obj))
writer = PdfWriter()
for page in reader.pages:
writer.add_page(page)
now = datetime.now()
source_dt = _source_timestamp(document)
metadata = {
"/Title": _build_pdf_title(document, path_obj),
"/Author": _build_pdf_author(document),
"/Subject": _build_pdf_subject(document),
"/Keywords": _build_pdf_keywords(document),
"/Creator": "Document Processor",
"/Producer": "Document Processor",
"/CreationDate": _pdf_date(source_dt),
"/ModDate": _pdf_date(now),
"/DocumentID": str(getattr(document, "document_id", "") or ""),
"/VersionNumber": str(version_number),
"/VersionType": str(version_type),
}
writer.add_metadata({k: v for k, v in metadata.items() if v is not None})
tmp_path = path_obj.with_suffix(path_obj.suffix + ".meta.tmp")
with tmp_path.open("wb") as f:
writer.write(f)
tmp_path.replace(path_obj)
def sha256_for_file(path: Path) -> str:
hasher = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
hasher.update(chunk)
return hasher.hexdigest()
def compress_pdf_with_ghostscript(path: Path) -> bool:
compressed_path = path.with_suffix(".compressed.pdf")
try:
subprocess.run(
[
"gs",
"-sDEVICE=pdfwrite",
"-dCompatibilityLevel=1.4",
"-dPDFSETTINGS=/ebook",
"-dNOPAUSE",
"-dQUIET",
"-dBATCH",
f"-sOutputFile={compressed_path}",
str(path),
],
check=True,
capture_output=True,
text=True,
)
if not compressed_path.exists() or compressed_path.stat().st_size == 0:
return False
original_size = path.stat().st_size if path.exists() else 0
compressed_size = compressed_path.stat().st_size
# Only replace if compression actually helped.
if original_size > 0 and compressed_size < original_size:
os.replace(compressed_path, path)
else:
compressed_path.unlink(missing_ok=True)
return True
except Exception:
compressed_path.unlink(missing_ok=True)
return False
def get_next_document_version_number(db: Session, document_id: int) -> int:
max_version = (
db.query(func.max(DocumentVersion.version_number))
.filter(DocumentVersion.document_id == document_id)
.scalar()
)
return (max_version or 0) + 1
def _build_output_path(root: str, document: Document, version_type: str, version_number: int) -> Path:
source = Path(document.current_path or "")
suffix = source.suffix.lower() if source.suffix else ".pdf"
filename = f"{document.document_id}_{version_type}_v{version_number}{suffix}"
return Path(root) / filename
def _latest_current_text_version(document: Document, version_type: str) -> TextVersion | None:
candidates = [tv for tv in document.text_versions if tv.version_type == version_type and tv.is_current]
if not candidates:
return None
return sorted(candidates, key=lambda x: (x.version_number, x.created_at), reverse=True)[0]
def _render_pdf_page_images(pdf_path: Path, tmpdir: Path) -> list[Path]:
prefix = tmpdir / "page"
subprocess.run(
["pdftoppm", "-r", "150", "-png", str(pdf_path), str(prefix)],
capture_output=True,
text=True,
check=True,
)
return sorted(tmpdir.glob("page-*.png"))
def _fit_font_size(text: str, box_width: float, box_height: float) -> float:
if not text:
return max(6.0, box_height * 0.80)
font_size = max(6.0, box_height * 0.88)
while font_size > 4.5 and stringWidth(text, "Helvetica", font_size) > box_width * 0.98:
font_size -= 0.25
min_reasonable = max(6.0, box_height * 0.68)
return max(min_reasonable, font_size)
def _flatten_layout_lines(layout_json: dict | None) -> list[dict]:
if not layout_json:
return []
flattened = []
for page in layout_json.get("pages", []):
for line in page.get("lines", []):
flattened.append(
{
"page": page["page"],
"bbox": line["bbox"],
"text": line.get("text", ""),
}
)
return flattened
def create_ocr_corrected_pdf_version(db: Session, document: Document, output_path: Path | None = None) -> DocumentVersion:
if not document.current_path:
raise ValueError("Document has no current_path")
current_file = Path(document.current_path)
if not current_file.exists():
raise FileNotFoundError(f"Current file not found: {current_file}")
raw_ocr = _latest_current_text_version(document, "raw_ocr")
reviewed = _latest_current_text_version(document, "reviewed")
if raw_ocr is None:
raise ValueError("No current raw OCR version found")
if reviewed is None:
raise ValueError("No current reviewed text found")
if current_file.suffix.lower() != ".pdf":
raise ValueError("C1 corrected PDF generation currently supports PDFs only")
raw_lines = _flatten_layout_lines(raw_ocr.layout_json)
reviewed_lines = _flatten_layout_lines(reviewed.layout_json)
if not raw_lines:
raise ValueError("No OCR line boxes found in raw OCR layout data")
if reviewed_lines and len(reviewed_lines) != len(raw_lines):
raise ValueError("Reviewed line layout does not match raw OCR line layout")
source_layout = reviewed.layout_json if reviewed.layout_json else raw_ocr.layout_json
if not source_layout:
raise ValueError("No source layout found")
next_version_number = get_next_document_version_number(db, document.id)
if output_path is None:
out_path = _build_output_path(OCR_CORRECTED_ROOT, document, "ocr_corrected", next_version_number)
else:
out_path = Path(output_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
reader = PdfReader(str(current_file))
with tempfile.TemporaryDirectory() as tmpdirname:
tmpdir = Path(tmpdirname)
images = _render_pdf_page_images(current_file, tmpdir)
overlay_pdf_path = tmpdir / "overlay.pdf"
c = None
page_layouts = {page["page"]: page for page in source_layout.get("pages", [])}
for page_num, img_path in enumerate(images, start=1):
pdf_page = reader.pages[page_num - 1]
page_w = float(pdf_page.mediabox.width)
page_h = float(pdf_page.mediabox.height)
img = Image.open(img_path)
if c is None:
c = canvas.Canvas(str(overlay_pdf_path), pagesize=(page_w, page_h))
else:
c.setPageSize((page_w, page_h))
c.drawImage(ImageReader(str(img_path)), 0, 0, width=page_w, height=page_h)
page_layout = page_layouts.get(page_num, {"lines": []})
src_w = float(page_layout.get("image_width") or img.size[0])
src_h = float(page_layout.get("image_height") or img.size[1])
scale_x = page_w / src_w
scale_y = page_h / src_h
for line in page_layout.get("lines", []):
text_line = (line.get("text") or "").strip()
if not text_line:
continue
left, top, right, bottom = line["bbox"]
pdf_x = left * scale_x
pdf_y = page_h - (bottom * scale_y)
box_width = max(10.0, (right - left) * scale_x)
box_height = max(6.0, (bottom - top) * scale_y)
font_size = _fit_font_size(text_line, box_width, box_height)
text_obj = c.beginText()
text_obj.setTextRenderMode(3)
text_obj.setFont("Helvetica", font_size)
text_obj.setTextOrigin(pdf_x, pdf_y + 1)
text_obj.textLine(text_line)
c.drawText(text_obj)
c.showPage()
if c is None:
raise ValueError("Failed to build overlay PDF")
c.save()
shutil.copy2(overlay_pdf_path, out_path)
compress_pdf_with_ghostscript(out_path)
_write_pdf_metadata(out_path, document, next_version_number, "ocr_corrected")
file_hash = sha256_for_file(out_path)
try:
mirror_path = _mirror_to_secondary_owner(document, out_path)
share_path_value = str(mirror_path) if mirror_path else None
except Exception as e:
share_path_value = None
document.share_path = share_path_value
db.query(Document).filter(Document.id == document.id).update(
{"share_path": share_path_value},
synchronize_session=False,
)
try:
mirror_path = _mirror_to_secondary_owner(document, out_path)
document.share_path = str(mirror_path) if mirror_path else None
except Exception:
document.share_path = None
db.add(document)
version = DocumentVersion(
document_id=document.id,
version_number=next_version_number,
version_type="ocr_corrected",
file_path=str(out_path),
sha256=file_hash,
created_by="save_ocr_corrected_pdf",
notes="C1 output: rebuilt searchable PDF using reviewed text aligned to OCR line boxes.",
)
db.add(version)
document.current_path = str(out_path)
document.canonical_filename = out_path.name
document.sha256_current = file_hash
db.commit()
keep_paths = {str(out_path)}
if document.share_path:
keep_paths.add(str(document.share_path))
_prune_old_saved_files(db, document, keep_paths)
db.refresh(version)
return version
def create_field_enriched_pdf_version(db: Session, document: Document, output_path: Path | None = None) -> DocumentVersion:
if not document.current_path:
raise ValueError("Document has no current_path")
current_file = Path(document.current_path)
if not current_file.exists():
raise FileNotFoundError(f"Current file not found: {current_file}")
next_version_number = get_next_document_version_number(db, document.id)
if output_path is None:
out_path = _build_output_path(FIELD_ENRICHED_ROOT, document, "field_enriched", next_version_number)
else:
out_path = Path(output_path)
out_path = out_path.with_name(
re.sub(r"_v\d+(?=\.[^.]+$)", "", out_path.name)
)
out_path.parent.mkdir(parents=True, exist_ok=True)
if current_file.resolve() != out_path.resolve():
shutil.copy2(current_file, out_path)
_write_pdf_metadata(out_path, document, next_version_number, "field_enriched")
file_hash = sha256_for_file(out_path)
try:
mirror_path = _mirror_to_secondary_owner(document, out_path)
share_path_value = str(mirror_path) if mirror_path else None
except Exception:
share_path_value = None
document.share_path = share_path_value
db.add(document)
version = DocumentVersion(
document_id=document.id,
version_number=next_version_number,
version_type="field_enriched",
file_path=str(out_path),
sha256=file_hash,
created_by="save_field_enriched_pdf",
notes="Scaffold output: copied current file; extracted fields not yet embedded into PDF.",
)
db.add(version)
document.current_path = str(out_path)
document.canonical_filename = out_path.name
document.sha256_current = file_hash
db.commit()
keep_paths = {str(out_path)}
if document.share_path:
keep_paths.add(str(document.share_path))
_prune_old_saved_files(db, document, keep_paths)
db.refresh(version)
return version