diff --git a/app/logic/vision_analysis.py b/app/logic/vision_analysis.py index 267c681..f3fa915 100644 --- a/app/logic/vision_analysis.py +++ b/app/logic/vision_analysis.py @@ -189,19 +189,261 @@ def analyze_document_image(image_path: str | Path, *, model_name: str = "placeho } + +def _bbox_iou(a: list[float] | tuple[float, ...], b: list[float] | tuple[float, ...]) -> float: + ax1, ay1, ax2, ay2 = [float(v) for v in a[:4]] + bx1, by1, bx2, by2 = [float(v) for v in b[:4]] + + ix1 = max(ax1, bx1) + iy1 = max(ay1, by1) + ix2 = min(ax2, bx2) + iy2 = min(ay2, by2) + + iw = max(0.0, ix2 - ix1) + ih = max(0.0, iy2 - iy1) + inter = iw * ih + + area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1) + area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1) + denom = area_a + area_b - inter + + return inter / denom if denom else 0.0 + + +def _region_contains_ratio(region: list[float] | tuple[float, ...], item: list[float] | tuple[float, ...]) -> float: + rx1, ry1, rx2, ry2 = [float(v) for v in region[:4]] + ix1, iy1, ix2, iy2 = [float(v) for v in item[:4]] + + x1 = max(rx1, ix1) + y1 = max(ry1, iy1) + x2 = min(rx2, ix2) + y2 = min(ry2, iy2) + + inter = max(0.0, x2 - x1) * max(0.0, y2 - y1) + item_area = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1) + + return inter / item_area if item_area else 0.0 + + + +def _scale_bbox( + bbox: list[float] | tuple[float, ...], + *, + scale_x: float, + scale_y: float, +) -> list[float]: + x1, y1, x2, y2 = [float(v) for v in bbox[:4]] + return [ + x1 * scale_x, + y1 * scale_y, + x2 * scale_x, + y2 * scale_y, + ] + + +def normalize_vision_regions_to_layout( + vision_result: dict[str, Any], + layout_json: dict[str, Any] | None, +) -> dict[str, Any]: + """ + Convert OpenCV rendered-image pixel coordinates into layout_json page coordinates. + """ + pages = (layout_json or {}).get("pages") or [] + rendered_pages = vision_result.get("rendered_pages") or [] + layers = vision_result.setdefault("layers", {}) + regions = layers.get("vision_regions") or [] + + if not pages or not rendered_pages or not regions: + vision_result["coordinate_space"] = "unknown_or_unscaled" + return vision_result + + page = pages[0] + rendered = rendered_pages[0] + + layout_w = float(page.get("page_width") or page.get("width") or 0) + layout_h = float(page.get("page_height") or page.get("height") or 0) + rendered_w = float(rendered.get("width") or 0) + rendered_h = float(rendered.get("height") or 0) + + if not layout_w or not layout_h or not rendered_w or not rendered_h: + vision_result["coordinate_space"] = "rendered_pixels_unscaled" + return vision_result + + scale_x = layout_w / rendered_w + scale_y = layout_h / rendered_h + + normalized = [] + for region in regions: + bbox = region.get("bbox") + if not bbox: + continue + + item = dict(region) + item["rendered_bbox"] = bbox + item["bbox"] = _scale_bbox(bbox, scale_x=scale_x, scale_y=scale_y) + item["coordinate_space"] = "layout_page" + item["scale_x"] = scale_x + item["scale_y"] = scale_y + normalized.append(item) + + layers["vision_regions"] = normalized + vision_result["coordinate_space"] = "layout_page" + vision_result["coordinate_normalization"] = { + "source": "rendered_pixels", + "target": "layout_page", + "layout_width": layout_w, + "layout_height": layout_h, + "rendered_width": rendered_w, + "rendered_height": rendered_h, + "scale_x": scale_x, + "scale_y": scale_y, + } + return vision_result + +def score_vision_regions_against_layout( + vision_result: dict[str, Any], + layout_json: dict[str, Any] | None, +) -> dict[str, Any]: + """ + Compare OpenCV regions against existing OCR layout lines. + + Purpose: + - measure whether CV regions line up with OCR line boxes + - identify OCR lines not covered by CV regions + - identify CV regions with no OCR coverage + """ + pages = (layout_json or {}).get("pages") or [] + regions = ((vision_result.get("layers") or {}).get("vision_regions")) or [] + + if not pages or not regions: + return { + "schema_version": "vision_region_scoring_v1", + "status": "not_enough_data", + "page_scores": [], + "summary": { + "pages": len(pages), + "regions": len(regions), + "lines": 0, + "matched_lines": 0, + "unmatched_lines": 0, + "unmatched_regions": len(regions), + }, + } + + page_scores: list[dict[str, Any]] = [] + total_lines = 0 + total_matched_lines = 0 + total_unmatched_regions = 0 + + for page in pages: + page_number = int(page.get("page") or 1) + page_lines = page.get("lines") or [] + page_regions = [r for r in regions if int(r.get("page") or 1) == page_number] + + matched_region_indexes: set[int] = set() + line_scores: list[dict[str, Any]] = [] + + for line in page_lines: + bbox = line.get("bbox") + if not bbox: + continue + + best = { + "region_index": None, + "iou": 0.0, + "contains_ratio": 0.0, + "region_bbox": None, + } + + for idx, region in enumerate(page_regions): + region_bbox = region.get("bbox") + if not region_bbox: + continue + + iou = _bbox_iou(region_bbox, bbox) + contains = _region_contains_ratio(region_bbox, bbox) + score = max(iou, contains) + + if score > max(best["iou"], best["contains_ratio"]): + best = { + "region_index": idx, + "iou": round(iou, 4), + "contains_ratio": round(contains, 4), + "region_bbox": region_bbox, + } + + matched = (best["contains_ratio"] >= 0.55) or (best["iou"] >= 0.10) + if matched and best["region_index"] is not None: + matched_region_indexes.add(int(best["region_index"])) + + line_scores.append( + { + "line_text": str(line.get("text") or "")[:120], + "line_bbox": bbox, + "matched": matched, + **best, + } + ) + + total_lines += len(line_scores) + matched_lines = sum(1 for item in line_scores if item["matched"]) + total_matched_lines += matched_lines + + unmatched_region_indexes = [ + idx for idx in range(len(page_regions)) if idx not in matched_region_indexes + ] + total_unmatched_regions += len(unmatched_region_indexes) + + page_scores.append( + { + "page": page_number, + "line_count": len(line_scores), + "region_count": len(page_regions), + "matched_line_count": matched_lines, + "unmatched_line_count": len(line_scores) - matched_lines, + "unmatched_region_count": len(unmatched_region_indexes), + "line_scores": line_scores[:200], + "unmatched_regions": [ + page_regions[idx] for idx in unmatched_region_indexes[:100] + ], + } + ) + + return { + "schema_version": "vision_region_scoring_v1", + "status": "scored", + "page_scores": page_scores, + "summary": { + "pages": len(pages), + "regions": len(regions), + "lines": total_lines, + "matched_lines": total_matched_lines, + "unmatched_lines": total_lines - total_matched_lines, + "unmatched_regions": total_unmatched_regions, + }, + } + def build_vision_assisted_layout(source_layout: dict[str, Any] | None, vision_result: dict[str, Any]) -> dict[str, Any]: """ Convert vision analysis into normal layout_json. Current phase: - - preserves the current source layout - - tags it as vision-assisted + - normalizes CV coordinates into layout page coordinates + - scores CV region coverage against OCR layout lines + - preserves the current source layout for editor stability + - stores diagnostics on the layout candidate """ layout = dict(source_layout or {"pages": []}) + + normalized_vision = normalize_vision_regions_to_layout(vision_result, layout) + region_score = score_vision_regions_against_layout(normalized_vision, layout) + layout["vision_assisted"] = True - layout["vision_assisted_status"] = vision_result.get("status", "unknown") - layout["vision_engine"] = vision_result.get("engine") - layout["vision_model_name"] = vision_result.get("model_name") + layout["vision_assisted_status"] = normalized_vision.get("status", "unknown") + layout["vision_engine"] = normalized_vision.get("engine") + layout["vision_model_name"] = normalized_vision.get("model_name") + layout["vision_coordinate_normalization"] = normalized_vision.get("coordinate_normalization") + layout["vision_region_score"] = region_score layout["layout_sync_source"] = "vision_assisted" layout["layout_needs_review"] = True return layout