577 lines
18 KiB
Python
577 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
import hashlib
|
|
import os
|
|
import re
|
|
import shutil
|
|
|
|
def _mirror_to_secondary_owner(document, canonical_path: Path) -> Path | None:
|
|
additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None
|
|
if not additional:
|
|
return None
|
|
|
|
owner_secondary = getattr(additional, "owner_secondary", None)
|
|
if not owner_secondary:
|
|
return None
|
|
|
|
from app.logic.storage_paths import (
|
|
_split_person_name,
|
|
to_owner_filepath_name,
|
|
build_proposed_storage_path,
|
|
)
|
|
from app.core.storage_settings import get_default_save_root
|
|
|
|
first, last = _split_person_name(owner_secondary)
|
|
owner_folder = to_owner_filepath_name(first, last)
|
|
if not owner_folder:
|
|
return None
|
|
|
|
save_root = get_default_save_root()
|
|
naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None
|
|
|
|
mirror_path = Path(
|
|
build_proposed_storage_path(
|
|
document=document,
|
|
save_root=save_root,
|
|
naming_row=naming_row,
|
|
)
|
|
)
|
|
|
|
# replace owner segment
|
|
parts = list(mirror_path.parts)
|
|
for i, p in enumerate(parts):
|
|
if p == "records" and i + 1 < len(parts):
|
|
parts[i + 1] = owner_folder
|
|
break
|
|
|
|
mirror_path = Path(*parts)
|
|
mirror_path = mirror_path.with_name(
|
|
re.sub(r"_v\d+(?=\.[^.]+$)", "", mirror_path.name)
|
|
)
|
|
mirror_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if canonical_path.resolve() != mirror_path.resolve():
|
|
import shutil
|
|
shutil.copy2(canonical_path, mirror_path)
|
|
|
|
return mirror_path
|
|
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
from PIL import Image
|
|
from pypdf import PdfReader, PdfWriter
|
|
from reportlab.lib.utils import ImageReader
|
|
from reportlab.pdfbase.pdfmetrics import stringWidth
|
|
from reportlab.pdfgen import canvas
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import FIELD_ENRICHED_ROOT, OCR_CORRECTED_ROOT
|
|
from app.models.document import Document
|
|
from app.models.document_version import DocumentVersion
|
|
from app.models.text_version import TextVersion
|
|
|
|
|
|
|
|
|
|
def _prune_old_saved_files(db: Session, document: Document, keep_paths: set[str]) -> None:
|
|
protected = {str(Path(p).resolve()) for p in keep_paths if p}
|
|
for p in [getattr(document, "source_path", None), getattr(document, "original_path", None)]:
|
|
if p:
|
|
protected.add(str(Path(p).resolve()))
|
|
|
|
prior_versions = (
|
|
db.query(DocumentVersion)
|
|
.filter(DocumentVersion.document_id == document.id)
|
|
.all()
|
|
)
|
|
|
|
candidate_paths: set[str] = set()
|
|
for version in prior_versions:
|
|
if version.file_path:
|
|
try:
|
|
candidate_paths.add(str(Path(version.file_path).resolve()))
|
|
except Exception:
|
|
candidate_paths.add(version.file_path)
|
|
|
|
for candidate in sorted(candidate_paths):
|
|
if candidate in protected:
|
|
continue
|
|
try:
|
|
candidate_path = Path(candidate)
|
|
if candidate_path.exists() and candidate_path.is_file():
|
|
candidate_path.unlink()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _build_pdf_keywords(document) -> str:
|
|
"""
|
|
Currently returns location-only keywords.
|
|
Easy to extend later.
|
|
"""
|
|
additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None
|
|
|
|
parts = []
|
|
|
|
if additional:
|
|
# adjust field names if needed
|
|
for field in ["location_city", "location_area", "location_name"]:
|
|
value = getattr(additional, field, None)
|
|
if value:
|
|
parts.append(str(value).strip().lower())
|
|
|
|
# dedupe while preserving order
|
|
seen = set()
|
|
clean = []
|
|
for p in parts:
|
|
if p and p not in seen:
|
|
seen.add(p)
|
|
clean.append(p)
|
|
|
|
return ", ".join(clean)
|
|
|
|
|
|
|
|
def _latest_additional(document):
|
|
rows = getattr(document, "additional_fields", None) or []
|
|
return rows[0] if rows else None
|
|
|
|
|
|
def _latest_extracted(document):
|
|
rows = getattr(document, "extracted_fields", None) or []
|
|
return rows[0] if rows else None
|
|
|
|
|
|
def _humanize_filename(path_obj: Path) -> str:
|
|
stem = path_obj.stem.replace("_", " ").replace("-", " ").strip()
|
|
stem = re.sub(r"\s+", " ", stem)
|
|
return stem.title()
|
|
|
|
|
|
def _build_pdf_title(document, out_path: Path) -> str:
|
|
return _humanize_filename(out_path)
|
|
|
|
|
|
def _build_pdf_author(document) -> str:
|
|
additional = _latest_additional(document)
|
|
owners = []
|
|
if additional:
|
|
for field in ["owner_primary", "owner_secondary"]:
|
|
value = getattr(additional, field, None)
|
|
if value:
|
|
owners.append(str(value).strip())
|
|
seen = set()
|
|
clean = []
|
|
for owner in owners:
|
|
key = owner.lower()
|
|
if key not in seen:
|
|
seen.add(key)
|
|
clean.append(owner)
|
|
return "; ".join(clean)
|
|
|
|
|
|
def _build_pdf_subject(document) -> str:
|
|
value = getattr(document, "document_type", None)
|
|
return str(value).replace("_", " ").title() if value else ""
|
|
|
|
|
|
def _build_pdf_keywords(document) -> str:
|
|
"""
|
|
Currently returns location-only keywords.
|
|
Easy to extend later.
|
|
"""
|
|
parts = []
|
|
|
|
extracted = _latest_extracted(document)
|
|
if extracted:
|
|
location = getattr(extracted, "location", None)
|
|
if location:
|
|
for chunk in re.split(r"[,;/|-]+", str(location)):
|
|
chunk = chunk.strip().lower()
|
|
if chunk:
|
|
parts.append(chunk)
|
|
|
|
seen = set()
|
|
clean = []
|
|
for p in parts:
|
|
if p and p not in seen:
|
|
seen.add(p)
|
|
clean.append(p)
|
|
|
|
return ", ".join(clean)
|
|
|
|
|
|
def _source_timestamp(document) -> datetime | None:
|
|
for attr in ["source_path", "original_path", "current_path"]:
|
|
value = getattr(document, attr, None)
|
|
if not value:
|
|
continue
|
|
try:
|
|
p = Path(value)
|
|
if p.exists():
|
|
return datetime.fromtimestamp(p.stat().st_mtime)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _pdf_date(dt: datetime | None) -> str:
|
|
if not dt:
|
|
dt = datetime.now()
|
|
return dt.strftime("D:%Y%m%d%H%M%S")
|
|
|
|
|
|
def _write_pdf_metadata(path_obj: Path, document, version_number: int, version_type: str) -> None:
|
|
reader = PdfReader(str(path_obj))
|
|
writer = PdfWriter()
|
|
for page in reader.pages:
|
|
writer.add_page(page)
|
|
|
|
now = datetime.now()
|
|
source_dt = _source_timestamp(document)
|
|
|
|
metadata = {
|
|
"/Title": _build_pdf_title(document, path_obj),
|
|
"/Author": _build_pdf_author(document),
|
|
"/Subject": _build_pdf_subject(document),
|
|
"/Keywords": _build_pdf_keywords(document),
|
|
"/Creator": "Document Processor",
|
|
"/Producer": "Document Processor",
|
|
"/CreationDate": _pdf_date(source_dt),
|
|
"/ModDate": _pdf_date(now),
|
|
"/DocumentID": str(getattr(document, "document_id", "") or ""),
|
|
"/VersionNumber": str(version_number),
|
|
"/VersionType": str(version_type),
|
|
}
|
|
|
|
writer.add_metadata({k: v for k, v in metadata.items() if v is not None})
|
|
|
|
tmp_path = path_obj.with_suffix(path_obj.suffix + ".meta.tmp")
|
|
with tmp_path.open("wb") as f:
|
|
writer.write(f)
|
|
tmp_path.replace(path_obj)
|
|
|
|
|
|
def sha256_for_file(path: Path) -> str:
|
|
hasher = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def compress_pdf_with_ghostscript(path: Path) -> bool:
|
|
compressed_path = path.with_suffix(".compressed.pdf")
|
|
|
|
try:
|
|
subprocess.run(
|
|
[
|
|
"gs",
|
|
"-sDEVICE=pdfwrite",
|
|
"-dCompatibilityLevel=1.4",
|
|
"-dPDFSETTINGS=/ebook",
|
|
"-dNOPAUSE",
|
|
"-dQUIET",
|
|
"-dBATCH",
|
|
f"-sOutputFile={compressed_path}",
|
|
str(path),
|
|
],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
if not compressed_path.exists() or compressed_path.stat().st_size == 0:
|
|
return False
|
|
|
|
original_size = path.stat().st_size if path.exists() else 0
|
|
compressed_size = compressed_path.stat().st_size
|
|
|
|
# Only replace if compression actually helped.
|
|
if original_size > 0 and compressed_size < original_size:
|
|
os.replace(compressed_path, path)
|
|
else:
|
|
compressed_path.unlink(missing_ok=True)
|
|
|
|
return True
|
|
except Exception:
|
|
compressed_path.unlink(missing_ok=True)
|
|
return False
|
|
|
|
|
|
def get_next_document_version_number(db: Session, document_id: int) -> int:
|
|
max_version = (
|
|
db.query(func.max(DocumentVersion.version_number))
|
|
.filter(DocumentVersion.document_id == document_id)
|
|
.scalar()
|
|
)
|
|
return (max_version or 0) + 1
|
|
|
|
|
|
def _build_output_path(root: str, document: Document, version_type: str, version_number: int) -> Path:
|
|
source = Path(document.current_path or "")
|
|
suffix = source.suffix.lower() if source.suffix else ".pdf"
|
|
filename = f"{document.document_id}_{version_type}_v{version_number}{suffix}"
|
|
return Path(root) / filename
|
|
|
|
|
|
def _latest_current_text_version(document: Document, version_type: str) -> TextVersion | None:
|
|
candidates = [tv for tv in document.text_versions if tv.version_type == version_type and tv.is_current]
|
|
if not candidates:
|
|
return None
|
|
return sorted(candidates, key=lambda x: (x.version_number, x.created_at), reverse=True)[0]
|
|
|
|
|
|
def _render_pdf_page_images(pdf_path: Path, tmpdir: Path) -> list[Path]:
|
|
prefix = tmpdir / "page"
|
|
subprocess.run(
|
|
["pdftoppm", "-r", "150", "-png", str(pdf_path), str(prefix)],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return sorted(tmpdir.glob("page-*.png"))
|
|
|
|
|
|
def _fit_font_size(text: str, box_width: float, box_height: float) -> float:
|
|
if not text:
|
|
return max(6.0, box_height * 0.80)
|
|
|
|
font_size = max(6.0, box_height * 0.88)
|
|
|
|
while font_size > 4.5 and stringWidth(text, "Helvetica", font_size) > box_width * 0.98:
|
|
font_size -= 0.25
|
|
|
|
min_reasonable = max(6.0, box_height * 0.68)
|
|
return max(min_reasonable, font_size)
|
|
|
|
|
|
def _flatten_layout_lines(layout_json: dict | None) -> list[dict]:
|
|
if not layout_json:
|
|
return []
|
|
|
|
flattened = []
|
|
for page in layout_json.get("pages", []):
|
|
for line in page.get("lines", []):
|
|
flattened.append(
|
|
{
|
|
"page": page["page"],
|
|
"bbox": line["bbox"],
|
|
"text": line.get("text", ""),
|
|
}
|
|
)
|
|
return flattened
|
|
|
|
|
|
def create_ocr_corrected_pdf_version(db: Session, document: Document, output_path: Path | None = None) -> DocumentVersion:
|
|
if not document.current_path:
|
|
raise ValueError("Document has no current_path")
|
|
|
|
current_file = Path(document.current_path)
|
|
if not current_file.exists():
|
|
raise FileNotFoundError(f"Current file not found: {current_file}")
|
|
|
|
raw_ocr = _latest_current_text_version(document, "raw_ocr")
|
|
reviewed = _latest_current_text_version(document, "reviewed")
|
|
|
|
if raw_ocr is None:
|
|
raise ValueError("No current raw OCR version found")
|
|
if reviewed is None:
|
|
raise ValueError("No current reviewed text found")
|
|
if current_file.suffix.lower() != ".pdf":
|
|
raise ValueError("C1 corrected PDF generation currently supports PDFs only")
|
|
|
|
raw_lines = _flatten_layout_lines(raw_ocr.layout_json)
|
|
reviewed_lines = _flatten_layout_lines(reviewed.layout_json)
|
|
|
|
if not raw_lines:
|
|
raise ValueError("No OCR line boxes found in raw OCR layout data")
|
|
|
|
if reviewed_lines and len(reviewed_lines) != len(raw_lines):
|
|
raise ValueError("Reviewed line layout does not match raw OCR line layout")
|
|
|
|
source_layout = reviewed.layout_json if reviewed.layout_json else raw_ocr.layout_json
|
|
if not source_layout:
|
|
raise ValueError("No source layout found")
|
|
|
|
next_version_number = get_next_document_version_number(db, document.id)
|
|
if output_path is None:
|
|
out_path = _build_output_path(OCR_CORRECTED_ROOT, document, "ocr_corrected", next_version_number)
|
|
else:
|
|
out_path = Path(output_path)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
reader = PdfReader(str(current_file))
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
tmpdir = Path(tmpdirname)
|
|
images = _render_pdf_page_images(current_file, tmpdir)
|
|
|
|
overlay_pdf_path = tmpdir / "overlay.pdf"
|
|
c = None
|
|
|
|
page_layouts = {page["page"]: page for page in source_layout.get("pages", [])}
|
|
|
|
for page_num, img_path in enumerate(images, start=1):
|
|
pdf_page = reader.pages[page_num - 1]
|
|
page_w = float(pdf_page.mediabox.width)
|
|
page_h = float(pdf_page.mediabox.height)
|
|
|
|
img = Image.open(img_path)
|
|
|
|
if c is None:
|
|
c = canvas.Canvas(str(overlay_pdf_path), pagesize=(page_w, page_h))
|
|
else:
|
|
c.setPageSize((page_w, page_h))
|
|
|
|
c.drawImage(ImageReader(str(img_path)), 0, 0, width=page_w, height=page_h)
|
|
|
|
page_layout = page_layouts.get(page_num, {"lines": []})
|
|
src_w = float(page_layout.get("image_width") or img.size[0])
|
|
src_h = float(page_layout.get("image_height") or img.size[1])
|
|
|
|
scale_x = page_w / src_w
|
|
scale_y = page_h / src_h
|
|
|
|
for line in page_layout.get("lines", []):
|
|
text_line = (line.get("text") or "").strip()
|
|
if not text_line:
|
|
continue
|
|
|
|
left, top, right, bottom = line["bbox"]
|
|
|
|
pdf_x = left * scale_x
|
|
pdf_y = page_h - (bottom * scale_y)
|
|
box_width = max(10.0, (right - left) * scale_x)
|
|
box_height = max(6.0, (bottom - top) * scale_y)
|
|
|
|
font_size = _fit_font_size(text_line, box_width, box_height)
|
|
|
|
text_obj = c.beginText()
|
|
text_obj.setTextRenderMode(3)
|
|
text_obj.setFont("Helvetica", font_size)
|
|
text_obj.setTextOrigin(pdf_x, pdf_y + 1)
|
|
text_obj.textLine(text_line)
|
|
c.drawText(text_obj)
|
|
|
|
c.showPage()
|
|
|
|
if c is None:
|
|
raise ValueError("Failed to build overlay PDF")
|
|
|
|
c.save()
|
|
shutil.copy2(overlay_pdf_path, out_path)
|
|
|
|
compress_pdf_with_ghostscript(out_path)
|
|
_write_pdf_metadata(out_path, document, next_version_number, "ocr_corrected")
|
|
|
|
file_hash = sha256_for_file(out_path)
|
|
try:
|
|
mirror_path = _mirror_to_secondary_owner(document, out_path)
|
|
share_path_value = str(mirror_path) if mirror_path else None
|
|
except Exception as e:
|
|
share_path_value = None
|
|
|
|
document.share_path = share_path_value
|
|
db.query(Document).filter(Document.id == document.id).update(
|
|
{"share_path": share_path_value},
|
|
synchronize_session=False,
|
|
)
|
|
try:
|
|
mirror_path = _mirror_to_secondary_owner(document, out_path)
|
|
document.share_path = str(mirror_path) if mirror_path else None
|
|
except Exception:
|
|
document.share_path = None
|
|
|
|
db.add(document)
|
|
|
|
|
|
version = DocumentVersion(
|
|
document_id=document.id,
|
|
version_number=next_version_number,
|
|
version_type="ocr_corrected",
|
|
file_path=str(out_path),
|
|
sha256=file_hash,
|
|
created_by="save_ocr_corrected_pdf",
|
|
notes="C1 output: rebuilt searchable PDF using reviewed text aligned to OCR line boxes.",
|
|
)
|
|
db.add(version)
|
|
|
|
document.current_path = str(out_path)
|
|
document.canonical_filename = out_path.name
|
|
document.sha256_current = file_hash
|
|
|
|
db.commit()
|
|
|
|
keep_paths = {str(out_path)}
|
|
if document.share_path:
|
|
keep_paths.add(str(document.share_path))
|
|
_prune_old_saved_files(db, document, keep_paths)
|
|
|
|
db.refresh(version)
|
|
return version
|
|
|
|
|
|
def create_field_enriched_pdf_version(db: Session, document: Document, output_path: Path | None = None) -> DocumentVersion:
|
|
if not document.current_path:
|
|
raise ValueError("Document has no current_path")
|
|
|
|
current_file = Path(document.current_path)
|
|
if not current_file.exists():
|
|
raise FileNotFoundError(f"Current file not found: {current_file}")
|
|
|
|
next_version_number = get_next_document_version_number(db, document.id)
|
|
if output_path is None:
|
|
out_path = _build_output_path(FIELD_ENRICHED_ROOT, document, "field_enriched", next_version_number)
|
|
else:
|
|
out_path = Path(output_path)
|
|
|
|
out_path = out_path.with_name(
|
|
re.sub(r"_v\d+(?=\.[^.]+$)", "", out_path.name)
|
|
)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if current_file.resolve() != out_path.resolve():
|
|
shutil.copy2(current_file, out_path)
|
|
|
|
_write_pdf_metadata(out_path, document, next_version_number, "field_enriched")
|
|
file_hash = sha256_for_file(out_path)
|
|
|
|
try:
|
|
mirror_path = _mirror_to_secondary_owner(document, out_path)
|
|
share_path_value = str(mirror_path) if mirror_path else None
|
|
except Exception:
|
|
share_path_value = None
|
|
|
|
document.share_path = share_path_value
|
|
db.add(document)
|
|
|
|
version = DocumentVersion(
|
|
document_id=document.id,
|
|
version_number=next_version_number,
|
|
version_type="field_enriched",
|
|
file_path=str(out_path),
|
|
sha256=file_hash,
|
|
created_by="save_field_enriched_pdf",
|
|
notes="Scaffold output: copied current file; extracted fields not yet embedded into PDF.",
|
|
)
|
|
db.add(version)
|
|
|
|
document.current_path = str(out_path)
|
|
document.canonical_filename = out_path.name
|
|
document.sha256_current = file_hash
|
|
|
|
db.commit()
|
|
|
|
keep_paths = {str(out_path)}
|
|
if document.share_path:
|
|
keep_paths.add(str(document.share_path))
|
|
_prune_old_saved_files(db, document, keep_paths)
|
|
|
|
db.refresh(version)
|
|
return version
|