Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import json | |
| import re | |
| from typing import Any, Iterable, List | |
| from .types import GroundedEvidence, ReasoningStep | |
| _JSON_FENCE_RE = re.compile(r"```(?:json)?(.*?)```", re.DOTALL | re.IGNORECASE) | |
| _STEP_MARKER_RE = re.compile(r"(?im)(?:^|\n)\s*(?:step\s*(\d+)|(\d+)[\.\)])\s*[:\-]?\s*") | |
| _NEEDS_VISION_RE = re.compile( | |
| r"needs[\s_]*vision\s*[:\-]?\s*(?P<value>true|false|yes|no|required|not required|necessary|unnecessary)", | |
| re.IGNORECASE, | |
| ) | |
| _REASON_RE = re.compile(r"reason\s*[:\-]\s*(?P<value>.+)", re.IGNORECASE) | |
| _BOX_RE = re.compile( | |
| r"\[\s*-?\d+(?:\.\d+)?\s*,\s*-?\d+(?:\.\d+)?\s*,\s*-?\d+(?:\.\d+)?\s*,\s*-?\d+(?:\.\d+)?\s*\]" | |
| ) | |
| _ORDINAL_WORD_MAP = { | |
| "first": 1, | |
| "second": 2, | |
| "third": 3, | |
| "fourth": 4, | |
| "fifth": 5, | |
| "sixth": 6, | |
| "seventh": 7, | |
| "eighth": 8, | |
| "ninth": 9, | |
| "tenth": 10, | |
| } | |
| _NUMBER_WORD_MAP = { | |
| "one": 1, | |
| "two": 2, | |
| "three": 3, | |
| "four": 4, | |
| "five": 5, | |
| "six": 6, | |
| "seven": 7, | |
| "eight": 8, | |
| "nine": 9, | |
| "ten": 10, | |
| } | |
| _ORDINAL_STEP_RE = re.compile( | |
| r"(?im)\b(?P<word>first|second|third|fourth|fifth|sixth|seventh|eighth|ninth|tenth)\s+step\b" | |
| ) | |
| _WORD_STEP_RE = re.compile( | |
| r"(?im)\bstep\s+(?P<word>one|two|three|four|five|six|seven|eight|nine|ten)\b" | |
| ) | |
| _META_TOKENS = {"maybe", "wait", "let's", "lets", "question", "protocol"} | |
| def _to_bool(value: Any) -> bool: | |
| if isinstance(value, bool): | |
| return value | |
| if value is None: | |
| return False | |
| if isinstance(value, (int, float)): | |
| return value != 0 | |
| if isinstance(value, str): | |
| lowered = value.strip().lower() | |
| if lowered in {"true", "t", "yes", "y", "1"}: | |
| return True | |
| if lowered in {"false", "f", "no", "n", "0"}: | |
| return False | |
| return False | |
| def _extract_json_strings(text: str) -> Iterable[str]: | |
| """Return candidate JSON payloads from the response text.""" | |
| fenced = _JSON_FENCE_RE.findall(text) | |
| if fenced: | |
| for body in fenced: | |
| yield body.strip() | |
| stripped = text.strip() | |
| if stripped: | |
| yield stripped | |
| def _load_first_json(text: str) -> Any: | |
| last_error = None | |
| for candidate in _extract_json_strings(text): | |
| try: | |
| return json.loads(candidate) | |
| except json.JSONDecodeError as err: | |
| last_error = err | |
| continue | |
| if last_error: | |
| raise ValueError(f"Unable to parse JSON from response: {last_error}") from last_error | |
| raise ValueError("Empty response, cannot parse JSON.") | |
| def _trim_reasoning_text(text: str) -> str: | |
| lowered = text.lower() | |
| for anchor in ("let's draft", "draft:", "structured steps", "final reasoning"): | |
| pos = lowered.rfind(anchor) | |
| if pos != -1: | |
| return text[pos:] | |
| return text | |
| def _clean_sentence(text: str) -> str: | |
| return " ".join(text.strip().split()) | |
| def _normalize_step_markers(text: str) -> str: | |
| """Convert ordinal step markers into numeric form (e.g., 'First step' -> 'Step 1').""" | |
| def replace_ordinal(match: re.Match[str]) -> str: | |
| word = match.group("word").lower() | |
| num = _ORDINAL_WORD_MAP.get(word) | |
| return f"Step {num}" if num is not None else match.group(0) | |
| def replace_word_number(match: re.Match[str]) -> str: | |
| word = match.group("word").lower() | |
| num = _NUMBER_WORD_MAP.get(word) | |
| return f"Step {num}" if num is not None else match.group(0) | |
| normalized = _ORDINAL_STEP_RE.sub(replace_ordinal, text) | |
| normalized = _WORD_STEP_RE.sub(replace_word_number, normalized) | |
| return normalized | |
| def _extract_statement(body: str) -> str | None: | |
| statement_match = re.search(r"statement\s*[:\-]\s*(.+?)(?=\s*(?:needs\s*vision|reason\s*[:\-]|$))", body, re.IGNORECASE | re.DOTALL) | |
| if statement_match: | |
| candidate = statement_match.group(1) | |
| else: | |
| # Fallback: take first sentence or line before metadata | |
| candidate = re.split(r"(?i)needs\s*vision|reason\s*[:\-]", body)[0] | |
| # Clean up the candidate | |
| candidate = candidate.strip().rstrip(".,;:") | |
| # If still empty or too short, return None | |
| if not candidate or len(candidate) < 5: | |
| return None | |
| return _clean_sentence(candidate) | |
| def _extract_needs_vision(body: str) -> bool: | |
| match = _NEEDS_VISION_RE.search(body) | |
| if not match: | |
| return True | |
| token = match.group("value").strip().lower() | |
| if token in {"not required", "unnecessary"}: | |
| return False | |
| if token in {"required", "necessary"}: | |
| return True | |
| return _to_bool(token) | |
| def _extract_reason(body: str) -> str | None: | |
| match = _REASON_RE.search(body) | |
| if match: | |
| reason = match.group("value").strip() | |
| reason = re.split(r"(?i)needs\s*vision", reason)[0].strip() | |
| reason = reason.rstrip(".") | |
| return reason or None | |
| because_match = re.search(r"because\s+(.+?)(?:\.|$)", body, re.IGNORECASE) | |
| if because_match: | |
| reason = because_match.group(1).strip().rstrip(".") | |
| return reason or None | |
| return None | |
| def _parse_step_block(index_guess: int, body: str) -> ReasoningStep | None: | |
| statement = _extract_statement(body) | |
| if not statement: | |
| return None | |
| needs_vision = _extract_needs_vision(body) | |
| reason = _extract_reason(body) | |
| index = index_guess if index_guess > 0 else 1 | |
| return ReasoningStep(index=index, statement=statement, needs_vision=needs_vision, reason=reason) | |
| def _parse_reasoning_from_text(response_text: str, max_steps: int) -> List[ReasoningStep]: | |
| text = _trim_reasoning_text(response_text) | |
| text = _normalize_step_markers(text) | |
| matches = list(_STEP_MARKER_RE.finditer(text)) | |
| if not matches: | |
| return [] | |
| steps_map: dict[int, ReasoningStep] = {} | |
| ordering: List[int] = [] | |
| fallback_index = 1 | |
| for idx, marker in enumerate(matches): | |
| start = marker.end() | |
| end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text) | |
| body = text[start:end].strip() | |
| if not body: | |
| continue | |
| raw_index = marker.group(1) or marker.group(2) | |
| try: | |
| index_guess = int(raw_index) if raw_index else fallback_index | |
| except (TypeError, ValueError): | |
| index_guess = fallback_index | |
| if raw_index is None: | |
| fallback_index += 1 | |
| step = _parse_step_block(index_guess, body) | |
| if step is None: | |
| continue | |
| if step.index not in steps_map: | |
| ordering.append(step.index) | |
| steps_map[step.index] = step | |
| if len(ordering) >= max_steps: | |
| break | |
| return [steps_map[idx] for idx in ordering[:max_steps]] | |
| def _looks_like_meta_statement(statement: str) -> bool: | |
| lowered = statement.lower() | |
| if any(token in lowered for token in _META_TOKENS) and "step" in lowered: | |
| return True | |
| if lowered.startswith(("maybe", "wait", "let's", "lets")): | |
| return True | |
| if len(statement) > 260 and "step" in lowered: | |
| return True | |
| return False | |
| def _prune_steps(steps: List[ReasoningStep]) -> List[ReasoningStep]: | |
| filtered: List[ReasoningStep] = [] | |
| seen_statements: set[str] = set() | |
| for step in steps: | |
| normalized = step.statement.strip().lower() | |
| if _looks_like_meta_statement(step.statement): | |
| continue | |
| if normalized in seen_statements: | |
| continue | |
| seen_statements.add(normalized) | |
| filtered.append(step) | |
| return filtered or steps | |
| def _extract_description(text: str, start_index: int) -> str | None: | |
| boundary = max(text.rfind("\n", 0, start_index), text.rfind(".", 0, start_index)) | |
| if boundary == -1: | |
| boundary = 0 | |
| snippet = text[boundary:start_index].strip(" \n.:–-") | |
| if not snippet: | |
| return None | |
| return _clean_sentence(snippet) | |
| def _parse_roi_from_text(response_text: str, default_step_index: int) -> List[GroundedEvidence]: | |
| evidences: List[GroundedEvidence] = [] | |
| seen: set[tuple[float, float, float, float]] = set() | |
| for match in _BOX_RE.finditer(response_text): | |
| coords_str = match.group(0).strip("[]") | |
| try: | |
| coords = [float(part.strip()) for part in coords_str.split(",")] | |
| except ValueError: | |
| continue | |
| if len(coords) != 4: | |
| continue | |
| try: | |
| bbox = _normalize_bbox(coords) | |
| except ValueError: | |
| continue | |
| key = tuple(round(c, 4) for c in bbox) | |
| if key in seen: | |
| continue | |
| description = _extract_description(response_text, match.start()) | |
| evidences.append( | |
| GroundedEvidence( | |
| step_index=default_step_index, | |
| bbox=bbox, | |
| description=description, | |
| confidence=None, | |
| raw_source={"bbox": coords, "description": description}, | |
| ) | |
| ) | |
| seen.add(key) | |
| return evidences | |
| def parse_structured_reasoning(response_text: str, max_steps: int) -> List[ReasoningStep]: | |
| """Parse Qwen3-VL structured reasoning output into dataclasses.""" | |
| try: | |
| payload = _load_first_json(response_text) | |
| except ValueError as json_error: | |
| steps = _parse_reasoning_from_text(response_text, max_steps=max_steps) | |
| if steps: | |
| return _prune_steps(steps)[:max_steps] | |
| raise json_error | |
| if not isinstance(payload, list): | |
| raise ValueError("Structured reasoning response must be a JSON list.") | |
| steps: List[ReasoningStep] = [] | |
| for idx, item in enumerate(payload, start=1): | |
| if not isinstance(item, dict): | |
| continue | |
| statement = item.get("statement") or item.get("step") or item.get("text") | |
| if not isinstance(statement, str): | |
| continue | |
| statement = statement.strip() | |
| if not statement: | |
| continue | |
| step_index = item.get("index") | |
| if not isinstance(step_index, int): | |
| step_index = idx | |
| needs_vision = _to_bool(item.get("needs_vision") or item.get("requires_vision")) | |
| reason = item.get("reason") or item.get("justification") | |
| if isinstance(reason, str): | |
| reason = reason.strip() or None | |
| else: | |
| reason = None | |
| steps.append(ReasoningStep(index=step_index, statement=statement, needs_vision=needs_vision, reason=reason)) | |
| if len(steps) >= max_steps: | |
| break | |
| steps = _prune_steps(steps)[:max_steps] | |
| if not steps: | |
| raise ValueError("No reasoning steps parsed from response.") | |
| return steps | |
| def _normalize_bbox(bbox: Any) -> tuple[float, float, float, float]: | |
| if not isinstance(bbox, (list, tuple)) or len(bbox) != 4: | |
| raise ValueError(f"Bounding box must be a list of 4 numbers, got {bbox!r}") | |
| coords = [] | |
| for raw in bbox: | |
| if isinstance(raw, str): | |
| raw = raw.strip() | |
| if not raw: | |
| raw = 0 | |
| else: | |
| raw = float(raw) | |
| elif isinstance(raw, (int, float)): | |
| raw = float(raw) | |
| else: | |
| raw = 0.0 | |
| coords.append(raw) | |
| scale = max(abs(v) for v in coords) if coords else 1.0 | |
| if scale > 1.5: # assume 0..1000 or pixel coordinates | |
| coords = [max(0.0, min(v / 1000.0, 1.0)) for v in coords] | |
| else: | |
| coords = [max(0.0, min(v, 1.0)) for v in coords] | |
| x1, y1, x2, y2 = coords | |
| x_min, x_max = sorted((x1, x2)) | |
| y_min, y_max = sorted((y1, y2)) | |
| return (x_min, y_min, x_max, y_max) | |
| def parse_roi_evidence(response_text: str, default_step_index: int) -> List[GroundedEvidence]: | |
| """Parse ROI grounding output into evidence structures.""" | |
| try: | |
| payload = _load_first_json(response_text) | |
| except ValueError: | |
| return _parse_roi_from_text(response_text, default_step_index=default_step_index) | |
| if not isinstance(payload, list): | |
| raise ValueError("ROI extraction response must be a JSON list.") | |
| evidences: List[GroundedEvidence] = [] | |
| for item in payload: | |
| if not isinstance(item, dict): | |
| continue | |
| raw_bbox = item.get("bbox") or item.get("bbox_2d") or item.get("box") | |
| if raw_bbox is None: | |
| continue | |
| try: | |
| bbox = _normalize_bbox(raw_bbox) | |
| except ValueError: | |
| continue | |
| step_index = item.get("step") or item.get("step_index") or default_step_index | |
| if not isinstance(step_index, int): | |
| step_index = default_step_index | |
| description = item.get("description") or item.get("caption") or item.get("detail") | |
| if isinstance(description, str): | |
| description = description.strip() or None | |
| else: | |
| description = None | |
| confidence = item.get("confidence") or item.get("score") or item.get("probability") | |
| if isinstance(confidence, str): | |
| confidence = confidence.strip() | |
| confidence = float(confidence) if confidence else None | |
| elif isinstance(confidence, (int, float)): | |
| confidence = float(confidence) | |
| else: | |
| confidence = None | |
| evidences.append( | |
| GroundedEvidence( | |
| step_index=step_index, | |
| bbox=bbox, | |
| description=description, | |
| confidence=confidence, | |
| raw_source=item, | |
| ) | |
| ) | |
| return evidences | |