from __future__ import annotations from pathlib import Path from typing import Any import hashlib import tempfile import re try: import fitz # PyMuPDF except Exception: # pragma: no cover fitz = None try: import cv2 except Exception: # pragma: no cover cv2 = None try: import pytesseract except Exception: # pragma: no cover pytesseract = None def _render_pdf_page_to_png(path: Path, *, page_number: int = 0, dpi: int = 200) -> dict[str, Any]: if fitz is None: return { "status": "render_failed", "error": "pymupdf_not_available", "rendered_pages": [], } cache_root = Path(tempfile.gettempdir()) / "document_processor_vision" cache_root.mkdir(parents=True, exist_ok=True) digest = hashlib.sha256(str(path).encode("utf-8")).hexdigest()[:16] png_path = cache_root / f"{path.stem}_{digest}_page{page_number + 1}_{dpi}dpi.png" doc = fitz.open(str(path)) try: page_count = doc.page_count if page_count <= page_number: return { "status": "render_failed", "error": "page_number_out_of_range", "page_count": page_count, "rendered_pages": [], } page = doc.load_page(page_number) matrix = fitz.Matrix(dpi / 72.0, dpi / 72.0) pix = page.get_pixmap(matrix=matrix, alpha=False) pix.save(str(png_path)) return { "status": "image_rendered", "page_count": page_count, "rendered_pages": [ { "page": page_number + 1, "png_path": str(png_path), "width": pix.width, "height": pix.height, "dpi": dpi, } ], } finally: doc.close() def _detect_visual_regions(png_path: str | Path) -> list[dict[str, Any]]: """ Detect coarse visual/text regions from a rendered document image. This is intentionally conservative. It does not replace OCR boxes yet; it gives the vision pipeline a first set of image-derived regions that can later be scored, merged, or sent to a VLM. """ if cv2 is None: return [] img = cv2.imread(str(png_path)) if img is None: return [] height, width = img.shape[:2] page_area = float(width * height) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Convert dark text/lines to white foreground. thresh = cv2.adaptiveThreshold( gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 35, 15, ) # Merge nearby characters into coarse rows/regions. kernel_w = max(12, width // 90) kernel_h = max(3, height // 350) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_w, kernel_h)) merged = cv2.dilate(thresh, kernel, iterations=2) contours, _ = cv2.findContours(merged, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) regions: list[dict[str, Any]] = [] for contour in contours: x, y, w, h = cv2.boundingRect(contour) area = float(w * h) if area < page_area * 0.00008: continue if w < width * 0.04 or h < 4: continue if area > page_area * 0.65: continue regions.append( { "bbox": [int(x), int(y), int(x + w), int(y + h)], "label": "cv_region", "confidence": 0.35, "source": "opencv_adaptive_threshold_contours", "page": 1, } ) # Stable reading-ish order. regions.sort(key=lambda r: (r["bbox"][1], r["bbox"][0])) # Avoid huge payloads for now. return regions[:200] def analyze_document_image(image_path: str | Path, *, model_name: str = "placeholder") -> dict[str, Any]: """ Backend-only vision analysis entrypoint. Current phase: - renders the first PDF page to PNG - returns normalized metadata for later CV/Ollama processing """ path = Path(image_path) render_result: dict[str, Any] if path.exists() and path.suffix.lower() == ".pdf": render_result = _render_pdf_page_to_png(path) elif path.exists(): render_result = { "status": "image_available", "rendered_pages": [ { "page": 1, "png_path": str(path), "width": None, "height": None, "dpi": None, } ], } else: render_result = { "status": "source_missing", "error": "image_path_does_not_exist", "rendered_pages": [], } rendered_pages = render_result.get("rendered_pages") or [] vision_regions: list[dict[str, Any]] = [] if rendered_pages and rendered_pages[0].get("png_path"): vision_regions = _detect_visual_regions(rendered_pages[0]["png_path"]) return { "schema_version": "vision_analysis_v1", "engine": "local", "model_name": model_name, "image_path": str(path), **render_result, "layers": { "vision_regions": vision_regions, "vision_lines": [], "vision_boxes": [], "vision_fields": [], "vision_line_items": [], }, "notes": [ "Vision module rendered/located image input.", "OpenCV coarse region detection has run when available.", "No CV/Ollama model is connected yet.", ], } def _bbox_iou(a: list[float] | tuple[float, ...], b: list[float] | tuple[float, ...]) -> float: ax1, ay1, ax2, ay2 = [float(v) for v in a[:4]] bx1, by1, bx2, by2 = [float(v) for v in b[:4]] ix1 = max(ax1, bx1) iy1 = max(ay1, by1) ix2 = min(ax2, bx2) iy2 = min(ay2, by2) iw = max(0.0, ix2 - ix1) ih = max(0.0, iy2 - iy1) inter = iw * ih area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1) area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1) denom = area_a + area_b - inter return inter / denom if denom else 0.0 def _region_contains_ratio(region: list[float] | tuple[float, ...], item: list[float] | tuple[float, ...]) -> float: rx1, ry1, rx2, ry2 = [float(v) for v in region[:4]] ix1, iy1, ix2, iy2 = [float(v) for v in item[:4]] x1 = max(rx1, ix1) y1 = max(ry1, iy1) x2 = min(rx2, ix2) y2 = min(ry2, iy2) inter = max(0.0, x2 - x1) * max(0.0, y2 - y1) item_area = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1) return inter / item_area if item_area else 0.0 def _scale_bbox( bbox: list[float] | tuple[float, ...], *, scale_x: float, scale_y: float, ) -> list[float]: x1, y1, x2, y2 = [float(v) for v in bbox[:4]] return [ x1 * scale_x, y1 * scale_y, x2 * scale_x, y2 * scale_y, ] def normalize_vision_regions_to_layout( vision_result: dict[str, Any], layout_json: dict[str, Any] | None, ) -> dict[str, Any]: """ Convert OpenCV rendered-image pixel coordinates into layout_json page coordinates. """ pages = (layout_json or {}).get("pages") or [] rendered_pages = vision_result.get("rendered_pages") or [] layers = vision_result.setdefault("layers", {}) regions = layers.get("vision_regions") or [] if not pages or not rendered_pages or not regions: vision_result["coordinate_space"] = "unknown_or_unscaled" return vision_result page = pages[0] rendered = rendered_pages[0] layout_w = float(page.get("page_width") or page.get("width") or 0) layout_h = float(page.get("page_height") or page.get("height") or 0) rendered_w = float(rendered.get("width") or 0) rendered_h = float(rendered.get("height") or 0) if not layout_w or not layout_h or not rendered_w or not rendered_h: vision_result["coordinate_space"] = "rendered_pixels_unscaled" return vision_result scale_x = layout_w / rendered_w scale_y = layout_h / rendered_h normalized = [] for region in regions: bbox = region.get("bbox") if not bbox: continue item = dict(region) item["rendered_bbox"] = bbox item["bbox"] = _scale_bbox(bbox, scale_x=scale_x, scale_y=scale_y) item["coordinate_space"] = "layout_page" item["scale_x"] = scale_x item["scale_y"] = scale_y normalized.append(item) layers["vision_regions"] = normalized vision_result["coordinate_space"] = "layout_page" vision_result["coordinate_normalization"] = { "source": "rendered_pixels", "target": "layout_page", "layout_width": layout_w, "layout_height": layout_h, "rendered_width": rendered_w, "rendered_height": rendered_h, "scale_x": scale_x, "scale_y": scale_y, } return vision_result def score_vision_regions_against_layout( vision_result: dict[str, Any], layout_json: dict[str, Any] | None, ) -> dict[str, Any]: """ Compare OpenCV regions against existing OCR layout lines. Purpose: - measure whether CV regions line up with OCR line boxes - identify OCR lines not covered by CV regions - identify CV regions with no OCR coverage """ pages = (layout_json or {}).get("pages") or [] regions = ((vision_result.get("layers") or {}).get("vision_regions")) or [] if not pages or not regions: return { "schema_version": "vision_region_scoring_v1", "status": "not_enough_data", "page_scores": [], "summary": { "pages": len(pages), "regions": len(regions), "lines": 0, "matched_lines": 0, "unmatched_lines": 0, "unmatched_regions": len(regions), }, } page_scores: list[dict[str, Any]] = [] total_lines = 0 total_matched_lines = 0 total_unmatched_regions = 0 for page in pages: page_number = int(page.get("page") or 1) page_lines = page.get("lines") or [] page_regions = [r for r in regions if int(r.get("page") or 1) == page_number] matched_region_indexes: set[int] = set() line_scores: list[dict[str, Any]] = [] for line in page_lines: bbox = line.get("bbox") if not bbox: continue best = { "region_index": None, "iou": 0.0, "contains_ratio": 0.0, "region_bbox": None, } for idx, region in enumerate(page_regions): region_bbox = region.get("bbox") if not region_bbox: continue iou = _bbox_iou(region_bbox, bbox) contains = _region_contains_ratio(region_bbox, bbox) score = max(iou, contains) if score > max(best["iou"], best["contains_ratio"]): best = { "region_index": idx, "iou": round(iou, 4), "contains_ratio": round(contains, 4), "region_bbox": region_bbox, } matched = (best["contains_ratio"] >= 0.55) or (best["iou"] >= 0.10) if matched and best["region_index"] is not None: matched_region_indexes.add(int(best["region_index"])) line_scores.append( { "line_text": str(line.get("text") or "")[:120], "line_bbox": bbox, "matched": matched, **best, } ) total_lines += len(line_scores) matched_lines = sum(1 for item in line_scores if item["matched"]) total_matched_lines += matched_lines unmatched_region_indexes = [ idx for idx in range(len(page_regions)) if idx not in matched_region_indexes ] total_unmatched_regions += len(unmatched_region_indexes) page_scores.append( { "page": page_number, "line_count": len(line_scores), "region_count": len(page_regions), "matched_line_count": matched_lines, "unmatched_line_count": len(line_scores) - matched_lines, "unmatched_region_count": len(unmatched_region_indexes), "line_scores": line_scores[:200], "unmatched_regions": [ page_regions[idx] for idx in unmatched_region_indexes[:100] ], } ) return { "schema_version": "vision_region_scoring_v1", "status": "scored", "page_scores": page_scores, "summary": { "pages": len(pages), "regions": len(regions), "lines": total_lines, "matched_lines": total_matched_lines, "unmatched_lines": total_lines - total_matched_lines, "unmatched_regions": total_unmatched_regions, }, } def _classify_region_geometry(region: dict[str, Any], *, page_width: float, page_height: float) -> dict[str, Any]: bbox = region.get("bbox") or [0, 0, 0, 0] x1, y1, x2, y2 = [float(v) for v in bbox[:4]] w = max(0.0, x2 - x1) h = max(0.0, y2 - y1) area = w * h page_area = max(1.0, page_width * page_height) aspect = w / h if h else 0.0 label = "unknown_region" confidence = 0.20 if area > page_area * 0.18: label = "large_document_region" confidence = 0.35 elif w > page_width * 0.70 and aspect > 4: label = "wide_text_band" confidence = 0.45 elif h > page_height * 0.10 and w > page_width * 0.35: label = "large_text_block" confidence = 0.40 elif aspect > 8: label = "horizontal_rule_or_text_band" confidence = 0.35 elif w < page_width * 0.12 and h < page_height * 0.06: label = "small_symbol_or_short_text" confidence = 0.30 item = dict(region) item["geometry_class"] = label item["geometry_confidence"] = confidence item["geometry_features"] = { "width": w, "height": h, "area_ratio": area / page_area, "aspect_ratio": aspect, } return item def _write_region_crop( png_path: str | Path, region: dict[str, Any], *, crop_index: int, padding_px: int = 8, ) -> str | None: if cv2 is None: return None img = cv2.imread(str(png_path)) if img is None: return None height, width = img.shape[:2] bbox = region.get("rendered_bbox") or region.get("bbox") if not bbox: return None x1, y1, x2, y2 = [int(round(float(v))) for v in bbox[:4]] x1 = max(0, x1 - padding_px) y1 = max(0, y1 - padding_px) x2 = min(width, x2 + padding_px) y2 = min(height, y2 + padding_px) if x2 <= x1 or y2 <= y1: return None crop = img[y1:y2, x1:x2] crop_dir = Path(png_path).parent / "crops" / Path(png_path).stem crop_dir.mkdir(parents=True, exist_ok=True) crop_path = crop_dir / f"region_{crop_index:04d}.png" cv2.imwrite(str(crop_path), crop) return str(crop_path) def _ocr_crop(crop_path: str | Path) -> dict[str, Any]: """ Run OCR over a cropped unmatched region. Returns lightweight text/confidence metadata only. Full OCR/layout merging remains a later step. """ if pytesseract is None: return { "ocr_status": "unavailable", "ocr_engine": "tesseract", "ocr_text": "", "ocr_confidence": None, } path = Path(crop_path) if not path.exists(): return { "ocr_status": "missing_crop", "ocr_engine": "tesseract", "ocr_text": "", "ocr_confidence": None, } try: data = pytesseract.image_to_data( str(path), output_type=pytesseract.Output.DICT, config="--psm 6", ) except Exception as e: return { "ocr_status": "error", "ocr_engine": "tesseract", "ocr_error": repr(e), "ocr_text": "", "ocr_confidence": None, } words: list[str] = [] confidences: list[float] = [] for text, conf in zip(data.get("text", []), data.get("conf", [])): text = str(text or "").strip() if not text: continue try: c = float(conf) except Exception: c = -1.0 if c >= 0: confidences.append(c) words.append(text) ocr_text = " ".join(words).strip() avg_conf = round(sum(confidences) / len(confidences), 2) if confidences else None return { "ocr_status": "ok" if ocr_text else "no_text", "ocr_engine": "tesseract", "ocr_psm": 6, "ocr_text": ocr_text, "ocr_confidence": avg_conf, } def classify_and_crop_unmatched_regions( vision_result: dict[str, Any], layout_json: dict[str, Any] | None, region_score: dict[str, Any] | None = None, ) -> dict[str, Any]: """ Classify unmatched CV regions and write region crop images for later VLM analysis. """ pages = (layout_json or {}).get("pages") or [] rendered_pages = vision_result.get("rendered_pages") or [] if not pages or not rendered_pages: return { "schema_version": "vision_region_classification_v1", "status": "not_enough_data", "classified_regions": [], } page = pages[0] page_width = float(page.get("page_width") or page.get("width") or 1) page_height = float(page.get("page_height") or page.get("height") or 1) png_path = rendered_pages[0].get("png_path") unmatched_regions: list[dict[str, Any]] = [] if region_score: for page_score in region_score.get("page_scores") or []: unmatched_regions.extend(page_score.get("unmatched_regions") or []) if not unmatched_regions: unmatched_regions = ((vision_result.get("layers") or {}).get("vision_regions")) or [] classified: list[dict[str, Any]] = [] for idx, region in enumerate(unmatched_regions): item = _classify_region_geometry(region, page_width=page_width, page_height=page_height) if png_path: item["crop_path"] = _write_region_crop(png_path, item, crop_index=idx) if item.get("crop_path"): item.update(_ocr_crop(item["crop_path"])) item["classification_source"] = "opencv_geometry_classifier" classified.append(item) return { "schema_version": "vision_region_classification_v1", "status": "classified", "classified_region_count": len(classified), "classified_regions": classified, } def build_vision_candidate_fields(classification: dict[str, Any]) -> list[dict[str, Any]]: """ Convert crop OCR/classification results into lightweight structured field candidates. """ fields: list[dict[str, Any]] = [] regions = classification.get("classified_regions") or [] money_re = re.compile(r"(?= 60 else 0.25, }) elif "cvs" in lower or "pharmacy" in lower: fields.append({ **base, "candidate_type": "merchant_or_header", "value": text, "confidence": 0.75 if (conf or 0) >= 70 else 0.45, }) if time_re.search(text): fields.append({ **base, "candidate_type": "transaction_time", "value": time_re.search(text).group(0), "raw_text": text, "confidence": 0.80 if (conf or 0) >= 70 else 0.50, }) if item_count_re.search(text): fields.append({ **base, "candidate_type": "item_count", "value": item_count_re.search(text).group(0).upper(), "raw_text": text, "confidence": 0.65 if (conf or 0) >= 50 else 0.40, }) money_matches = money_re.findall(text) if money_matches: fields.append({ **base, "candidate_type": "money_amounts", "value": money_matches, "raw_text": text, "confidence": 0.65 if (conf or 0) >= 50 else 0.35, }) # Capture low-value symbol/noise so later filtering can learn from it. if len(text) <= 3 and not money_matches and not time_re.search(text): fields.append({ **base, "candidate_type": "symbol_or_noise", "value": text, "confidence": 0.20, }) return fields def build_vision_field_suggestions( candidate_fields: list[dict[str, Any]], existing_fields: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: """ Convert vision candidate fields into simple add/update/ignore suggestions. This intentionally stays conservative: - high confidence merchant/time/item_count/money candidates are surfaced - symbol/noise is ignored - existing field comparison can be expanded later """ existing_fields = existing_fields or {} suggestions: list[dict[str, Any]] = [] type_to_existing_key = { "merchant_or_header": "merchant_raw", "transaction_time": "transaction_time", "item_count": "item_count", "money_amounts": "amount_candidates", } for field in candidate_fields or []: candidate_type = field.get("candidate_type") if candidate_type in {"symbol_or_noise", "receipt_message"}: continue confidence = float(field.get("confidence") or 0) ocr_confidence = field.get("ocr_confidence") value = field.get("value") if not value: continue min_conf = 0.40 if candidate_type in {"merchant_or_header", "transaction_time"}: min_conf = 0.60 elif candidate_type == "money_amounts": min_conf = 0.50 if confidence < min_conf: continue existing_key = type_to_existing_key.get(candidate_type, candidate_type) existing_value = existing_fields.get(existing_key) action = "add" if existing_value: action = "review_update" if str(existing_value).strip() != str(value).strip() else "already_present" suggestions.append( { "suggestion_type": candidate_type, "target_field": existing_key, "action": action, "value": value, "existing_value": existing_value, "confidence": confidence, "ocr_confidence": ocr_confidence, "source": "vision_candidate_fields", "source_region_index": field.get("source_region_index"), "source_bbox": field.get("source_bbox"), "source_crop_path": field.get("source_crop_path"), "raw_text": field.get("raw_text"), } ) return suggestions def build_vision_assisted_layout(source_layout: dict[str, Any] | None, vision_result: dict[str, Any]) -> dict[str, Any]: """ Convert vision analysis into normal layout_json. Current phase: - normalizes CV coordinates into layout page coordinates - scores CV region coverage against OCR layout lines - preserves the current source layout for editor stability - stores diagnostics on the layout candidate """ layout = dict(source_layout or {"pages": []}) normalized_vision = normalize_vision_regions_to_layout(vision_result, layout) region_score = score_vision_regions_against_layout(normalized_vision, layout) region_classification = classify_and_crop_unmatched_regions( normalized_vision, layout, region_score, ) candidate_fields = build_vision_candidate_fields(region_classification) field_suggestions = build_vision_field_suggestions(candidate_fields) layout["vision_assisted"] = True layout["vision_assisted_status"] = normalized_vision.get("status", "unknown") layout["vision_engine"] = normalized_vision.get("engine") layout["vision_model_name"] = normalized_vision.get("model_name") layout["vision_coordinate_normalization"] = normalized_vision.get("coordinate_normalization") layout["vision_region_score"] = region_score layout["vision_region_classification"] = region_classification layout["vision_candidate_fields"] = candidate_fields layout["vision_field_suggestions"] = field_suggestions layout["layout_sync_source"] = "vision_assisted" layout["layout_needs_review"] = True return layout