puyang2025's picture
update
f8d6dca
"""Generate structured explanations from evidence."""
from __future__ import annotations
from typing import List
from schemas.evidence_schema import EvidenceStore
from schemas.explanation_schema import Explanation
def _recommended_action(verdict: str) -> str:
if verdict == "benign":
return "allow"
if verdict == "suspicious":
return "warn"
return "quarantine"
def _top_signals(
hard_rules: list[str],
breakdown: list[dict[str, float]],
limit: int = 5,
) -> List[str]:
if hard_rules:
return [f"hard_rule:{code}" for code in hard_rules][:limit]
ranked = sorted(breakdown, key=lambda item: item.get("contribution", 0.0), reverse=True)
signals: list[str] = []
for item in ranked:
if item.get("contribution", 0.0) <= 0:
continue
signals.append(f"score_factor:{item.get('factor')}")
if len(signals) >= limit:
break
return signals
def build_explanation(
evidence: EvidenceStore,
verdict: str,
risk_score: int,
breakdown: list[dict[str, float]],
) -> Explanation:
"""Create a structured explanation without chain-of-thought."""
evidence_refs = {}
if evidence.header_auth:
evidence_refs["header_auth"] = evidence.header_auth.model_dump()
if evidence.url_chain:
evidence_refs["url_finals"] = [item.final_url for item in evidence.url_chain.chains]
evidence_refs["url_domains"] = [item.final_domain for item in evidence.url_chain.chains]
if evidence.domain_risk:
evidence_refs["domain_risk"] = [
{"domain": item.domain, "flags": item.risk_flags}
for item in evidence.domain_risk.items
]
if evidence.semantic:
evidence_refs["semantic"] = evidence.semantic.model_dump()
if evidence.attachment_scan:
evidence_refs["attachments"] = [item.model_dump() for item in evidence.attachment_scan.items]
top_signals = _top_signals(evidence.hard_rule_matches, breakdown)
return Explanation(
verdict=verdict,
risk_score=risk_score,
top_signals=top_signals,
recommended_action=_recommended_action(verdict),
evidence=evidence_refs,
score_breakdown=breakdown,
)