from __future__ import annotations import logging import itertools from dataclasses import dataclass from typing import Callable, Dict, List, Optional, Tuple try: import spaces # type: ignore except ImportError: # pragma: no cover - spaces library only on HF Spaces spaces = None # type: ignore from PIL import Image, ImageDraw from .cli import DEFAULT_MODEL_ID from .pipeline import CoRGIPipeline, PipelineResult from .qwen_client import Qwen3VLClient, QwenGenerationConfig from .types import GroundedEvidence, PromptLog @dataclass class PipelineState: model_id: str pipeline: Optional[CoRGIPipeline] _PIPELINE_CACHE: dict[str, CoRGIPipeline] = {} _GLOBAL_FACTORY: Callable[[Optional[str]], CoRGIPipeline] | None = None logger = logging.getLogger("corgi.gradio_app") MAX_UI_STEPS = 6 GALLERY_MAX_DIM = 768 EVIDENCE_COLORS: Tuple[Tuple[int, int, int], ...] = ( (244, 67, 54), # red (255, 193, 7), # amber (76, 175, 80), # green (33, 150, 243), # blue (156, 39, 176), # purple (255, 87, 34), # deep orange ) try: _THUMBNAIL_RESAMPLE = Image.Resampling.LANCZOS # type: ignore[attr-defined] except AttributeError: # pragma: no cover - Pillow < 9.1 _THUMBNAIL_RESAMPLE = Image.LANCZOS # type: ignore def _default_factory(model_id: Optional[str]) -> CoRGIPipeline: config = QwenGenerationConfig(model_id=model_id or DEFAULT_MODEL_ID) return CoRGIPipeline(vlm_client=Qwen3VLClient(config=config)) def _warm_default_pipeline() -> None: if DEFAULT_MODEL_ID in _PIPELINE_CACHE: return try: logger.info("Preloading default pipeline for model_id=%s", DEFAULT_MODEL_ID) _PIPELINE_CACHE[DEFAULT_MODEL_ID] = _default_factory(DEFAULT_MODEL_ID) except Exception as exc: # pragma: no cover - defensive logger.exception("Failed to preload default model %s: %s", DEFAULT_MODEL_ID, exc) _GLOBAL_FACTORY = _default_factory # type: ignore[assignment] _warm_default_pipeline() def _get_pipeline(model_id: str, factory: Callable[[Optional[str]], CoRGIPipeline]) -> CoRGIPipeline: pipeline = _PIPELINE_CACHE.get(model_id) if pipeline is None: logger.info("Creating new pipeline for model_id=%s", model_id) pipeline = factory(model_id) _PIPELINE_CACHE[model_id] = pipeline else: logger.debug("Reusing cached pipeline for model_id=%s", model_id) return pipeline # @spaces.GPU(duration=120) def _execute_pipeline( image: Image.Image, question: str, max_steps: int, max_regions: int, model_id: str, ) -> PipelineResult: factory = _GLOBAL_FACTORY or _default_factory pipeline = _get_pipeline(model_id, factory) logger.info( "Executing pipeline for model_id=%s | max_steps=%s | max_regions=%s", model_id, max_steps, max_regions, ) return pipeline.run( image=image, question=question, max_steps=max_steps, max_regions=max_regions, ) def _group_evidence_by_step(evidences: List[GroundedEvidence]) -> Dict[int, List[GroundedEvidence]]: grouped: Dict[int, List[GroundedEvidence]] = {} for ev in evidences: grouped.setdefault(ev.step_index, []).append(ev) return grouped def _format_evidence_caption(evidence: GroundedEvidence) -> str: bbox_str = ", ".join(f"{coord:.2f}" for coord in evidence.bbox) parts = [f"Step {evidence.step_index}"] if evidence.description: parts.append(evidence.description) if evidence.confidence is not None: parts.append(f"Confidence: {evidence.confidence:.2f}") parts.append(f"BBox: ({bbox_str})") return "\n".join(parts) def _annotate_evidence_image( image: Image.Image, evidence: GroundedEvidence, color: Tuple[int, int, int], ) -> Image.Image: base = image.copy().convert("RGBA") overlay = Image.new("RGBA", base.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(overlay) width, height = base.size x1 = max(0, min(int(evidence.bbox[0] * width), width - 1)) y1 = max(0, min(int(evidence.bbox[1] * height), height - 1)) x2 = max(0, min(int(evidence.bbox[2] * width), width - 1)) y2 = max(0, min(int(evidence.bbox[3] * height), height - 1)) x1, x2 = sorted((x1, x2)) y1, y2 = sorted((y1, y2)) outline_width = max(2, int(min(width, height) * 0.005)) rgba_color = color + (255,) fill_color = color + (64,) draw.rectangle([x1, y1, x2, y2], fill=fill_color, outline=rgba_color, width=outline_width) annotated = Image.alpha_composite(base, overlay).convert("RGB") if max(annotated.size) > GALLERY_MAX_DIM: annotated.thumbnail((GALLERY_MAX_DIM, GALLERY_MAX_DIM), _THUMBNAIL_RESAMPLE) return annotated def _empty_ui_payload(message: str) -> Dict[str, object]: placeholder_prompt = f"```text\n{message}\n```" return { "answer_markdown": f"### Final Answer\n{message}", "chain_markdown": message, "chain_prompt": placeholder_prompt, "roi_overview": None, "roi_gallery": [], "roi_prompt": placeholder_prompt, "evidence_markdown": message, "evidence_prompt": placeholder_prompt, "answer_process_markdown": message, "answer_prompt": placeholder_prompt, "timing_markdown": message, } def _annotate_overview_image(image: Image.Image, evidences: List[GroundedEvidence]) -> Optional[Image.Image]: if not evidences: return None base = image.copy().convert("RGBA") overlay = Image.new("RGBA", base.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(overlay) width, height = base.size step_colors: Dict[int, Tuple[int, int, int]] = {} color_cycle = itertools.cycle(EVIDENCE_COLORS) for ev in evidences: color = step_colors.setdefault(ev.step_index, next(color_cycle)) x1 = max(0, min(int(ev.bbox[0] * width), width - 1)) y1 = max(0, min(int(ev.bbox[1] * height), height - 1)) x2 = max(0, min(int(ev.bbox[2] * width), width - 1)) y2 = max(0, min(int(ev.bbox[3] * height), height - 1)) x1, x2 = sorted((x1, x2)) y1, y2 = sorted((y1, y2)) outline_width = max(2, int(min(width, height) * 0.005)) rgba_color = color + (255,) fill_color = color + (60,) draw.rectangle([x1, y1, x2, y2], outline=rgba_color, width=outline_width) label = f"S{ev.step_index}" draw.text((x1 + 4, y1 + 4), label, fill=rgba_color) annotated = Image.alpha_composite(base, overlay).convert("RGB") if max(annotated.size) > GALLERY_MAX_DIM: annotated.thumbnail((GALLERY_MAX_DIM, GALLERY_MAX_DIM), _THUMBNAIL_RESAMPLE) return annotated def _format_prompt_markdown(log: Optional[PromptLog], title: str) -> str: if log is None: return f"**{title} Prompt**\n_Prompt unavailable._" lines = [f"**{title} Prompt**", "```text", log.prompt, "```"] if log.response: lines.extend(["**Model Response**", "```text", log.response, "```"]) return "\n".join(lines) def _format_grounding_prompts(logs: List[PromptLog]) -> str: if not logs: return "_No ROI prompts available._" blocks: List[str] = [] for log in logs: heading = f"#### Step {log.step_index}" if log.step_index is not None else "#### ROI Prompt" sections = [heading, "**Prompt**", "```text", log.prompt, "```"] if log.response: sections.extend(["**Model Response**", "```text", log.response, "```"]) blocks.append("\n".join(sections)) return "\n\n".join(blocks) def _prepare_ui_payload( image: Image.Image, result: PipelineResult, max_slots: int = MAX_UI_STEPS, ) -> Dict[str, object]: answer_text = f"### Final Answer\n{result.answer or '(no answer returned)'}" step_lines: List[str] = [] evidences_by_step = _group_evidence_by_step(result.evidence) for step in result.steps[:max_slots]: lines = [ f"**Step {step.index}:** {step.statement}", f"- Needs vision: {'yes' if step.needs_vision else 'no'}", ] if step.reason: lines.append(f"- Reason: {step.reason}") evs = evidences_by_step.get(step.index, []) if evs: lines.append(f"- Visual evidence items: {len(evs)}") else: lines.append("- No visual evidence returned for this step.") step_lines.append("\n".join(lines)) if len(result.steps) > max_slots: step_lines.append(f"_Only the first {max_slots} steps are shown._") chain_markdown = "\n\n".join(step_lines) if step_lines else "_No reasoning steps returned._" roi_overview = _annotate_overview_image(image, result.evidence) aggregated_gallery: List[Tuple[Image.Image, str]] = [] for idx, evidence in enumerate(result.evidence): color = EVIDENCE_COLORS[idx % len(EVIDENCE_COLORS)] annotated = _annotate_evidence_image(image, evidence, color) aggregated_gallery.append((annotated, _format_evidence_caption(evidence))) evidence_blocks: List[str] = [] for idx, evidence in enumerate(result.evidence, start=1): bbox = ", ".join(f"{coord:.2f}" for coord in evidence.bbox) desc = evidence.description or "(no description)" conf = f"Confidence: {evidence.confidence:.2f}" if evidence.confidence is not None else "Confidence: n/a" evidence_blocks.append( f"**Evidence {idx} — Step {evidence.step_index}**\n- {desc}\n- {conf}\n- BBox: ({bbox})" ) evidence_markdown = "\n\n".join(evidence_blocks) if evidence_blocks else "_No visual evidence collected._" reasoning_prompt_md = _format_prompt_markdown(result.reasoning_log, "Reasoning") roi_prompt_md = _format_grounding_prompts(result.grounding_logs) evidence_prompt_md = roi_prompt_md if result.grounding_logs else "_No ROI prompts available._" answer_prompt_md = _format_prompt_markdown(result.answer_log, "Answer Synthesis") answer_process_lines = [ f"**Question:** {result.question}", f"**Final Answer:** {result.answer or '(no answer returned)'}", f"**Steps considered:** {len(result.steps)}", f"**Visual evidence items:** {len(result.evidence)}", ] answer_process_markdown = "\n".join(answer_process_lines) timing_lines: List[str] = [] if result.timings: total_entry = next((t for t in result.timings if t.name == "total_pipeline"), None) if total_entry: timing_lines.append(f"**Total pipeline:** {total_entry.duration_ms/1000:.2f} s") for timing in result.timings: if timing.name == "total_pipeline": continue label = timing.name.replace("_", " ") if timing.step_index is not None: label += f" (step {timing.step_index})" timing_lines.append(f"- {label}: {timing.duration_ms/1000:.2f} s") timing_markdown = "\n".join(timing_lines) if timing_lines else "_No timing data available._" return { "answer_markdown": answer_text, "chain_markdown": chain_markdown, "chain_prompt": reasoning_prompt_md, "roi_overview": roi_overview, "roi_gallery": aggregated_gallery, "roi_prompt": roi_prompt_md, "evidence_markdown": evidence_markdown, "evidence_prompt": evidence_prompt_md, "answer_process_markdown": answer_process_markdown, "answer_prompt": answer_prompt_md, "timing_markdown": timing_markdown, } if spaces is not None: @spaces.GPU(duration=120) # type: ignore[attr-defined] def _execute_pipeline_gpu( image: Image.Image, question: str, max_steps: int, max_regions: int, model_id: str, ) -> PipelineResult: logger.debug("Running GPU-decorated pipeline.") return _execute_pipeline(image, question, max_steps, max_regions, model_id) else: def _execute_pipeline_gpu( image: Image.Image, question: str, max_steps: int, max_regions: int, model_id: str, ) -> PipelineResult: return _execute_pipeline(image, question, max_steps, max_regions, model_id) def ensure_pipeline_state( previous: Optional[PipelineState], model_id: Optional[str], factory: Callable[[Optional[str]], CoRGIPipeline] | None = None, ) -> PipelineState: target_model = model_id or DEFAULT_MODEL_ID factory = factory or _default_factory if previous is not None and previous.model_id == target_model: return previous pipeline = factory(target_model) return PipelineState(model_id=target_model, pipeline=pipeline) def format_result_markdown(result: PipelineResult) -> str: lines: list[str] = [] lines.append("### Answer") lines.append(result.answer or "(no answer returned)") lines.append("") lines.append("### Reasoning Steps") if result.steps: for step in result.steps: needs = "yes" if step.needs_vision else "no" reason = f" — {step.reason}" if step.reason else "" lines.append(f"- **Step {step.index}**: {step.statement} _(needs vision: {needs})_{reason}") else: lines.append("- No reasoning steps returned.") lines.append("") lines.append("### Visual Evidence") if result.evidence: for ev in result.evidence: bbox = ", ".join(f"{coord:.2f}" for coord in ev.bbox) desc = ev.description or "(no description)" conf = f" — confidence {ev.confidence:.2f}" if ev.confidence is not None else "" lines.append(f"- Step {ev.step_index}: bbox=({bbox}) — {desc}{conf}") else: lines.append("- No visual evidence collected.") return "\n".join(lines) def _run_pipeline( state: Optional[PipelineState], image: Image.Image | None, question: str, max_steps: int, max_regions: int, model_id: Optional[str], ) -> tuple[PipelineState, Dict[str, object]]: target_model = (model_id or DEFAULT_MODEL_ID).strip() or DEFAULT_MODEL_ID cached_pipeline = _PIPELINE_CACHE.get(target_model) base_state = state or PipelineState(model_id=target_model, pipeline=cached_pipeline) if image is None: logger.info("Request skipped: no image provided.") return base_state, _empty_ui_payload("Please provide an image before running the demo.") if not question.strip(): logger.info("Request skipped: question empty.") return base_state, _empty_ui_payload("Please enter a question before running the demo.") logger.info("Received request for model_id=%s", target_model) rgb_image = image.convert("RGB") try: result = _execute_pipeline_gpu( image=rgb_image, question=question.strip(), max_steps=int(max_steps), max_regions=int(max_regions), model_id=target_model, ) except Exception as exc: # pragma: no cover - defensive error handling logger.exception("Pipeline execution failed: %s", exc) return PipelineState(model_id=target_model, pipeline=_PIPELINE_CACHE.get(target_model)), _empty_ui_payload( f"Pipeline error: {exc}" ) new_state = PipelineState(model_id=target_model, pipeline=_PIPELINE_CACHE.get(target_model)) payload = _prepare_ui_payload(rgb_image, result, MAX_UI_STEPS) return new_state, payload def build_demo( pipeline_factory: Callable[[Optional[str]], CoRGIPipeline] | None = None, ) -> "gradio.Blocks": try: import gradio as gr except ImportError as exc: # pragma: no cover - exercised when gradio missing raise RuntimeError("Gradio is required to build the demo. Install gradio>=4.0.") from exc factory = pipeline_factory or _default_factory global _GLOBAL_FACTORY _GLOBAL_FACTORY = factory logger.info("Registering pipeline factory %s", factory) try: logger.info("Preloading pipeline with factory for model_id=%s", DEFAULT_MODEL_ID) _PIPELINE_CACHE[DEFAULT_MODEL_ID] = factory(DEFAULT_MODEL_ID) except Exception as exc: # pragma: no cover - defensive logger.exception("Unable to preload pipeline via factory: %s", exc) with gr.Blocks(title="CoRGI Qwen3-VL Demo") as demo: state = gr.State() # stores PipelineState with gr.Row(): with gr.Column(scale=1, min_width=320): image_input = gr.Image(label="Input image", type="pil") question_input = gr.Textbox(label="Question", placeholder="What is happening in the image?", lines=2) model_id_input = gr.Textbox( label="Model ID", value=DEFAULT_MODEL_ID, placeholder="Leave blank to use default", ) max_steps_slider = gr.Slider( label="Max reasoning steps", minimum=1, maximum=6, step=1, value=3, ) max_regions_slider = gr.Slider( label="Max regions per step", minimum=1, maximum=6, step=1, value=3, ) run_button = gr.Button("Run CoRGI") with gr.Column(scale=1, min_width=320): answer_markdown = gr.Markdown(value="### Final Answer\nUpload an image and ask a question to begin.") with gr.Tabs(): with gr.Tab("Chain of Thought"): chain_markdown = gr.Markdown("_No reasoning steps yet._") chain_prompt = gr.Markdown("```text\nAwaiting prompt...\n```") with gr.Tab("ROI Extraction"): roi_overview_image = gr.Image(label="Annotated image", value=None) roi_gallery = gr.Gallery( label="Evidence gallery", columns=2, height=280, allow_preview=True, ) roi_prompt_markdown = gr.Markdown("```text\nAwaiting ROI prompts...\n```") with gr.Tab("Evidence Descriptions"): evidence_markdown = gr.Markdown("_No visual evidence collected._") evidence_prompt_markdown = gr.Markdown("```text\nAwaiting ROI prompts...\n```") with gr.Tab("Answer Synthesis"): answer_process_markdown = gr.Markdown("_No answer generated yet._") answer_prompt_markdown = gr.Markdown("```text\nAwaiting answer prompt...\n```") with gr.Tab("Performance"): timing_markdown = gr.Markdown("_No timing data available._") def _on_submit(state_data, image, question, model_id, max_steps, max_regions): pipeline_state = state_data if isinstance(state_data, PipelineState) else None new_state, payload = _run_pipeline( pipeline_state, image, question, int(max_steps), int(max_regions), model_id if model_id else None, ) return [ new_state, payload["answer_markdown"], payload["chain_markdown"], payload["chain_prompt"], payload["roi_overview"], payload["roi_gallery"], payload["roi_prompt"], payload["evidence_markdown"], payload["evidence_prompt"], payload["answer_process_markdown"], payload["answer_prompt"], payload["timing_markdown"], ] output_components = [ state, answer_markdown, chain_markdown, chain_prompt, roi_overview_image, roi_gallery, roi_prompt_markdown, evidence_markdown, evidence_prompt_markdown, answer_process_markdown, answer_prompt_markdown, timing_markdown, ] run_button.click( fn=_on_submit, inputs=[state, image_input, question_input, model_id_input, max_steps_slider, max_regions_slider], outputs=output_components, ) return demo def launch_demo( *, pipeline_factory: Callable[[Optional[str]], CoRGIPipeline] | None = None, **launch_kwargs, ) -> None: demo = build_demo(pipeline_factory=pipeline_factory) demo.launch(**launch_kwargs) __all__ = [ "PipelineState", "ensure_pipeline_state", "format_result_markdown", "build_demo", "launch_demo", "DEFAULT_MODEL_ID", ]