Add layout OCR pipeline and clean replica generation

This commit is contained in:
Sean McElwain 2026-05-09 15:06:20 -05:00
parent 610f25c2b8
commit d292b2d00d
7 changed files with 592 additions and 36 deletions

View File

@ -0,0 +1,207 @@
from __future__ import annotations
from typing import Any
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.models.document_analysis_version import DocumentAnalysisVersion
from app.logic.layout_ocr import run_layout_ocr
def _flatten_layout_lines(layout_json: dict | None) -> list[dict[str, Any]]:
if not layout_json:
return []
lines: list[dict[str, Any]] = []
for page in layout_json.get("pages", []) or []:
for line in page.get("lines", []) or []:
if isinstance(line, dict):
lines.append(line)
return lines
def _layout_has_any_text(layout_json: dict | None) -> bool:
for line in _flatten_layout_lines(layout_json):
if (line.get("text") or "").strip():
return True
return False
def _layout_has_usable_bboxes(layout_json: dict | None) -> bool:
for line in _flatten_layout_lines(layout_json):
bbox = line.get("bbox")
if (
isinstance(bbox, (list, tuple))
and len(bbox) == 4
and all(v is not None for v in bbox)
):
return True
return False
def _build_canonical_analysis_from_document(document) -> dict[str, Any]:
text_versions = sorted(
getattr(document, "text_versions", []) or [],
key=lambda tv: ((tv.version_number or 0), getattr(tv, "created_at", None) or 0),
reverse=True,
)
raw_ocr = next((tv for tv in text_versions if getattr(tv, "is_current", False) and tv.version_type == "raw_ocr"), None)
reviewed = next((tv for tv in text_versions if getattr(tv, "is_current", False) and tv.version_type == "reviewed"), None)
source_tv = reviewed or raw_ocr
layout_json = getattr(source_tv, "layout_json", None) if source_tv else None
extracted = None
extracted_rows = getattr(document, "extracted_fields", None) or []
if extracted_rows:
extracted = extracted_rows[0]
analysis = {
"schema_version": 1,
"analysis_type": "canonical",
"document_info": {
"document_id": document.document_id,
"document_type": getattr(document, "document_type", None),
"mime_type": getattr(document, "mime_type", None),
"current_path": getattr(document, "current_path", None),
"source_path": getattr(document, "source_path", None),
"canonical_filename": getattr(document, "canonical_filename", None),
},
"text_source": {
"raw_ocr_version_id": getattr(raw_ocr, "id", None) if raw_ocr else None,
"reviewed_version_id": getattr(reviewed, "id", None) if reviewed else None,
"active_version_id": getattr(source_tv, "id", None) if source_tv else None,
"active_version_type": getattr(source_tv, "version_type", None) if source_tv else None,
},
"pages": (layout_json or {}).get("pages", []) if isinstance(layout_json, dict) else [],
"semantic_candidates": {
"merchant": getattr(extracted, "merchant_normalized", None) if extracted else None,
"merchant_raw": getattr(extracted, "merchant_raw", None) if extracted else None,
"transaction_date": str(getattr(extracted, "transaction_date", None)) if extracted and getattr(extracted, "transaction_date", None) else None,
"total": str(getattr(extracted, "total", None)) if extracted and getattr(extracted, "total", None) is not None else None,
"tax": str(getattr(extracted, "tax", None)) if extracted and getattr(extracted, "tax", None) is not None else None,
"subtotal": str(getattr(extracted, "subtotal", None)) if extracted and getattr(extracted, "subtotal", None) is not None else None,
},
"quality": {
"text_present": _layout_has_any_text(layout_json),
"usable_layout": _layout_has_usable_bboxes(layout_json),
"usable_word_boxes": False,
"issues": [],
},
}
if not analysis["quality"]["text_present"]:
analysis["quality"]["issues"].append("no_text_in_layout")
if not analysis["quality"]["usable_layout"]:
analysis["quality"]["issues"].append("no_usable_bboxes")
return analysis
def get_current_document_analysis(db: Session, document) -> DocumentAnalysisVersion | None:
return (
db.query(DocumentAnalysisVersion)
.filter(
DocumentAnalysisVersion.document_id == document.id,
DocumentAnalysisVersion.is_current.is_(True),
)
.order_by(DocumentAnalysisVersion.version_number.desc(), DocumentAnalysisVersion.id.desc())
.first()
)
def ensure_document_analysis(db: Session, document, require_layout: bool = True) -> DocumentAnalysisVersion:
current = get_current_document_analysis(db, document)
if current and current.analysis_json:
quality = (current.analysis_json or {}).get("quality", {}) or {}
if not require_layout or quality.get("usable_layout"):
return current
analysis_json = _build_canonical_analysis_from_document(document)
quality = analysis_json.get("quality", {}) or {}
if require_layout and not quality.get("usable_layout"):
raise ValueError("document_analysis_missing_usable_layout")
db.query(DocumentAnalysisVersion).filter(
DocumentAnalysisVersion.document_id == document.id,
DocumentAnalysisVersion.is_current.is_(True),
).update({"is_current": False}, synchronize_session=False)
next_version = (
db.query(func.max(DocumentAnalysisVersion.version_number))
.filter(DocumentAnalysisVersion.document_id == document.id)
.scalar()
or 0
) + 1
row = DocumentAnalysisVersion(
document_id=document.id,
version_number=next_version,
analysis_type="canonical",
is_current=True,
created_by="ensure_document_analysis",
engine_name="internal_existing_ocr_adapter",
engine_version="v1",
model_name=None,
prompt_version=None,
quality_score=1.0 if quality.get("usable_layout") else 0.5 if quality.get("text_present") else 0.0,
quality_note=None,
quality_flags=quality.get("issues", []),
analysis_json=analysis_json,
)
db.add(row)
db.commit()
db.refresh(row)
return row
def build_layout_ocr_analysis_for_document(document) -> dict[str, Any]:
current_path = getattr(document, "current_path", None)
if not current_path:
raise ValueError("Document has no current_path")
result = run_layout_ocr(current_path)
analysis_json = result.to_analysis_json()
pages = analysis_json.get("pages", []) or []
text_lines = []
usable_layout = False
for page in pages:
for line in page.get("lines", []) or []:
line_text = (line.get("text") or "").strip()
if line_text:
text_lines.append(line_text)
bbox = line.get("bbox")
if isinstance(bbox, (list, tuple)) and len(bbox) == 4 and all(v is not None for v in bbox):
usable_layout = True
issues: list[str] = []
if not text_lines:
issues.append("no_text_detected")
if not usable_layout:
issues.append("no_usable_bboxes")
analysis_json["text_source"] = {
"active_version_id": None,
"raw_ocr_version_id": None,
"reviewed_version_id": None,
"active_version_type": "layout_ocr",
}
analysis_json["quality"] = {
"text_present": bool(text_lines),
"usable_layout": usable_layout,
"usable_word_boxes": usable_layout,
"issues": issues,
}
analysis_json["text_content"] = "\n".join(text_lines)
analysis_json["engine"] = {
"name": result.engine_name,
"version": result.engine_version,
}
return analysis_json

