dung-vpt-uney
Deploy CoRGI demo - 2025-10-29 14:17:23
58fe08c
from __future__ import annotations
import json
import re
from typing import Any, Iterable, List
from .types import GroundedEvidence, ReasoningStep
_JSON_FENCE_RE = re.compile(r"```(?:json)?(.*?)```", re.DOTALL | re.IGNORECASE)
_STEP_MARKER_RE = re.compile(r"(?im)(?:^|\n)\s*(?:step\s*(\d+)|(\d+)[\.\)])\s*[:\-]?\s*")
_NEEDS_VISION_RE = re.compile(
r"needs[\s_]*vision\s*[:\-]?\s*(?P<value>true|false|yes|no|required|not required|necessary|unnecessary)",
re.IGNORECASE,
)
_REASON_RE = re.compile(r"reason\s*[:\-]\s*(?P<value>.+)", re.IGNORECASE)
_BOX_RE = re.compile(
r"\[\s*-?\d+(?:\.\d+)?\s*,\s*-?\d+(?:\.\d+)?\s*,\s*-?\d+(?:\.\d+)?\s*,\s*-?\d+(?:\.\d+)?\s*\]"
)
_ORDINAL_WORD_MAP = {
"first": 1,
"second": 2,
"third": 3,
"fourth": 4,
"fifth": 5,
"sixth": 6,
"seventh": 7,
"eighth": 8,
"ninth": 9,
"tenth": 10,
}
_NUMBER_WORD_MAP = {
"one": 1,
"two": 2,
"three": 3,
"four": 4,
"five": 5,
"six": 6,
"seven": 7,
"eight": 8,
"nine": 9,
"ten": 10,
}
_ORDINAL_STEP_RE = re.compile(
r"(?im)\b(?P<word>first|second|third|fourth|fifth|sixth|seventh|eighth|ninth|tenth)\s+step\b"
)
_WORD_STEP_RE = re.compile(
r"(?im)\bstep\s+(?P<word>one|two|three|four|five|six|seven|eight|nine|ten)\b"
)
_META_TOKENS = {"maybe", "wait", "let's", "lets", "question", "protocol"}
def _to_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
if value is None:
return False
if isinstance(value, (int, float)):
return value != 0
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "t", "yes", "y", "1"}:
return True
if lowered in {"false", "f", "no", "n", "0"}:
return False
return False
def _extract_json_strings(text: str) -> Iterable[str]:
"""Return candidate JSON payloads from the response text."""
fenced = _JSON_FENCE_RE.findall(text)
if fenced:
for body in fenced:
yield body.strip()
stripped = text.strip()
if stripped:
yield stripped
def _load_first_json(text: str) -> Any:
last_error = None
for candidate in _extract_json_strings(text):
try:
return json.loads(candidate)
except json.JSONDecodeError as err:
last_error = err
continue
if last_error:
raise ValueError(f"Unable to parse JSON from response: {last_error}") from last_error
raise ValueError("Empty response, cannot parse JSON.")
def _trim_reasoning_text(text: str) -> str:
lowered = text.lower()
for anchor in ("let's draft", "draft:", "structured steps", "final reasoning"):
pos = lowered.rfind(anchor)
if pos != -1:
return text[pos:]
return text
def _clean_sentence(text: str) -> str:
return " ".join(text.strip().split())
def _normalize_step_markers(text: str) -> str:
"""Convert ordinal step markers into numeric form (e.g., 'First step' -> 'Step 1')."""
def replace_ordinal(match: re.Match[str]) -> str:
word = match.group("word").lower()
num = _ORDINAL_WORD_MAP.get(word)
return f"Step {num}" if num is not None else match.group(0)
def replace_word_number(match: re.Match[str]) -> str:
word = match.group("word").lower()
num = _NUMBER_WORD_MAP.get(word)
return f"Step {num}" if num is not None else match.group(0)
normalized = _ORDINAL_STEP_RE.sub(replace_ordinal, text)
normalized = _WORD_STEP_RE.sub(replace_word_number, normalized)
return normalized
def _extract_statement(body: str) -> str | None:
statement_match = re.search(r"statement\s*[:\-]\s*(.+?)(?=\s*(?:needs\s*vision|reason\s*[:\-]|$))", body, re.IGNORECASE | re.DOTALL)
if statement_match:
candidate = statement_match.group(1)
else:
# Fallback: take first sentence or line before metadata
candidate = re.split(r"(?i)needs\s*vision|reason\s*[:\-]", body)[0]
# Clean up the candidate
candidate = candidate.strip().rstrip(".,;:")
# If still empty or too short, return None
if not candidate or len(candidate) < 5:
return None
return _clean_sentence(candidate)
def _extract_needs_vision(body: str) -> bool:
match = _NEEDS_VISION_RE.search(body)
if not match:
return True
token = match.group("value").strip().lower()
if token in {"not required", "unnecessary"}:
return False
if token in {"required", "necessary"}:
return True
return _to_bool(token)
def _extract_reason(body: str) -> str | None:
match = _REASON_RE.search(body)
if match:
reason = match.group("value").strip()
reason = re.split(r"(?i)needs\s*vision", reason)[0].strip()
reason = reason.rstrip(".")
return reason or None
because_match = re.search(r"because\s+(.+?)(?:\.|$)", body, re.IGNORECASE)
if because_match:
reason = because_match.group(1).strip().rstrip(".")
return reason or None
return None
def _parse_step_block(index_guess: int, body: str) -> ReasoningStep | None:
statement = _extract_statement(body)
if not statement:
return None
needs_vision = _extract_needs_vision(body)
reason = _extract_reason(body)
index = index_guess if index_guess > 0 else 1
return ReasoningStep(index=index, statement=statement, needs_vision=needs_vision, reason=reason)
def _parse_reasoning_from_text(response_text: str, max_steps: int) -> List[ReasoningStep]:
text = _trim_reasoning_text(response_text)
text = _normalize_step_markers(text)
matches = list(_STEP_MARKER_RE.finditer(text))
if not matches:
return []
steps_map: dict[int, ReasoningStep] = {}
ordering: List[int] = []
fallback_index = 1
for idx, marker in enumerate(matches):
start = marker.end()
end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text)
body = text[start:end].strip()
if not body:
continue
raw_index = marker.group(1) or marker.group(2)
try:
index_guess = int(raw_index) if raw_index else fallback_index
except (TypeError, ValueError):
index_guess = fallback_index
if raw_index is None:
fallback_index += 1
step = _parse_step_block(index_guess, body)
if step is None:
continue
if step.index not in steps_map:
ordering.append(step.index)
steps_map[step.index] = step
if len(ordering) >= max_steps:
break
return [steps_map[idx] for idx in ordering[:max_steps]]
def _looks_like_meta_statement(statement: str) -> bool:
lowered = statement.lower()
if any(token in lowered for token in _META_TOKENS) and "step" in lowered:
return True
if lowered.startswith(("maybe", "wait", "let's", "lets")):
return True
if len(statement) > 260 and "step" in lowered:
return True
return False
def _prune_steps(steps: List[ReasoningStep]) -> List[ReasoningStep]:
filtered: List[ReasoningStep] = []
seen_statements: set[str] = set()
for step in steps:
normalized = step.statement.strip().lower()
if _looks_like_meta_statement(step.statement):
continue
if normalized in seen_statements:
continue
seen_statements.add(normalized)
filtered.append(step)
return filtered or steps
def _extract_description(text: str, start_index: int) -> str | None:
boundary = max(text.rfind("\n", 0, start_index), text.rfind(".", 0, start_index))
if boundary == -1:
boundary = 0
snippet = text[boundary:start_index].strip(" \n.:–-")
if not snippet:
return None
return _clean_sentence(snippet)
def _parse_roi_from_text(response_text: str, default_step_index: int) -> List[GroundedEvidence]:
evidences: List[GroundedEvidence] = []
seen: set[tuple[float, float, float, float]] = set()
for match in _BOX_RE.finditer(response_text):
coords_str = match.group(0).strip("[]")
try:
coords = [float(part.strip()) for part in coords_str.split(",")]
except ValueError:
continue
if len(coords) != 4:
continue
try:
bbox = _normalize_bbox(coords)
except ValueError:
continue
key = tuple(round(c, 4) for c in bbox)
if key in seen:
continue
description = _extract_description(response_text, match.start())
evidences.append(
GroundedEvidence(
step_index=default_step_index,
bbox=bbox,
description=description,
confidence=None,
raw_source={"bbox": coords, "description": description},
)
)
seen.add(key)
return evidences
def parse_structured_reasoning(response_text: str, max_steps: int) -> List[ReasoningStep]:
"""Parse Qwen3-VL structured reasoning output into dataclasses."""
try:
payload = _load_first_json(response_text)
except ValueError as json_error:
steps = _parse_reasoning_from_text(response_text, max_steps=max_steps)
if steps:
return _prune_steps(steps)[:max_steps]
raise json_error
if not isinstance(payload, list):
raise ValueError("Structured reasoning response must be a JSON list.")
steps: List[ReasoningStep] = []
for idx, item in enumerate(payload, start=1):
if not isinstance(item, dict):
continue
statement = item.get("statement") or item.get("step") or item.get("text")
if not isinstance(statement, str):
continue
statement = statement.strip()
if not statement:
continue
step_index = item.get("index")
if not isinstance(step_index, int):
step_index = idx
needs_vision = _to_bool(item.get("needs_vision") or item.get("requires_vision"))
reason = item.get("reason") or item.get("justification")
if isinstance(reason, str):
reason = reason.strip() or None
else:
reason = None
steps.append(ReasoningStep(index=step_index, statement=statement, needs_vision=needs_vision, reason=reason))
if len(steps) >= max_steps:
break
steps = _prune_steps(steps)[:max_steps]
if not steps:
raise ValueError("No reasoning steps parsed from response.")
return steps
def _normalize_bbox(bbox: Any) -> tuple[float, float, float, float]:
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
raise ValueError(f"Bounding box must be a list of 4 numbers, got {bbox!r}")
coords = []
for raw in bbox:
if isinstance(raw, str):
raw = raw.strip()
if not raw:
raw = 0
else:
raw = float(raw)
elif isinstance(raw, (int, float)):
raw = float(raw)
else:
raw = 0.0
coords.append(raw)
scale = max(abs(v) for v in coords) if coords else 1.0
if scale > 1.5: # assume 0..1000 or pixel coordinates
coords = [max(0.0, min(v / 1000.0, 1.0)) for v in coords]
else:
coords = [max(0.0, min(v, 1.0)) for v in coords]
x1, y1, x2, y2 = coords
x_min, x_max = sorted((x1, x2))
y_min, y_max = sorted((y1, y2))
return (x_min, y_min, x_max, y_max)
def parse_roi_evidence(response_text: str, default_step_index: int) -> List[GroundedEvidence]:
"""Parse ROI grounding output into evidence structures."""
try:
payload = _load_first_json(response_text)
except ValueError:
return _parse_roi_from_text(response_text, default_step_index=default_step_index)
if not isinstance(payload, list):
raise ValueError("ROI extraction response must be a JSON list.")
evidences: List[GroundedEvidence] = []
for item in payload:
if not isinstance(item, dict):
continue
raw_bbox = item.get("bbox") or item.get("bbox_2d") or item.get("box")
if raw_bbox is None:
continue
try:
bbox = _normalize_bbox(raw_bbox)
except ValueError:
continue
step_index = item.get("step") or item.get("step_index") or default_step_index
if not isinstance(step_index, int):
step_index = default_step_index
description = item.get("description") or item.get("caption") or item.get("detail")
if isinstance(description, str):
description = description.strip() or None
else:
description = None
confidence = item.get("confidence") or item.get("score") or item.get("probability")
if isinstance(confidence, str):
confidence = confidence.strip()
confidence = float(confidence) if confidence else None
elif isinstance(confidence, (int, float)):
confidence = float(confidence)
else:
confidence = None
evidences.append(
GroundedEvidence(
step_index=step_index,
bbox=bbox,
description=description,
confidence=confidence,
raw_source=item,
)
)
return evidences