from __future__ import annotations from typing import Any from sqlalchemy import func from sqlalchemy.orm import Session from app.models.document_analysis_version import DocumentAnalysisVersion from app.logic.layout_ocr import run_layout_ocr def _flatten_layout_lines(layout_json: dict | None) -> list[dict[str, Any]]: if not layout_json: return [] lines: list[dict[str, Any]] = [] for page in layout_json.get("pages", []) or []: for line in page.get("lines", []) or []: if isinstance(line, dict): lines.append(line) return lines def _layout_has_any_text(layout_json: dict | None) -> bool: for line in _flatten_layout_lines(layout_json): if (line.get("text") or "").strip(): return True return False def _layout_has_usable_bboxes(layout_json: dict | None) -> bool: for line in _flatten_layout_lines(layout_json): bbox = line.get("bbox") if ( isinstance(bbox, (list, tuple)) and len(bbox) == 4 and all(v is not None for v in bbox) ): return True return False def _build_canonical_analysis_from_document(document) -> dict[str, Any]: text_versions = sorted( getattr(document, "text_versions", []) or [], key=lambda tv: ((tv.version_number or 0), getattr(tv, "created_at", None) or 0), reverse=True, ) raw_ocr = next((tv for tv in text_versions if getattr(tv, "is_current", False) and tv.version_type == "raw_ocr"), None) reviewed = next((tv for tv in text_versions if getattr(tv, "is_current", False) and tv.version_type == "reviewed"), None) source_tv = reviewed or raw_ocr layout_json = getattr(source_tv, "layout_json", None) if source_tv else None extracted = None extracted_rows = getattr(document, "extracted_fields", None) or [] if extracted_rows: extracted = extracted_rows[0] analysis = { "schema_version": 1, "analysis_type": "canonical", "document_info": { "document_id": document.document_id, "document_type": getattr(document, "document_type", None), "mime_type": getattr(document, "mime_type", None), "current_path": getattr(document, "current_path", None), "source_path": getattr(document, "source_path", None), "canonical_filename": getattr(document, "canonical_filename", None), }, "text_source": { "raw_ocr_version_id": getattr(raw_ocr, "id", None) if raw_ocr else None, "reviewed_version_id": getattr(reviewed, "id", None) if reviewed else None, "active_version_id": getattr(source_tv, "id", None) if source_tv else None, "active_version_type": getattr(source_tv, "version_type", None) if source_tv else None, }, "pages": (layout_json or {}).get("pages", []) if isinstance(layout_json, dict) else [], "semantic_candidates": { "merchant": getattr(extracted, "merchant_normalized", None) if extracted else None, "merchant_raw": getattr(extracted, "merchant_raw", None) if extracted else None, "transaction_date": str(getattr(extracted, "transaction_date", None)) if extracted and getattr(extracted, "transaction_date", None) else None, "total": str(getattr(extracted, "total", None)) if extracted and getattr(extracted, "total", None) is not None else None, "tax": str(getattr(extracted, "tax", None)) if extracted and getattr(extracted, "tax", None) is not None else None, "subtotal": str(getattr(extracted, "subtotal", None)) if extracted and getattr(extracted, "subtotal", None) is not None else None, }, "quality": { "text_present": _layout_has_any_text(layout_json), "usable_layout": _layout_has_usable_bboxes(layout_json), "usable_word_boxes": False, "issues": [], }, } if not analysis["quality"]["text_present"]: analysis["quality"]["issues"].append("no_text_in_layout") if not analysis["quality"]["usable_layout"]: analysis["quality"]["issues"].append("no_usable_bboxes") return analysis def get_current_document_analysis(db: Session, document) -> DocumentAnalysisVersion | None: return ( db.query(DocumentAnalysisVersion) .filter( DocumentAnalysisVersion.document_id == document.id, DocumentAnalysisVersion.is_current.is_(True), ) .order_by(DocumentAnalysisVersion.version_number.desc(), DocumentAnalysisVersion.id.desc()) .first() ) def ensure_document_analysis(db: Session, document, require_layout: bool = True) -> DocumentAnalysisVersion: current = get_current_document_analysis(db, document) if current and current.analysis_json: quality = (current.analysis_json or {}).get("quality", {}) or {} if not require_layout or quality.get("usable_layout"): return current analysis_json = _build_canonical_analysis_from_document(document) quality = analysis_json.get("quality", {}) or {} if require_layout and not quality.get("usable_layout"): raise ValueError("document_analysis_missing_usable_layout") db.query(DocumentAnalysisVersion).filter( DocumentAnalysisVersion.document_id == document.id, DocumentAnalysisVersion.is_current.is_(True), ).update({"is_current": False}, synchronize_session=False) next_version = ( db.query(func.max(DocumentAnalysisVersion.version_number)) .filter(DocumentAnalysisVersion.document_id == document.id) .scalar() or 0 ) + 1 row = DocumentAnalysisVersion( document_id=document.id, version_number=next_version, analysis_type="canonical", is_current=True, created_by="ensure_document_analysis", engine_name="internal_existing_ocr_adapter", engine_version="v1", model_name=None, prompt_version=None, quality_score=1.0 if quality.get("usable_layout") else 0.5 if quality.get("text_present") else 0.0, quality_note=None, quality_flags=quality.get("issues", []), analysis_json=analysis_json, ) db.add(row) db.commit() db.refresh(row) return row def build_layout_ocr_analysis_for_document(document) -> dict[str, Any]: current_path = getattr(document, "current_path", None) if not current_path: raise ValueError("Document has no current_path") result = run_layout_ocr(current_path) analysis_json = result.to_analysis_json() pages = analysis_json.get("pages", []) or [] text_lines = [] usable_layout = False for page in pages: for line in page.get("lines", []) or []: line_text = (line.get("text") or "").strip() if line_text: text_lines.append(line_text) bbox = line.get("bbox") if isinstance(bbox, (list, tuple)) and len(bbox) == 4 and all(v is not None for v in bbox): usable_layout = True issues: list[str] = [] if not text_lines: issues.append("no_text_detected") if not usable_layout: issues.append("no_usable_bboxes") analysis_json["text_source"] = { "active_version_id": None, "raw_ocr_version_id": None, "reviewed_version_id": None, "active_version_type": "layout_ocr", } analysis_json["quality"] = { "text_present": bool(text_lines), "usable_layout": usable_layout, "usable_word_boxes": usable_layout, "issues": issues, } analysis_json["text_content"] = "\n".join(text_lines) analysis_json["engine"] = { "name": result.engine_name, "version": result.engine_version, } return analysis_json