View File

@ -446,12 +446,20 @@ def create_ocr_corrected_pdf_version(db: Session, document: Document, output_pat
if not text_line:
continue
left, top, right, bottom = line["bbox"]
bbox = line.get("bbox")
if not bbox or not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
continue
try:
left, top, right, bottom = [float(v) for v in bbox]
except (TypeError, ValueError):
continue
if right <= left or bottom <= top:
continue
pdf_x = left * scale_x
pdf_y = page_h - (bottom * scale_y)
box_width = max(10.0, (right - left) * scale_x)
box_height = max(6.0, (bottom - top) * scale_y)
box_height = max(6.0, (bottom - top) * scale_y)
font_size = _fit_font_size(text_line, box_width, box_height)
@ -744,6 +752,31 @@ def _next_replica_layout_version_number(db: Session, document_id: int) -> int:
def _get_current_replica_review_state(document: Document) -> DocumentReplicaReviewState | None:
rows = getattr(document, "replica_review_states", None) or []
def _layout_has_any_text(layout_json: dict | None) -> bool:
if not layout_json:
return False
for page in layout_json.get("pages", []):
for line in page.get("lines", []):
if (line.get("text") or "").strip():
return True
return False
def _layout_has_usable_bboxes(layout_json: dict | None) -> bool:
if not layout_json:
return False
for page in layout_json.get("pages", []):
for line in page.get("lines", []):
bbox = line.get("bbox")
if (
isinstance(bbox, (list, tuple))
and len(bbox) == 4
and all(v is not None for v in bbox)
):
return True
return False
return rows[0] if rows else None
@ -758,30 +791,25 @@ def _get_replica_source_context(document: Document):
raw_ocr = _latest_current_text_version(document, "raw_ocr")
reviewed = _latest_current_text_version(document, "reviewed")
if raw_ocr is None:
raise ValueError("No current raw OCR version found")
if reviewed is None:
raise ValueError("No current reviewed text found")
if current_file.suffix.lower() != ".pdf":
raise ValueError("Replica PDF generation currently supports PDFs only")
raw_lines = _flatten_layout_lines(raw_ocr.layout_json)
reviewed_lines = _flatten_layout_lines(reviewed.layout_json)
if reviewed is not None and _layout_has_usable_bboxes(reviewed.layout_json):
return current_file, raw_ocr, reviewed, reviewed.layout_json, "reviewed"
if not raw_lines:
raise ValueError("No OCR line boxes found in raw OCR layout data")
if reviewed_lines and len(reviewed_lines) != len(raw_lines):
raise ValueError("Reviewed line layout does not match raw OCR line layout")
if raw_ocr is not None and _layout_has_usable_bboxes(raw_ocr.layout_json):
return current_file, raw_ocr, reviewed, raw_ocr.layout_json, "raw_ocr"
source_layout = reviewed.layout_json if reviewed.layout_json else raw_ocr.layout_json
if not source_layout:
raise ValueError("No source layout found")
if reviewed is not None and _layout_has_any_text(reviewed.layout_json):
return current_file, raw_ocr, reviewed, reviewed.layout_json, "reviewed_text_only"
return current_file, raw_ocr, reviewed, source_layout
if raw_ocr is not None and _layout_has_any_text(raw_ocr.layout_json):
return current_file, raw_ocr, reviewed, raw_ocr.layout_json, "raw_text_only"
return current_file, raw_ocr, reviewed, {"pages": []}, "no_layout"
def build_replica_layout(document: Document, mode: str = "shared") -> dict:
current_file, raw_ocr, reviewed, source_layout = _get_replica_source_context(document)
current_file, raw_ocr, reviewed, source_layout, layout_source = _get_replica_source_context(document)
reader = PdfReader(str(current_file))
pages = []
@ -802,7 +830,15 @@ def build_replica_layout(document: Document, mode: str = "shared") -> dict:
if not text_line:
continue
left, top, right, bottom = line["bbox"]
bbox = line.get("bbox")
if not bbox or not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
continue
try:
left, top, right, bottom = [float(v) for v in bbox]
except (TypeError, ValueError):
continue
if right <= left or bottom <= top:
continue
pdf_x = left * scale_x
pdf_y = page_h - (bottom * scale_y)
box_width = max(10.0, (right - left) * scale_x)
@ -948,7 +984,7 @@ def save_replica_pdf(db: Session, document: Document, output_path: Path, mode: s
if mode not in {"clean", "scan_backed"}:
raise ValueError(f"Unsupported replica mode: {mode}")
current_file, _, _, _ = _get_replica_source_context(document)
current_file, _, _, _, _ = _get_replica_source_context(document)
out_path = Path(output_path)
out_path = out_path.with_name(re.sub(r"_v\d+(?=\.[^.]+$)", "", out_path.name))
@ -962,10 +998,31 @@ def save_replica_pdf(db: Session, document: Document, output_path: Path, mode: s
out_path.parent.mkdir(parents=True, exist_ok=True)
layout_json = build_replica_layout(document, mode=mode)
layout_version = _save_replica_layout_version(db, document, layout_json, mode=mode)
requested_mode = mode
actual_mode = mode
_render_replica_pdf_from_layout(current_file, layout_json, out_path, mode=mode)
layout_json = build_replica_layout(document, mode=mode)
page_lines = []
for page in (layout_json.get("pages") or []):
page_lines.extend(page.get("lines") or [])
if mode == "clean" and not page_lines:
raise ValueError("clean_replica_has_no_renderable_lines")
if mode == "clean":
has_text = False
for page in layout_json.get("pages", []):
if page.get("lines"):
has_text = True
break
if not has_text:
actual_mode = "scan_backed"
out_path = out_path.with_name(f"{stem}_replica_scan_backed{suffix}")
layout_json = build_replica_layout(document, mode="scan_backed")
layout_version = _save_replica_layout_version(db, document, layout_json, mode=actual_mode)
_render_replica_pdf_from_layout(current_file, layout_json, out_path, mode=actual_mode)
file_hash = sha256_for_file(out_path)
file_size = out_path.stat().st_size
@ -979,12 +1036,12 @@ def save_replica_pdf(db: Session, document: Document, output_path: Path, mode: s
output = DocumentReplicaOutput(
document_id=document.id,
replica_layout_version_id=layout_version.id,
output_type=mode,
output_type=actual_mode,
file_path=str(out_path),
sha256=file_hash,
file_size_bytes=file_size,
created_by="save_replica_pdf",
render_settings_json={"mode": mode},
render_settings_json={"requested_mode": requested_mode, "actual_mode": actual_mode},
)
db.add(output)

153
app/logic/layout_ocr.py Normal file
View File

@ -0,0 +1,153 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import fitz
import pytesseract
from pdf2image import convert_from_path
from PIL import Image
@dataclass
class LayoutOCRResult:
engine_name: str
engine_version: str
pages: list[dict[str, Any]]
def to_analysis_json(self) -> dict[str, Any]:
return {
"schema_version": 1,
"analysis_type": "canonical",
"engine": {
"name": self.engine_name,
"version": self.engine_version,
},
"pages": self.pages,
}
def _group_words_into_lines(words: list[dict[str, Any]], y_tol: float = 12.0) -> list[dict[str, Any]]:
if not words:
return []
words = sorted(words, key=lambda w: (w["bbox"][1], w["bbox"][0]))
groups: list[list[dict[str, Any]]] = []
for word in words:
placed = False
wy = word["bbox"][1]
for group in groups:
gy = sum(item["bbox"][1] for item in group) / len(group)
if abs(wy - gy) <= y_tol:
group.append(word)
placed = True
break
if not placed:
groups.append([word])
lines: list[dict[str, Any]] = []
for group in groups:
group = sorted(group, key=lambda w: w["bbox"][0])
text = " ".join((w.get("text") or "").strip() for w in group).strip()
if not text:
continue
left = min(w["bbox"][0] for w in group)
top = min(w["bbox"][1] for w in group)
right = max(w["bbox"][2] for w in group)
bottom = max(w["bbox"][3] for w in group)
avg_height = max(1.0, sum((w["bbox"][3] - w["bbox"][1]) for w in group) / len(group))
lines.append(
{
"text": text,
"bbox": [left, top, right, bottom],
"confidence": None,
"font_family_guess": "Helvetica",
"font_size_guess": max(6.0, avg_height * 0.75),
"text_color_guess": "#000000",
"words": group,
}
)
return lines
def run_layout_ocr(pdf_path: str | Path, dpi: int = 300) -> LayoutOCRResult:
pdf_path = Path(pdf_path)
if not pdf_path.exists():
raise FileNotFoundError(f"PDF not found: {pdf_path}")
doc = fitz.open(pdf_path)
pil_pages = convert_from_path(str(pdf_path), dpi=dpi)
pages: list[dict[str, Any]] = []
for idx, (pdf_page, pil_img) in enumerate(zip(doc, pil_pages), start=1):
page_w = float(pdf_page.rect.width)
page_h = float(pdf_page.rect.height)
if not isinstance(pil_img, Image.Image):
raise ValueError(f"Rendered page {idx} is not a PIL image")
img_w, img_h = pil_img.size
scale_x = page_w / float(img_w)
scale_y = page_h / float(img_h)
data = pytesseract.image_to_data(
pil_img,
output_type=pytesseract.Output.DICT,
config="--oem 3 --psm 6",
)
words: list[dict[str, Any]] = []
n = len(data.get("text", []))
for i in range(n):
text = (data["text"][i] or "").strip()
if not text:
continue
try:
conf = float(data["conf"][i])
except Exception:
conf = None
left_px = float(data["left"][i])
top_px = float(data["top"][i])
width_px = float(data["width"][i])
height_px = float(data["height"][i])
if width_px <= 0 or height_px <= 0:
continue
left = left_px * scale_x
top = top_px * scale_y
right = (left_px + width_px) * scale_x
bottom = (top_px + height_px) * scale_y
words.append(
{
"text": text,
"bbox": [left, top, right, bottom],
"confidence": conf,
}
)
lines = _group_words_into_lines(words)
pages.append(
{
"page": idx,
"page_width": page_w,
"page_height": page_h,
"image_width": page_w,
"image_height": page_h,
"lines": lines,
"words": words,
}
)
return LayoutOCRResult(
engine_name="tesseract_layout",
engine_version=str(pytesseract.get_tesseract_version()),
pages=pages,
)

View File

@ -8,6 +8,7 @@ from app.models.document_additional_field import DocumentAdditionalField
from app.models.document_preset import DocumentPreset
__all__ = [
"DocumentAnalysisVersion",
"Document",
"DocumentVersion",
"TextVersion",
@ -21,3 +22,5 @@ from app.models.document_naming_field import DocumentNamingField
from app.models.document_replica_layout_version import DocumentReplicaLayoutVersion
from app.models.document_replica_output import DocumentReplicaOutput
from app.models.document_replica_review_state import DocumentReplicaReviewState
import app.models.document_analysis_version
from app.models.document_analysis_version import DocumentAnalysisVersion

View File

@ -115,6 +115,9 @@ class Document(Base):
back_populates="document",
cascade="all, delete-orphan",
)
analysis_versions: Mapped[list["DocumentAnalysisVersion"]] = relationship(
"DocumentAnalysisVersion", back_populates="document", cascade="all, delete-orphan"
)
replica_review_states: Mapped[list["DocumentReplicaReviewState"]] = relationship(
back_populates="document",
cascade="all, delete-orphan",

View File

@ -0,0 +1,33 @@
from __future__ import annotations
from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String, Text, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.db.base import Base
class DocumentAnalysisVersion(Base):
__tablename__ = "document_analysis_versions"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
document_id: Mapped[int] = mapped_column(ForeignKey("documents.id", ondelete="CASCADE"), index=True, nullable=False)
version_number: Mapped[int] = mapped_column(Integer, nullable=False)
analysis_type: Mapped[str] = mapped_column(String(50), nullable=False, default="canonical")
is_current: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_by: Mapped[str | None] = mapped_column(String(100), nullable=True)
engine_name: Mapped[str | None] = mapped_column(String(100), nullable=True)
engine_version: Mapped[str | None] = mapped_column(String(100), nullable=True)
model_name: Mapped[str | None] = mapped_column(String(200), nullable=True)
prompt_version: Mapped[str | None] = mapped_column(String(100), nullable=True)
quality_score: Mapped[float | None] = mapped_column(nullable=True)
quality_note: Mapped[str | None] = mapped_column(Text, nullable=True)
quality_flags: Mapped[dict | list | None] = mapped_column(JSONB, nullable=True)
analysis_json: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
created_at: Mapped[DateTime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
document = relationship("Document", back_populates="analysis_versions")

View File

@ -35,6 +35,9 @@ from app.logic.extraction import (
_replace_document_line_items,
)
from app.logic.ingest import compute_quality_score, rerun_ocr_for_document
from app.models.document_analysis_version import DocumentAnalysisVersion
from app.logic.document_analysis import build_layout_ocr_analysis_for_document
from app.logic.layout_ocr import run_layout_ocr
from app.models.document import Document
from app.models.document_line_item import DocumentLineItem
from app.models.document_line_item_set import DocumentLineItemSet
@ -946,17 +949,93 @@ def save_document_type_route(
@router.post("/{document_id}/rerun-ocr", response_class=RedirectResponse)
def rerun_ocr(document_id: str, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
document = (
db.query(Document)
.options(
selectinload(Document.text_versions),
selectinload(Document.analysis_versions),
)
.filter(Document.document_id == document_id)
.first()
)
if document is None:
return RedirectResponse(url="/documents/", status_code=303)
try:
rerun_ocr_for_document(db, document)
if not document.current_path:
return RedirectResponse(
url=f"/documents/{document.document_id}?error=rerun_ocr_failed&tab=ocr-review",
status_code=303,
)
layout_result = run_layout_ocr(document.current_path)
analysis_json = build_layout_ocr_analysis_for_document(document)
text_content = analysis_json.get("text_content") or ""
for row in getattr(document, "text_versions", []) or []:
if getattr(row, "is_current", False):
row.is_current = False
next_version = (
max((getattr(v, "version_number", 0) or 0) for v in getattr(document, "text_versions", []) or []) + 1
if getattr(document, "text_versions", None) else 1
)
text_row = TextVersion(
document_id=document.id,
version_number=next_version,
version_type="raw_ocr",
text_content=text_content,
created_by="rerun_ocr_layout",
is_current=True,
ocr_engine=layout_result.engine_name,
ocr_engine_version=layout_result.engine_version,
rerun_source="layout_ocr",
quality_score=0.9 if analysis_json.get("quality", {}).get("usable_layout") else 0.5,
quality_flags=analysis_json.get("quality", {}).get("issues", []),
quality_note="Layout OCR generated line and word boxes for replica workflow.",
layout_json={"pages": analysis_json.get("pages", [])},
)
db.add(text_row)
db.flush()
for row in getattr(document, "analysis_versions", []) or []:
if getattr(row, "is_current", False):
row.is_current = False
next_analysis_version = (
max((getattr(v, "version_number", 0) or 0) for v in getattr(document, "analysis_versions", []) or []) + 1
if getattr(document, "analysis_versions", None) else 1
)
analysis_row = DocumentAnalysisVersion(
document_id=document.id,
version_number=next_analysis_version,
analysis_type="canonical",
is_current=True,
created_by="rerun_ocr_layout",
engine_name=layout_result.engine_name,
engine_version=layout_result.engine_version,
quality_score=0.9 if analysis_json.get("quality", {}).get("usable_layout") else 0.5,
quality_flags=analysis_json.get("quality", {}).get("issues", []),
quality_note="Canonical analysis refreshed from layout OCR result.",
analysis_json=analysis_json,
)
db.add(analysis_row)
db.commit()
except Exception:
return RedirectResponse(url=f"/documents/{document.document_id}?error=rerun_ocr_failed", status_code=303)
return RedirectResponse(url=f"/documents/{document.document_id}?editor_source=raw&tab=ocr-review", status_code=303)
traceback.print_exc()
db.rollback()
return RedirectResponse(
url=f"/documents/{document.document_id}?error=rerun_ocr_failed&tab=ocr-review",
status_code=303,
)
return RedirectResponse(
url=f"/documents/{document.document_id}?success=rerun_ocr&editor_source=raw&tab=ocr-review",
status_code=303,
)
@router.post("/{document_id}/save-ocr-corrected-pdf", response_class=RedirectResponse)
def save_ocr_corrected_pdf(document_id: str, db: Session = Depends(get_db)):
@ -1097,6 +1176,9 @@ def save_replica_pdf_clean(document_id: str, output_path: str = Form(""), db: Se
selectinload(Document.text_versions),
selectinload(Document.naming_fields),
selectinload(Document.replica_review_states),
selectinload(Document.replica_outputs),
selectinload(Document.extracted_fields),
selectinload(Document.analysis_versions),
)
.filter(Document.document_id == document_id)
.first()
@ -1107,16 +1189,34 @@ def save_replica_pdf_clean(document_id: str, output_path: str = Form(""), db: Se
try:
output_path_obj = _resolve_document_output_path(document, output_path)
save_replica_pdf(db, document, output_path_obj, mode="clean")
return RedirectResponse(
url=f"/documents/{document.document_id}?success=saved_replica_pdf&tab=ocr-review&viewer_source=replica",
status_code=303,
)
except ValueError as e:
if "invalid_output_path" in str(e):
return RedirectResponse(url=f"/documents/{document.document_id}?error=invalid_output_path", status_code=303)
return RedirectResponse(url=f"/documents/{document.document_id}?error=save_replica_pdf_failed&tab=ocr-review", status_code=303)
msg = str(e)
if "invalid_output_path" in msg:
return RedirectResponse(
url=f"/documents/{document.document_id}?error=invalid_output_path",
status_code=303,
)
if "document_analysis_missing_usable_layout" in msg or "clean_replica_has_no_renderable_lines" in msg:
return RedirectResponse(
url=f"/documents/{document.document_id}?error=clean_replica_requires_layout_ocr&tab=ocr-review&viewer_source=scan",
status_code=303,
)
traceback.print_exc()
return RedirectResponse(
url=f"/documents/{document.document_id}?error=save_replica_pdf_failed&tab=ocr-review",
status_code=303,
)
except Exception:
traceback.print_exc()
return RedirectResponse(url=f"/documents/{document.document_id}?error=save_replica_pdf_failed&tab=ocr-review", status_code=303)
return RedirectResponse(url=f"/documents/{document.document_id}?success=saved_replica_pdf&tab=ocr-review", status_code=303)
return RedirectResponse(
url=f"/documents/{document.document_id}?error=save_replica_pdf_failed&tab=ocr-review",
status_code=303,
)
@router.post("/{document_id}/save-replica-pdf-scan-backed", response_class=RedirectResponse)
def save_replica_pdf_scan_backed(document_id: str, output_path: str = Form(""), db: Session = Depends(get_db)):