document-processor/app/logic/vision_analysis.py

450 lines
14 KiB
Python

from __future__ import annotations
from pathlib import Path
from typing import Any
import hashlib
import tempfile
try:
import fitz # PyMuPDF
except Exception: # pragma: no cover
fitz = None
try:
import cv2
except Exception: # pragma: no cover
cv2 = None
def _render_pdf_page_to_png(path: Path, *, page_number: int = 0, dpi: int = 200) -> dict[str, Any]:
if fitz is None:
return {
"status": "render_failed",
"error": "pymupdf_not_available",
"rendered_pages": [],
}
cache_root = Path(tempfile.gettempdir()) / "document_processor_vision"
cache_root.mkdir(parents=True, exist_ok=True)
digest = hashlib.sha256(str(path).encode("utf-8")).hexdigest()[:16]
png_path = cache_root / f"{path.stem}_{digest}_page{page_number + 1}_{dpi}dpi.png"
doc = fitz.open(str(path))
try:
page_count = doc.page_count
if page_count <= page_number:
return {
"status": "render_failed",
"error": "page_number_out_of_range",
"page_count": page_count,
"rendered_pages": [],
}
page = doc.load_page(page_number)
matrix = fitz.Matrix(dpi / 72.0, dpi / 72.0)
pix = page.get_pixmap(matrix=matrix, alpha=False)
pix.save(str(png_path))
return {
"status": "image_rendered",
"page_count": page_count,
"rendered_pages": [
{
"page": page_number + 1,
"png_path": str(png_path),
"width": pix.width,
"height": pix.height,
"dpi": dpi,
}
],
}
finally:
doc.close()
def _detect_visual_regions(png_path: str | Path) -> list[dict[str, Any]]:
"""
Detect coarse visual/text regions from a rendered document image.
This is intentionally conservative. It does not replace OCR boxes yet;
it gives the vision pipeline a first set of image-derived regions that
can later be scored, merged, or sent to a VLM.
"""
if cv2 is None:
return []
img = cv2.imread(str(png_path))
if img is None:
return []
height, width = img.shape[:2]
page_area = float(width * height)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Convert dark text/lines to white foreground.
thresh = cv2.adaptiveThreshold(
gray,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
35,
15,
)
# Merge nearby characters into coarse rows/regions.
kernel_w = max(12, width // 90)
kernel_h = max(3, height // 350)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_w, kernel_h))
merged = cv2.dilate(thresh, kernel, iterations=2)
contours, _ = cv2.findContours(merged, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
regions: list[dict[str, Any]] = []
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
area = float(w * h)
if area < page_area * 0.00008:
continue
if w < width * 0.04 or h < 4:
continue
if area > page_area * 0.65:
continue
regions.append(
{
"bbox": [int(x), int(y), int(x + w), int(y + h)],
"label": "cv_region",
"confidence": 0.35,
"source": "opencv_adaptive_threshold_contours",
"page": 1,
}
)
# Stable reading-ish order.
regions.sort(key=lambda r: (r["bbox"][1], r["bbox"][0]))
# Avoid huge payloads for now.
return regions[:200]
def analyze_document_image(image_path: str | Path, *, model_name: str = "placeholder") -> dict[str, Any]:
"""
Backend-only vision analysis entrypoint.
Current phase:
- renders the first PDF page to PNG
- returns normalized metadata for later CV/Ollama processing
"""
path = Path(image_path)
render_result: dict[str, Any]
if path.exists() and path.suffix.lower() == ".pdf":
render_result = _render_pdf_page_to_png(path)
elif path.exists():
render_result = {
"status": "image_available",
"rendered_pages": [
{
"page": 1,
"png_path": str(path),
"width": None,
"height": None,
"dpi": None,
}
],
}
else:
render_result = {
"status": "source_missing",
"error": "image_path_does_not_exist",
"rendered_pages": [],
}
rendered_pages = render_result.get("rendered_pages") or []
vision_regions: list[dict[str, Any]] = []
if rendered_pages and rendered_pages[0].get("png_path"):
vision_regions = _detect_visual_regions(rendered_pages[0]["png_path"])
return {
"schema_version": "vision_analysis_v1",
"engine": "local",
"model_name": model_name,
"image_path": str(path),
**render_result,
"layers": {
"vision_regions": vision_regions,
"vision_lines": [],
"vision_boxes": [],
"vision_fields": [],
"vision_line_items": [],
},
"notes": [
"Vision module rendered/located image input.",
"OpenCV coarse region detection has run when available.",
"No CV/Ollama model is connected yet.",
],
}
def _bbox_iou(a: list[float] | tuple[float, ...], b: list[float] | tuple[float, ...]) -> float:
ax1, ay1, ax2, ay2 = [float(v) for v in a[:4]]
bx1, by1, bx2, by2 = [float(v) for v in b[:4]]
ix1 = max(ax1, bx1)
iy1 = max(ay1, by1)
ix2 = min(ax2, bx2)
iy2 = min(ay2, by2)
iw = max(0.0, ix2 - ix1)
ih = max(0.0, iy2 - iy1)
inter = iw * ih
area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1)
area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1)
denom = area_a + area_b - inter
return inter / denom if denom else 0.0
def _region_contains_ratio(region: list[float] | tuple[float, ...], item: list[float] | tuple[float, ...]) -> float:
rx1, ry1, rx2, ry2 = [float(v) for v in region[:4]]
ix1, iy1, ix2, iy2 = [float(v) for v in item[:4]]
x1 = max(rx1, ix1)
y1 = max(ry1, iy1)
x2 = min(rx2, ix2)
y2 = min(ry2, iy2)
inter = max(0.0, x2 - x1) * max(0.0, y2 - y1)
item_area = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1)
return inter / item_area if item_area else 0.0
def _scale_bbox(
bbox: list[float] | tuple[float, ...],
*,
scale_x: float,
scale_y: float,
) -> list[float]:
x1, y1, x2, y2 = [float(v) for v in bbox[:4]]
return [
x1 * scale_x,
y1 * scale_y,
x2 * scale_x,
y2 * scale_y,
]
def normalize_vision_regions_to_layout(
vision_result: dict[str, Any],
layout_json: dict[str, Any] | None,
) -> dict[str, Any]:
"""
Convert OpenCV rendered-image pixel coordinates into layout_json page coordinates.
"""
pages = (layout_json or {}).get("pages") or []
rendered_pages = vision_result.get("rendered_pages") or []
layers = vision_result.setdefault("layers", {})
regions = layers.get("vision_regions") or []
if not pages or not rendered_pages or not regions:
vision_result["coordinate_space"] = "unknown_or_unscaled"
return vision_result
page = pages[0]
rendered = rendered_pages[0]
layout_w = float(page.get("page_width") or page.get("width") or 0)
layout_h = float(page.get("page_height") or page.get("height") or 0)
rendered_w = float(rendered.get("width") or 0)
rendered_h = float(rendered.get("height") or 0)
if not layout_w or not layout_h or not rendered_w or not rendered_h:
vision_result["coordinate_space"] = "rendered_pixels_unscaled"
return vision_result
scale_x = layout_w / rendered_w
scale_y = layout_h / rendered_h
normalized = []
for region in regions:
bbox = region.get("bbox")
if not bbox:
continue
item = dict(region)
item["rendered_bbox"] = bbox
item["bbox"] = _scale_bbox(bbox, scale_x=scale_x, scale_y=scale_y)
item["coordinate_space"] = "layout_page"
item["scale_x"] = scale_x
item["scale_y"] = scale_y
normalized.append(item)
layers["vision_regions"] = normalized
vision_result["coordinate_space"] = "layout_page"
vision_result["coordinate_normalization"] = {
"source": "rendered_pixels",
"target": "layout_page",
"layout_width": layout_w,
"layout_height": layout_h,
"rendered_width": rendered_w,
"rendered_height": rendered_h,
"scale_x": scale_x,
"scale_y": scale_y,
}
return vision_result
def score_vision_regions_against_layout(
vision_result: dict[str, Any],
layout_json: dict[str, Any] | None,
) -> dict[str, Any]:
"""
Compare OpenCV regions against existing OCR layout lines.
Purpose:
- measure whether CV regions line up with OCR line boxes
- identify OCR lines not covered by CV regions
- identify CV regions with no OCR coverage
"""
pages = (layout_json or {}).get("pages") or []
regions = ((vision_result.get("layers") or {}).get("vision_regions")) or []
if not pages or not regions:
return {
"schema_version": "vision_region_scoring_v1",
"status": "not_enough_data",
"page_scores": [],
"summary": {
"pages": len(pages),
"regions": len(regions),
"lines": 0,
"matched_lines": 0,
"unmatched_lines": 0,
"unmatched_regions": len(regions),
},
}
page_scores: list[dict[str, Any]] = []
total_lines = 0
total_matched_lines = 0
total_unmatched_regions = 0
for page in pages:
page_number = int(page.get("page") or 1)
page_lines = page.get("lines") or []
page_regions = [r for r in regions if int(r.get("page") or 1) == page_number]
matched_region_indexes: set[int] = set()
line_scores: list[dict[str, Any]] = []
for line in page_lines:
bbox = line.get("bbox")
if not bbox:
continue
best = {
"region_index": None,
"iou": 0.0,
"contains_ratio": 0.0,
"region_bbox": None,
}
for idx, region in enumerate(page_regions):
region_bbox = region.get("bbox")
if not region_bbox:
continue
iou = _bbox_iou(region_bbox, bbox)
contains = _region_contains_ratio(region_bbox, bbox)
score = max(iou, contains)
if score > max(best["iou"], best["contains_ratio"]):
best = {
"region_index": idx,
"iou": round(iou, 4),
"contains_ratio": round(contains, 4),
"region_bbox": region_bbox,
}
matched = (best["contains_ratio"] >= 0.55) or (best["iou"] >= 0.10)
if matched and best["region_index"] is not None:
matched_region_indexes.add(int(best["region_index"]))
line_scores.append(
{
"line_text": str(line.get("text") or "")[:120],
"line_bbox": bbox,
"matched": matched,
**best,
}
)
total_lines += len(line_scores)
matched_lines = sum(1 for item in line_scores if item["matched"])
total_matched_lines += matched_lines
unmatched_region_indexes = [
idx for idx in range(len(page_regions)) if idx not in matched_region_indexes
]
total_unmatched_regions += len(unmatched_region_indexes)
page_scores.append(
{
"page": page_number,
"line_count": len(line_scores),
"region_count": len(page_regions),
"matched_line_count": matched_lines,
"unmatched_line_count": len(line_scores) - matched_lines,
"unmatched_region_count": len(unmatched_region_indexes),
"line_scores": line_scores[:200],
"unmatched_regions": [
page_regions[idx] for idx in unmatched_region_indexes[:100]
],
}
)
return {
"schema_version": "vision_region_scoring_v1",
"status": "scored",
"page_scores": page_scores,
"summary": {
"pages": len(pages),
"regions": len(regions),
"lines": total_lines,
"matched_lines": total_matched_lines,
"unmatched_lines": total_lines - total_matched_lines,
"unmatched_regions": total_unmatched_regions,
},
}
def build_vision_assisted_layout(source_layout: dict[str, Any] | None, vision_result: dict[str, Any]) -> dict[str, Any]:
"""
Convert vision analysis into normal layout_json.
Current phase:
- normalizes CV coordinates into layout page coordinates
- scores CV region coverage against OCR layout lines
- preserves the current source layout for editor stability
- stores diagnostics on the layout candidate
"""
layout = dict(source_layout or {"pages": []})
normalized_vision = normalize_vision_regions_to_layout(vision_result, layout)
region_score = score_vision_regions_against_layout(normalized_vision, layout)
layout["vision_assisted"] = True
layout["vision_assisted_status"] = normalized_vision.get("status", "unknown")
layout["vision_engine"] = normalized_vision.get("engine")
layout["vision_model_name"] = normalized_vision.get("model_name")
layout["vision_coordinate_normalization"] = normalized_vision.get("coordinate_normalization")
layout["vision_region_score"] = region_score
layout["layout_sync_source"] = "vision_assisted"
layout["layout_needs_review"] = True
return layout