|
|
|
|
|
import os |
|
|
import json |
|
|
from typing import List, Dict, Any |
|
|
import qa_store |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
DATA_DIR = os.path.join(BASE_DIR, "data") |
|
|
|
|
|
|
|
|
MANUAL_QA_PATH = os.path.join(DATA_DIR, "manual_qa.jsonl") |
|
|
|
|
|
|
|
|
CACHE_FILENAME = "cached_embeddings.pt" |
|
|
CACHE_PATH = os.path.join(DATA_DIR, CACHE_FILENAME) |
|
|
|
|
|
DATASET_REPO_ID = "Heng2004/lao-science-qa-store" |
|
|
DATASET_FILENAME = "manual_qa.jsonl" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sync_upload_cache() -> str: |
|
|
"""Upload the cached_embeddings.pt to Hugging Face Dataset.""" |
|
|
if not DATASET_REPO_ID or "YOUR_USERNAME" in DATASET_REPO_ID: |
|
|
return "⚠️ Upload Skipped (Repo ID not set)" |
|
|
try: |
|
|
from huggingface_hub import HfApi |
|
|
api = HfApi() |
|
|
api.upload_file( |
|
|
path_or_fileobj=CACHE_PATH, |
|
|
path_in_repo=CACHE_FILENAME, |
|
|
repo_id=DATASET_REPO_ID, |
|
|
repo_type="dataset", |
|
|
commit_message="System: Updated embedding cache" |
|
|
) |
|
|
return "☁️ Cache Upload Success" |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Upload cache failed: {e}") |
|
|
return f"⚠️ Cache Upload Failed: {e}" |
|
|
|
|
|
def sync_download_cache() -> None: |
|
|
"""Download cached_embeddings.pt at startup.""" |
|
|
if not DATASET_REPO_ID: return |
|
|
try: |
|
|
from huggingface_hub import hf_hub_download |
|
|
import shutil |
|
|
downloaded_path = hf_hub_download( |
|
|
repo_id=DATASET_REPO_ID, |
|
|
filename=CACHE_FILENAME, |
|
|
repo_type="dataset", |
|
|
token=os.environ.get("HF_TOKEN") |
|
|
) |
|
|
shutil.copy(downloaded_path, CACHE_PATH) |
|
|
print("[INFO] Cache download success!") |
|
|
except Exception as e: |
|
|
print(f"[WARN] Could not download cache (First run?): {e}") |
|
|
|
|
|
def sync_upload_manual_qa() -> str: |
|
|
if not DATASET_REPO_ID or "YOUR_USERNAME" in DATASET_REPO_ID: |
|
|
return "⚠️ Upload Skipped" |
|
|
try: |
|
|
from huggingface_hub import HfApi |
|
|
api = HfApi() |
|
|
api.upload_file( |
|
|
path_or_fileobj=MANUAL_QA_PATH, |
|
|
path_in_repo=DATASET_FILENAME, |
|
|
repo_id=DATASET_REPO_ID, |
|
|
repo_type="dataset", |
|
|
commit_message="Teacher Panel: Updated Q&A data" |
|
|
) |
|
|
return "☁️ Cloud Upload Success" |
|
|
except Exception as e: |
|
|
return f"⚠️ Cloud Upload Failed: {e}" |
|
|
|
|
|
def sync_download_manual_qa() -> None: |
|
|
if not DATASET_REPO_ID: return |
|
|
try: |
|
|
from huggingface_hub import hf_hub_download |
|
|
import shutil |
|
|
downloaded_path = hf_hub_download( |
|
|
repo_id=DATASET_REPO_ID, |
|
|
filename=DATASET_FILENAME, |
|
|
repo_type="dataset", |
|
|
token=os.environ.get("HF_TOKEN") |
|
|
) |
|
|
shutil.copy(downloaded_path, MANUAL_QA_PATH) |
|
|
print("[INFO] Manual QA download success!") |
|
|
except Exception as e: |
|
|
print(f"[WARN] Could not download manual_qa.jsonl: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_curriculum() -> None: |
|
|
""" |
|
|
Recursively find and load all textbook JSONL files in data/ |
|
|
Looks for files named 'textbook.jsonl' OR starting with 'M'. |
|
|
""" |
|
|
qa_store.ENTRIES.clear() |
|
|
qa_store.AUTO_QA_KNOWLEDGE.clear() |
|
|
|
|
|
print(f"[INFO] Scanning {DATA_DIR} for textbook content...") |
|
|
|
|
|
file_count = 0 |
|
|
|
|
|
for root, dirs, files in os.walk(DATA_DIR): |
|
|
for file in files: |
|
|
|
|
|
is_textbook = file == "textbook.jsonl" or (file.startswith("M") and file.endswith(".jsonl")) |
|
|
|
|
|
if is_textbook: |
|
|
full_path = os.path.join(root, file) |
|
|
_parse_curriculum_file(full_path) |
|
|
file_count += 1 |
|
|
|
|
|
if qa_store.ENTRIES: |
|
|
qa_store.RAW_KNOWLEDGE = "\n\n".join(e["text"] for e in qa_store.ENTRIES) |
|
|
print(f"[INFO] Loaded {len(qa_store.ENTRIES)} entries from {file_count} files.") |
|
|
else: |
|
|
qa_store.RAW_KNOWLEDGE = "ຍັງບໍ່ມີຂໍ້ມູນ." |
|
|
print("[WARN] No curriculum files found.") |
|
|
|
|
|
|
|
|
def _parse_curriculum_file(path: str): |
|
|
"""Helper to read a single textbook file""" |
|
|
with open(path, "r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
line = line.strip() |
|
|
if not line: continue |
|
|
try: |
|
|
obj = json.loads(line) |
|
|
if "text" not in obj: continue |
|
|
|
|
|
qa_store.ENTRIES.append(obj) |
|
|
|
|
|
|
|
|
for pair in obj.get("qa", []): |
|
|
q = (pair.get("q") or "").strip() |
|
|
a = (pair.get("a") or "").strip() |
|
|
if q and a: |
|
|
norm_q = qa_store.normalize_question(q) |
|
|
qa_store.AUTO_QA_KNOWLEDGE.append({ |
|
|
"norm_q": norm_q, |
|
|
"q": q, |
|
|
"a": a, |
|
|
"source": "auto", |
|
|
"id": obj.get("id", "") |
|
|
}) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
|
|
|
def load_glossary() -> None: |
|
|
""" |
|
|
Recursively find and load all glossary JSONL files. |
|
|
Looks for files named 'glossary.jsonl' OR starting with 'glossary'. |
|
|
""" |
|
|
qa_store.GLOSSARY.clear() |
|
|
|
|
|
print(f"[INFO] Scanning {DATA_DIR} for glossary files...") |
|
|
|
|
|
for root, dirs, files in os.walk(DATA_DIR): |
|
|
for file in files: |
|
|
is_glossary = "glossary" in file and file.endswith(".jsonl") |
|
|
|
|
|
if is_glossary: |
|
|
full_path = os.path.join(root, file) |
|
|
with open(full_path, "r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
line = line.strip() |
|
|
if not line: continue |
|
|
try: |
|
|
obj = json.loads(line) |
|
|
qa_store.GLOSSARY.append(obj) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
print(f"[INFO] Loaded {len(qa_store.GLOSSARY)} glossary terms.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_manual_qa() -> None: |
|
|
qa_store.MANUAL_QA_LIST.clear() |
|
|
qa_store.MANUAL_QA_INDEX.clear() |
|
|
max_num = 0 |
|
|
|
|
|
if not os.path.exists(MANUAL_QA_PATH): |
|
|
print(f"[WARN] Manual QA file not found: {MANUAL_QA_PATH}") |
|
|
qa_store.NEXT_MANUAL_ID = 1 |
|
|
return |
|
|
|
|
|
with open(MANUAL_QA_PATH, "r", encoding="utf-8") as f: |
|
|
for line in f: |
|
|
line = line.strip() |
|
|
if not line: continue |
|
|
try: |
|
|
obj = json.loads(line) |
|
|
entry_id = str(obj.get("id") or "") |
|
|
|
|
|
|
|
|
import re |
|
|
m = re.search(r"(\d+)$", entry_id) |
|
|
if m: max_num = max(max_num, int(m.group(1))) |
|
|
|
|
|
q = (obj.get("q") or "").strip() |
|
|
a = (obj.get("a") or "").strip() |
|
|
if q and a: |
|
|
norm_q = qa_store.normalize_question(q) |
|
|
entry = {"id": entry_id, "q": q, "a": a, "norm_q": norm_q} |
|
|
qa_store.MANUAL_QA_LIST.append(entry) |
|
|
qa_store.MANUAL_QA_INDEX[norm_q] = entry |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
qa_store.NEXT_MANUAL_ID = max_num + 1 if max_num > 0 else 1 |
|
|
|
|
|
def generate_new_manual_id() -> str: |
|
|
import re |
|
|
used_nums = set() |
|
|
for e in qa_store.MANUAL_QA_LIST: |
|
|
raw_id = str(e.get("id") or "") |
|
|
m = re.search(r"(\d+)$", raw_id) |
|
|
if m: used_nums.add(int(m.group(1))) |
|
|
i = 1 |
|
|
while i in used_nums: i += 1 |
|
|
return f"manual_{i:04d}" |
|
|
|
|
|
def save_manual_qa_file() -> None: |
|
|
os.makedirs(os.path.dirname(MANUAL_QA_PATH), exist_ok=True) |
|
|
with open(MANUAL_QA_PATH, "w", encoding="utf-8") as f: |
|
|
for e in qa_store.MANUAL_QA_LIST: |
|
|
obj = {"id": e["id"], "q": e["q"], "a": e["a"]} |
|
|
f.write(json.dumps(obj, ensure_ascii=False) + "\n") |
|
|
|
|
|
def rebuild_combined_qa() -> None: |
|
|
qa_store.QA_INDEX.clear() |
|
|
qa_store.ALL_QA_KNOWLEDGE.clear() |
|
|
for item in qa_store.AUTO_QA_KNOWLEDGE: |
|
|
norm_q = item["norm_q"] |
|
|
qa_store.QA_INDEX[norm_q] = item["a"] |
|
|
qa_store.ALL_QA_KNOWLEDGE.append(item) |
|
|
for e in qa_store.MANUAL_QA_LIST: |
|
|
item = {"norm_q": e["norm_q"], "q": e["q"], "a": e["a"], "source": "manual", "id": e["id"]} |
|
|
qa_store.QA_INDEX[item["norm_q"]] = item["a"] |
|
|
qa_store.ALL_QA_KNOWLEDGE.append(item) |
|
|
|
|
|
def manual_qa_table_data() -> List[List[str]]: |
|
|
return [[e["id"], e["q"], e["a"]] for e in qa_store.MANUAL_QA_LIST] |