208 lines
6.1 KiB
Python
208 lines
6.1 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Any
|
|
import hashlib
|
|
import tempfile
|
|
|
|
try:
|
|
import fitz # PyMuPDF
|
|
except Exception: # pragma: no cover
|
|
fitz = None
|
|
|
|
try:
|
|
import cv2
|
|
except Exception: # pragma: no cover
|
|
cv2 = None
|
|
|
|
|
|
def _render_pdf_page_to_png(path: Path, *, page_number: int = 0, dpi: int = 200) -> dict[str, Any]:
|
|
if fitz is None:
|
|
return {
|
|
"status": "render_failed",
|
|
"error": "pymupdf_not_available",
|
|
"rendered_pages": [],
|
|
}
|
|
|
|
cache_root = Path(tempfile.gettempdir()) / "document_processor_vision"
|
|
cache_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
digest = hashlib.sha256(str(path).encode("utf-8")).hexdigest()[:16]
|
|
png_path = cache_root / f"{path.stem}_{digest}_page{page_number + 1}_{dpi}dpi.png"
|
|
|
|
doc = fitz.open(str(path))
|
|
try:
|
|
page_count = doc.page_count
|
|
if page_count <= page_number:
|
|
return {
|
|
"status": "render_failed",
|
|
"error": "page_number_out_of_range",
|
|
"page_count": page_count,
|
|
"rendered_pages": [],
|
|
}
|
|
|
|
page = doc.load_page(page_number)
|
|
matrix = fitz.Matrix(dpi / 72.0, dpi / 72.0)
|
|
pix = page.get_pixmap(matrix=matrix, alpha=False)
|
|
pix.save(str(png_path))
|
|
|
|
return {
|
|
"status": "image_rendered",
|
|
"page_count": page_count,
|
|
"rendered_pages": [
|
|
{
|
|
"page": page_number + 1,
|
|
"png_path": str(png_path),
|
|
"width": pix.width,
|
|
"height": pix.height,
|
|
"dpi": dpi,
|
|
}
|
|
],
|
|
}
|
|
finally:
|
|
doc.close()
|
|
|
|
|
|
|
|
def _detect_visual_regions(png_path: str | Path) -> list[dict[str, Any]]:
|
|
"""
|
|
Detect coarse visual/text regions from a rendered document image.
|
|
|
|
This is intentionally conservative. It does not replace OCR boxes yet;
|
|
it gives the vision pipeline a first set of image-derived regions that
|
|
can later be scored, merged, or sent to a VLM.
|
|
"""
|
|
if cv2 is None:
|
|
return []
|
|
|
|
img = cv2.imread(str(png_path))
|
|
if img is None:
|
|
return []
|
|
|
|
height, width = img.shape[:2]
|
|
page_area = float(width * height)
|
|
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Convert dark text/lines to white foreground.
|
|
thresh = cv2.adaptiveThreshold(
|
|
gray,
|
|
255,
|
|
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY_INV,
|
|
35,
|
|
15,
|
|
)
|
|
|
|
# Merge nearby characters into coarse rows/regions.
|
|
kernel_w = max(12, width // 90)
|
|
kernel_h = max(3, height // 350)
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_w, kernel_h))
|
|
merged = cv2.dilate(thresh, kernel, iterations=2)
|
|
|
|
contours, _ = cv2.findContours(merged, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
regions: list[dict[str, Any]] = []
|
|
for contour in contours:
|
|
x, y, w, h = cv2.boundingRect(contour)
|
|
area = float(w * h)
|
|
|
|
if area < page_area * 0.00008:
|
|
continue
|
|
if w < width * 0.04 or h < 4:
|
|
continue
|
|
if area > page_area * 0.65:
|
|
continue
|
|
|
|
regions.append(
|
|
{
|
|
"bbox": [int(x), int(y), int(x + w), int(y + h)],
|
|
"label": "cv_region",
|
|
"confidence": 0.35,
|
|
"source": "opencv_adaptive_threshold_contours",
|
|
"page": 1,
|
|
}
|
|
)
|
|
|
|
# Stable reading-ish order.
|
|
regions.sort(key=lambda r: (r["bbox"][1], r["bbox"][0]))
|
|
|
|
# Avoid huge payloads for now.
|
|
return regions[:200]
|
|
|
|
def analyze_document_image(image_path: str | Path, *, model_name: str = "placeholder") -> dict[str, Any]:
|
|
"""
|
|
Backend-only vision analysis entrypoint.
|
|
|
|
Current phase:
|
|
- renders the first PDF page to PNG
|
|
- returns normalized metadata for later CV/Ollama processing
|
|
"""
|
|
path = Path(image_path)
|
|
|
|
render_result: dict[str, Any]
|
|
if path.exists() and path.suffix.lower() == ".pdf":
|
|
render_result = _render_pdf_page_to_png(path)
|
|
elif path.exists():
|
|
render_result = {
|
|
"status": "image_available",
|
|
"rendered_pages": [
|
|
{
|
|
"page": 1,
|
|
"png_path": str(path),
|
|
"width": None,
|
|
"height": None,
|
|
"dpi": None,
|
|
}
|
|
],
|
|
}
|
|
else:
|
|
render_result = {
|
|
"status": "source_missing",
|
|
"error": "image_path_does_not_exist",
|
|
"rendered_pages": [],
|
|
}
|
|
|
|
rendered_pages = render_result.get("rendered_pages") or []
|
|
vision_regions: list[dict[str, Any]] = []
|
|
if rendered_pages and rendered_pages[0].get("png_path"):
|
|
vision_regions = _detect_visual_regions(rendered_pages[0]["png_path"])
|
|
|
|
return {
|
|
"schema_version": "vision_analysis_v1",
|
|
"engine": "local",
|
|
"model_name": model_name,
|
|
"image_path": str(path),
|
|
**render_result,
|
|
"layers": {
|
|
"vision_regions": vision_regions,
|
|
"vision_lines": [],
|
|
"vision_boxes": [],
|
|
"vision_fields": [],
|
|
"vision_line_items": [],
|
|
},
|
|
"notes": [
|
|
"Vision module rendered/located image input.",
|
|
"OpenCV coarse region detection has run when available.",
|
|
"No CV/Ollama model is connected yet.",
|
|
],
|
|
}
|
|
|
|
|
|
def build_vision_assisted_layout(source_layout: dict[str, Any] | None, vision_result: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
Convert vision analysis into normal layout_json.
|
|
|
|
Current phase:
|
|
- preserves the current source layout
|
|
- tags it as vision-assisted
|
|
"""
|
|
layout = dict(source_layout or {"pages": []})
|
|
layout["vision_assisted"] = True
|
|
layout["vision_assisted_status"] = vision_result.get("status", "unknown")
|
|
layout["vision_engine"] = vision_result.get("engine")
|
|
layout["vision_model_name"] = vision_result.get("model_name")
|
|
layout["layout_sync_source"] = "vision_assisted"
|
|
layout["layout_needs_review"] = True
|
|
return layout
|