652 lines
20 KiB
Python
652 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Any
|
|
import hashlib
|
|
import tempfile
|
|
|
|
try:
|
|
import fitz # PyMuPDF
|
|
except Exception: # pragma: no cover
|
|
fitz = None
|
|
|
|
try:
|
|
import cv2
|
|
except Exception: # pragma: no cover
|
|
cv2 = None
|
|
|
|
try:
|
|
import pytesseract
|
|
except Exception: # pragma: no cover
|
|
pytesseract = None
|
|
|
|
|
|
def _render_pdf_page_to_png(path: Path, *, page_number: int = 0, dpi: int = 200) -> dict[str, Any]:
|
|
if fitz is None:
|
|
return {
|
|
"status": "render_failed",
|
|
"error": "pymupdf_not_available",
|
|
"rendered_pages": [],
|
|
}
|
|
|
|
cache_root = Path(tempfile.gettempdir()) / "document_processor_vision"
|
|
cache_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
digest = hashlib.sha256(str(path).encode("utf-8")).hexdigest()[:16]
|
|
png_path = cache_root / f"{path.stem}_{digest}_page{page_number + 1}_{dpi}dpi.png"
|
|
|
|
doc = fitz.open(str(path))
|
|
try:
|
|
page_count = doc.page_count
|
|
if page_count <= page_number:
|
|
return {
|
|
"status": "render_failed",
|
|
"error": "page_number_out_of_range",
|
|
"page_count": page_count,
|
|
"rendered_pages": [],
|
|
}
|
|
|
|
page = doc.load_page(page_number)
|
|
matrix = fitz.Matrix(dpi / 72.0, dpi / 72.0)
|
|
pix = page.get_pixmap(matrix=matrix, alpha=False)
|
|
pix.save(str(png_path))
|
|
|
|
return {
|
|
"status": "image_rendered",
|
|
"page_count": page_count,
|
|
"rendered_pages": [
|
|
{
|
|
"page": page_number + 1,
|
|
"png_path": str(png_path),
|
|
"width": pix.width,
|
|
"height": pix.height,
|
|
"dpi": dpi,
|
|
}
|
|
],
|
|
}
|
|
finally:
|
|
doc.close()
|
|
|
|
|
|
|
|
def _detect_visual_regions(png_path: str | Path) -> list[dict[str, Any]]:
|
|
"""
|
|
Detect coarse visual/text regions from a rendered document image.
|
|
|
|
This is intentionally conservative. It does not replace OCR boxes yet;
|
|
it gives the vision pipeline a first set of image-derived regions that
|
|
can later be scored, merged, or sent to a VLM.
|
|
"""
|
|
if cv2 is None:
|
|
return []
|
|
|
|
img = cv2.imread(str(png_path))
|
|
if img is None:
|
|
return []
|
|
|
|
height, width = img.shape[:2]
|
|
page_area = float(width * height)
|
|
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Convert dark text/lines to white foreground.
|
|
thresh = cv2.adaptiveThreshold(
|
|
gray,
|
|
255,
|
|
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY_INV,
|
|
35,
|
|
15,
|
|
)
|
|
|
|
# Merge nearby characters into coarse rows/regions.
|
|
kernel_w = max(12, width // 90)
|
|
kernel_h = max(3, height // 350)
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_w, kernel_h))
|
|
merged = cv2.dilate(thresh, kernel, iterations=2)
|
|
|
|
contours, _ = cv2.findContours(merged, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
regions: list[dict[str, Any]] = []
|
|
for contour in contours:
|
|
x, y, w, h = cv2.boundingRect(contour)
|
|
area = float(w * h)
|
|
|
|
if area < page_area * 0.00008:
|
|
continue
|
|
if w < width * 0.04 or h < 4:
|
|
continue
|
|
if area > page_area * 0.65:
|
|
continue
|
|
|
|
regions.append(
|
|
{
|
|
"bbox": [int(x), int(y), int(x + w), int(y + h)],
|
|
"label": "cv_region",
|
|
"confidence": 0.35,
|
|
"source": "opencv_adaptive_threshold_contours",
|
|
"page": 1,
|
|
}
|
|
)
|
|
|
|
# Stable reading-ish order.
|
|
regions.sort(key=lambda r: (r["bbox"][1], r["bbox"][0]))
|
|
|
|
# Avoid huge payloads for now.
|
|
return regions[:200]
|
|
|
|
def analyze_document_image(image_path: str | Path, *, model_name: str = "placeholder") -> dict[str, Any]:
|
|
"""
|
|
Backend-only vision analysis entrypoint.
|
|
|
|
Current phase:
|
|
- renders the first PDF page to PNG
|
|
- returns normalized metadata for later CV/Ollama processing
|
|
"""
|
|
path = Path(image_path)
|
|
|
|
render_result: dict[str, Any]
|
|
if path.exists() and path.suffix.lower() == ".pdf":
|
|
render_result = _render_pdf_page_to_png(path)
|
|
elif path.exists():
|
|
render_result = {
|
|
"status": "image_available",
|
|
"rendered_pages": [
|
|
{
|
|
"page": 1,
|
|
"png_path": str(path),
|
|
"width": None,
|
|
"height": None,
|
|
"dpi": None,
|
|
}
|
|
],
|
|
}
|
|
else:
|
|
render_result = {
|
|
"status": "source_missing",
|
|
"error": "image_path_does_not_exist",
|
|
"rendered_pages": [],
|
|
}
|
|
|
|
rendered_pages = render_result.get("rendered_pages") or []
|
|
vision_regions: list[dict[str, Any]] = []
|
|
if rendered_pages and rendered_pages[0].get("png_path"):
|
|
vision_regions = _detect_visual_regions(rendered_pages[0]["png_path"])
|
|
|
|
return {
|
|
"schema_version": "vision_analysis_v1",
|
|
"engine": "local",
|
|
"model_name": model_name,
|
|
"image_path": str(path),
|
|
**render_result,
|
|
"layers": {
|
|
"vision_regions": vision_regions,
|
|
"vision_lines": [],
|
|
"vision_boxes": [],
|
|
"vision_fields": [],
|
|
"vision_line_items": [],
|
|
},
|
|
"notes": [
|
|
"Vision module rendered/located image input.",
|
|
"OpenCV coarse region detection has run when available.",
|
|
"No CV/Ollama model is connected yet.",
|
|
],
|
|
}
|
|
|
|
|
|
|
|
def _bbox_iou(a: list[float] | tuple[float, ...], b: list[float] | tuple[float, ...]) -> float:
|
|
ax1, ay1, ax2, ay2 = [float(v) for v in a[:4]]
|
|
bx1, by1, bx2, by2 = [float(v) for v in b[:4]]
|
|
|
|
ix1 = max(ax1, bx1)
|
|
iy1 = max(ay1, by1)
|
|
ix2 = min(ax2, bx2)
|
|
iy2 = min(ay2, by2)
|
|
|
|
iw = max(0.0, ix2 - ix1)
|
|
ih = max(0.0, iy2 - iy1)
|
|
inter = iw * ih
|
|
|
|
area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1)
|
|
area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1)
|
|
denom = area_a + area_b - inter
|
|
|
|
return inter / denom if denom else 0.0
|
|
|
|
|
|
def _region_contains_ratio(region: list[float] | tuple[float, ...], item: list[float] | tuple[float, ...]) -> float:
|
|
rx1, ry1, rx2, ry2 = [float(v) for v in region[:4]]
|
|
ix1, iy1, ix2, iy2 = [float(v) for v in item[:4]]
|
|
|
|
x1 = max(rx1, ix1)
|
|
y1 = max(ry1, iy1)
|
|
x2 = min(rx2, ix2)
|
|
y2 = min(ry2, iy2)
|
|
|
|
inter = max(0.0, x2 - x1) * max(0.0, y2 - y1)
|
|
item_area = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1)
|
|
|
|
return inter / item_area if item_area else 0.0
|
|
|
|
|
|
|
|
def _scale_bbox(
|
|
bbox: list[float] | tuple[float, ...],
|
|
*,
|
|
scale_x: float,
|
|
scale_y: float,
|
|
) -> list[float]:
|
|
x1, y1, x2, y2 = [float(v) for v in bbox[:4]]
|
|
return [
|
|
x1 * scale_x,
|
|
y1 * scale_y,
|
|
x2 * scale_x,
|
|
y2 * scale_y,
|
|
]
|
|
|
|
|
|
def normalize_vision_regions_to_layout(
|
|
vision_result: dict[str, Any],
|
|
layout_json: dict[str, Any] | None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Convert OpenCV rendered-image pixel coordinates into layout_json page coordinates.
|
|
"""
|
|
pages = (layout_json or {}).get("pages") or []
|
|
rendered_pages = vision_result.get("rendered_pages") or []
|
|
layers = vision_result.setdefault("layers", {})
|
|
regions = layers.get("vision_regions") or []
|
|
|
|
if not pages or not rendered_pages or not regions:
|
|
vision_result["coordinate_space"] = "unknown_or_unscaled"
|
|
return vision_result
|
|
|
|
page = pages[0]
|
|
rendered = rendered_pages[0]
|
|
|
|
layout_w = float(page.get("page_width") or page.get("width") or 0)
|
|
layout_h = float(page.get("page_height") or page.get("height") or 0)
|
|
rendered_w = float(rendered.get("width") or 0)
|
|
rendered_h = float(rendered.get("height") or 0)
|
|
|
|
if not layout_w or not layout_h or not rendered_w or not rendered_h:
|
|
vision_result["coordinate_space"] = "rendered_pixels_unscaled"
|
|
return vision_result
|
|
|
|
scale_x = layout_w / rendered_w
|
|
scale_y = layout_h / rendered_h
|
|
|
|
normalized = []
|
|
for region in regions:
|
|
bbox = region.get("bbox")
|
|
if not bbox:
|
|
continue
|
|
|
|
item = dict(region)
|
|
item["rendered_bbox"] = bbox
|
|
item["bbox"] = _scale_bbox(bbox, scale_x=scale_x, scale_y=scale_y)
|
|
item["coordinate_space"] = "layout_page"
|
|
item["scale_x"] = scale_x
|
|
item["scale_y"] = scale_y
|
|
normalized.append(item)
|
|
|
|
layers["vision_regions"] = normalized
|
|
vision_result["coordinate_space"] = "layout_page"
|
|
vision_result["coordinate_normalization"] = {
|
|
"source": "rendered_pixels",
|
|
"target": "layout_page",
|
|
"layout_width": layout_w,
|
|
"layout_height": layout_h,
|
|
"rendered_width": rendered_w,
|
|
"rendered_height": rendered_h,
|
|
"scale_x": scale_x,
|
|
"scale_y": scale_y,
|
|
}
|
|
return vision_result
|
|
|
|
def score_vision_regions_against_layout(
|
|
vision_result: dict[str, Any],
|
|
layout_json: dict[str, Any] | None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Compare OpenCV regions against existing OCR layout lines.
|
|
|
|
Purpose:
|
|
- measure whether CV regions line up with OCR line boxes
|
|
- identify OCR lines not covered by CV regions
|
|
- identify CV regions with no OCR coverage
|
|
"""
|
|
pages = (layout_json or {}).get("pages") or []
|
|
regions = ((vision_result.get("layers") or {}).get("vision_regions")) or []
|
|
|
|
if not pages or not regions:
|
|
return {
|
|
"schema_version": "vision_region_scoring_v1",
|
|
"status": "not_enough_data",
|
|
"page_scores": [],
|
|
"summary": {
|
|
"pages": len(pages),
|
|
"regions": len(regions),
|
|
"lines": 0,
|
|
"matched_lines": 0,
|
|
"unmatched_lines": 0,
|
|
"unmatched_regions": len(regions),
|
|
},
|
|
}
|
|
|
|
page_scores: list[dict[str, Any]] = []
|
|
total_lines = 0
|
|
total_matched_lines = 0
|
|
total_unmatched_regions = 0
|
|
|
|
for page in pages:
|
|
page_number = int(page.get("page") or 1)
|
|
page_lines = page.get("lines") or []
|
|
page_regions = [r for r in regions if int(r.get("page") or 1) == page_number]
|
|
|
|
matched_region_indexes: set[int] = set()
|
|
line_scores: list[dict[str, Any]] = []
|
|
|
|
for line in page_lines:
|
|
bbox = line.get("bbox")
|
|
if not bbox:
|
|
continue
|
|
|
|
best = {
|
|
"region_index": None,
|
|
"iou": 0.0,
|
|
"contains_ratio": 0.0,
|
|
"region_bbox": None,
|
|
}
|
|
|
|
for idx, region in enumerate(page_regions):
|
|
region_bbox = region.get("bbox")
|
|
if not region_bbox:
|
|
continue
|
|
|
|
iou = _bbox_iou(region_bbox, bbox)
|
|
contains = _region_contains_ratio(region_bbox, bbox)
|
|
score = max(iou, contains)
|
|
|
|
if score > max(best["iou"], best["contains_ratio"]):
|
|
best = {
|
|
"region_index": idx,
|
|
"iou": round(iou, 4),
|
|
"contains_ratio": round(contains, 4),
|
|
"region_bbox": region_bbox,
|
|
}
|
|
|
|
matched = (best["contains_ratio"] >= 0.55) or (best["iou"] >= 0.10)
|
|
if matched and best["region_index"] is not None:
|
|
matched_region_indexes.add(int(best["region_index"]))
|
|
|
|
line_scores.append(
|
|
{
|
|
"line_text": str(line.get("text") or "")[:120],
|
|
"line_bbox": bbox,
|
|
"matched": matched,
|
|
**best,
|
|
}
|
|
)
|
|
|
|
total_lines += len(line_scores)
|
|
matched_lines = sum(1 for item in line_scores if item["matched"])
|
|
total_matched_lines += matched_lines
|
|
|
|
unmatched_region_indexes = [
|
|
idx for idx in range(len(page_regions)) if idx not in matched_region_indexes
|
|
]
|
|
total_unmatched_regions += len(unmatched_region_indexes)
|
|
|
|
page_scores.append(
|
|
{
|
|
"page": page_number,
|
|
"line_count": len(line_scores),
|
|
"region_count": len(page_regions),
|
|
"matched_line_count": matched_lines,
|
|
"unmatched_line_count": len(line_scores) - matched_lines,
|
|
"unmatched_region_count": len(unmatched_region_indexes),
|
|
"line_scores": line_scores[:200],
|
|
"unmatched_regions": [
|
|
page_regions[idx] for idx in unmatched_region_indexes[:100]
|
|
],
|
|
}
|
|
)
|
|
|
|
return {
|
|
"schema_version": "vision_region_scoring_v1",
|
|
"status": "scored",
|
|
"page_scores": page_scores,
|
|
"summary": {
|
|
"pages": len(pages),
|
|
"regions": len(regions),
|
|
"lines": total_lines,
|
|
"matched_lines": total_matched_lines,
|
|
"unmatched_lines": total_lines - total_matched_lines,
|
|
"unmatched_regions": total_unmatched_regions,
|
|
},
|
|
}
|
|
|
|
|
|
def _classify_region_geometry(region: dict[str, Any], *, page_width: float, page_height: float) -> dict[str, Any]:
|
|
bbox = region.get("bbox") or [0, 0, 0, 0]
|
|
x1, y1, x2, y2 = [float(v) for v in bbox[:4]]
|
|
w = max(0.0, x2 - x1)
|
|
h = max(0.0, y2 - y1)
|
|
area = w * h
|
|
page_area = max(1.0, page_width * page_height)
|
|
aspect = w / h if h else 0.0
|
|
|
|
label = "unknown_region"
|
|
confidence = 0.20
|
|
|
|
if area > page_area * 0.18:
|
|
label = "large_document_region"
|
|
confidence = 0.35
|
|
elif w > page_width * 0.70 and aspect > 4:
|
|
label = "wide_text_band"
|
|
confidence = 0.45
|
|
elif h > page_height * 0.10 and w > page_width * 0.35:
|
|
label = "large_text_block"
|
|
confidence = 0.40
|
|
elif aspect > 8:
|
|
label = "horizontal_rule_or_text_band"
|
|
confidence = 0.35
|
|
elif w < page_width * 0.12 and h < page_height * 0.06:
|
|
label = "small_symbol_or_short_text"
|
|
confidence = 0.30
|
|
|
|
item = dict(region)
|
|
item["geometry_class"] = label
|
|
item["geometry_confidence"] = confidence
|
|
item["geometry_features"] = {
|
|
"width": w,
|
|
"height": h,
|
|
"area_ratio": area / page_area,
|
|
"aspect_ratio": aspect,
|
|
}
|
|
return item
|
|
|
|
|
|
def _write_region_crop(
|
|
png_path: str | Path,
|
|
region: dict[str, Any],
|
|
*,
|
|
crop_index: int,
|
|
padding_px: int = 8,
|
|
) -> str | None:
|
|
if cv2 is None:
|
|
return None
|
|
|
|
img = cv2.imread(str(png_path))
|
|
if img is None:
|
|
return None
|
|
|
|
height, width = img.shape[:2]
|
|
bbox = region.get("rendered_bbox") or region.get("bbox")
|
|
if not bbox:
|
|
return None
|
|
|
|
x1, y1, x2, y2 = [int(round(float(v))) for v in bbox[:4]]
|
|
x1 = max(0, x1 - padding_px)
|
|
y1 = max(0, y1 - padding_px)
|
|
x2 = min(width, x2 + padding_px)
|
|
y2 = min(height, y2 + padding_px)
|
|
|
|
if x2 <= x1 or y2 <= y1:
|
|
return None
|
|
|
|
crop = img[y1:y2, x1:x2]
|
|
crop_dir = Path(png_path).parent / "crops" / Path(png_path).stem
|
|
crop_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
crop_path = crop_dir / f"region_{crop_index:04d}.png"
|
|
cv2.imwrite(str(crop_path), crop)
|
|
return str(crop_path)
|
|
|
|
|
|
|
|
def _ocr_crop(crop_path: str | Path) -> dict[str, Any]:
|
|
"""
|
|
Run OCR over a cropped unmatched region.
|
|
|
|
Returns lightweight text/confidence metadata only. Full OCR/layout merging
|
|
remains a later step.
|
|
"""
|
|
if pytesseract is None:
|
|
return {
|
|
"ocr_status": "unavailable",
|
|
"ocr_engine": "tesseract",
|
|
"ocr_text": "",
|
|
"ocr_confidence": None,
|
|
}
|
|
|
|
path = Path(crop_path)
|
|
if not path.exists():
|
|
return {
|
|
"ocr_status": "missing_crop",
|
|
"ocr_engine": "tesseract",
|
|
"ocr_text": "",
|
|
"ocr_confidence": None,
|
|
}
|
|
|
|
try:
|
|
data = pytesseract.image_to_data(
|
|
str(path),
|
|
output_type=pytesseract.Output.DICT,
|
|
config="--psm 6",
|
|
)
|
|
except Exception as e:
|
|
return {
|
|
"ocr_status": "error",
|
|
"ocr_engine": "tesseract",
|
|
"ocr_error": repr(e),
|
|
"ocr_text": "",
|
|
"ocr_confidence": None,
|
|
}
|
|
|
|
words: list[str] = []
|
|
confidences: list[float] = []
|
|
|
|
for text, conf in zip(data.get("text", []), data.get("conf", [])):
|
|
text = str(text or "").strip()
|
|
if not text:
|
|
continue
|
|
try:
|
|
c = float(conf)
|
|
except Exception:
|
|
c = -1.0
|
|
if c >= 0:
|
|
confidences.append(c)
|
|
words.append(text)
|
|
|
|
ocr_text = " ".join(words).strip()
|
|
avg_conf = round(sum(confidences) / len(confidences), 2) if confidences else None
|
|
|
|
return {
|
|
"ocr_status": "ok" if ocr_text else "no_text",
|
|
"ocr_engine": "tesseract",
|
|
"ocr_psm": 6,
|
|
"ocr_text": ocr_text,
|
|
"ocr_confidence": avg_conf,
|
|
}
|
|
|
|
def classify_and_crop_unmatched_regions(
|
|
vision_result: dict[str, Any],
|
|
layout_json: dict[str, Any] | None,
|
|
region_score: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Classify unmatched CV regions and write region crop images for later VLM analysis.
|
|
"""
|
|
pages = (layout_json or {}).get("pages") or []
|
|
rendered_pages = vision_result.get("rendered_pages") or []
|
|
if not pages or not rendered_pages:
|
|
return {
|
|
"schema_version": "vision_region_classification_v1",
|
|
"status": "not_enough_data",
|
|
"classified_regions": [],
|
|
}
|
|
|
|
page = pages[0]
|
|
page_width = float(page.get("page_width") or page.get("width") or 1)
|
|
page_height = float(page.get("page_height") or page.get("height") or 1)
|
|
png_path = rendered_pages[0].get("png_path")
|
|
|
|
unmatched_regions: list[dict[str, Any]] = []
|
|
if region_score:
|
|
for page_score in region_score.get("page_scores") or []:
|
|
unmatched_regions.extend(page_score.get("unmatched_regions") or [])
|
|
|
|
if not unmatched_regions:
|
|
unmatched_regions = ((vision_result.get("layers") or {}).get("vision_regions")) or []
|
|
|
|
classified: list[dict[str, Any]] = []
|
|
for idx, region in enumerate(unmatched_regions):
|
|
item = _classify_region_geometry(region, page_width=page_width, page_height=page_height)
|
|
if png_path:
|
|
item["crop_path"] = _write_region_crop(png_path, item, crop_index=idx)
|
|
if item.get("crop_path"):
|
|
item.update(_ocr_crop(item["crop_path"]))
|
|
item["classification_source"] = "opencv_geometry_classifier"
|
|
classified.append(item)
|
|
|
|
return {
|
|
"schema_version": "vision_region_classification_v1",
|
|
"status": "classified",
|
|
"classified_region_count": len(classified),
|
|
"classified_regions": classified,
|
|
}
|
|
|
|
def build_vision_assisted_layout(source_layout: dict[str, Any] | None, vision_result: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
Convert vision analysis into normal layout_json.
|
|
|
|
Current phase:
|
|
- normalizes CV coordinates into layout page coordinates
|
|
- scores CV region coverage against OCR layout lines
|
|
- preserves the current source layout for editor stability
|
|
- stores diagnostics on the layout candidate
|
|
"""
|
|
layout = dict(source_layout or {"pages": []})
|
|
|
|
normalized_vision = normalize_vision_regions_to_layout(vision_result, layout)
|
|
region_score = score_vision_regions_against_layout(normalized_vision, layout)
|
|
region_classification = classify_and_crop_unmatched_regions(
|
|
normalized_vision,
|
|
layout,
|
|
region_score,
|
|
)
|
|
|
|
layout["vision_assisted"] = True
|
|
layout["vision_assisted_status"] = normalized_vision.get("status", "unknown")
|
|
layout["vision_engine"] = normalized_vision.get("engine")
|
|
layout["vision_model_name"] = normalized_vision.get("model_name")
|
|
layout["vision_coordinate_normalization"] = normalized_vision.get("coordinate_normalization")
|
|
layout["vision_region_score"] = region_score
|
|
layout["vision_region_classification"] = region_classification
|
|
layout["layout_sync_source"] = "vision_assisted"
|
|
layout["layout_needs_review"] = True
|
|
return layout
|