from __future__ import annotations from datetime import datetime import hashlib import os import re import shutil def _mirror_to_secondary_owner(document, canonical_path: Path) -> Path | None: additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None if not additional: return None owner_secondary = getattr(additional, "owner_secondary", None) if not owner_secondary: return None from app.logic.storage_paths import ( _split_person_name, to_owner_filepath_name, build_proposed_storage_path, ) from app.core.storage_settings import get_default_save_root first, last = _split_person_name(owner_secondary) owner_folder = to_owner_filepath_name(first, last) if not owner_folder: return None save_root = get_default_save_root() naming_row = document.naming_fields[0] if getattr(document, "naming_fields", None) else None mirror_path = Path( build_proposed_storage_path( document=document, save_root=save_root, naming_row=naming_row, ) ) # replace owner segment parts = list(mirror_path.parts) for i, p in enumerate(parts): if p == "records" and i + 1 < len(parts): parts[i + 1] = owner_folder break mirror_path = Path(*parts) mirror_path = mirror_path.with_name( re.sub(r"_v\d+(?=\.[^.]+$)", "", mirror_path.name) ) mirror_path.parent.mkdir(parents=True, exist_ok=True) if canonical_path.resolve() != mirror_path.resolve(): import shutil shutil.copy2(canonical_path, mirror_path) return mirror_path import subprocess import tempfile from pathlib import Path from PIL import Image from pypdf import PdfReader, PdfWriter from reportlab.lib.utils import ImageReader from reportlab.pdfbase.pdfmetrics import stringWidth from reportlab.pdfgen import canvas from sqlalchemy import func from sqlalchemy.orm import Session from app.core.config import FIELD_ENRICHED_ROOT, OCR_CORRECTED_ROOT from app.models.document import Document from app.models.document_version import DocumentVersion from app.models.document_replica_layout_version import DocumentReplicaLayoutVersion from app.models.document_replica_output import DocumentReplicaOutput from app.models.document_replica_review_state import DocumentReplicaReviewState from app.models.text_version import TextVersion def _prune_old_saved_files(db: Session, document: Document, keep_paths: set[str]) -> None: protected = {str(Path(p).resolve()) for p in keep_paths if p} for p in [getattr(document, "source_path", None), getattr(document, "original_path", None)]: if p: protected.add(str(Path(p).resolve())) prior_versions = ( db.query(DocumentVersion) .filter(DocumentVersion.document_id == document.id) .all() ) candidate_paths: set[str] = set() for version in prior_versions: if version.file_path: try: candidate_paths.add(str(Path(version.file_path).resolve())) except Exception: candidate_paths.add(version.file_path) for candidate in sorted(candidate_paths): if candidate in protected: continue try: candidate_path = Path(candidate) if candidate_path.exists() and candidate_path.is_file(): candidate_path.unlink() except Exception: pass def _build_pdf_keywords(document) -> str: """ Currently returns location-only keywords. Easy to extend later. """ additional = document.additional_fields[0] if getattr(document, "additional_fields", None) else None parts = [] if additional: # adjust field names if needed for field in ["location_city", "location_area", "location_name"]: value = getattr(additional, field, None) if value: parts.append(str(value).strip().lower()) # dedupe while preserving order seen = set() clean = [] for p in parts: if p and p not in seen: seen.add(p) clean.append(p) return ", ".join(clean) def _latest_additional(document): rows = getattr(document, "additional_fields", None) or [] return rows[0] if rows else None def _latest_extracted(document): rows = getattr(document, "extracted_fields", None) or [] return rows[0] if rows else None def _humanize_filename(path_obj: Path) -> str: stem = path_obj.stem.replace("_", " ").replace("-", " ").strip() stem = re.sub(r"\s+", " ", stem) return stem.title() def _build_pdf_title(document, out_path: Path) -> str: return _humanize_filename(out_path) def _build_pdf_author(document) -> str: additional = _latest_additional(document) owners = [] if additional: for field in ["owner_primary", "owner_secondary"]: value = getattr(additional, field, None) if value: owners.append(str(value).strip()) seen = set() clean = [] for owner in owners: key = owner.lower() if key not in seen: seen.add(key) clean.append(owner) return "; ".join(clean) def _build_pdf_subject(document) -> str: value = getattr(document, "document_type", None) return str(value).replace("_", " ").title() if value else "" def _build_pdf_keywords(document) -> str: """ Currently returns location-only keywords. Easy to extend later. """ parts = [] extracted = _latest_extracted(document) if extracted: location = getattr(extracted, "location", None) if location: for chunk in re.split(r"[,;/|-]+", str(location)): chunk = chunk.strip().lower() if chunk: parts.append(chunk) seen = set() clean = [] for p in parts: if p and p not in seen: seen.add(p) clean.append(p) return ", ".join(clean) def _source_timestamp(document) -> datetime | None: for attr in ["source_path", "original_path", "current_path"]: value = getattr(document, attr, None) if not value: continue try: p = Path(value) if p.exists(): return datetime.fromtimestamp(p.stat().st_mtime) except Exception: pass return None def _pdf_date(dt: datetime | None) -> str: if not dt: dt = datetime.now() return dt.strftime("D:%Y%m%d%H%M%S") def _write_pdf_metadata(path_obj: Path, document, version_number: int, version_type: str) -> None: reader = PdfReader(str(path_obj)) writer = PdfWriter() for page in reader.pages: writer.add_page(page) now = datetime.now() source_dt = _source_timestamp(document) metadata = { "/Title": _build_pdf_title(document, path_obj), "/Author": _build_pdf_author(document), "/Subject": _build_pdf_subject(document), "/Keywords": _build_pdf_keywords(document), "/Creator": "Document Processor", "/Producer": "Document Processor", "/CreationDate": _pdf_date(source_dt), "/ModDate": _pdf_date(now), "/DocumentID": str(getattr(document, "document_id", "") or ""), "/VersionNumber": str(version_number), "/VersionType": str(version_type), } writer.add_metadata({k: v for k, v in metadata.items() if v is not None}) tmp_path = path_obj.with_suffix(path_obj.suffix + ".meta.tmp") with tmp_path.open("wb") as f: writer.write(f) tmp_path.replace(path_obj) def sha256_for_file(path: Path) -> str: hasher = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): hasher.update(chunk) return hasher.hexdigest() def compress_pdf_with_ghostscript(path: Path) -> bool: compressed_path = path.with_suffix(".compressed.pdf") try: subprocess.run( [ "gs", "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.4", "-dPDFSETTINGS=/ebook", "-dNOPAUSE", "-dQUIET", "-dBATCH", f"-sOutputFile={compressed_path}", str(path), ], check=True, capture_output=True, text=True, ) if not compressed_path.exists() or compressed_path.stat().st_size == 0: return False original_size = path.stat().st_size if path.exists() else 0 compressed_size = compressed_path.stat().st_size # Only replace if compression actually helped. if original_size > 0 and compressed_size < original_size: os.replace(compressed_path, path) else: compressed_path.unlink(missing_ok=True) return True except Exception: compressed_path.unlink(missing_ok=True) return False def get_next_document_version_number(db: Session, document_id: int) -> int: max_version = ( db.query(func.max(DocumentVersion.version_number)) .filter(DocumentVersion.document_id == document_id) .scalar() ) return (max_version or 0) + 1 def _build_output_path(root: str, document: Document, version_type: str, version_number: int) -> Path: source = Path(document.current_path or "") suffix = source.suffix.lower() if source.suffix else ".pdf" filename = f"{document.document_id}_{version_type}_v{version_number}{suffix}" return Path(root) / filename def _latest_current_text_version(document: Document, version_type: str) -> TextVersion | None: candidates = [tv for tv in document.text_versions if tv.version_type == version_type and tv.is_current] if not candidates: return None return sorted(candidates, key=lambda x: (x.version_number, x.created_at), reverse=True)[0] def _render_pdf_page_images(pdf_path: Path, tmpdir: Path) -> list[Path]: prefix = tmpdir / "page" subprocess.run( ["pdftoppm", "-r", "150", "-png", str(pdf_path), str(prefix)], capture_output=True, text=True, check=True, ) return sorted(tmpdir.glob("page-*.png")) def _fit_font_size(text: str, box_width: float, box_height: float) -> float: if not text: return max(6.0, box_height * 0.80) font_size = max(6.0, box_height * 0.88) while font_size > 4.5 and stringWidth(text, "Helvetica", font_size) > box_width * 0.98: font_size -= 0.25 min_reasonable = max(6.0, box_height * 0.68) return max(min_reasonable, font_size) def _fit_font_size_for_bbox_text(text: str, box_width: float, box_height: float) -> float: text = (text or "").strip() if not text: return 8.0 approx = min(max(box_height * 0.8, 4.0), 18.0) if len(text) <= 2: return approx width_limited = max(4.0, box_width / max(len(text) * 0.55, 1.0)) return min(approx, width_limited, box_height * 0.9) def _build_word_entries_for_page(page_layout: dict, page_h: float) -> list[dict]: entries = [] for word in page_layout.get("words", []) or []: word_text = (word.get("text") or "").strip() bbox = word.get("bbox") if not word_text: continue if not bbox or not isinstance(bbox, (list, tuple)) or len(bbox) != 4: continue try: left, top, right, bottom = [float(v) for v in bbox] except (TypeError, ValueError): continue if right <= left or bottom <= top: continue box_width = max(1.0, right - left) box_height = max(1.0, bottom - top) entries.append( { "text": word_text, "pdf_x": left, "pdf_y": page_h - bottom, "box_width": box_width, "box_height": box_height, "font_family_guess": "Helvetica", "font_size_guess": _fit_font_size_for_bbox_text(word_text, box_width, box_height), "text_color_guess": "#000000", "text_render_mode_clean": 0, "text_render_mode_scan_backed": 3, "bbox_source": [left, top, right, bottom], } ) return entries def _page_layout_line_entries(page_layout: dict) -> list[dict]: region_lines = [] for region in page_layout.get("regions", []) or []: region_lines.extend(region.get("lines", []) or []) if region_lines: return region_lines return page_layout.get("lines", []) or [] def _flatten_layout_lines(layout_json: dict | None) -> list[dict]: if not layout_json: return [] flattened = [] for page in layout_json.get("pages", []): for line in page.get("lines", []): flattened.append( { "page": page["page"], "bbox": line["bbox"], "text": line.get("text", ""), } ) return flattened def create_ocr_corrected_pdf_version(db: Session, document: Document, output_path: Path | None = None) -> DocumentVersion: if not document.current_path: raise ValueError("Document has no current_path") current_file = Path(document.current_path) if not current_file.exists(): raise FileNotFoundError(f"Current file not found: {current_file}") raw_ocr = _latest_current_text_version(document, "raw_ocr") reviewed = _latest_current_text_version(document, "reviewed") if raw_ocr is None: raise ValueError("No current raw OCR version found") if reviewed is None: raise ValueError("No current reviewed text found") if current_file.suffix.lower() != ".pdf": raise ValueError("C1 corrected PDF generation currently supports PDFs only") raw_lines = _flatten_layout_lines(raw_ocr.layout_json) reviewed_lines = _flatten_layout_lines(reviewed.layout_json) if not raw_lines: raise ValueError("No OCR line boxes found in raw OCR layout data") if reviewed_lines and len(reviewed_lines) != len(raw_lines): raise ValueError("Reviewed line layout does not match raw OCR line layout") source_layout = reviewed.layout_json if reviewed.layout_json else raw_ocr.layout_json if not source_layout: raise ValueError("No source layout found") next_version_number = get_next_document_version_number(db, document.id) if output_path is None: out_path = _build_output_path(OCR_CORRECTED_ROOT, document, "ocr_corrected", next_version_number) else: out_path = Path(output_path) out_path.parent.mkdir(parents=True, exist_ok=True) reader = PdfReader(str(current_file)) with tempfile.TemporaryDirectory() as tmpdirname: tmpdir = Path(tmpdirname) images = _render_pdf_page_images(current_file, tmpdir) overlay_pdf_path = tmpdir / "overlay.pdf" c = None page_layouts = {page["page"]: page for page in source_layout.get("pages", [])} for page_num, img_path in enumerate(images, start=1): pdf_page = reader.pages[page_num - 1] page_w = float(pdf_page.mediabox.width) page_h = float(pdf_page.mediabox.height) img = Image.open(img_path) if c is None: c = canvas.Canvas(str(overlay_pdf_path), pagesize=(page_w, page_h)) else: c.setPageSize((page_w, page_h)) c.drawImage(ImageReader(str(img_path)), 0, 0, width=page_w, height=page_h) page_layout = page_layouts.get(page_num, {"lines": []}) src_w = float(page_layout.get("image_width") or img.size[0]) src_h = float(page_layout.get("image_height") or img.size[1]) scale_x = page_w / src_w scale_y = page_h / src_h for line in page_layout.get("lines", []): text_line = (line.get("text") or "").strip() if not text_line: continue bbox = line.get("bbox") if not bbox or not isinstance(bbox, (list, tuple)) or len(bbox) != 4: continue try: left, top, right, bottom = [float(v) for v in bbox] except (TypeError, ValueError): continue if right <= left or bottom <= top: continue pdf_x = left * scale_x pdf_y = page_h - (bottom * scale_y) box_width = max(10.0, (right - left) * scale_x) box_height = max(6.0, (bottom - top) * scale_y) box_height = max(6.0, (bottom - top) * scale_y) font_size = _fit_font_size(text_line, box_width, box_height) text_obj = c.beginText() text_obj.setTextRenderMode(3) text_obj.setFont("Helvetica", font_size) text_obj.setTextOrigin(pdf_x, pdf_y + 1) text_obj.textLine(text_line) c.drawText(text_obj) c.showPage() if c is None: raise ValueError("Failed to build overlay PDF") c.save() shutil.copy2(overlay_pdf_path, out_path) compress_pdf_with_ghostscript(out_path) _write_pdf_metadata(out_path, document, next_version_number, "ocr_corrected") file_size = out_path.stat().st_size file_hash = sha256_for_file(out_path) try: mirror_path = _mirror_to_secondary_owner(document, out_path) share_path_value = str(mirror_path) if mirror_path else None except Exception as e: share_path_value = None document.share_path = share_path_value db.query(Document).filter(Document.id == document.id).update( {"share_path": share_path_value}, synchronize_session=False, ) try: mirror_path = _mirror_to_secondary_owner(document, out_path) document.share_path = str(mirror_path) if mirror_path else None except Exception: document.share_path = None db.add(document) version = DocumentVersion( document_id=document.id, version_number=next_version_number, version_type="ocr_corrected", file_path=str(out_path), sha256=file_hash, file_size_bytes=file_size, created_by="save_ocr_corrected_pdf", notes="C1 output: rebuilt searchable PDF using reviewed text aligned to OCR line boxes.", ) db.add(version) document.current_path = str(out_path) document.canonical_filename = out_path.name document.sha256_current = file_hash db.commit() keep_paths = {str(out_path)} if document.share_path: keep_paths.add(str(document.share_path)) _prune_old_saved_files(db, document, keep_paths) db.refresh(version) return version def create_field_enriched_pdf_version(db: Session, document: Document, output_path: Path | None = None) -> DocumentVersion: if not document.current_path: raise ValueError("Document has no current_path") current_file = Path(document.current_path) if not current_file.exists(): raise FileNotFoundError(f"Current file not found: {current_file}") next_version_number = get_next_document_version_number(db, document.id) if output_path is None: out_path = _build_output_path(FIELD_ENRICHED_ROOT, document, "field_enriched", next_version_number) else: out_path = Path(output_path) out_path = out_path.with_name( re.sub(r"_v\d+(?=\.[^.]+$)", "", out_path.name) ) out_path.parent.mkdir(parents=True, exist_ok=True) if current_file.resolve() != out_path.resolve(): shutil.copy2(current_file, out_path) _write_pdf_metadata(out_path, document, next_version_number, "field_enriched") file_size = out_path.stat().st_size file_hash = sha256_for_file(out_path) try: mirror_path = _mirror_to_secondary_owner(document, out_path) share_path_value = str(mirror_path) if mirror_path else None except Exception: share_path_value = None document.share_path = share_path_value db.add(document) version = DocumentVersion( document_id=document.id, version_number=next_version_number, version_type="field_enriched", file_path=str(out_path), sha256=file_hash, file_size_bytes=file_size, created_by="save_field_enriched_pdf", notes="Scaffold output: copied current file; extracted fields not yet embedded into PDF.", ) db.add(version) document.current_path = str(out_path) document.canonical_filename = out_path.name document.sha256_current = file_hash db.commit() keep_paths = {str(out_path)} if document.share_path: keep_paths.add(str(document.share_path)) _prune_old_saved_files(db, document, keep_paths) db.refresh(version) return version def save_ocr_corrected_pdf_current(db: Session, document: Document, output_path: Path) -> None: if not document.current_path: raise ValueError("Document has no current_path") current_file = Path(document.current_path) if not current_file.exists(): raise FileNotFoundError(f"Current file not found: {current_file}") raw_ocr = _latest_current_text_version(document, "raw_ocr") reviewed = _latest_current_text_version(document, "reviewed") if raw_ocr is None: raise ValueError("No current raw OCR version found") if reviewed is None: raise ValueError("No current reviewed text found") if current_file.suffix.lower() != ".pdf": raise ValueError("C1 corrected PDF generation currently supports PDFs only") raw_lines = _flatten_layout_lines(raw_ocr.layout_json) reviewed_lines = _flatten_layout_lines(reviewed.layout_json) if not raw_lines: raise ValueError("No OCR line boxes found in raw OCR layout data") if reviewed_lines and len(reviewed_lines) != len(raw_lines): raise ValueError("Reviewed line layout does not match raw OCR line layout") source_layout = reviewed.layout_json if reviewed.layout_json else raw_ocr.layout_json if not source_layout: raise ValueError("No source layout found") out_path = Path(output_path) out_path.parent.mkdir(parents=True, exist_ok=True) reader = PdfReader(str(current_file)) with tempfile.TemporaryDirectory() as tmpdirname: tmpdir = Path(tmpdirname) images = _render_pdf_page_images(current_file, tmpdir) overlay_pdf_path = tmpdir / "overlay.pdf" c = None page_layouts = {page["page"]: page for page in source_layout.get("pages", [])} for page_num, img_path in enumerate(images, start=1): pdf_page = reader.pages[page_num - 1] page_w = float(pdf_page.mediabox.width) page_h = float(pdf_page.mediabox.height) img = Image.open(img_path) if c is None: c = canvas.Canvas(str(overlay_pdf_path), pagesize=(page_w, page_h)) else: c.setPageSize((page_w, page_h)) c.drawImage(ImageReader(str(img_path)), 0, 0, width=page_w, height=page_h) page_layout = page_layouts.get(page_num, {"lines": []}) src_w = float(page_layout.get("image_width") or img.size[0]) src_h = float(page_layout.get("image_height") or img.size[1]) scale_x = page_w / src_w scale_y = page_h / src_h for line in page_layout.get("lines", []): text_line = (line.get("text") or "").strip() if not text_line: continue left, top, right, bottom = line["bbox"] pdf_x = left * scale_x pdf_y = page_h - (bottom * scale_y) box_width = max(10.0, (right - left) * scale_x) box_height = max(6.0, (bottom - top) * scale_y) font_size = _fit_font_size(text_line, box_width, box_height) text_obj = c.beginText() text_obj.setTextRenderMode(3) text_obj.setFont("Helvetica", font_size) text_obj.setTextOrigin(pdf_x, pdf_y + 1) text_obj.textLine(text_line) c.drawText(text_obj) c.showPage() if c is None: raise ValueError("Failed to build overlay PDF") c.save() shutil.copy2(overlay_pdf_path, out_path) compress_pdf_with_ghostscript(out_path) file_hash = sha256_for_file(out_path) try: mirror_path = _mirror_to_secondary_owner(document, out_path) share_path_value = str(mirror_path) if mirror_path else None except Exception: share_path_value = None # Replica outputs are non-destructive exports for now. # Do not replace the primary/current document path. db.commit() def save_field_enriched_pdf_current(db: Session, document: Document, output_path: Path) -> None: if not document.current_path: raise ValueError("Document has no current_path") current_file = Path(document.current_path) if not current_file.exists(): raise FileNotFoundError(f"Current file not found: {current_file}") out_path = Path(output_path) out_path = out_path.with_name( re.sub(r"_v\d+(?=\.[^.]+$)", "", out_path.name) ) out_path.parent.mkdir(parents=True, exist_ok=True) if current_file.resolve() != out_path.resolve(): shutil.copy2(current_file, out_path) file_hash = sha256_for_file(out_path) try: mirror_path = _mirror_to_secondary_owner(document, out_path) share_path_value = str(mirror_path) if mirror_path else None except Exception: share_path_value = None document.share_path = share_path_value document.current_path = str(out_path) document.canonical_filename = out_path.name document.sha256_current = file_hash db.add(document) db.commit() keep_paths = {str(out_path)} if document.share_path: keep_paths.add(str(document.share_path)) _prune_old_saved_files(db, document, keep_paths) def _next_replica_layout_version_number(db: Session, document_id: int) -> int: return ( db.query(func.max(DocumentReplicaLayoutVersion.version_number)) .filter(DocumentReplicaLayoutVersion.document_id == document_id) .scalar() or 0 ) + 1 def _get_current_replica_review_state(document: Document) -> DocumentReplicaReviewState | None: rows = getattr(document, "replica_review_states", None) or [] def _layout_has_any_text(layout_json: dict | None) -> bool: if not layout_json: return False for page in layout_json.get("pages", []): for line in page.get("lines", []): if (line.get("text") or "").strip(): return True return False def _layout_has_usable_bboxes(layout_json: dict | None) -> bool: if not layout_json: return False for page in layout_json.get("pages", []): for line in page.get("lines", []): bbox = line.get("bbox") if ( isinstance(bbox, (list, tuple)) and len(bbox) == 4 and all(v is not None for v in bbox) ): return True return False return rows[0] if rows else None def _current_pdf_path(document: Document) -> Path: candidate = ( getattr(document, "current_path", None) or getattr(document, "original_path", None) or getattr(document, "source_path", None) ) if not candidate: raise ValueError("document_has_no_pdf_path") path = Path(candidate) if not path.exists() or not path.is_file(): raise ValueError(f"document_pdf_missing:{path}") return path def _get_replica_source_context(document: Document): current_file = _current_pdf_path(document) raw_ocr = _latest_current_text_version(document, "raw_ocr") reviewed = _latest_current_text_version(document, "reviewed_ocr") if reviewed is None: reviewed = _latest_current_text_version(document, "reviewed") reviewed_layout = getattr(reviewed, "layout_json", None) if reviewed is not None else None raw_layout = getattr(raw_ocr, "layout_json", None) if raw_ocr is not None else None if reviewed is not None and _layout_has_usable_bboxes(reviewed_layout): return current_file, raw_ocr, reviewed, reviewed_layout, "reviewed" if raw_ocr is not None and _layout_has_usable_bboxes(raw_layout): return current_file, raw_ocr, reviewed, raw_layout, "raw_ocr" if reviewed is not None and _layout_has_any_text(reviewed_layout): return current_file, raw_ocr, reviewed, reviewed_layout, "reviewed_text_only" if raw_ocr is not None and _layout_has_any_text(raw_layout): return current_file, raw_ocr, reviewed, raw_layout, "raw_text_only" return current_file, raw_ocr, reviewed, {"pages": []}, "no_layout" def build_replica_layout(document: Document, mode: str = "shared") -> dict: current_file, raw_ocr, reviewed, source_layout, layout_source = _get_replica_source_context(document) reader = PdfReader(str(current_file)) pages = [] page_layouts = {page["page"]: page for page in (source_layout.get("pages", []) if isinstance(source_layout, dict) else [])} for page_num, pdf_page in enumerate(reader.pages, start=1): page_w = float(pdf_page.mediabox.width) page_h = float(pdf_page.mediabox.height) page_layout = page_layouts.get(page_num, {"lines": [], "words": []}) src_w = float(page_layout.get("image_width") or page_layout.get("page_width") or 1.0) src_h = float(page_layout.get("image_height") or page_layout.get("page_height") or 1.0) scale_x = page_w / src_w scale_y = page_h / src_h source_lines = page_layout.get("lines", []) or [] line_entries = [] for line in source_lines: text_line = (line.get("text") or "").strip() if not text_line: continue bbox = line.get("bbox") if not bbox or not isinstance(bbox, (list, tuple)) or len(bbox) != 4: continue try: left, top, right, bottom = [float(v) for v in bbox] except (TypeError, ValueError): continue if right <= left or bottom <= top: continue pdf_x = left * scale_x pdf_y = page_h - (bottom * scale_y) box_width = max(0.5, (right - left) * scale_x) box_height = max(0.5, (bottom - top) * scale_y) source_font_size = line.get("font_size_guess") try: source_font_size = float(source_font_size) if source_font_size is not None else None except (TypeError, ValueError): source_font_size = None if not source_font_size or source_font_size <= 0: source_font_size = _fit_font_size(text_line, max(10.0, box_width), max(6.0, box_height)) font_size = max(1.0, source_font_size * scale_y) font_family = line.get("font_family_guess") or "Helvetica" line_entries.append( { "text": text_line, "bbox_source": [left, top, right, bottom], "pdf_x": pdf_x, "pdf_y": pdf_y, "box_width": box_width, "box_height": box_height, "font_family_guess": font_family, "font_size_guess": font_size, "text_color_guess": line.get("text_color_guess") or "#000000", "text_render_mode_clean": 0, "text_render_mode_scan_backed": 3, } ) pages.append( { "page": page_num, "page_width": page_w, "page_height": page_h, "image_width": src_w, "image_height": src_h, "lines": line_entries, "words": page_layout.get("words", []) or [], } ) return { "schema_version": 1, "mode_source": mode, "current_path": str(current_file), "layout_source": layout_source, "text_version_source": { "raw_ocr_version_id": raw_ocr.id if raw_ocr else None, "reviewed_version_id": reviewed.id if reviewed else None, }, "pages": pages, } def _save_replica_layout_version( db: Session, document: Document, layout_json: dict, mode: str, created_by: str = "save_replica_pdf", ) -> DocumentReplicaLayoutVersion: db.query(DocumentReplicaLayoutVersion).filter( DocumentReplicaLayoutVersion.document_id == document.id, DocumentReplicaLayoutVersion.is_current == True, # noqa: E712 ).update({"is_current": False}, synchronize_session=False) version = DocumentReplicaLayoutVersion( document_id=document.id, version_number=_next_replica_layout_version_number(db, document.id), version_type="heuristic", render_mode_source=mode, is_current=True, created_by=created_by, quality_flags=[], inference_metadata_json={"pipeline": "heuristic_replica_v1", "mode": mode}, layout_json=layout_json, ) db.add(version) db.flush() state = _get_current_replica_review_state(document) if state is None: state = DocumentReplicaReviewState(document_id=document.id) db.add(state) state.current_replica_layout_version_id = version.id state.is_reviewed = False state.is_approved = False state.needs_manual_adjustment = False state.needs_model_retry = False db.flush() return version def _render_replica_pdf_from_layout( current_file: Path, layout_json: dict, out_path: Path, mode: str, ) -> None: reader = PdfReader(str(current_file)) out_path.parent.mkdir(parents=True, exist_ok=True) with tempfile.TemporaryDirectory() as tmpdirname: tmpdir = Path(tmpdirname) images = _render_pdf_page_images(current_file, tmpdir) overlay_pdf_path = tmpdir / "replica.pdf" c = None pages = {page["page"]: page for page in layout_json.get("pages", [])} for page_num, img_path in enumerate(images, start=1): pdf_page = reader.pages[page_num - 1] page_w = float(pdf_page.mediabox.width) page_h = float(pdf_page.mediabox.height) if c is None: c = canvas.Canvas(str(overlay_pdf_path), pagesize=(page_w, page_h)) else: c.setPageSize((page_w, page_h)) if mode == "scan_backed": c.drawImage(ImageReader(str(img_path)), 0, 0, width=page_w, height=page_h) page_layout = pages.get(page_num, {"lines": []}) render_entries = [] if mode in {"scan_backed", "debug_overlay"} and (page_layout.get("words") or []): render_entries = _build_word_entries_for_page(page_layout, page_h) else: render_entries = _page_layout_line_entries(page_layout) for line in render_entries: text_line = (line.get("text") or "").strip() if not text_line: continue text_obj = c.beginText() if mode == "scan_backed": text_obj.setTextRenderMode(3) else: text_obj.setTextRenderMode(0) text_obj.setFont(line.get("font_family_guess") or "Helvetica", float(line.get("font_size_guess") or 10)) text_obj.setTextOrigin(float(line["pdf_x"]), float(line["pdf_y"]) + 1) if mode == "debug_overlay": c.setStrokeColorRGB(1, 0, 0) c.setFillColorRGB(1, 0, 0) else: c.setStrokeColorRGB(0, 0, 0) c.setFillColorRGB(0, 0, 0) text_obj.textLine(text_line) c.drawText(text_obj) if mode == "debug_overlay": bbox = line.get("bbox_source") if bbox and isinstance(bbox, (list, tuple)) and len(bbox) == 4: try: left, top, right, bottom = [float(v) for v in bbox] c.setStrokeColorRGB(1, 0, 0) c.setLineWidth(0.4) c.rect(left, page_h - bottom, max(0.5, right - left), max(0.5, bottom - top), stroke=1, fill=0) except Exception: pass c.showPage() if c is None: raise ValueError("Failed to build replica PDF") c.save() shutil.copy2(overlay_pdf_path, out_path) compress_pdf_with_ghostscript(out_path) def save_replica_pdf(db: Session, document: Document, output_path: Path, mode: str) -> None: if mode not in {"clean", "scan_backed", "debug_overlay"}: raise ValueError(f"Unsupported replica mode: {mode}") current_file, _, _, _, _ = _get_replica_source_context(document) out_path = Path(output_path) out_path = out_path.with_name(re.sub(r"_v\d+(?=\.[^.]+$)", "", out_path.name)) stem = re.sub(r"(_replica_clean|_replica_scan_backed)$", "", out_path.stem) suffix = out_path.suffix or ".pdf" if mode == "clean": out_path = out_path.with_name(f"{stem}_replica_clean{suffix}") elif mode == "scan_backed": out_path = out_path.with_name(f"{stem}_replica_scan_backed{suffix}") else: out_path = out_path.with_name(f"{stem}_replica_debug_overlay{suffix}") out_path.parent.mkdir(parents=True, exist_ok=True) requested_mode = mode actual_mode = mode layout_json = build_replica_layout(document, mode=mode) page_lines = [] for page in (layout_json.get("pages") or []): page_lines.extend(page.get("lines") or []) if mode == "clean" and not page_lines: raise ValueError("clean_replica_has_no_renderable_lines") if mode == "clean": has_text = False for page in layout_json.get("pages", []): if page.get("lines"): has_text = True break if not has_text: actual_mode = "scan_backed" out_path = out_path.with_name(f"{stem}_replica_scan_backed{suffix}") layout_json = build_replica_layout(document, mode="scan_backed") layout_version = _save_replica_layout_version(db, document, layout_json, mode=actual_mode) _render_replica_pdf_from_layout(current_file, layout_json, out_path, mode=actual_mode) file_hash = sha256_for_file(out_path) file_size = out_path.stat().st_size try: mirror_path = _mirror_to_secondary_owner(document, out_path) share_path_value = str(mirror_path) if mirror_path else None except Exception: share_path_value = None output = DocumentReplicaOutput( document_id=document.id, replica_layout_version_id=layout_version.id, output_type=actual_mode, file_path=str(out_path), sha256=file_hash, file_size_bytes=file_size, created_by="save_replica_pdf", render_settings_json={"requested_mode": requested_mode, "actual_mode": actual_mode}, ) db.add(output) # Replica outputs are non-destructive exports. # Do not replace the primary/current document path or prune sibling files. db.commit()