Fix layout review selected-word style persistence

This commit is contained in:
Sean McElwain 2026-05-25 16:55:27 -05:00
parent 746757e19f
commit ea314d733d
4 changed files with 2582 additions and 279 deletions

View File

@ -67,6 +67,7 @@ from reportlab.lib.utils import ImageReader
from reportlab.pdfbase.pdfmetrics import stringWidth
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from sqlalchemy import func
from sqlalchemy.orm import Session
@ -368,7 +369,32 @@ def _fit_font_size_for_bbox_text(text: str, box_width: float, box_height: float)
def _safe_pdf_font_name(font_name: str | None) -> str:
candidate = (font_name or "Helvetica").strip()
raw = (font_name or "Helvetica").strip()
key = raw.lower()
# ReportLab core PDF fonts only unless TTFs are registered.
# Map common UI font names to visible built-in PDF fonts.
aliases = {
"arial": "Helvetica",
"calibri": "Helvetica",
"verdana": "Helvetica",
"trebuchet ms": "Helvetica",
"helvetica": "Helvetica",
"times new roman": "Times-Roman",
"times": "Times-Roman",
"georgia": "Times-Roman",
"cambria": "Times-Roman",
"liberation serif": "Times-Roman",
"dejavu serif": "Times-Roman",
"courier new": "Courier",
"courier": "Courier",
"liberation mono": "Courier",
"dejavu sans mono": "Courier",
}
candidate = aliases.get(key, raw)
try:
pdfmetrics.getFont(candidate)
return candidate
@ -380,8 +406,9 @@ def _font_size_for_box(text: str, font_name: str, box_width: float, box_height:
fitted = _fit_font_size_for_bbox_text(text, box_width, box_height)
if saved_size and saved_size > 0:
# Saved UI/editor font size is allowed, but geometry wins for replica output.
return max(1.0, min(float(saved_size), float(fitted)))
# Layout Review is the source of truth after manual editing.
# Do not silently shrink manual font edits back to the fitted estimate.
return max(1.0, float(saved_size))
return max(1.0, float(fitted))
@ -993,6 +1020,12 @@ def build_replica_layout(document: Document, mode: str = "shared") -> dict:
current_file, raw_ocr, reviewed, source_layout, layout_source = _get_replica_source_context(document)
reader = PdfReader(str(current_file))
source_layout_meta = source_layout if isinstance(source_layout, dict) else {}
prefer_word_entries = (
source_layout_meta.get("layout_sync_source") == "layout_review"
or source_layout_meta.get("layout_sync_status") == "synced"
)
pages = []
page_layouts = {page["page"]: page for page in (source_layout.get("pages", []) if isinstance(source_layout, dict) else [])}
@ -1066,6 +1099,7 @@ def build_replica_layout(document: Document, mode: str = "shared") -> dict:
"page_height": page_h,
"image_width": src_w,
"image_height": src_h,
"prefer_word_entries": prefer_word_entries,
"lines": line_entries,
"words": page_layout.get("words", []) or [],
}
@ -1156,8 +1190,35 @@ def _render_replica_pdf_from_layout(
page_layout = pages.get(page_num, {"lines": []})
edited_words = [
w for w in (page_layout.get("words") or [])
if (isinstance(w.get("manual_flags"), dict) and w.get("manual_flags", {}).get("style_edited"))
or str(w.get("text_color_guess") or "#000000").lower() != "#000000"
]
if edited_words:
print(
"[replica-render-debug]",
"page=", page_num,
"prefer_word_entries=", page_layout.get("prefer_word_entries"),
"edited_words=",
[
(
w.get("id"),
w.get("text"),
w.get("font_size_guess"),
w.get("font_family_guess"),
w.get("text_color_guess"),
w.get("manual_flags"),
)
for w in edited_words[:20]
],
flush=True,
)
render_entries = []
if page_layout.get("lines"):
if page_layout.get("prefer_word_entries") and page_layout.get("words"):
render_entries = _build_word_entries_for_page(page_layout, page_h)
if not render_entries and page_layout.get("lines"):
render_entries = _build_line_entries_for_page(page_layout, page_h)
if not render_entries and page_layout.get("words"):
render_entries = _build_word_entries_for_page(page_layout, page_h)
@ -1189,8 +1250,18 @@ def _render_replica_pdf_from_layout(
c.setStrokeColorRGB(1, 0, 0)
c.setFillColorRGB(1, 0, 0)
else:
c.setStrokeColorRGB(0, 0, 0)
c.setFillColorRGB(0, 0, 0)
color = str(line.get("text_color_guess") or "#000000").lstrip("#")
try:
if len(color) == 6:
r = int(color[0:2], 16) / 255.0
g = int(color[2:4], 16) / 255.0
b = int(color[4:6], 16) / 255.0
else:
r = g = b = 0
except Exception:
r = g = b = 0
c.setStrokeColorRGB(r, g, b)
c.setFillColorRGB(r, g, b)
text_obj.textLine(text_line)
c.drawText(text_obj)

View File

@ -1,4 +1,5 @@
from app.models.document import Document
from app.models.document_review_state import DocumentReviewState
from app.models.document_version import DocumentVersion
from app.models.text_version import TextVersion
from app.models.extracted_field import ExtractedField
@ -8,6 +9,8 @@ from app.models.document_additional_field import DocumentAdditionalField
from app.models.document_preset import DocumentPreset
__all__ = [
"DocumentReplicaReviewState",
"DocumentReviewState",
"DocumentAnalysisVersion",
"Document",
"DocumentVersion",

View File

@ -19,7 +19,7 @@ from io import BytesIO
from fastapi import APIRouter, Depends, Form, Query, Request
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse, Response
from fastapi.templating import Jinja2Templates
from sqlalchemy import distinct
from sqlalchemy import distinct, text
from sqlalchemy import func
from sqlalchemy.orm import Session, selectinload
from pypdf import PdfReader
@ -2023,12 +2023,30 @@ async def save_layout_review(document_id: str, request: Request, db: Session = D
x_right = max(x1, x2)
y_top = min(y1, y2)
y_bottom = max(y1, y2)
if abs(x_right - x_left) < 1.0 or abs(y_bottom - y_top) < 1.0:
continue
font_size_guess = float(word.get("font_size_guess") or max(6.0, (y_bottom - y_top) * 0.75))
font_family_guess = (word.get("font_family_guess") or "Helvetica")
manual_flags = word.get("manual_flags") if isinstance(word.get("manual_flags"), dict) else {}
style_edited = manual_flags.get("style_edited") is True
override_style = word.get("override_style") if style_edited and isinstance(word.get("override_style"), dict) else {}
resolved_style = word.get("resolved_style") if style_edited and isinstance(word.get("resolved_style"), dict) else {}
font_size_guess = float(word.get("font_size_guess") or override_style.get("font_size") or max(6.0, (y_bottom - y_top) * 0.75))
font_family_guess = word.get("font_family_guess") or override_style.get("font_family") or "Helvetica"
font_weight_guess = int(word.get("font_weight_guess") or resolved_style.get("font_weight") or 400)
font_style_guess = word.get("font_style_guess") or resolved_style.get("font_style") or "normal"
letter_spacing_guess = float(word.get("letter_spacing_guess") or resolved_style.get("letter_spacing") or 0)
text_color_guess = word.get("text_color_guess") or override_style.get("text_color") or "#000000"
if style_edited:
override_style = dict(override_style)
override_style.update({"font_family": font_family_guess, "font_size": font_size_guess, "text_color": text_color_guess})
resolved_style = dict(resolved_style)
resolved_style.update(override_style)
else:
override_style = {}
resolved_style = {}
manual_flags["style_edited"] = False
words.append({
"id": int(word.get("id") or word_idx),
@ -2037,6 +2055,13 @@ async def save_layout_review(document_id: str, request: Request, db: Session = D
"confidence": None,
"font_size_guess": font_size_guess,
"font_family_guess": font_family_guess,
"font_weight_guess": font_weight_guess,
"font_style_guess": font_style_guess,
"letter_spacing_guess": letter_spacing_guess,
"text_color_guess": text_color_guess,
"override_style": override_style,
"resolved_style": resolved_style,
"manual_flags": manual_flags,
})
words.sort(key=lambda w: (w["bbox"][1], w["bbox"][0]))
@ -2186,12 +2211,27 @@ def document_detail(document_id: str, request: Request, queue: str | None = None
bbox = word.get("bbox") or [0, 0, 0, 0]
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
continue
resolved_style = word.get("resolved_style") if isinstance(word.get("resolved_style"), dict) else {}
override_style = word.get("override_style") if isinstance(word.get("override_style"), dict) else {}
inferred_style = word.get("inferred_style") if isinstance(word.get("inferred_style"), dict) else {}
manual_flags = word.get("manual_flags") if isinstance(word.get("manual_flags"), dict) else {}
font_size_value = word.get("font_size_guess") or override_style.get("font_size") or resolved_style.get("font_size") or max(6.0, (float(bbox[3]) - float(bbox[1])) * 0.75)
font_family_value = word.get("font_family_guess") or override_style.get("font_family") or resolved_style.get("font_family") or "Helvetica"
text_color_value = word.get("text_color_guess") or override_style.get("text_color") or resolved_style.get("text_color") or "#000000"
word_row = {
"id": idx,
"text": (word.get("text") or "").strip(),
"bbox": [float(bbox[0]), float(bbox[1]), float(bbox[2]), float(bbox[3])],
"font_size_guess": float(word.get("font_size_guess") or max(6.0, (float(bbox[3]) - float(bbox[1])) * 0.75)),
"font_family_guess": (word.get("font_family_guess") or "Helvetica"),
"font_size_guess": float(font_size_value),
"font_family_guess": font_family_value,
"font_weight_guess": int(word.get("font_weight_guess") or resolved_style.get("font_weight") or 400),
"font_style_guess": word.get("font_style_guess") or resolved_style.get("font_style") or "normal",
"letter_spacing_guess": float(word.get("letter_spacing_guess") or resolved_style.get("letter_spacing") or 0),
"text_color_guess": text_color_value,
"inferred_style": inferred_style,
"override_style": override_style,
"resolved_style": resolved_style,
"manual_flags": manual_flags,
}
words.append(word_row)
@ -3293,78 +3333,151 @@ async def run_diagnostic_candidates(document_id: str, db: Session = Depends(get_
)
@router.get("/{document_id}/diagnostic-output/{output_id}/download")
async def download_diagnostic_output(document_id: str, output_id: int, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return HTMLResponse(content="Document not found", status_code=404)
row = db.execute(
text("""
SELECT file_path, engine, output_type, version_number
FROM document_diagnostic_outputs
WHERE id = :id AND document_id = :document_id
"""),
{"id": output_id, "document_id": document.id},
).mappings().first()
@router.get("/{document_id}/diagnostic-output/{output_id}/download")
async def download_diagnostic_output(document_id: str, output_id: int):
with engine.connect() as conn:
row = conn.execute(
text("""
SELECT ddo.file_path, ddo.engine, ddo.output_type, ddo.version_number
FROM document_diagnostic_outputs ddo
JOIN documents d ON d.id = ddo.document_id
WHERE ddo.id = :id AND d.document_id = :document_id
"""),
{"id": output_id, "document_id": document_id},
).mappings().first()
if not row or not row["file_path"]:
return HTMLResponse(content="Diagnostic output not found", status_code=404)
path = Path(row["file_path"])
if not path.exists():
return HTMLResponse(content="Diagnostic output file missing", status_code=404)
return HTMLResponse(content=f"Diagnostic output file missing: {path}", status_code=404)
return FileResponse(path=str(path), filename=path.name)
@router.post("/{document_id}/diagnostic-output/{output_id}/select")
async def select_diagnostic_output(document_id: str, output_id: int, db: Session = Depends(get_db)):
document = db.query(Document).filter(Document.document_id == document_id).first()
if document is None:
return HTMLResponse(content="Document not found", status_code=404)
async def select_diagnostic_output(document_id: str, output_id: int):
with engine.begin() as conn:
row = conn.execute(
text("""
SELECT ddo.id, ddo.document_id, ddo.engine, ddo.output_type
FROM document_diagnostic_outputs ddo
JOIN documents d ON d.id = ddo.document_id
WHERE ddo.id = :id AND d.document_id = :document_id
"""),
{"id": output_id, "document_id": document_id},
).mappings().first()
row = db.execute(
text("""
SELECT engine, output_type
FROM document_diagnostic_outputs
WHERE id = :id AND document_id = :document_id
"""),
{"id": output_id, "document_id": document.id},
).mappings().first()
if not row:
return HTMLResponse(content="Diagnostic output not found", status_code=404)
if not row:
return HTMLResponse(content="Diagnostic output not found", status_code=404)
conn.execute(
text("""
UPDATE document_diagnostic_outputs
SET is_selected = false
WHERE document_id = :document_pk
AND engine = :engine
AND output_type = :output_type
"""),
{
"document_pk": row["document_id"],
"engine": row["engine"],
"output_type": row["output_type"],
},
)
db.execute(
text("""
UPDATE document_diagnostic_outputs
SET is_selected = false
WHERE document_id = :document_id
AND engine = :engine
AND output_type = :output_type
"""),
{
"document_id": document.id,
"engine": row["engine"],
"output_type": row["output_type"],
},
)
db.execute(
text("""
UPDATE document_diagnostic_outputs
SET is_selected = true, updated_at = NOW()
WHERE id = :id AND document_id = :document_id
"""),
{"id": output_id, "document_id": document.id},
)
db.commit()
conn.execute(
text("""
UPDATE document_diagnostic_outputs
SET is_selected = true, updated_at = NOW()
WHERE id = :id
"""),
{"id": output_id},
)
return RedirectResponse(
url=f"/documents/{document_id}?tab=ocr-review&success=diagnostic_candidate_selected",
status_code=303,
)
@router.post("/{document_id}/diagnostic-output/select")
async def select_diagnostic_output_from_form(document_id: str, diagnostic_output_id: int = Form(...)):
return await select_diagnostic_output(document_id, diagnostic_output_id)
@router.get("/{document_id}/diagnostic-output/{output_id}/view")
async def view_diagnostic_output(document_id: str, output_id: int):
with engine.connect() as conn:
row = conn.execute(
text("""
SELECT ddo.file_path, ddo.engine, ddo.output_type, ddo.version_number
FROM document_diagnostic_outputs ddo
JOIN documents d ON d.id = ddo.document_id
WHERE ddo.id = :id AND d.document_id = :document_id
"""),
{"id": output_id, "document_id": document_id},
).mappings().first()
if not row or not row["file_path"]:
return HTMLResponse(content="Diagnostic output not found", status_code=404)
path = Path(row["file_path"])
if not path.exists():
return HTMLResponse(content=f"Diagnostic output file missing: {path}", status_code=404)
suffix = path.suffix.lower()
if suffix == ".pdf":
return FileResponse(path=str(path), filename=path.name, media_type="application/pdf")
if suffix == ".docx":
with open(path, "rb") as f:
result = mammoth.convert_to_html(f)
body = result.value or ""
return HTMLResponse(content=f"""
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body {{
margin: 0;
padding: 1rem;
background: #2b2b31;
font-family: Arial, Helvetica, sans-serif;
}}
.page {{
background: white;
color: #111827;
max-width: 8.5in;
min-height: 11in;
margin: 0 auto;
padding: 0.5in;
box-sizing: border-box;
box-shadow: 0 0 0.25rem rgba(0,0,0,0.35);
}}
@media (max-width: 900px) {{
body {{ padding: 0.5rem; }}
.page {{ padding: 0.35in; font-size: 0.8rem; }}
}}
</style>
</head>
<body>
<main class="page">
{body}
</main>
</body>
</html>
""")
return FileResponse(path=str(path), filename=path.name)
# --- diagnostic candidate routes end ---

File diff suppressed because it is too large Load Diff