document-processor/app/logic/vision_analysis.py

810 lines
25 KiB
Python

from __future__ import annotations
from pathlib import Path
from typing import Any
import hashlib
import tempfile
import re
try:
import fitz # PyMuPDF
except Exception: # pragma: no cover
fitz = None
try:
import cv2
except Exception: # pragma: no cover
cv2 = None
try:
import pytesseract
except Exception: # pragma: no cover
pytesseract = None
def _render_pdf_page_to_png(path: Path, *, page_number: int = 0, dpi: int = 200) -> dict[str, Any]:
if fitz is None:
return {
"status": "render_failed",
"error": "pymupdf_not_available",
"rendered_pages": [],
}
cache_root = Path(tempfile.gettempdir()) / "document_processor_vision"
cache_root.mkdir(parents=True, exist_ok=True)
digest = hashlib.sha256(str(path).encode("utf-8")).hexdigest()[:16]
png_path = cache_root / f"{path.stem}_{digest}_page{page_number + 1}_{dpi}dpi.png"
doc = fitz.open(str(path))
try:
page_count = doc.page_count
if page_count <= page_number:
return {
"status": "render_failed",
"error": "page_number_out_of_range",
"page_count": page_count,
"rendered_pages": [],
}
page = doc.load_page(page_number)
matrix = fitz.Matrix(dpi / 72.0, dpi / 72.0)
pix = page.get_pixmap(matrix=matrix, alpha=False)
pix.save(str(png_path))
return {
"status": "image_rendered",
"page_count": page_count,
"rendered_pages": [
{
"page": page_number + 1,
"png_path": str(png_path),
"width": pix.width,
"height": pix.height,
"dpi": dpi,
}
],
}
finally:
doc.close()
def _detect_visual_regions(png_path: str | Path) -> list[dict[str, Any]]:
"""
Detect coarse visual/text regions from a rendered document image.
This is intentionally conservative. It does not replace OCR boxes yet;
it gives the vision pipeline a first set of image-derived regions that
can later be scored, merged, or sent to a VLM.
"""
if cv2 is None:
return []
img = cv2.imread(str(png_path))
if img is None:
return []
height, width = img.shape[:2]
page_area = float(width * height)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Convert dark text/lines to white foreground.
thresh = cv2.adaptiveThreshold(
gray,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
35,
15,
)
# Merge nearby characters into coarse rows/regions.
kernel_w = max(12, width // 90)
kernel_h = max(3, height // 350)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_w, kernel_h))
merged = cv2.dilate(thresh, kernel, iterations=2)
contours, _ = cv2.findContours(merged, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
regions: list[dict[str, Any]] = []
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
area = float(w * h)
if area < page_area * 0.00008:
continue
if w < width * 0.04 or h < 4:
continue
if area > page_area * 0.65:
continue
regions.append(
{
"bbox": [int(x), int(y), int(x + w), int(y + h)],
"label": "cv_region",
"confidence": 0.35,
"source": "opencv_adaptive_threshold_contours",
"page": 1,
}
)
# Stable reading-ish order.
regions.sort(key=lambda r: (r["bbox"][1], r["bbox"][0]))
# Avoid huge payloads for now.
return regions[:200]
def analyze_document_image(image_path: str | Path, *, model_name: str = "placeholder") -> dict[str, Any]:
"""
Backend-only vision analysis entrypoint.
Current phase:
- renders the first PDF page to PNG
- returns normalized metadata for later CV/Ollama processing
"""
path = Path(image_path)
render_result: dict[str, Any]
if path.exists() and path.suffix.lower() == ".pdf":
render_result = _render_pdf_page_to_png(path)
elif path.exists():
render_result = {
"status": "image_available",
"rendered_pages": [
{
"page": 1,
"png_path": str(path),
"width": None,
"height": None,
"dpi": None,
}
],
}
else:
render_result = {
"status": "source_missing",
"error": "image_path_does_not_exist",
"rendered_pages": [],
}
rendered_pages = render_result.get("rendered_pages") or []
vision_regions: list[dict[str, Any]] = []
if rendered_pages and rendered_pages[0].get("png_path"):
vision_regions = _detect_visual_regions(rendered_pages[0]["png_path"])
return {
"schema_version": "vision_analysis_v1",
"engine": "local",
"model_name": model_name,
"image_path": str(path),
**render_result,
"layers": {
"vision_regions": vision_regions,
"vision_lines": [],
"vision_boxes": [],
"vision_fields": [],
"vision_line_items": [],
},
"notes": [
"Vision module rendered/located image input.",
"OpenCV coarse region detection has run when available.",
"No CV/Ollama model is connected yet.",
],
}
def _bbox_iou(a: list[float] | tuple[float, ...], b: list[float] | tuple[float, ...]) -> float:
ax1, ay1, ax2, ay2 = [float(v) for v in a[:4]]
bx1, by1, bx2, by2 = [float(v) for v in b[:4]]
ix1 = max(ax1, bx1)
iy1 = max(ay1, by1)
ix2 = min(ax2, bx2)
iy2 = min(ay2, by2)
iw = max(0.0, ix2 - ix1)
ih = max(0.0, iy2 - iy1)
inter = iw * ih
area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1)
area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1)
denom = area_a + area_b - inter
return inter / denom if denom else 0.0
def _region_contains_ratio(region: list[float] | tuple[float, ...], item: list[float] | tuple[float, ...]) -> float:
rx1, ry1, rx2, ry2 = [float(v) for v in region[:4]]
ix1, iy1, ix2, iy2 = [float(v) for v in item[:4]]
x1 = max(rx1, ix1)
y1 = max(ry1, iy1)
x2 = min(rx2, ix2)
y2 = min(ry2, iy2)
inter = max(0.0, x2 - x1) * max(0.0, y2 - y1)
item_area = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1)
return inter / item_area if item_area else 0.0
def _scale_bbox(
bbox: list[float] | tuple[float, ...],
*,
scale_x: float,
scale_y: float,
) -> list[float]:
x1, y1, x2, y2 = [float(v) for v in bbox[:4]]
return [
x1 * scale_x,
y1 * scale_y,
x2 * scale_x,
y2 * scale_y,
]
def normalize_vision_regions_to_layout(
vision_result: dict[str, Any],
layout_json: dict[str, Any] | None,
) -> dict[str, Any]:
"""
Convert OpenCV rendered-image pixel coordinates into layout_json page coordinates.
"""
pages = (layout_json or {}).get("pages") or []
rendered_pages = vision_result.get("rendered_pages") or []
layers = vision_result.setdefault("layers", {})
regions = layers.get("vision_regions") or []
if not pages or not rendered_pages or not regions:
vision_result["coordinate_space"] = "unknown_or_unscaled"
return vision_result
page = pages[0]
rendered = rendered_pages[0]
layout_w = float(page.get("page_width") or page.get("width") or 0)
layout_h = float(page.get("page_height") or page.get("height") or 0)
rendered_w = float(rendered.get("width") or 0)
rendered_h = float(rendered.get("height") or 0)
if not layout_w or not layout_h or not rendered_w or not rendered_h:
vision_result["coordinate_space"] = "rendered_pixels_unscaled"
return vision_result
scale_x = layout_w / rendered_w
scale_y = layout_h / rendered_h
normalized = []
for region in regions:
bbox = region.get("bbox")
if not bbox:
continue
item = dict(region)
item["rendered_bbox"] = bbox
item["bbox"] = _scale_bbox(bbox, scale_x=scale_x, scale_y=scale_y)
item["coordinate_space"] = "layout_page"
item["scale_x"] = scale_x
item["scale_y"] = scale_y
normalized.append(item)
layers["vision_regions"] = normalized
vision_result["coordinate_space"] = "layout_page"
vision_result["coordinate_normalization"] = {
"source": "rendered_pixels",
"target": "layout_page",
"layout_width": layout_w,
"layout_height": layout_h,
"rendered_width": rendered_w,
"rendered_height": rendered_h,
"scale_x": scale_x,
"scale_y": scale_y,
}
return vision_result
def score_vision_regions_against_layout(
vision_result: dict[str, Any],
layout_json: dict[str, Any] | None,
) -> dict[str, Any]:
"""
Compare OpenCV regions against existing OCR layout lines.
Purpose:
- measure whether CV regions line up with OCR line boxes
- identify OCR lines not covered by CV regions
- identify CV regions with no OCR coverage
"""
pages = (layout_json or {}).get("pages") or []
regions = ((vision_result.get("layers") or {}).get("vision_regions")) or []
if not pages or not regions:
return {
"schema_version": "vision_region_scoring_v1",
"status": "not_enough_data",
"page_scores": [],
"summary": {
"pages": len(pages),
"regions": len(regions),
"lines": 0,
"matched_lines": 0,
"unmatched_lines": 0,
"unmatched_regions": len(regions),
},
}
page_scores: list[dict[str, Any]] = []
total_lines = 0
total_matched_lines = 0
total_unmatched_regions = 0
for page in pages:
page_number = int(page.get("page") or 1)
page_lines = page.get("lines") or []
page_regions = [r for r in regions if int(r.get("page") or 1) == page_number]
matched_region_indexes: set[int] = set()
line_scores: list[dict[str, Any]] = []
for line in page_lines:
bbox = line.get("bbox")
if not bbox:
continue
best = {
"region_index": None,
"iou": 0.0,
"contains_ratio": 0.0,
"region_bbox": None,
}
for idx, region in enumerate(page_regions):
region_bbox = region.get("bbox")
if not region_bbox:
continue
iou = _bbox_iou(region_bbox, bbox)
contains = _region_contains_ratio(region_bbox, bbox)
score = max(iou, contains)
if score > max(best["iou"], best["contains_ratio"]):
best = {
"region_index": idx,
"iou": round(iou, 4),
"contains_ratio": round(contains, 4),
"region_bbox": region_bbox,
}
matched = (best["contains_ratio"] >= 0.55) or (best["iou"] >= 0.10)
if matched and best["region_index"] is not None:
matched_region_indexes.add(int(best["region_index"]))
line_scores.append(
{
"line_text": str(line.get("text") or "")[:120],
"line_bbox": bbox,
"matched": matched,
**best,
}
)
total_lines += len(line_scores)
matched_lines = sum(1 for item in line_scores if item["matched"])
total_matched_lines += matched_lines
unmatched_region_indexes = [
idx for idx in range(len(page_regions)) if idx not in matched_region_indexes
]
total_unmatched_regions += len(unmatched_region_indexes)
page_scores.append(
{
"page": page_number,
"line_count": len(line_scores),
"region_count": len(page_regions),
"matched_line_count": matched_lines,
"unmatched_line_count": len(line_scores) - matched_lines,
"unmatched_region_count": len(unmatched_region_indexes),
"line_scores": line_scores[:200],
"unmatched_regions": [
page_regions[idx] for idx in unmatched_region_indexes[:100]
],
}
)
return {
"schema_version": "vision_region_scoring_v1",
"status": "scored",
"page_scores": page_scores,
"summary": {
"pages": len(pages),
"regions": len(regions),
"lines": total_lines,
"matched_lines": total_matched_lines,
"unmatched_lines": total_lines - total_matched_lines,
"unmatched_regions": total_unmatched_regions,
},
}
def _classify_region_geometry(region: dict[str, Any], *, page_width: float, page_height: float) -> dict[str, Any]:
bbox = region.get("bbox") or [0, 0, 0, 0]
x1, y1, x2, y2 = [float(v) for v in bbox[:4]]
w = max(0.0, x2 - x1)
h = max(0.0, y2 - y1)
area = w * h
page_area = max(1.0, page_width * page_height)
aspect = w / h if h else 0.0
label = "unknown_region"
confidence = 0.20
if area > page_area * 0.18:
label = "large_document_region"
confidence = 0.35
elif w > page_width * 0.70 and aspect > 4:
label = "wide_text_band"
confidence = 0.45
elif h > page_height * 0.10 and w > page_width * 0.35:
label = "large_text_block"
confidence = 0.40
elif aspect > 8:
label = "horizontal_rule_or_text_band"
confidence = 0.35
elif w < page_width * 0.12 and h < page_height * 0.06:
label = "small_symbol_or_short_text"
confidence = 0.30
item = dict(region)
item["geometry_class"] = label
item["geometry_confidence"] = confidence
item["geometry_features"] = {
"width": w,
"height": h,
"area_ratio": area / page_area,
"aspect_ratio": aspect,
}
return item
def _write_region_crop(
png_path: str | Path,
region: dict[str, Any],
*,
crop_index: int,
padding_px: int = 8,
) -> str | None:
if cv2 is None:
return None
img = cv2.imread(str(png_path))
if img is None:
return None
height, width = img.shape[:2]
bbox = region.get("rendered_bbox") or region.get("bbox")
if not bbox:
return None
x1, y1, x2, y2 = [int(round(float(v))) for v in bbox[:4]]
x1 = max(0, x1 - padding_px)
y1 = max(0, y1 - padding_px)
x2 = min(width, x2 + padding_px)
y2 = min(height, y2 + padding_px)
if x2 <= x1 or y2 <= y1:
return None
crop = img[y1:y2, x1:x2]
crop_dir = Path(png_path).parent / "crops" / Path(png_path).stem
crop_dir.mkdir(parents=True, exist_ok=True)
crop_path = crop_dir / f"region_{crop_index:04d}.png"
cv2.imwrite(str(crop_path), crop)
return str(crop_path)
def _ocr_crop(crop_path: str | Path) -> dict[str, Any]:
"""
Run OCR over a cropped unmatched region.
Returns lightweight text/confidence metadata only. Full OCR/layout merging
remains a later step.
"""
if pytesseract is None:
return {
"ocr_status": "unavailable",
"ocr_engine": "tesseract",
"ocr_text": "",
"ocr_confidence": None,
}
path = Path(crop_path)
if not path.exists():
return {
"ocr_status": "missing_crop",
"ocr_engine": "tesseract",
"ocr_text": "",
"ocr_confidence": None,
}
try:
data = pytesseract.image_to_data(
str(path),
output_type=pytesseract.Output.DICT,
config="--psm 6",
)
except Exception as e:
return {
"ocr_status": "error",
"ocr_engine": "tesseract",
"ocr_error": repr(e),
"ocr_text": "",
"ocr_confidence": None,
}
words: list[str] = []
confidences: list[float] = []
for text, conf in zip(data.get("text", []), data.get("conf", [])):
text = str(text or "").strip()
if not text:
continue
try:
c = float(conf)
except Exception:
c = -1.0
if c >= 0:
confidences.append(c)
words.append(text)
ocr_text = " ".join(words).strip()
avg_conf = round(sum(confidences) / len(confidences), 2) if confidences else None
return {
"ocr_status": "ok" if ocr_text else "no_text",
"ocr_engine": "tesseract",
"ocr_psm": 6,
"ocr_text": ocr_text,
"ocr_confidence": avg_conf,
}
def classify_and_crop_unmatched_regions(
vision_result: dict[str, Any],
layout_json: dict[str, Any] | None,
region_score: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Classify unmatched CV regions and write region crop images for later VLM analysis.
"""
pages = (layout_json or {}).get("pages") or []
rendered_pages = vision_result.get("rendered_pages") or []
if not pages or not rendered_pages:
return {
"schema_version": "vision_region_classification_v1",
"status": "not_enough_data",
"classified_regions": [],
}
page = pages[0]
page_width = float(page.get("page_width") or page.get("width") or 1)
page_height = float(page.get("page_height") or page.get("height") or 1)
png_path = rendered_pages[0].get("png_path")
unmatched_regions: list[dict[str, Any]] = []
if region_score:
for page_score in region_score.get("page_scores") or []:
unmatched_regions.extend(page_score.get("unmatched_regions") or [])
if not unmatched_regions:
unmatched_regions = ((vision_result.get("layers") or {}).get("vision_regions")) or []
classified: list[dict[str, Any]] = []
for idx, region in enumerate(unmatched_regions):
item = _classify_region_geometry(region, page_width=page_width, page_height=page_height)
if png_path:
item["crop_path"] = _write_region_crop(png_path, item, crop_index=idx)
if item.get("crop_path"):
item.update(_ocr_crop(item["crop_path"]))
item["classification_source"] = "opencv_geometry_classifier"
classified.append(item)
return {
"schema_version": "vision_region_classification_v1",
"status": "classified",
"classified_region_count": len(classified),
"classified_regions": classified,
}
def build_vision_candidate_fields(classification: dict[str, Any]) -> list[dict[str, Any]]:
"""
Convert crop OCR/classification results into lightweight structured field candidates.
"""
fields: list[dict[str, Any]] = []
regions = classification.get("classified_regions") or []
money_re = re.compile(r"(?<!\d)(?:\$?\s*)\d+\.\d{2}[A-Z]?(?!\d)")
time_re = re.compile(r"\b\d{1,2}:\d{2}\s*(?:AM|PM)?\b", re.IGNORECASE)
item_count_re = re.compile(r"\b\d+\s+ITEMS?\b", re.IGNORECASE)
for idx, region in enumerate(regions):
text = str(region.get("ocr_text") or "").strip()
if not text:
continue
conf = region.get("ocr_confidence")
base = {
"source": "vision_crop_ocr",
"source_region_index": idx,
"source_bbox": region.get("bbox"),
"source_crop_path": region.get("crop_path"),
"ocr_confidence": conf,
"geometry_class": region.get("geometry_class"),
}
lower = text.lower()
if any(token in lower for token in ["extracare", "member", "coupon", "survey", "rewards"]):
fields.append({
**base,
"candidate_type": "receipt_message",
"value": text,
"confidence": 0.45 if (conf or 0) >= 60 else 0.25,
})
elif "cvs" in lower or "pharmacy" in lower:
fields.append({
**base,
"candidate_type": "merchant_or_header",
"value": text,
"confidence": 0.75 if (conf or 0) >= 70 else 0.45,
})
if time_re.search(text):
fields.append({
**base,
"candidate_type": "transaction_time",
"value": time_re.search(text).group(0),
"raw_text": text,
"confidence": 0.80 if (conf or 0) >= 70 else 0.50,
})
if item_count_re.search(text):
fields.append({
**base,
"candidate_type": "item_count",
"value": item_count_re.search(text).group(0).upper(),
"raw_text": text,
"confidence": 0.65 if (conf or 0) >= 50 else 0.40,
})
money_matches = money_re.findall(text)
if money_matches:
fields.append({
**base,
"candidate_type": "money_amounts",
"value": money_matches,
"raw_text": text,
"confidence": 0.65 if (conf or 0) >= 50 else 0.35,
})
# Capture low-value symbol/noise so later filtering can learn from it.
if len(text) <= 3 and not money_matches and not time_re.search(text):
fields.append({
**base,
"candidate_type": "symbol_or_noise",
"value": text,
"confidence": 0.20,
})
return fields
def build_vision_field_suggestions(
candidate_fields: list[dict[str, Any]],
existing_fields: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""
Convert vision candidate fields into simple add/update/ignore suggestions.
This intentionally stays conservative:
- high confidence merchant/time/item_count/money candidates are surfaced
- symbol/noise is ignored
- existing field comparison can be expanded later
"""
existing_fields = existing_fields or {}
suggestions: list[dict[str, Any]] = []
type_to_existing_key = {
"merchant_or_header": "merchant_raw",
"transaction_time": "transaction_time",
"item_count": "item_count",
"money_amounts": "amount_candidates",
}
for field in candidate_fields or []:
candidate_type = field.get("candidate_type")
if candidate_type in {"symbol_or_noise", "receipt_message"}:
continue
confidence = float(field.get("confidence") or 0)
ocr_confidence = field.get("ocr_confidence")
value = field.get("value")
if not value:
continue
min_conf = 0.40
if candidate_type in {"merchant_or_header", "transaction_time"}:
min_conf = 0.60
elif candidate_type == "money_amounts":
min_conf = 0.50
if confidence < min_conf:
continue
existing_key = type_to_existing_key.get(candidate_type, candidate_type)
existing_value = existing_fields.get(existing_key)
action = "add"
if existing_value:
action = "review_update" if str(existing_value).strip() != str(value).strip() else "already_present"
suggestions.append(
{
"suggestion_type": candidate_type,
"target_field": existing_key,
"action": action,
"value": value,
"existing_value": existing_value,
"confidence": confidence,
"ocr_confidence": ocr_confidence,
"source": "vision_candidate_fields",
"source_region_index": field.get("source_region_index"),
"source_bbox": field.get("source_bbox"),
"source_crop_path": field.get("source_crop_path"),
"raw_text": field.get("raw_text"),
}
)
return suggestions
def build_vision_assisted_layout(source_layout: dict[str, Any] | None, vision_result: dict[str, Any]) -> dict[str, Any]:
"""
Convert vision analysis into normal layout_json.
Current phase:
- normalizes CV coordinates into layout page coordinates
- scores CV region coverage against OCR layout lines
- preserves the current source layout for editor stability
- stores diagnostics on the layout candidate
"""
layout = dict(source_layout or {"pages": []})
normalized_vision = normalize_vision_regions_to_layout(vision_result, layout)
region_score = score_vision_regions_against_layout(normalized_vision, layout)
region_classification = classify_and_crop_unmatched_regions(
normalized_vision,
layout,
region_score,
)
candidate_fields = build_vision_candidate_fields(region_classification)
field_suggestions = build_vision_field_suggestions(candidate_fields)
layout["vision_assisted"] = True
layout["vision_assisted_status"] = normalized_vision.get("status", "unknown")
layout["vision_engine"] = normalized_vision.get("engine")
layout["vision_model_name"] = normalized_vision.get("model_name")
layout["vision_coordinate_normalization"] = normalized_vision.get("coordinate_normalization")
layout["vision_region_score"] = region_score
layout["vision_region_classification"] = region_classification
layout["vision_candidate_fields"] = candidate_fields
layout["vision_field_suggestions"] = field_suggestions
layout["layout_sync_source"] = "vision_assisted"
layout["layout_needs_review"] = True
return layout