document-processor/app/logic/document_analysis.py

208 lines
7.7 KiB
Python

from __future__ import annotations
from typing import Any
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.models.document_analysis_version import DocumentAnalysisVersion
from app.logic.layout_ocr import run_layout_ocr
def _flatten_layout_lines(layout_json: dict | None) -> list[dict[str, Any]]:
if not layout_json:
return []
lines: list[dict[str, Any]] = []
for page in layout_json.get("pages", []) or []:
for line in page.get("lines", []) or []:
if isinstance(line, dict):
lines.append(line)
return lines
def _layout_has_any_text(layout_json: dict | None) -> bool:
for line in _flatten_layout_lines(layout_json):
if (line.get("text") or "").strip():
return True
return False
def _layout_has_usable_bboxes(layout_json: dict | None) -> bool:
for line in _flatten_layout_lines(layout_json):
bbox = line.get("bbox")
if (
isinstance(bbox, (list, tuple))
and len(bbox) == 4
and all(v is not None for v in bbox)
):
return True
return False
def _build_canonical_analysis_from_document(document) -> dict[str, Any]:
text_versions = sorted(
getattr(document, "text_versions", []) or [],
key=lambda tv: ((tv.version_number or 0), getattr(tv, "created_at", None) or 0),
reverse=True,
)
raw_ocr = next((tv for tv in text_versions if getattr(tv, "is_current", False) and tv.version_type == "raw_ocr"), None)
reviewed = next((tv for tv in text_versions if getattr(tv, "is_current", False) and tv.version_type == "reviewed"), None)
source_tv = reviewed or raw_ocr
layout_json = getattr(source_tv, "layout_json", None) if source_tv else None
extracted = None
extracted_rows = getattr(document, "extracted_fields", None) or []
if extracted_rows:
extracted = extracted_rows[0]
analysis = {
"schema_version": 1,
"analysis_type": "canonical",
"document_info": {
"document_id": document.document_id,
"document_type": getattr(document, "document_type", None),
"mime_type": getattr(document, "mime_type", None),
"current_path": getattr(document, "current_path", None),
"source_path": getattr(document, "source_path", None),
"canonical_filename": getattr(document, "canonical_filename", None),
},
"text_source": {
"raw_ocr_version_id": getattr(raw_ocr, "id", None) if raw_ocr else None,
"reviewed_version_id": getattr(reviewed, "id", None) if reviewed else None,
"active_version_id": getattr(source_tv, "id", None) if source_tv else None,
"active_version_type": getattr(source_tv, "version_type", None) if source_tv else None,
},
"pages": (layout_json or {}).get("pages", []) if isinstance(layout_json, dict) else [],
"semantic_candidates": {
"merchant": getattr(extracted, "merchant_normalized", None) if extracted else None,
"merchant_raw": getattr(extracted, "merchant_raw", None) if extracted else None,
"transaction_date": str(getattr(extracted, "transaction_date", None)) if extracted and getattr(extracted, "transaction_date", None) else None,
"total": str(getattr(extracted, "total", None)) if extracted and getattr(extracted, "total", None) is not None else None,
"tax": str(getattr(extracted, "tax", None)) if extracted and getattr(extracted, "tax", None) is not None else None,
"subtotal": str(getattr(extracted, "subtotal", None)) if extracted and getattr(extracted, "subtotal", None) is not None else None,
},
"quality": {
"text_present": _layout_has_any_text(layout_json),
"usable_layout": _layout_has_usable_bboxes(layout_json),
"usable_word_boxes": False,
"issues": [],
},
}
if not analysis["quality"]["text_present"]:
analysis["quality"]["issues"].append("no_text_in_layout")
if not analysis["quality"]["usable_layout"]:
analysis["quality"]["issues"].append("no_usable_bboxes")
return analysis
def get_current_document_analysis(db: Session, document) -> DocumentAnalysisVersion | None:
return (
db.query(DocumentAnalysisVersion)
.filter(
DocumentAnalysisVersion.document_id == document.id,
DocumentAnalysisVersion.is_current.is_(True),
)
.order_by(DocumentAnalysisVersion.version_number.desc(), DocumentAnalysisVersion.id.desc())
.first()
)
def ensure_document_analysis(db: Session, document, require_layout: bool = True) -> DocumentAnalysisVersion:
current = get_current_document_analysis(db, document)
if current and current.analysis_json:
quality = (current.analysis_json or {}).get("quality", {}) or {}
if not require_layout or quality.get("usable_layout"):
return current
analysis_json = _build_canonical_analysis_from_document(document)
quality = analysis_json.get("quality", {}) or {}
if require_layout and not quality.get("usable_layout"):
raise ValueError("document_analysis_missing_usable_layout")
db.query(DocumentAnalysisVersion).filter(
DocumentAnalysisVersion.document_id == document.id,
DocumentAnalysisVersion.is_current.is_(True),
).update({"is_current": False}, synchronize_session=False)
next_version = (
db.query(func.max(DocumentAnalysisVersion.version_number))
.filter(DocumentAnalysisVersion.document_id == document.id)
.scalar()
or 0
) + 1
row = DocumentAnalysisVersion(
document_id=document.id,
version_number=next_version,
analysis_type="canonical",
is_current=True,
created_by="ensure_document_analysis",
engine_name="internal_existing_ocr_adapter",
engine_version="v1",
model_name=None,
prompt_version=None,
quality_score=1.0 if quality.get("usable_layout") else 0.5 if quality.get("text_present") else 0.0,
quality_note=None,
quality_flags=quality.get("issues", []),
analysis_json=analysis_json,
)
db.add(row)
db.commit()
db.refresh(row)
return row
def build_layout_ocr_analysis_for_document(document) -> dict[str, Any]:
current_path = getattr(document, "current_path", None)
if not current_path:
raise ValueError("Document has no current_path")
result = run_layout_ocr(current_path)
analysis_json = result.to_analysis_json()
pages = analysis_json.get("pages", []) or []
text_lines = []
usable_layout = False
for page in pages:
for line in page.get("lines", []) or []:
line_text = (line.get("text") or "").strip()
if line_text:
text_lines.append(line_text)
bbox = line.get("bbox")
if isinstance(bbox, (list, tuple)) and len(bbox) == 4 and all(v is not None for v in bbox):
usable_layout = True
issues: list[str] = []
if not text_lines:
issues.append("no_text_detected")
if not usable_layout:
issues.append("no_usable_bboxes")
analysis_json["text_source"] = {
"active_version_id": None,
"raw_ocr_version_id": None,
"reviewed_version_id": None,
"active_version_type": "layout_ocr",
}
analysis_json["quality"] = {
"text_present": bool(text_lines),
"usable_layout": usable_layout,
"usable_word_boxes": usable_layout,
"issues": issues,
}
analysis_json["text_content"] = "\n".join(text_lines)
analysis_json["engine"] = {
"name": result.engine_name,
"version": result.engine_version,
}
return analysis_json