from __future__ import annotations from datetime import datetime import hashlib import os import re import shutil def _mirror_to_secondary_owner(document, canonical_path: Path) -> Path | None: additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None if not additional: return None owner_secondary = getattr(additional, "owner_secondary", None) if not owner_secondary: return None from app.logic.storage_paths import ( _split_person_name, to_owner_filepath_name, build_proposed_storage_path, ) from app.core.storage_settings import get_default_save_root first, last = _split_person_name(owner_secondary) owner_folder = to_owner_filepath_name(first, last) if not owner_folder: return None save_root = get_default_save_root() naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None mirror_path = Path( build_proposed_storage_path( document=document, save_root=save_root, naming_row=naming_row, ) ) # replace owner segment parts = list(mirror_path.parts) for i, p in enumerate(parts): if p == "records" and i + 1 < len(parts): parts[i + 1] = owner_folder break mirror_path = Path(*parts) mirror_path = mirror_path.with_name( re.sub(r"_v\d+(?=\.[^.]+$)", "", mirror_path.name) ) mirror_path.parent.mkdir(parents=True, exist_ok=True) if canonical_path.resolve() != mirror_path.resolve(): import shutil shutil.copy2(canonical_path, mirror_path) return mirror_path import subprocess import tempfile from pathlib import Path from PIL import Image from pypdf import PdfReader, PdfWriter from reportlab.lib.utils import ImageReader from reportlab.pdfbase.pdfmetrics import stringWidth from reportlab.pdfgen import canvas from sqlalchemy import func from sqlalchemy.orm import Session from app.core.config import FIELD_ENRICHED_ROOT, OCR_CORRECTED_ROOT from app.models.document import Document from app.models.document_version import DocumentVersion from app.models.text_version import TextVersion def _prune_old_saved_files(db: Session, document: Document, keep_paths: set[str]) -> None: protected = {str(Path(p).resolve()) for p in keep_paths if p} for p in [getattr(document, "source_path", None), getattr(document, "original_path", None)]: if p: protected.add(str(Path(p).resolve())) prior_versions = ( db.query(DocumentVersion) .filter(DocumentVersion.document_id == document.id) .all() ) candidate_paths: set[str] = set() for version in prior_versions: if version.file_path: try: candidate_paths.add(str(Path(version.file_path).resolve())) except Exception: candidate_paths.add(version.file_path) for candidate in sorted(candidate_paths): if candidate in protected: continue try: candidate_path = Path(candidate) if candidate_path.exists() and candidate_path.is_file(): candidate_path.unlink() except Exception: pass def _build_pdf_keywords(document) -> str: """ Currently returns location-only keywords. Easy to extend later. """ additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None parts = [] if additional: # adjust field names if needed for field in ["location_city", "location_area", "location_name"]: value = getattr(additional, field, None) if value: parts.append(str(value).strip().lower()) # dedupe while preserving order seen = set() clean = [] for p in parts: if p and p not in seen: seen.add(p) clean.append(p) return ", ".join(clean) def _latest_additional(document): rows = getattr(document, "additional_fields", None) or [] return rows[0] if rows else None def _latest_extracted(document): rows = getattr(document, "extracted_fields", None) or [] return rows[0] if rows else None def _humanize_filename(path_obj: Path) -> str: stem = path_obj.stem.replace("_", " ").replace("-", " ").strip() stem = re.sub(r"\s+", " ", stem) return stem.title() def _build_pdf_title(document, out_path: Path) -> str: return _humanize_filename(out_path) def _build_pdf_author(document) -> str: additional = _latest_additional(document) owners = [] if additional: for field in ["owner_primary", "owner_secondary"]: value = getattr(additional, field, None) if value: owners.append(str(value).strip()) seen = set() clean = [] for owner in owners: key = owner.lower() if key not in seen: seen.add(key) clean.append(owner) return "; ".join(clean) def _build_pdf_subject(document) -> str: value = getattr(document, "document_type", None) return str(value).replace("_", " ").title() if value else "" def _build_pdf_keywords(document) -> str: """ Currently returns location-only keywords. Easy to extend later. """ parts = [] extracted = _latest_extracted(document) if extracted: location = getattr(extracted, "location", None) if location: for chunk in re.split(r"[,;/|-]+", str(location)): chunk = chunk.strip().lower() if chunk: parts.append(chunk) seen = set() clean = [] for p in parts: if p and p not in seen: seen.add(p) clean.append(p) return ", ".join(clean) def _source_timestamp(document) -> datetime | None: for attr in ["source_path", "original_path", "current_path"]: value = getattr(document, attr, None) if not value: continue try: p = Path(value) if p.exists(): return datetime.fromtimestamp(p.stat().st_mtime) except Exception: pass return None def _pdf_date(dt: datetime | None) -> str: if not dt: dt = datetime.now() return dt.strftime("D:%Y%m%d%H%M%S") def _write_pdf_metadata(path_obj: Path, document, version_number: int, version_type: str) -> None: reader = PdfReader(str(path_obj)) writer = PdfWriter() for page in reader.pages: writer.add_page(page) now = datetime.now() source_dt = _source_timestamp(document) metadata = { "/Title": _build_pdf_title(document, path_obj), "/Author": _build_pdf_author(document), "/Subject": _build_pdf_subject(document), "/Keywords": _build_pdf_keywords(document), "/Creator": "Document Processor", "/Producer": "Document Processor", "/CreationDate": _pdf_date(source_dt), "/ModDate": _pdf_date(now), "/DocumentID": str(getattr(document, "document_id", "") or ""), "/VersionNumber": str(version_number), "/VersionType": str(version_type), } writer.add_metadata({k: v for k, v in metadata.items() if v is not None}) tmp_path = path_obj.with_suffix(path_obj.suffix + ".meta.tmp") with tmp_path.open("wb") as f: writer.write(f) tmp_path.replace(path_obj) def sha256_for_file(path: Path) -> str: hasher = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): hasher.update(chunk) return hasher.hexdigest() def compress_pdf_with_ghostscript(path: Path) -> bool: compressed_path = path.with_suffix(".compressed.pdf") try: subprocess.run( [ "gs", "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.4", "-dPDFSETTINGS=/ebook", "-dNOPAUSE", "-dQUIET", "-dBATCH", f"-sOutputFile={compressed_path}", str(path), ], check=True, capture_output=True, text=True, ) if not compressed_path.exists() or compressed_path.stat().st_size == 0: return False original_size = path.stat().st_size if path.exists() else 0 compressed_size = compressed_path.stat().st_size # Only replace if compression actually helped. if original_size > 0 and compressed_size < original_size: os.replace(compressed_path, path) else: compressed_path.unlink(missing_ok=True) return True except Exception: compressed_path.unlink(missing_ok=True) return False def get_next_document_version_number(db: Session, document_id: int) -> int: max_version = ( db.query(func.max(DocumentVersion.version_number)) .filter(DocumentVersion.document_id == document_id) .scalar() ) return (max_version or 0) + 1 def _build_output_path(root: str, document: Document, version_type: str, version_number: int) -> Path: source = Path(document.current_path or "") suffix = source.suffix.lower() if source.suffix else ".pdf" filename = f"{document.document_id}_{version_type}_v{version_number}{suffix}" return Path(root) / filename def _latest_current_text_version(document: Document, version_type: str) -> TextVersion | None: candidates = [tv for tv in document.text_versions if tv.version_type == version_type and tv.is_current] if not candidates: return None return sorted(candidates, key=lambda x: (x.version_number, x.created_at), reverse=True)[0] def _render_pdf_page_images(pdf_path: Path, tmpdir: Path) -> list[Path]: prefix = tmpdir / "page" subprocess.run( ["pdftoppm", "-r", "150", "-png", str(pdf_path), str(prefix)], capture_output=True, text=True, check=True, ) return sorted(tmpdir.glob("page-*.png")) def _fit_font_size(text: str, box_width: float, box_height: float) -> float: if not text: return max(6.0, box_height * 0.80) font_size = max(6.0, box_height * 0.88) while font_size > 4.5 and stringWidth(text, "Helvetica", font_size) > box_width * 0.98: font_size -= 0.25 min_reasonable = max(6.0, box_height * 0.68) return max(min_reasonable, font_size) def _flatten_layout_lines(layout_json: dict | None) -> list[dict]: if not layout_json: return [] flattened = [] for page in layout_json.get("pages", []): for line in page.get("lines", []): flattened.append( { "page": page["page"], "bbox": line["bbox"], "text": line.get("text", ""), } ) return flattened def create_ocr_corrected_pdf_version(db: Session, document: Document, output_path: Path | None = None) -> DocumentVersion: if not document.current_path: raise ValueError("Document has no current_path") current_file = Path(document.current_path) if not current_file.exists(): raise FileNotFoundError(f"Current file not found: {current_file}") raw_ocr = _latest_current_text_version(document, "raw_ocr") reviewed = _latest_current_text_version(document, "reviewed") if raw_ocr is None: raise ValueError("No current raw OCR version found") if reviewed is None: raise ValueError("No current reviewed text found") if current_file.suffix.lower() != ".pdf": raise ValueError("C1 corrected PDF generation currently supports PDFs only") raw_lines = _flatten_layout_lines(raw_ocr.layout_json) reviewed_lines = _flatten_layout_lines(reviewed.layout_json) if not raw_lines: raise ValueError("No OCR line boxes found in raw OCR layout data") if reviewed_lines and len(reviewed_lines) != len(raw_lines): raise ValueError("Reviewed line layout does not match raw OCR line layout") source_layout = reviewed.layout_json if reviewed.layout_json else raw_ocr.layout_json if not source_layout: raise ValueError("No source layout found") next_version_number = get_next_document_version_number(db, document.id) if output_path is None: out_path = _build_output_path(OCR_CORRECTED_ROOT, document, "ocr_corrected", next_version_number) else: out_path = Path(output_path) out_path.parent.mkdir(parents=True, exist_ok=True) reader = PdfReader(str(current_file)) with tempfile.TemporaryDirectory() as tmpdirname: tmpdir = Path(tmpdirname) images = _render_pdf_page_images(current_file, tmpdir) overlay_pdf_path = tmpdir / "overlay.pdf" c = None page_layouts = {page["page"]: page for page in source_layout.get("pages", [])} for page_num, img_path in enumerate(images, start=1): pdf_page = reader.pages[page_num - 1] page_w = float(pdf_page.mediabox.width) page_h = float(pdf_page.mediabox.height) img = Image.open(img_path) if c is None: c = canvas.Canvas(str(overlay_pdf_path), pagesize=(page_w, page_h)) else: c.setPageSize((page_w, page_h)) c.drawImage(ImageReader(str(img_path)), 0, 0, width=page_w, height=page_h) page_layout = page_layouts.get(page_num, {"lines": []}) src_w = float(page_layout.get("image_width") or img.size[0]) src_h = float(page_layout.get("image_height") or img.size[1]) scale_x = page_w / src_w scale_y = page_h / src_h for line in page_layout.get("lines", []): text_line = (line.get("text") or "").strip() if not text_line: continue left, top, right, bottom = line["bbox"] pdf_x = left * scale_x pdf_y = page_h - (bottom * scale_y) box_width = max(10.0, (right - left) * scale_x) box_height = max(6.0, (bottom - top) * scale_y) font_size = _fit_font_size(text_line, box_width, box_height) text_obj = c.beginText() text_obj.setTextRenderMode(3) text_obj.setFont("Helvetica", font_size) text_obj.setTextOrigin(pdf_x, pdf_y + 1) text_obj.textLine(text_line) c.drawText(text_obj) c.showPage() if c is None: raise ValueError("Failed to build overlay PDF") c.save() shutil.copy2(overlay_pdf_path, out_path) compress_pdf_with_ghostscript(out_path) _write_pdf_metadata(out_path, document, next_version_number, "ocr_corrected") file_size = out_path.stat().st_size file_hash = sha256_for_file(out_path) try: mirror_path = _mirror_to_secondary_owner(document, out_path) share_path_value = str(mirror_path) if mirror_path else None except Exception as e: share_path_value = None document.share_path = share_path_value db.query(Document).filter(Document.id == document.id).update( {"share_path": share_path_value}, synchronize_session=False, ) try: mirror_path = _mirror_to_secondary_owner(document, out_path) document.share_path = str(mirror_path) if mirror_path else None except Exception: document.share_path = None db.add(document) version = DocumentVersion( document_id=document.id, version_number=next_version_number, version_type="ocr_corrected", file_path=str(out_path), sha256=file_hash, file_size_bytes=file_size, created_by="save_ocr_corrected_pdf", notes="C1 output: rebuilt searchable PDF using reviewed text aligned to OCR line boxes.", ) db.add(version) document.current_path = str(out_path) document.canonical_filename = out_path.name document.sha256_current = file_hash db.commit() keep_paths = {str(out_path)} if document.share_path: keep_paths.add(str(document.share_path)) _prune_old_saved_files(db, document, keep_paths) db.refresh(version) return version def create_field_enriched_pdf_version(db: Session, document: Document, output_path: Path | None = None) -> DocumentVersion: if not document.current_path: raise ValueError("Document has no current_path") current_file = Path(document.current_path) if not current_file.exists(): raise FileNotFoundError(f"Current file not found: {current_file}") next_version_number = get_next_document_version_number(db, document.id) if output_path is None: out_path = _build_output_path(FIELD_ENRICHED_ROOT, document, "field_enriched", next_version_number) else: out_path = Path(output_path) out_path = out_path.with_name( re.sub(r"_v\d+(?=\.[^.]+$)", "", out_path.name) ) out_path.parent.mkdir(parents=True, exist_ok=True) if current_file.resolve() != out_path.resolve(): shutil.copy2(current_file, out_path) _write_pdf_metadata(out_path, document, next_version_number, "field_enriched") file_size = out_path.stat().st_size file_hash = sha256_for_file(out_path) try: mirror_path = _mirror_to_secondary_owner(document, out_path) share_path_value = str(mirror_path) if mirror_path else None except Exception: share_path_value = None document.share_path = share_path_value db.add(document) version = DocumentVersion( document_id=document.id, version_number=next_version_number, version_type="field_enriched", file_path=str(out_path), sha256=file_hash, file_size_bytes=file_size, created_by="save_field_enriched_pdf", notes="Scaffold output: copied current file; extracted fields not yet embedded into PDF.", ) db.add(version) document.current_path = str(out_path) document.canonical_filename = out_path.name document.sha256_current = file_hash db.commit() keep_paths = {str(out_path)} if document.share_path: keep_paths.add(str(document.share_path)) _prune_old_saved_files(db, document, keep_paths) db.refresh(version) return version