208 lines
7.7 KiB
Python
208 lines
7.7 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.document_analysis_version import DocumentAnalysisVersion
|
|
from app.logic.layout_ocr import run_layout_ocr
|
|
|
|
|
|
def _flatten_layout_lines(layout_json: dict | None) -> list[dict[str, Any]]:
|
|
if not layout_json:
|
|
return []
|
|
lines: list[dict[str, Any]] = []
|
|
for page in layout_json.get("pages", []) or []:
|
|
for line in page.get("lines", []) or []:
|
|
if isinstance(line, dict):
|
|
lines.append(line)
|
|
return lines
|
|
|
|
|
|
def _layout_has_any_text(layout_json: dict | None) -> bool:
|
|
for line in _flatten_layout_lines(layout_json):
|
|
if (line.get("text") or "").strip():
|
|
return True
|
|
return False
|
|
|
|
|
|
def _layout_has_usable_bboxes(layout_json: dict | None) -> bool:
|
|
for line in _flatten_layout_lines(layout_json):
|
|
bbox = line.get("bbox")
|
|
if (
|
|
isinstance(bbox, (list, tuple))
|
|
and len(bbox) == 4
|
|
and all(v is not None for v in bbox)
|
|
):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _build_canonical_analysis_from_document(document) -> dict[str, Any]:
|
|
text_versions = sorted(
|
|
getattr(document, "text_versions", []) or [],
|
|
key=lambda tv: ((tv.version_number or 0), getattr(tv, "created_at", None) or 0),
|
|
reverse=True,
|
|
)
|
|
|
|
raw_ocr = next((tv for tv in text_versions if getattr(tv, "is_current", False) and tv.version_type == "raw_ocr"), None)
|
|
reviewed = next((tv for tv in text_versions if getattr(tv, "is_current", False) and tv.version_type == "reviewed"), None)
|
|
|
|
source_tv = reviewed or raw_ocr
|
|
layout_json = getattr(source_tv, "layout_json", None) if source_tv else None
|
|
|
|
extracted = None
|
|
extracted_rows = getattr(document, "extracted_fields", None) or []
|
|
if extracted_rows:
|
|
extracted = extracted_rows[0]
|
|
|
|
analysis = {
|
|
"schema_version": 1,
|
|
"analysis_type": "canonical",
|
|
"document_info": {
|
|
"document_id": document.document_id,
|
|
"document_type": getattr(document, "document_type", None),
|
|
"mime_type": getattr(document, "mime_type", None),
|
|
"current_path": getattr(document, "current_path", None),
|
|
"source_path": getattr(document, "source_path", None),
|
|
"canonical_filename": getattr(document, "canonical_filename", None),
|
|
},
|
|
"text_source": {
|
|
"raw_ocr_version_id": getattr(raw_ocr, "id", None) if raw_ocr else None,
|
|
"reviewed_version_id": getattr(reviewed, "id", None) if reviewed else None,
|
|
"active_version_id": getattr(source_tv, "id", None) if source_tv else None,
|
|
"active_version_type": getattr(source_tv, "version_type", None) if source_tv else None,
|
|
},
|
|
"pages": (layout_json or {}).get("pages", []) if isinstance(layout_json, dict) else [],
|
|
"semantic_candidates": {
|
|
"merchant": getattr(extracted, "merchant_normalized", None) if extracted else None,
|
|
"merchant_raw": getattr(extracted, "merchant_raw", None) if extracted else None,
|
|
"transaction_date": str(getattr(extracted, "transaction_date", None)) if extracted and getattr(extracted, "transaction_date", None) else None,
|
|
"total": str(getattr(extracted, "total", None)) if extracted and getattr(extracted, "total", None) is not None else None,
|
|
"tax": str(getattr(extracted, "tax", None)) if extracted and getattr(extracted, "tax", None) is not None else None,
|
|
"subtotal": str(getattr(extracted, "subtotal", None)) if extracted and getattr(extracted, "subtotal", None) is not None else None,
|
|
},
|
|
"quality": {
|
|
"text_present": _layout_has_any_text(layout_json),
|
|
"usable_layout": _layout_has_usable_bboxes(layout_json),
|
|
"usable_word_boxes": False,
|
|
"issues": [],
|
|
},
|
|
}
|
|
|
|
if not analysis["quality"]["text_present"]:
|
|
analysis["quality"]["issues"].append("no_text_in_layout")
|
|
if not analysis["quality"]["usable_layout"]:
|
|
analysis["quality"]["issues"].append("no_usable_bboxes")
|
|
|
|
return analysis
|
|
|
|
|
|
def get_current_document_analysis(db: Session, document) -> DocumentAnalysisVersion | None:
|
|
return (
|
|
db.query(DocumentAnalysisVersion)
|
|
.filter(
|
|
DocumentAnalysisVersion.document_id == document.id,
|
|
DocumentAnalysisVersion.is_current.is_(True),
|
|
)
|
|
.order_by(DocumentAnalysisVersion.version_number.desc(), DocumentAnalysisVersion.id.desc())
|
|
.first()
|
|
)
|
|
|
|
|
|
def ensure_document_analysis(db: Session, document, require_layout: bool = True) -> DocumentAnalysisVersion:
|
|
current = get_current_document_analysis(db, document)
|
|
if current and current.analysis_json:
|
|
quality = (current.analysis_json or {}).get("quality", {}) or {}
|
|
if not require_layout or quality.get("usable_layout"):
|
|
return current
|
|
|
|
analysis_json = _build_canonical_analysis_from_document(document)
|
|
quality = analysis_json.get("quality", {}) or {}
|
|
|
|
if require_layout and not quality.get("usable_layout"):
|
|
raise ValueError("document_analysis_missing_usable_layout")
|
|
|
|
db.query(DocumentAnalysisVersion).filter(
|
|
DocumentAnalysisVersion.document_id == document.id,
|
|
DocumentAnalysisVersion.is_current.is_(True),
|
|
).update({"is_current": False}, synchronize_session=False)
|
|
|
|
next_version = (
|
|
db.query(func.max(DocumentAnalysisVersion.version_number))
|
|
.filter(DocumentAnalysisVersion.document_id == document.id)
|
|
.scalar()
|
|
or 0
|
|
) + 1
|
|
|
|
row = DocumentAnalysisVersion(
|
|
document_id=document.id,
|
|
version_number=next_version,
|
|
analysis_type="canonical",
|
|
is_current=True,
|
|
created_by="ensure_document_analysis",
|
|
engine_name="internal_existing_ocr_adapter",
|
|
engine_version="v1",
|
|
model_name=None,
|
|
prompt_version=None,
|
|
quality_score=1.0 if quality.get("usable_layout") else 0.5 if quality.get("text_present") else 0.0,
|
|
quality_note=None,
|
|
quality_flags=quality.get("issues", []),
|
|
analysis_json=analysis_json,
|
|
)
|
|
db.add(row)
|
|
db.commit()
|
|
db.refresh(row)
|
|
return row
|
|
|
|
|
|
|
|
def build_layout_ocr_analysis_for_document(document) -> dict[str, Any]:
|
|
current_path = getattr(document, "current_path", None)
|
|
if not current_path:
|
|
raise ValueError("Document has no current_path")
|
|
|
|
result = run_layout_ocr(current_path)
|
|
analysis_json = result.to_analysis_json()
|
|
|
|
pages = analysis_json.get("pages", []) or []
|
|
text_lines = []
|
|
usable_layout = False
|
|
|
|
for page in pages:
|
|
for line in page.get("lines", []) or []:
|
|
line_text = (line.get("text") or "").strip()
|
|
if line_text:
|
|
text_lines.append(line_text)
|
|
bbox = line.get("bbox")
|
|
if isinstance(bbox, (list, tuple)) and len(bbox) == 4 and all(v is not None for v in bbox):
|
|
usable_layout = True
|
|
|
|
issues: list[str] = []
|
|
if not text_lines:
|
|
issues.append("no_text_detected")
|
|
if not usable_layout:
|
|
issues.append("no_usable_bboxes")
|
|
|
|
analysis_json["text_source"] = {
|
|
"active_version_id": None,
|
|
"raw_ocr_version_id": None,
|
|
"reviewed_version_id": None,
|
|
"active_version_type": "layout_ocr",
|
|
}
|
|
analysis_json["quality"] = {
|
|
"text_present": bool(text_lines),
|
|
"usable_layout": usable_layout,
|
|
"usable_word_boxes": usable_layout,
|
|
"issues": issues,
|
|
}
|
|
analysis_json["text_content"] = "\n".join(text_lines)
|
|
|
|
analysis_json["engine"] = {
|
|
"name": result.engine_name,
|
|
"version": result.engine_version,
|
|
}
|
|
|
|
return analysis_json
|