import json
import math
import time
import hashlib
from pathlib import Path
from datetime import datetime, timezone
from typing import Any, Dict, List, Tuple
import gradio as gr
import requests
from huggingface_hub import HfApi, hf_hub_download
# -----------------------------------------------------------------------------
# CROVIA — CEP TERMINAL v2: EVIDENCE MACHINE
# World's first AI forensic evidence console
# Temporal proof + cryptographic anchoring + regulatory mapping + citation
# -----------------------------------------------------------------------------
CEP_DATASET_ID = "Crovia/cep-capsules"
REGISTRY_URL = "https://registry.croviatrust.com"
OPEN_EVIDENCE_MODE = True
# --- Caches ---
_CACHE = {
"tpa": {"ts": 0.0, "data": None},
"lineage": {"ts": 0.0, "data": None},
"outreach": {"ts": 0.0, "data": None},
"capsules": {"ts": 0.0, "data": []},
}
_TTL = 300 # 5 min
def _now():
return time.time()
def _nowz():
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _sha256_hex(b: bytes) -> str:
return hashlib.sha256(b).hexdigest()
def _canonical_json(obj):
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
def _canonical_hash(obj):
return _sha256_hex(_canonical_json(obj).encode("utf-8"))
# --- Registry Data Fetchers ---
def _fetch_cached(key, url):
now = _now()
if now - _CACHE[key]["ts"] < _TTL and _CACHE[key]["data"] is not None:
return _CACHE[key]["data"]
try:
resp = requests.get(url, timeout=8)
data = resp.json()
_CACHE[key]["ts"] = now
_CACHE[key]["data"] = data
return data
except Exception:
return _CACHE[key]["data"] or {}
def fetch_tpa():
return _fetch_cached("tpa", f"{REGISTRY_URL}/registry/data/tpa_latest.json")
def fetch_lineage():
return _fetch_cached("lineage", f"{REGISTRY_URL}/registry/data/lineage_graph.json")
def fetch_outreach():
return _fetch_cached("outreach", f"{REGISTRY_URL}/registry/data/outreach_status.json")
def _list_capsules() -> List[str]:
now = _now()
if (now - _CACHE["capsules"]["ts"]) < _TTL and _CACHE["capsules"]["data"]:
return _CACHE["capsules"]["data"]
items = []
try:
files = HfApi().list_repo_files(repo_id=CEP_DATASET_ID, repo_type="dataset")
for f in files:
if f.endswith(".json") and f.startswith("CEP-") and not f.lower().endswith("index.json"):
items.append(Path(f).stem)
items = sorted(set(items))[:350]
except Exception:
items = []
_CACHE["capsules"]["ts"] = now
_CACHE["capsules"]["data"] = items
return items
# --- Evidence Computation ---
def get_model_tpa(model_id: str) -> dict:
"""Get TPA data for a specific model."""
tpa = fetch_tpa()
tpas = tpa.get("tpas", [])
for t in tpas:
if t.get("model_id", "").lower() == model_id.lower():
return t
return {}
def get_model_lineage(model_id: str) -> dict:
"""Get lineage node for a model."""
lg = fetch_lineage()
for node in lg.get("nodes", []):
if node.get("id", "").lower() == model_id.lower():
return node
return {}
def get_model_outreach(model_id: str) -> dict:
"""Get outreach status for a model's org."""
org = model_id.split("/")[0] if "/" in model_id else ""
if not org:
return {}
outreach = fetch_outreach()
entries = outreach if isinstance(outreach, list) else outreach.get("entries", outreach.get("organizations", []))
if isinstance(entries, list):
for e in entries:
oid = e.get("org", e.get("organization", ""))
if oid.lower() == org.lower():
return e
return {}
def compute_evidence_strength(tpa_entry: dict) -> dict:
"""Compute evidence strength from NEC# observations."""
obs = tpa_entry.get("observations", [])
if not obs:
return {"score": 0, "total": 0, "present": 0, "absent": 0, "critical_gaps": 0}
total = len(obs)
present = sum(1 for o in obs if o.get("is_present"))
absent = total - present
critical = sum(1 for o in obs if not o.get("is_present") and o.get("severity_label") == "CRITICAL")
score = round((present / total) * 100, 1) if total > 0 else 0
return {
"score": score,
"total": total,
"present": present,
"absent": absent,
"critical_gaps": critical,
}
def compute_peer_context(model_id: str) -> dict:
"""Compute peer comparison context."""
tpa = fetch_tpa()
tpas = tpa.get("tpas", [])
org = model_id.split("/")[0] if "/" in model_id else ""
all_scores = []
org_scores = []
for t in tpas:
obs = t.get("observations", [])
if not obs:
continue
s = sum(1 for o in obs if o.get("is_present")) / len(obs) * 100
all_scores.append(s)
tid = t.get("model_id", "")
if "/" in tid and tid.split("/")[0].lower() == org.lower():
org_scores.append(s)
return {
"industry_avg": round(sum(all_scores) / len(all_scores), 1) if all_scores else 0,
"org_avg": round(sum(org_scores) / len(org_scores), 1) if org_scores else 0,
"total_models": len(all_scores),
"org_models": len(org_scores),
}
# --- Main Evidence Function ---
def generate_evidence(model_id: str) -> str:
"""Generate complete forensic evidence package for a model."""
model_id = (model_id or "").split("|")[0].strip()
if not model_id:
return ""
tpa_entry = get_model_tpa(model_id)
lineage = get_model_lineage(model_id)
outreach = get_model_outreach(model_id)
strength = compute_evidence_strength(tpa_entry)
peer = compute_peer_context(model_id)
tpa_data = fetch_tpa()
chain_height = tpa_data.get("chain_height", 0)
# Build observations detail
obs_detail = []
jurisdictions = set()
for o in tpa_entry.get("observations", []):
obs_detail.append({
"nec_id": o.get("necessity_id", ""),
"name": o.get("necessity_name", ""),
"present": o.get("is_present", False),
"severity": o.get("severity_label", ""),
"jurisdictions": o.get("jurisdictions_affected", 0),
"jurisdiction_hints": o.get("jurisdictions_hint", []),
"commitment_x": o.get("commitment_x", ""),
"commitment_y": o.get("commitment_y", ""),
})
for j in o.get("jurisdictions_hint", []):
jurisdictions.add(j)
# Trust level
if strength["score"] >= 80:
trust = "GREEN"
elif strength["score"] >= 40:
trust = "YELLOW"
else:
trust = "RED"
org = model_id.split("/")[0] if "/" in model_id else "unknown"
# Citation text
citation = (
f"As of {_nowz()}, model {model_id} published by {org} "
f"has been monitored by the Crovia Temporal Proof Registry. "
f"{strength['absent']}/{strength['total']} NEC# documentation requirements "
f"remain absent ({strength['critical_gaps']} critical). "
f"Cryptographic anchor: chain height {chain_height}, "
f"TPA-ID {tpa_entry.get('tpa_id', 'N/A')}. "
f"Source: registry.croviatrust.com"
)
payload = {
"model_id": model_id,
"org": org,
"timestamp": _nowz(),
"found": bool(tpa_entry),
"trust_level": trust if tpa_entry else "UNKNOWN",
"tpa_id": tpa_entry.get("tpa_id", ""),
"chain_height": chain_height,
"strength": strength,
"peer": peer,
"observations": obs_detail,
"jurisdictions": sorted(jurisdictions),
"lineage": {
"compliance_score": lineage.get("compliance_score"),
"severity": lineage.get("severity"),
"nec_absent": lineage.get("nec_absent"),
"card_length": lineage.get("card_length"),
} if lineage else None,
"outreach": {
"status": outreach.get("status", outreach.get("outreach_status", "unknown")),
"contacted": outreach.get("contacted", outreach.get("discussion_sent", False)),
"response": outreach.get("response", outreach.get("response_received", False)),
} if outreach else None,
"citation": citation,
}
return json.dumps(payload, ensure_ascii=False)
# --- Startup data ---
def get_targets_list() -> str:
"""Get list of all monitored targets for autocomplete."""
tpa = fetch_tpa()
tpas = tpa.get("tpas", [])
targets = []
for t in tpas:
mid = t.get("model_id", "")
if mid:
obs = t.get("observations", [])
absent = sum(1 for o in obs if not o.get("is_present"))
targets.append({"id": mid, "gaps": absent})
targets.sort(key=lambda x: -x["gaps"])
return json.dumps(targets, ensure_ascii=False)
def get_registry_stats() -> str:
"""Get registry-wide stats for the header."""
tpa = fetch_tpa()
tpas = tpa.get("tpas", [])
lg = fetch_lineage()
models = set()
orgs = set()
total_gaps = 0
for t in tpas:
mid = t.get("model_id", "")
models.add(mid)
if "/" in mid:
orgs.add(mid.split("/")[0])
for o in t.get("observations", []):
if not o.get("is_present"):
total_gaps += 1
return json.dumps({
"models": len(models),
"orgs": len(orgs),
"chain_height": tpa.get("chain_height", 0),
"total_gaps": total_gaps,
"lineage_nodes": len(lg.get("nodes", [])),
}, ensure_ascii=False)
# --- Live Capsule Generator ---
def generate_live_capsule(model_id: str) -> str:
"""Generate a live evidence capsule from TPA registry data."""
try:
model_id = (model_id or "").split("|")[0].strip()
if not model_id:
return ""
tpa_entry = get_model_tpa(model_id)
if not tpa_entry:
return json.dumps({"error": f"No TPA data found for {model_id}"})
lineage = get_model_lineage(model_id)
outreach = get_model_outreach(model_id)
strength = compute_evidence_strength(tpa_entry)
tpa_data = fetch_tpa()
chain_height = tpa_data.get("chain_height", 0)
org = model_id.split("/")[0] if "/" in model_id else "unknown"
ts = _nowz()
# Build observations with full commitment data
observations = []
for o in tpa_entry.get("observations", []):
observations.append({
"necessity_id": o.get("necessity_id", ""),
"necessity_name": o.get("necessity_name", ""),
"is_present": o.get("is_present", False),
"severity_label": o.get("severity_label", ""),
"severity_weight": o.get("severity_weight", 0),
"jurisdictions_affected": o.get("jurisdictions_affected", 0),
"commitment": {
"x": o.get("commitment_x", ""),
"y": o.get("commitment_y", ""),
},
})
# Compute capsule content hash
capsule_content = {
"model_id": model_id,
"tpa_id": tpa_entry.get("tpa_id", ""),
"observations": observations,
"chain_height": chain_height,
"generated": ts,
}
content_hash = _canonical_hash(capsule_content)
# Build CRC-1-like capsule
capsule = {
"type": "live_capsule",
"schema": "crovia_evidence_capsule.v2",
"capsule_id": f"CEP-LIVE-{content_hash[:8].upper()}",
"generated": ts,
"mode": "OPEN_EVIDENCE",
"model": {
"model_id": model_id,
"organization": org,
"source": "huggingface.co",
},
"evidence": {
"tpa_id": tpa_entry.get("tpa_id", ""),
"chain_height": chain_height,
"anchor_hash": tpa_entry.get("anchor_hash", ""),
"epoch": tpa_entry.get("epoch", ""),
"observation_count": len(observations),
"present_count": strength["present"],
"absent_count": strength["absent"],
"critical_gaps": strength["critical_gaps"],
"evidence_strength": strength["score"],
"observations": observations,
},
"cryptographic_anchors": {
"type": "pedersen_secp256k1",
"commitment_count": sum(1 for o in observations if o["commitment"]["x"]),
"chain_type": "sha256_temporal",
"chain_height": chain_height,
"anchor_hash": tpa_entry.get("anchor_hash", ""),
"merkle_root": tpa_entry.get("merkle_root", ""),
},
"lineage": {
"compliance_score": lineage.get("compliance_score"),
"severity": lineage.get("severity"),
"card_length": lineage.get("card_length"),
} if lineage else None,
"outreach": {
"status": outreach.get("status", outreach.get("outreach_status", "unknown")),
"contacted": outreach.get("contacted", outreach.get("discussion_sent", False)),
} if outreach else None,
"content_hash": content_hash,
"trust_level": "GREEN" if strength["score"] >= 80 else "YELLOW" if strength["score"] >= 40 else "RED",
}
return json.dumps(capsule, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": f"capsule_gen: {str(e)}"})
# --- Capsule Inspector (backward compat) ---
def fetch_capsule(cep_id: str) -> Dict[str, Any]:
path = hf_hub_download(repo_id=CEP_DATASET_ID, filename=f"{cep_id}.json", repo_type="dataset")
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def inspect_capsule(cep_id: str) -> str:
cep_id = (cep_id or "").strip()
if not cep_id:
return json.dumps({"error": "empty"})
try:
cap = fetch_capsule(cep_id)
schema = cap.get("schema", "unknown")
model = cap.get("model", {})
model_id = model.get("model_id", "unknown") if isinstance(model, dict) else "unknown"
evidence = cap.get("evidence", {}) if isinstance(cap.get("evidence"), dict) else {}
meta = cap.get("meta", {}) if isinstance(cap.get("meta"), dict) else {}
hashchain_root = meta.get("hashchain_sha256", "")
sig_present = "signature" in cap
cap_sha = _sha256_hex(_canonical_json(cap).encode("utf-8"))
return json.dumps({
"type": "capsule",
"cep_id": cep_id,
"schema": schema,
"model_id": model_id,
"evidence_nodes": len(evidence),
"signature": sig_present,
"hashchain": bool(hashchain_root),
"hashchain_short": hashchain_root[:16] if hashchain_root else "",
"capsule_sha256": cap_sha,
"evidence_keys": list(evidence.keys())[:20],
}, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": f"{type(e).__name__}: {e}"})
# =============================================================================
# CSS
# =============================================================================
CSS = """
"""
# =============================================================================
# HTML
# =============================================================================
UI_HTML = """
C
Crovia · Evidence Machine
AI Forensic Evidence Console
LIVE · — models · chain:—
⌘
INVESTIGATING:—
—
Trust Level
—
Evidence Strength
—
Chain Height
—
NEC# Gaps
—
Jurisdictions
0%
Evidence Coverage
NEC# Constellation
—
NEC# Element Grid
20 elements
Forensic Report
LIVE
Select a model to generate forensic evidence report.
Jurisdictions & Outreach
Applicable Jurisdictions
—
Outreach Status
Select a model to see outreach status
Peer Context
—
Citation Engine
LEGAL-GRADE
Select a model to generate a citation-ready evidence statement.
Evidence Package ready — Download the complete forensic evidence as JSON for legal, journalistic, or audit use.
LIVE EVIDENCE CAPSULES— models
Each monitored model has a live evidence capsule generated from the Temporal Proof Registry.
Contains NEC# observations, Pedersen commitments, chain anchors, and trust assessment.
Select a model below to inspect its capsule.
SELECT MODEL
Live Capsule Inspector
LIVE
Select a model above to generate and inspect its live evidence capsule.
Each capsule contains:
- schema identifier (crovia_evidence_capsule.v2)
- model metadata (ID, organization, source)
- NEC# observations (20 documentation requirements)
- Pedersen commitments (cryptographic anchors per observation)
- temporal chain data (height, anchor hash, merkle root)
- trust level assessment (GREEN/YELLOW/RED)
- content hash (SHA-256 of capsule content)
Capsules are generated in real-time from the Crovia Temporal Proof Registry.
\u25b8Reference Capsules (CRC-1 Format)
— capsules
Static reference capsules from Crovia/cep-capsules.
These are offline-verifiable CRC-1 evidence envelopes with schema, fingerprint, receipts, hashchain, and signatures.
📦
Reference Capsule Inspector
CRC-1
Select a reference capsule to inspect.
CROVIA EVIDENCE MACHINE · Observation, not judgment. All data derived from publicly observable artifacts.
No model audit. No legal claim. Presence/absence only. Cryptographic commitments anchor observations immutably.
Source: registry.croviatrust.com
· Crovia/cep-capsules