from __future__ import annotations import json import re from typing import Any, Iterable, List from .types import GroundedEvidence, ReasoningStep _JSON_FENCE_RE = re.compile(r"```(?:json)?(.*?)```", re.DOTALL | re.IGNORECASE) _STEP_MARKER_RE = re.compile(r"(?im)(?:^|\n)\s*(?:step\s*(\d+)|(\d+)[\.\)])\s*[:\-]?\s*") _NEEDS_VISION_RE = re.compile( r"needs[\s_]*vision\s*[:\-]?\s*(?Ptrue|false|yes|no|required|not required|necessary|unnecessary)", re.IGNORECASE, ) _REASON_RE = re.compile(r"reason\s*[:\-]\s*(?P.+)", re.IGNORECASE) _BOX_RE = re.compile( r"\[\s*-?\d+(?:\.\d+)?\s*,\s*-?\d+(?:\.\d+)?\s*,\s*-?\d+(?:\.\d+)?\s*,\s*-?\d+(?:\.\d+)?\s*\]" ) _ORDINAL_WORD_MAP = { "first": 1, "second": 2, "third": 3, "fourth": 4, "fifth": 5, "sixth": 6, "seventh": 7, "eighth": 8, "ninth": 9, "tenth": 10, } _NUMBER_WORD_MAP = { "one": 1, "two": 2, "three": 3, "four": 4, "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10, } _ORDINAL_STEP_RE = re.compile( r"(?im)\b(?Pfirst|second|third|fourth|fifth|sixth|seventh|eighth|ninth|tenth)\s+step\b" ) _WORD_STEP_RE = re.compile( r"(?im)\bstep\s+(?Pone|two|three|four|five|six|seven|eight|nine|ten)\b" ) _META_TOKENS = {"maybe", "wait", "let's", "lets", "question", "protocol"} def _to_bool(value: Any) -> bool: if isinstance(value, bool): return value if value is None: return False if isinstance(value, (int, float)): return value != 0 if isinstance(value, str): lowered = value.strip().lower() if lowered in {"true", "t", "yes", "y", "1"}: return True if lowered in {"false", "f", "no", "n", "0"}: return False return False def _extract_json_strings(text: str) -> Iterable[str]: """Return candidate JSON payloads from the response text.""" fenced = _JSON_FENCE_RE.findall(text) if fenced: for body in fenced: yield body.strip() stripped = text.strip() if stripped: yield stripped def _load_first_json(text: str) -> Any: last_error = None for candidate in _extract_json_strings(text): try: return json.loads(candidate) except json.JSONDecodeError as err: last_error = err continue if last_error: raise ValueError(f"Unable to parse JSON from response: {last_error}") from last_error raise ValueError("Empty response, cannot parse JSON.") def _trim_reasoning_text(text: str) -> str: lowered = text.lower() for anchor in ("let's draft", "draft:", "structured steps", "final reasoning"): pos = lowered.rfind(anchor) if pos != -1: return text[pos:] return text def _clean_sentence(text: str) -> str: return " ".join(text.strip().split()) def _normalize_step_markers(text: str) -> str: """Convert ordinal step markers into numeric form (e.g., 'First step' -> 'Step 1').""" def replace_ordinal(match: re.Match[str]) -> str: word = match.group("word").lower() num = _ORDINAL_WORD_MAP.get(word) return f"Step {num}" if num is not None else match.group(0) def replace_word_number(match: re.Match[str]) -> str: word = match.group("word").lower() num = _NUMBER_WORD_MAP.get(word) return f"Step {num}" if num is not None else match.group(0) normalized = _ORDINAL_STEP_RE.sub(replace_ordinal, text) normalized = _WORD_STEP_RE.sub(replace_word_number, normalized) return normalized def _extract_statement(body: str) -> str | None: statement_match = re.search(r"statement\s*[:\-]\s*(.+?)(?=\s*(?:needs\s*vision|reason\s*[:\-]|$))", body, re.IGNORECASE | re.DOTALL) if statement_match: candidate = statement_match.group(1) else: # Fallback: take first sentence or line before metadata candidate = re.split(r"(?i)needs\s*vision|reason\s*[:\-]", body)[0] # Clean up the candidate candidate = candidate.strip().rstrip(".,;:") # If still empty or too short, return None if not candidate or len(candidate) < 5: return None return _clean_sentence(candidate) def _extract_needs_vision(body: str) -> bool: match = _NEEDS_VISION_RE.search(body) if not match: return True token = match.group("value").strip().lower() if token in {"not required", "unnecessary"}: return False if token in {"required", "necessary"}: return True return _to_bool(token) def _extract_reason(body: str) -> str | None: match = _REASON_RE.search(body) if match: reason = match.group("value").strip() reason = re.split(r"(?i)needs\s*vision", reason)[0].strip() reason = reason.rstrip(".") return reason or None because_match = re.search(r"because\s+(.+?)(?:\.|$)", body, re.IGNORECASE) if because_match: reason = because_match.group(1).strip().rstrip(".") return reason or None return None def _parse_step_block(index_guess: int, body: str) -> ReasoningStep | None: statement = _extract_statement(body) if not statement: return None needs_vision = _extract_needs_vision(body) reason = _extract_reason(body) index = index_guess if index_guess > 0 else 1 return ReasoningStep(index=index, statement=statement, needs_vision=needs_vision, reason=reason) def _parse_reasoning_from_text(response_text: str, max_steps: int) -> List[ReasoningStep]: text = _trim_reasoning_text(response_text) text = _normalize_step_markers(text) matches = list(_STEP_MARKER_RE.finditer(text)) if not matches: return [] steps_map: dict[int, ReasoningStep] = {} ordering: List[int] = [] fallback_index = 1 for idx, marker in enumerate(matches): start = marker.end() end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text) body = text[start:end].strip() if not body: continue raw_index = marker.group(1) or marker.group(2) try: index_guess = int(raw_index) if raw_index else fallback_index except (TypeError, ValueError): index_guess = fallback_index if raw_index is None: fallback_index += 1 step = _parse_step_block(index_guess, body) if step is None: continue if step.index not in steps_map: ordering.append(step.index) steps_map[step.index] = step if len(ordering) >= max_steps: break return [steps_map[idx] for idx in ordering[:max_steps]] def _looks_like_meta_statement(statement: str) -> bool: lowered = statement.lower() if any(token in lowered for token in _META_TOKENS) and "step" in lowered: return True if lowered.startswith(("maybe", "wait", "let's", "lets")): return True if len(statement) > 260 and "step" in lowered: return True return False def _prune_steps(steps: List[ReasoningStep]) -> List[ReasoningStep]: filtered: List[ReasoningStep] = [] seen_statements: set[str] = set() for step in steps: normalized = step.statement.strip().lower() if _looks_like_meta_statement(step.statement): continue if normalized in seen_statements: continue seen_statements.add(normalized) filtered.append(step) return filtered or steps def _extract_description(text: str, start_index: int) -> str | None: boundary = max(text.rfind("\n", 0, start_index), text.rfind(".", 0, start_index)) if boundary == -1: boundary = 0 snippet = text[boundary:start_index].strip(" \n.:–-") if not snippet: return None return _clean_sentence(snippet) def _parse_roi_from_text(response_text: str, default_step_index: int) -> List[GroundedEvidence]: evidences: List[GroundedEvidence] = [] seen: set[tuple[float, float, float, float]] = set() for match in _BOX_RE.finditer(response_text): coords_str = match.group(0).strip("[]") try: coords = [float(part.strip()) for part in coords_str.split(",")] except ValueError: continue if len(coords) != 4: continue try: bbox = _normalize_bbox(coords) except ValueError: continue key = tuple(round(c, 4) for c in bbox) if key in seen: continue description = _extract_description(response_text, match.start()) evidences.append( GroundedEvidence( step_index=default_step_index, bbox=bbox, description=description, confidence=None, raw_source={"bbox": coords, "description": description}, ) ) seen.add(key) return evidences def parse_structured_reasoning(response_text: str, max_steps: int) -> List[ReasoningStep]: """Parse Qwen3-VL structured reasoning output into dataclasses.""" try: payload = _load_first_json(response_text) except ValueError as json_error: steps = _parse_reasoning_from_text(response_text, max_steps=max_steps) if steps: return _prune_steps(steps)[:max_steps] raise json_error if not isinstance(payload, list): raise ValueError("Structured reasoning response must be a JSON list.") steps: List[ReasoningStep] = [] for idx, item in enumerate(payload, start=1): if not isinstance(item, dict): continue statement = item.get("statement") or item.get("step") or item.get("text") if not isinstance(statement, str): continue statement = statement.strip() if not statement: continue step_index = item.get("index") if not isinstance(step_index, int): step_index = idx needs_vision = _to_bool(item.get("needs_vision") or item.get("requires_vision")) reason = item.get("reason") or item.get("justification") if isinstance(reason, str): reason = reason.strip() or None else: reason = None steps.append(ReasoningStep(index=step_index, statement=statement, needs_vision=needs_vision, reason=reason)) if len(steps) >= max_steps: break steps = _prune_steps(steps)[:max_steps] if not steps: raise ValueError("No reasoning steps parsed from response.") return steps def _normalize_bbox(bbox: Any) -> tuple[float, float, float, float]: if not isinstance(bbox, (list, tuple)) or len(bbox) != 4: raise ValueError(f"Bounding box must be a list of 4 numbers, got {bbox!r}") coords = [] for raw in bbox: if isinstance(raw, str): raw = raw.strip() if not raw: raw = 0 else: raw = float(raw) elif isinstance(raw, (int, float)): raw = float(raw) else: raw = 0.0 coords.append(raw) scale = max(abs(v) for v in coords) if coords else 1.0 if scale > 1.5: # assume 0..1000 or pixel coordinates coords = [max(0.0, min(v / 1000.0, 1.0)) for v in coords] else: coords = [max(0.0, min(v, 1.0)) for v in coords] x1, y1, x2, y2 = coords x_min, x_max = sorted((x1, x2)) y_min, y_max = sorted((y1, y2)) return (x_min, y_min, x_max, y_max) def parse_roi_evidence(response_text: str, default_step_index: int) -> List[GroundedEvidence]: """Parse ROI grounding output into evidence structures.""" try: payload = _load_first_json(response_text) except ValueError: return _parse_roi_from_text(response_text, default_step_index=default_step_index) if not isinstance(payload, list): raise ValueError("ROI extraction response must be a JSON list.") evidences: List[GroundedEvidence] = [] for item in payload: if not isinstance(item, dict): continue raw_bbox = item.get("bbox") or item.get("bbox_2d") or item.get("box") if raw_bbox is None: continue try: bbox = _normalize_bbox(raw_bbox) except ValueError: continue step_index = item.get("step") or item.get("step_index") or default_step_index if not isinstance(step_index, int): step_index = default_step_index description = item.get("description") or item.get("caption") or item.get("detail") if isinstance(description, str): description = description.strip() or None else: description = None confidence = item.get("confidence") or item.get("score") or item.get("probability") if isinstance(confidence, str): confidence = confidence.strip() confidence = float(confidence) if confidence else None elif isinstance(confidence, (int, float)): confidence = float(confidence) else: confidence = None evidences.append( GroundedEvidence( step_index=step_index, bbox=bbox, description=description, confidence=confidence, raw_source=item, ) ) return evidences